diff options
Diffstat (limited to 'context')
7 files changed, 197 insertions, 69 deletions
diff --git a/context/context-test-utils/src/main/java/org/onap/policy/apex/context/test/distribution/SequentialContextInstantiation.java b/context/context-test-utils/src/main/java/org/onap/policy/apex/context/test/distribution/SequentialContextInstantiation.java index e789ae90c..dbf5ab2c6 100644 --- a/context/context-test-utils/src/main/java/org/onap/policy/apex/context/test/distribution/SequentialContextInstantiation.java +++ b/context/context-test-utils/src/main/java/org/onap/policy/apex/context/test/distribution/SequentialContextInstantiation.java @@ -308,7 +308,7 @@ public class SequentialContextInstantiation { testBadItem001.setByteValue(BYTE_VAL); testBadItem002.setIntValue(INT_VAL); testBadItem003.setLongValue(LONG_VAL); - testBadItem004.setFloatValue(new Float(FLOAT_VAL)); + testBadItem004.setFloatValue(FLOAT_VAL); testBadItem005.setDoubleValue(PI_VAL); testBadItem006.setStringValue(STRING_GLOBAL_VAL); testBadItem007.setLongValue(testDate.getTime()); diff --git a/context/context-test-utils/src/main/java/org/onap/policy/apex/context/test/locking/ConcurrentContext.java b/context/context-test-utils/src/main/java/org/onap/policy/apex/context/test/locking/ConcurrentContext.java index 2eea957c6..a400bc79a 100644 --- a/context/context-test-utils/src/main/java/org/onap/policy/apex/context/test/locking/ConcurrentContext.java +++ b/context/context-test-utils/src/main/java/org/onap/policy/apex/context/test/locking/ConcurrentContext.java @@ -20,7 +20,13 @@ package org.onap.policy.apex.context.test.locking; +import java.io.Closeable; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import org.onap.policy.apex.context.ContextAlbum; import org.onap.policy.apex.context.ContextException; @@ -28,6 +34,7 @@ import org.onap.policy.apex.context.Distributor; import org.onap.policy.apex.context.impl.distribution.DistributorFactory; import org.onap.policy.apex.context.test.concepts.TestContextLongItem; import org.onap.policy.apex.context.test.factory.TestContextAlbumFactory; +import org.onap.policy.apex.context.test.utils.IntegrationThreadFactory; import org.onap.policy.apex.model.basicmodel.concepts.ApexException; import org.onap.policy.apex.model.basicmodel.concepts.AxArtifactKey; import org.onap.policy.apex.model.basicmodel.handling.ApexModelException; @@ -41,8 +48,6 @@ import org.slf4j.ext.XLoggerFactory; * @author Liam Fallon (liam.fallon@ericsson.com) */ public class ConcurrentContext { - private static final int TEN_MILLISECONDS = 10; - // Logger for this class private static final XLogger LOGGER = XLoggerFactory.getXLogger(ConcurrentContext.class); @@ -75,45 +80,68 @@ public class ConcurrentContext { LOGGER.debug("starting JVMs and threads . . ."); - final Thread[] threadArray = new Thread[threadCount]; + final String name = getThreadFactoryName(jvmCount, testType); + final IntegrationThreadFactory threadFactory = new IntegrationThreadFactory(name); + final ExecutorService executorService = Executors.newFixedThreadPool(threadCount, threadFactory); + + final List<Closeable> tasks = new ArrayList<>(threadCount); + + addShutDownHook(tasks); // Check if we have a single JVM or multiple JVMs - int runningThreadCount = -1; if (jvmCount == 1) { // Run everything in this JVM for (int t = 0; t < threadCount; t++) { - threadArray[t] = new Thread(new ConcurrentContextThread(0, t, threadLoops)); - threadArray[t].setName(testType + ":TestConcurrentContextThread_0_" + t); - threadArray[t].start(); + final ConcurrentContextThread task = new ConcurrentContextThread(0, t, threadLoops); + tasks.add(task); + executorService.execute(task); } - runningThreadCount = threadCount; } else { // Spawn JVMs to run the tests for (int j = 0; j < jvmCount; j++) { - threadArray[j] = new Thread(new ConcurrentContextJVMThread(testType, j, threadCount, threadLoops)); - threadArray[j].setName(testType + ":TestConcurrentContextJVMThread_" + j); - threadArray[j].start(); + final ConcurrentContextJVMThread task = + new ConcurrentContextJVMThread(testType, j, threadCount, threadLoops); + tasks.add(task); + executorService.execute(task); } - runningThreadCount = jvmCount; } - boolean allFinished; - do { - allFinished = true; - for (int i = 0; i < runningThreadCount; i++) { - if (threadArray[i].isAlive()) { - allFinished = false; + try { + executorService.shutdown(); + // wait for threads to finish, if not Timeout + executorService.awaitTermination(10, TimeUnit.MINUTES); + } catch (final InterruptedException interruptedException) { + LOGGER.error("Exception while waiting for threads to finish", interruptedException); + } + + LOGGER.info("Shutting down now ..."); + executorService.shutdownNow(); + + return concurrentContext.verifyAndClearContext(jvmCount, threadCount, threadLoops); + } + + + private void addShutDownHook(final List<Closeable> tasks) { + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + LOGGER.info("Shutting down ..."); + for (final Closeable task : tasks) { try { - Thread.sleep(TEN_MILLISECONDS); - } catch (final Exception e) { + task.close(); + } catch (final IOException ioException) { + LOGGER.error("Unable to close task ... ", ioException); } - break; } } - } while (!allFinished); + }); + } - return concurrentContext.verifyAndClearContext(jvmCount, threadCount, threadLoops); + + private String getThreadFactoryName(final int jvmCount, final String testType) { + return jvmCount == 1 ? testType + ":TestConcurrentContextThread_0_" + : testType + ":TestConcurrentContextJVMThread_"; } /** diff --git a/context/context-test-utils/src/main/java/org/onap/policy/apex/context/test/locking/ConcurrentContextJVM.java b/context/context-test-utils/src/main/java/org/onap/policy/apex/context/test/locking/ConcurrentContextJVM.java index 0145bd161..b76b00840 100644 --- a/context/context-test-utils/src/main/java/org/onap/policy/apex/context/test/locking/ConcurrentContextJVM.java +++ b/context/context-test-utils/src/main/java/org/onap/policy/apex/context/test/locking/ConcurrentContextJVM.java @@ -20,20 +20,25 @@ package org.onap.policy.apex.context.test.locking; -import com.google.gson.Gson; - import java.net.InetAddress; import java.net.NetworkInterface; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.Enumeration; +import java.util.List; import java.util.Map.Entry; import java.util.TreeSet; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import org.onap.policy.apex.context.ContextAlbum; import org.onap.policy.apex.context.Distributor; import org.onap.policy.apex.context.impl.distribution.DistributorFactory; import org.onap.policy.apex.context.test.factory.TestContextAlbumFactory; +import org.onap.policy.apex.context.test.utils.IntegrationThreadFactory; import org.onap.policy.apex.model.basicmodel.concepts.ApexException; import org.onap.policy.apex.model.basicmodel.concepts.AxArtifactKey; import org.onap.policy.apex.model.basicmodel.service.AbstractParameters; @@ -42,6 +47,8 @@ import org.onap.policy.apex.model.contextmodel.concepts.AxContextModel; import org.slf4j.ext.XLogger; import org.slf4j.ext.XLoggerFactory; +import com.google.gson.Gson; + /** * The Class ConcurrentContextJVM tests concurrent use of context in a single JVM. * @@ -51,9 +58,16 @@ public final class ConcurrentContextJVM { // Logger for this class private static final XLogger LOGGER = XLoggerFactory.getXLogger(ConcurrentContextJVM.class); - private static final int TEN_MILLISECONDS = 10; private static final int IPV4_ADDRESS_LENGTH = 4; + private final int jvmNo; + + private final int threadCount; + + private final int threadLoops; + + private final ExecutorService executorService; + /** * The Constructor. * @@ -63,9 +77,15 @@ public final class ConcurrentContextJVM { * @param threadLoops the thread loops * @throws ApexException the apex exception */ - private ConcurrentContextJVM(final String testType, final int jvmNo, final int threadCount, final int threadLoops) - throws ApexException { - super(); + private ConcurrentContextJVM(final String testType, final int jvmNo, final int threadCount, final int threadLoops) { + this.jvmNo = jvmNo; + this.threadCount = threadCount; + this.threadLoops = threadLoops; + final String name = testType + ":ConcurrentContextThread_" + jvmNo; + this.executorService = Executors.newFixedThreadPool(threadCount, new IntegrationThreadFactory(name)); + } + + public void execute() throws ApexException { LOGGER.debug("starting JVMs and threads . . ."); final AxArtifactKey distributorKey = new AxArtifactKey("ApexDistributor" + jvmNo, "0.0.1"); @@ -87,34 +107,28 @@ public final class ConcurrentContextJVM { assert (lTypeAlbum != null); lTypeAlbum.setUserArtifactStack(usedArtifactStackArray); - final Thread[] threadArray = new Thread[threadCount]; + final List<Future<?>> tasks = new ArrayList<>(threadCount); for (int t = 0; t < threadCount; t++) { - threadArray[t] = new Thread(new ConcurrentContextThread(jvmNo, t, threadLoops)); - threadArray[t].setName(testType + ":ConcurrentContextThread_" + jvmNo + "_" + t); - threadArray[t].start(); - LOGGER.debug("started thread " + threadArray[t].getName()); + tasks.add(executorService.submit(new ConcurrentContextThread(jvmNo, t, threadLoops))); } - boolean allFinished; - do { - allFinished = true; - for (int t = 0; t < threadCount; t++) { - if (threadArray[t].isAlive()) { - allFinished = false; - try { - Thread.sleep(TEN_MILLISECONDS); - } catch (final Exception e) { - } - break; - } - } - } while (!allFinished); + try { + executorService.shutdown(); + // wait for threads to finish, if not Timeout + executorService.awaitTermination(10, TimeUnit.MINUTES); + } catch (final InterruptedException interruptedException) { + LOGGER.error("Exception while waiting for threads to finish", interruptedException); + } LOGGER.debug("threads finished, end value is {}", lTypeAlbum.get("testValue")); contextDistributor.clear(); + LOGGER.info("Shutting down now ... "); + executorService.shutdownNow(); } + + /** * The main method. * @@ -129,7 +143,7 @@ public final class ConcurrentContextJVM { // CHECKSTYLE:OFF: checkstyle:magicNumber // An even number of arguments greater than 3 - if (args.length < 4 || (args.length % 2 != 0)) { + if (args.length < 7) { LOGGER.error("invalid arguments: " + Arrays.toString(args)); LOGGER.error( "usage: TestConcurrentContextJVM testType jvmNo threadCount threadLoops [parameterKey parameterJson].... "); @@ -139,6 +153,7 @@ public final class ConcurrentContextJVM { int jvmNo = -1; int threadCount = -1; int threadLoops = -1; + String hazelCastfileLocation = null; try { jvmNo = Integer.parseInt(args[1]); @@ -161,7 +176,16 @@ public final class ConcurrentContextJVM { return; } - for (int p = 4; p < args.length - 1; p += 2) { + try { + hazelCastfileLocation = args[4].trim(); + } catch (final Exception e) { + LOGGER.error("invalid argument hazelcast file location", e); + return; + } + + System.setProperty("hazelcast.config", hazelCastfileLocation); + + for (int p = 5; p < args.length - 1; p += 2) { @SuppressWarnings("rawtypes") final Class parametersClass = Class.forName(args[p]); final AbstractParameters parameters = @@ -175,12 +199,17 @@ public final class ConcurrentContextJVM { } try { - new ConcurrentContextJVM(args[0], jvmNo, threadCount, threadLoops); + final ConcurrentContextJVM concurrentContextJVM = + new ConcurrentContextJVM(args[0], jvmNo, threadCount, threadLoops); + concurrentContextJVM.execute(); + } catch (final Exception e) { LOGGER.error("error running test in JVM", e); return; } // CHECKSTYLE:ON: checkstyle:magicNumber + + } /** @@ -190,11 +219,11 @@ public final class ConcurrentContextJVM { */ public static void configure() throws Exception { System.setProperty("java.net.preferIPv4Stack", "true"); - System.setProperty("hazelcast.config", "src/test/resources/hazelcast/hazelcast.xml"); - - // The JGroups IP address must be set to a real (not loopback) IP address for Infinispan to work. IN order to + // The JGroups IP address must be set to a real (not loopback) IP address for Infinispan to + // work. IN order to // ensure that all - // the JVMs in a test pick up the same IP address, this function sets te address to be the first non-loopback + // the JVMs in a test pick up the same IP address, this function sets te address to be the + // first non-loopback // IPv4 address // on a host final TreeSet<String> ipAddressSet = new TreeSet<String>(); diff --git a/context/context-test-utils/src/main/java/org/onap/policy/apex/context/test/locking/ConcurrentContextJVMThread.java b/context/context-test-utils/src/main/java/org/onap/policy/apex/context/test/locking/ConcurrentContextJVMThread.java index da2741f7b..995772f05 100644 --- a/context/context-test-utils/src/main/java/org/onap/policy/apex/context/test/locking/ConcurrentContextJVMThread.java +++ b/context/context-test-utils/src/main/java/org/onap/policy/apex/context/test/locking/ConcurrentContextJVMThread.java @@ -20,9 +20,8 @@ package org.onap.policy.apex.context.test.locking; -import com.google.gson.Gson; - import java.io.BufferedReader; +import java.io.Closeable; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; @@ -36,12 +35,14 @@ import org.onap.policy.apex.model.basicmodel.service.ParameterService; import org.slf4j.ext.XLogger; import org.slf4j.ext.XLoggerFactory; +import com.google.gson.Gson; + /** * The Class TestConcurrentContextThread tests concurrent use of context. * * @author Liam Fallon (liam.fallon@ericsson.com) */ -public class ConcurrentContextJVMThread implements Runnable { +public class ConcurrentContextJVMThread implements Runnable, Closeable { // Logger for this class private static final XLogger LOGGER = XLoggerFactory.getXLogger(ConcurrentContextJVMThread.class); @@ -49,6 +50,7 @@ public class ConcurrentContextJVMThread implements Runnable { private final int jvm; private final int threadCount; private final int target; + private Process process = null; /** * The Constructor. @@ -77,6 +79,8 @@ public class ConcurrentContextJVMThread implements Runnable { final List<String> commandList = new ArrayList<>(); commandList.add(System.getProperty("java.home") + System.getProperty("file.separator") + "bin" + System.getProperty("file.separator") + "java"); + commandList.add("-Xms512m"); + commandList.add("-Xmx512m"); commandList.add("-cp"); commandList.add(System.getProperty("java.class.path")); commandList.add(ConcurrentContextJVM.class.getCanonicalName()); @@ -84,6 +88,7 @@ public class ConcurrentContextJVMThread implements Runnable { commandList.add(new Integer(jvm).toString()); commandList.add(new Integer(threadCount).toString()); commandList.add(new Integer(target).toString()); + commandList.add(System.getProperty("hazelcast.config")); for (final Entry<Class<?>, AbstractParameters> parameterServiceEntry : ParameterService.getAll()) { commandList.add(parameterServiceEntry.getKey().getCanonicalName()); @@ -95,7 +100,7 @@ public class ConcurrentContextJVMThread implements Runnable { // Run the JVM final ProcessBuilder processBuilder = new ProcessBuilder(commandList); processBuilder.redirectErrorStream(true); - Process process; + try { process = processBuilder.start(); @@ -121,4 +126,14 @@ public class ConcurrentContextJVMThread implements Runnable { LOGGER.error("Error occured while writing JVM Output for command ", ioException); } } + + + @Override + public void close() { + LOGGER.info("Shutting down {} thread ...", Thread.currentThread().getName()); + if (process != null) { + LOGGER.info("Destroying process ..."); + process.destroy(); + } + } } diff --git a/context/context-test-utils/src/main/java/org/onap/policy/apex/context/test/locking/ConcurrentContextThread.java b/context/context-test-utils/src/main/java/org/onap/policy/apex/context/test/locking/ConcurrentContextThread.java index 554c3d005..75a704557 100644 --- a/context/context-test-utils/src/main/java/org/onap/policy/apex/context/test/locking/ConcurrentContextThread.java +++ b/context/context-test-utils/src/main/java/org/onap/policy/apex/context/test/locking/ConcurrentContextThread.java @@ -20,6 +20,8 @@ package org.onap.policy.apex.context.test.locking; +import java.io.Closeable; + import org.onap.policy.apex.context.ContextAlbum; import org.onap.policy.apex.context.ContextException; import org.onap.policy.apex.context.Distributor; @@ -38,7 +40,8 @@ import org.slf4j.ext.XLoggerFactory; * * @author Liam Fallon (liam.fallon@ericsson.com) */ -public class ConcurrentContextThread implements Runnable { +public class ConcurrentContextThread implements Runnable, Closeable { + private static final String VALUE = "testValue"; // Logger for this class private static final XLogger LOGGER = XLoggerFactory.getXLogger(ConcurrentContextThread.class); private final Distributor distributor; @@ -108,8 +111,8 @@ public class ConcurrentContextThread implements Runnable { } try { - lTypeAlbum.lockForWriting("testValue"); - final TestContextLongItem item = (TestContextLongItem) lTypeAlbum.get("testValue"); + lTypeAlbum.lockForWriting(VALUE); + final TestContextLongItem item = (TestContextLongItem) lTypeAlbum.get(VALUE); final long value = item.getLongValue(); LOGGER.info("completed TestConcurrentContextThread_" + jvm + "_" + instance + ", value=" + value); } catch (final Exception e) { @@ -117,7 +120,7 @@ public class ConcurrentContextThread implements Runnable { LOGGER.error("failed TestConcurrentContextThread_" + jvm + "_" + instance); } finally { try { - lTypeAlbum.unlockForWriting("testValue"); + lTypeAlbum.unlockForWriting(VALUE); distributor.shutdown(); } catch (final ContextException e) { LOGGER.error("could not unlock test context album item", e); @@ -129,18 +132,23 @@ public class ConcurrentContextThread implements Runnable { private void updateAlbum(final ContextAlbum lTypeAlbum) throws Exception { for (int i = 0; i < threadLoops; i++) { try { - lTypeAlbum.lockForWriting("testValue"); - TestContextLongItem item = (TestContextLongItem) lTypeAlbum.get("testValue"); + lTypeAlbum.lockForWriting(VALUE); + TestContextLongItem item = (TestContextLongItem) lTypeAlbum.get(VALUE); if (item != null) { long value = item.getLongValue(); item.setLongValue(++value); } else { item = new TestContextLongItem(0L); } - lTypeAlbum.put("testValue", item); + lTypeAlbum.put(VALUE, item); } finally { - lTypeAlbum.unlockForWriting("testValue"); + lTypeAlbum.unlockForWriting(VALUE); } } } + + @Override + public void close() { + LOGGER.info("Shutting down {} thread ...", Thread.currentThread().getName()); + } } diff --git a/context/context-test-utils/src/main/java/org/onap/policy/apex/context/test/utils/Constants.java b/context/context-test-utils/src/main/java/org/onap/policy/apex/context/test/utils/Constants.java index 90e586e72..ac1e951d5 100644 --- a/context/context-test-utils/src/main/java/org/onap/policy/apex/context/test/utils/Constants.java +++ b/context/context-test-utils/src/main/java/org/onap/policy/apex/context/test/utils/Constants.java @@ -51,7 +51,7 @@ public class Constants { public static final String TEST_POLICY_CONTEXT_ITEM = TestPolicyContextItem.class.getName(); public static final TimeZone TIME_ZONE = TimeZone.getTimeZone("Europe/Dublin"); - public static final AxArtifactKey[] USED_ARTIFACT_STACK_ARRAY = {new AxArtifactKey("testC-top", VERSION), + public static AxArtifactKey[] USED_ARTIFACT_STACK_ARRAY = {new AxArtifactKey("testC-top", VERSION), new AxArtifactKey("testC-next", VERSION), new AxArtifactKey("testC-bot", VERSION)}; private Constants() {} diff --git a/context/context-test-utils/src/main/java/org/onap/policy/apex/context/test/utils/IntegrationThreadFactory.java b/context/context-test-utils/src/main/java/org/onap/policy/apex/context/test/utils/IntegrationThreadFactory.java new file mode 100644 index 000000000..b5ea84d2f --- /dev/null +++ b/context/context-test-utils/src/main/java/org/onap/policy/apex/context/test/utils/IntegrationThreadFactory.java @@ -0,0 +1,48 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ +package org.onap.policy.apex.context.test.utils; + +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; + +import org.slf4j.ext.XLogger; +import org.slf4j.ext.XLoggerFactory; + +public class IntegrationThreadFactory implements ThreadFactory { + private static final XLogger LOGGER = XLoggerFactory.getXLogger(IntegrationThreadFactory.class); + + + private final String threadFactoryName; + + private final AtomicInteger counter = new AtomicInteger(); + + public IntegrationThreadFactory(final String threadFactoryName) { + this.threadFactoryName = threadFactoryName; + } + + @Override + public Thread newThread(final Runnable runnable) { + final Thread thread = new Thread(runnable); + thread.setName(threadFactoryName + "_" + counter.getAndIncrement()); + LOGGER.debug("started thread " + thread.getName()); + return thread; + } + +} |