summaryrefslogtreecommitdiffstats
path: root/feature-server-pool/src/test/java
diff options
context:
space:
mode:
authorStraubs, Ralph (rs8887) <rs8887@att.com>2020-02-04 03:26:30 -0600
committerChou, Joseph (jc2555) <jc2555@att.com>2020-03-04 10:33:48 -0500
commit001320ed1ecbdf3b2f58d261f008f681da3f4c67 (patch)
treebeb71c4577fd6b4f9bc843a5b0a58566a77009f2 /feature-server-pool/src/test/java
parentbbcf7265e9b74e689bd8974f9684a6cd5c95fd9f (diff)
Add feature-server-pool to the ONAP drools-pdp repository.
Issue-ID: POLICY-2351 Change-Id: I8ddde547a73a51c04c8dd9f1d66232e8281599a9 Signed-off-by: Straubs, Ralph (rs8887) <rs8887@att.com>
Diffstat (limited to 'feature-server-pool/src/test/java')
-rw-r--r--feature-server-pool/src/test/java/org/onap/policy/drools/serverpool/AdapterImpl.java456
-rw-r--r--feature-server-pool/src/test/java/org/onap/policy/drools/serverpool/BucketWrapperImpl.java173
-rw-r--r--feature-server-pool/src/test/java/org/onap/policy/drools/serverpool/ServerWrapperImpl.java146
-rw-r--r--feature-server-pool/src/test/java/org/onap/policy/drools/serverpool/TargetLockWrapperImpl.java195
-rw-r--r--feature-server-pool/src/test/java/org/onap/policy/drools/serverpooltest/Adapter.java353
-rw-r--r--feature-server-pool/src/test/java/org/onap/policy/drools/serverpooltest/BlockingClassLoader.java176
-rw-r--r--feature-server-pool/src/test/java/org/onap/policy/drools/serverpooltest/BucketWrapper.java132
-rw-r--r--feature-server-pool/src/test/java/org/onap/policy/drools/serverpooltest/ServerWrapper.java103
-rw-r--r--feature-server-pool/src/test/java/org/onap/policy/drools/serverpooltest/SimDmaap.java327
-rw-r--r--feature-server-pool/src/test/java/org/onap/policy/drools/serverpooltest/TargetLockWrapper.java98
-rw-r--r--feature-server-pool/src/test/java/org/onap/policy/drools/serverpooltest/Test1.java912
-rw-r--r--feature-server-pool/src/test/java/org/onap/policy/drools/serverpooltest/TestDroolsObject.java58
12 files changed, 3129 insertions, 0 deletions
diff --git a/feature-server-pool/src/test/java/org/onap/policy/drools/serverpool/AdapterImpl.java b/feature-server-pool/src/test/java/org/onap/policy/drools/serverpool/AdapterImpl.java
new file mode 100644
index 00000000..bac13f18
--- /dev/null
+++ b/feature-server-pool/src/test/java/org/onap/policy/drools/serverpool/AdapterImpl.java
@@ -0,0 +1,456 @@
+/*
+ * ============LICENSE_START=======================================================
+ * feature-server-pool
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.serverpool;
+
+import static org.awaitility.Awaitility.await;
+
+import java.io.PrintStream;
+import java.nio.file.Paths;
+import java.util.Properties;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.kie.api.runtime.KieSession;
+
+import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.common.endpoints.event.comm.TopicListener;
+
+import org.onap.policy.drools.core.PolicyContainer;
+import org.onap.policy.drools.core.PolicySession;
+import org.onap.policy.drools.core.PolicySessionFeatureApiConstants;
+
+import org.onap.policy.drools.serverpooltest.Adapter;
+import org.onap.policy.drools.serverpooltest.BucketWrapper;
+import org.onap.policy.drools.serverpooltest.ServerWrapper;
+import org.onap.policy.drools.serverpooltest.TargetLockWrapper;
+
+import org.onap.policy.drools.system.PolicyController;
+import org.onap.policy.drools.system.PolicyEngineConstants;
+import org.onap.policy.drools.util.KieUtils;
+import org.onap.policy.drools.utils.PropertyUtil;
+import org.powermock.reflect.Whitebox;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class implements the 'Adapter' interface. There is one 'AdapterImpl'
+ * class for each simulated host, and one instance of each 'AdapterImpl' class.
+ */
+public class AdapterImpl extends Adapter {
+ private static Logger logger = LoggerFactory.getLogger(AdapterImpl.class);
+
+ // Each 'AdapterImpl' instance has it's own class object, making it a
+ // singleton. There is only a single 'Adapter' class object, and all
+ // 'AdapterImpl' classes are derived from it.
+ private static AdapterImpl adapter = null;
+
+ // this is the adapter index
+ private int index;
+
+ // this will refer to the Drools session 'PolicyController' instance
+ private PolicyController policyController = null;
+
+ // this will refer to the Drools session 'PolicySession' instance
+ private PolicySession policySession = null;
+
+ // used by Drools session to signal back to Junit tests
+ private LinkedBlockingQueue<String> inotificationQueue =
+ new LinkedBlockingQueue<>();
+
+ // provides indirect references to a select set of static 'Server' methods
+ private static ServerWrapper.Static serverStatic =
+ new ServerWrapperImpl.Static();
+
+ // provides indirect references to a select set of static 'Bucket' methods
+ private static BucketWrapper.Static bucketStatic =
+ new BucketWrapperImpl.Static();
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void init(int index) throws Exception {
+ adapter = this;
+ this.index = index;
+
+ PolicyEngineConstants.getManager().configure(new Properties());
+ PolicyEngineConstants.getManager().start();
+
+ // Note that this method does basically what
+ // 'FeatureServerPool.afterStart(PolicyEngine)' does, but allows us to
+ // specify different properties for each of the 6 simulated hosts
+ logger.info("{}: Running: AdapterImpl.init({}), class hash code = {}",
+ this, index, AdapterImpl.class.hashCode());
+
+ Properties prop = new Properties();
+ prop.setProperty("server.pool.discovery.servers", "127.0.63.250");
+ prop.setProperty("server.pool.discovery.topic", "DISCOVERY-TOPIC");
+ prop.setProperty("server.pool.server.ipAddress", "127.0.63." + index);
+ prop.setProperty("server.pool.server.port", "20000");
+
+ prop.setProperty("keyword.path", "requestID,CommonHeader.RequestID,key");
+
+ prop.setProperty("keyword.org.onap.policy.m2.base.Transaction.lookup",
+ "getRequestID()");
+ prop.setProperty("keyword.org.onap.policy.controlloop.ControlLoopEvent.lookup", "requestID");
+ prop.setProperty("keyword.org.onap.policy.drools.serverpool.TargetLock.lookup", "getOwnerKey()");
+ prop.setProperty("keyword.java.lang.String.lookup", "toString()");
+ prop.setProperty("keyword.org.onap.policy.drools.serverpooltest.TestDroolsObject.lookup",
+ "getKey()");
+ prop.setProperty("keyword.org.onap.policy.drools.serverpooltest.Test1$KeywordWrapper.lookup", "key");
+
+ TargetLock.startup();
+ Server.startup(prop);
+
+ // use reflection to set private static field
+ // 'FeatureServerPool.droolsTimeoutMillis'
+ Whitebox.setInternalState(FeatureServerPool.class, "droolsTimeoutMillis",
+ ServerPoolProperties.DEFAULT_BUCKET_DROOLS_TIMEOUT);
+
+ // use reflection to set private static field
+ // 'FeatureServerPool.timeToLiveSecond'
+ Whitebox.setInternalState(FeatureServerPool.class, "timeToLiveSecond",
+ String.valueOf(ServerPoolProperties.DEFAULT_BUCKET_TIME_TO_LIVE));
+
+ // use reflection to call private static method
+ // 'FeatureServerPool.buildKeywordTable()'
+ Whitebox.invokeMethod(FeatureServerPool.class, "buildKeywordTable");
+
+ Bucket.Backup.register(new FeatureServerPool.DroolsSessionBackup());
+ Bucket.Backup.register(new TargetLock.LockBackup());
+
+ // dump out feature lists
+ logger.info("{}: ServerPoolApi features list: {}",
+ this, ServerPoolApi.impl.getList());
+ logger.info("{}: PolicySessionFeatureApi features list: {}",
+ this, PolicySessionFeatureApiConstants.getImpl().getList());
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void shutdown() {
+ policyController.stop();
+ Server.shutdown();
+
+ PolicyEngineConstants.getManager().stop();
+ PolicyEngineConstants.getManager().getExecutorService().shutdown();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public LinkedBlockingQueue<String> notificationQueue() {
+ return inotificationQueue;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean waitForInit(long endTime) throws InterruptedException {
+ try {
+ // wait until a leader is elected
+ await().atMost(endTime - System.currentTimeMillis(),
+ TimeUnit.MILLISECONDS).until(() -> Leader.getLeader() != null);
+
+ // wait for each bucket to have an owner
+ for (int i = 0 ; i < Bucket.BUCKETCOUNT ; i += 1) {
+ Bucket bucket = Bucket.getBucket(i);
+ while (bucket.getOwner() == null) {
+ Thread.sleep(Math.min(endTime - System.currentTimeMillis(), 100L));
+ }
+ //await().atMost(endTime - System.currentTimeMillis(),
+ //TimeUnit.MILLISECONDS).until(() -> bucket.getOwner() != null);
+ }
+ } catch (IllegalArgumentException e) {
+ // 'Thread.sleep()' was passed a negative time-out value --
+ // time is up
+ logger.debug("AdapterImpl waitForInit error", e);
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public ServerWrapper.Static getServerStatic() {
+ return serverStatic;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public ServerWrapper getLeader() {
+ return ServerWrapperImpl.getWrapper(Leader.getLeader());
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public BucketWrapper.Static getBucketStatic() {
+ return bucketStatic;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public TargetLockWrapper newTargetLock(
+ String key, String ownerKey, TargetLockWrapper.Owner owner, boolean waitForLock) {
+
+ return TargetLockWrapperImpl.newTargetLock(key, ownerKey, owner, waitForLock);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public TargetLockWrapper newTargetLock(String key, String ownerKey, TargetLockWrapper.Owner owner) {
+ return TargetLockWrapperImpl.newTargetLock(key, ownerKey, owner);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void dumpLocks(PrintStream out, boolean detail) {
+ try {
+ TargetLock.DumpLocks.dumpLocks(out, detail);
+ } catch (Exception e) {
+ logger.error("{}: Exception in 'dumpLocks'", this, e);
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public String createController() {
+ Properties properties;
+
+ // set the thread class loader to be the same as the one associated
+ // with the 'AdapterImpl' instance, so it will be inherited by any
+ // new threads created (the Drools session thread, in particular)
+ ClassLoader saveClassLoader =
+ Thread.currentThread().getContextClassLoader();
+ Thread.currentThread().setContextClassLoader(AdapterImpl.class.getClassLoader());
+
+ try {
+ // build and install Drools artifact
+ KieUtils.installArtifact(
+ Paths.get("src/test/resources/drools-artifact-1.1/src/main/resources/META-INF/kmodule.xml").toFile(),
+ Paths.get("src/test/resources/drools-artifact-1.1/pom.xml").toFile(),
+ "src/main/resources/rules/org/onap/policy/drools/core/test/rules.drl",
+ Paths.get("src/test/resources/drools-artifact-1.1/src/main/resources/rules.drl").toFile());
+
+ // load properties from file
+ properties = PropertyUtil.getProperties("src/test/resources/TestController-controller.properties");
+ } catch (Exception e) {
+ e.printStackTrace();
+ Thread.currentThread().setContextClassLoader(saveClassLoader);
+ return e.toString();
+ }
+
+ StringBuilder sb = new StringBuilder();
+ try {
+ // create and start 'PolicyController'
+ policyController = PolicyEngineConstants.getManager()
+ .createPolicyController("TestController", properties);
+ policyController.start();
+
+ // dump out container information (used for debugging tests)
+ sb.append("PolicyContainer count: ")
+ .append(PolicyContainer.getPolicyContainers().size()).append('\n');
+ for (PolicyContainer policyContainer :
+ PolicyContainer.getPolicyContainers()) {
+ sb.append(" name = ")
+ .append(policyContainer.getName())
+ .append('\n')
+ .append(" session count = ")
+ .append(policyContainer.getPolicySessions().size())
+ .append('\n');
+ for (PolicySession pc : policyContainer.getPolicySessions()) {
+ policySession = pc;
+ }
+ }
+ } finally {
+ Thread.currentThread().setContextClassLoader(saveClassLoader);
+ }
+ return sb.toString();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void sendEvent(String key) {
+ /*
+ * Note: the dumping out of package information was useful in tracking
+ * down strange Drools behavior that was eventually tied to the
+ * Drools class loader.
+ */
+ logger.info("{}: Calling 'sendEvent': packages = {}", this,
+ policySession.getKieSession().getKieBase().getKiePackages());
+ ((TopicListener)policyController).onTopicEvent(
+ CommInfrastructure.UEB, "JUNIT-TEST-TOPIC",
+ "{\"key\":\"" + key + "\"}");
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public KieSession getKieSession() {
+ return policySession == null ? null : policySession.getKieSession();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void insertDrools(Object object) {
+ if (policySession != null) {
+ /*
+ * this will eventually be changed to use the
+ * 'PolicySession.insertObject(...)' method
+ */
+ new FeatureServerPool().insertDrools(policySession, object);
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean isForeign(Object... objects) {
+ boolean rval = false;
+ ClassLoader myClassLoader = AdapterImpl.class.getClassLoader();
+ for (Object o : objects) {
+ Class clazz = o.getClass();
+ ClassLoader objClassLoader = clazz.getClassLoader();
+
+ try {
+ if (myClassLoader != objClassLoader
+ && clazz != myClassLoader.loadClass(clazz.getName())) {
+ rval = true;
+ logger.info("{}: FOREIGN OBJECT ({}) - {}",
+ this, getAdapter(objClassLoader), o);
+ }
+ } catch (ClassNotFoundException e) {
+ rval = true;
+ logger.error("{}: FOREIGN OBJECT -- CLASS NOT FOUND ({}) - {}",
+ this, getAdapter(objClassLoader), o);
+ }
+ }
+ return rval;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public String findKey(String prefix, int startingIndex, ServerWrapper host) {
+ String rval = null;
+
+ // try up to 10000 numeric values to locate one on a particular host
+ for (int i = 0 ; i < 10000 ; i += 1) {
+ // generate key, and see if it is on the desired server
+ String testString = prefix + (startingIndex + i);
+ if (ServerWrapperImpl.getWrapper(
+ Bucket.bucketToServer(Bucket.bucketNumber(testString))) == host) {
+ // we have one that works
+ rval = testString;
+ break;
+ }
+ }
+ return rval;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public String findKey(String prefix, int startingIndex) {
+ return findKey(prefix, startingIndex, serverStatic.getThisServer());
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public String findKey(String prefix) {
+ return findKey(prefix, 1, serverStatic.getThisServer());
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public String toString() {
+ return "AdapterImpl[" + index + "]";
+ }
+
+ /**
+ * Return an Adapter.
+ *
+ * @return the 'Adapter' instance associated with the ClassLoader associated
+ * with the current thread
+ */
+ public static Adapter getAdapter() {
+ /*
+ * Note that 'return(adapter)' doesn't work as expected when called from
+ * within a 'Drools' session, because of the strange way that the Drools
+ * 'ClassLoader' works -- it bypasses 'AdapterClassLoader' when doing
+ * class lookups, even though it is the immediate parent of the Drools
+ * session class loader.
+ */
+ return getAdapter(Thread.currentThread().getContextClassLoader());
+ }
+
+ /**
+ * Return an Adapter.
+ *
+ * @param classLoader a ClassLoader instance
+ * @return the 'Adapter' instance associated with the specified ClassLoader
+ */
+ public static Adapter getAdapter(ClassLoader classLoader) {
+ try {
+ // locate the 'AdapterImpl' class associated with a particular
+ // 'ClassLoader' (which may be different from the current one)
+ Class<?> thisAdapterClass =
+ classLoader.loadClass("org.onap.policy.drools.serverpool.AdapterImpl");
+
+ // return the 'adapter' field value
+ return Whitebox.getInternalState(thisAdapterClass, "adapter");
+ } catch (Exception e) {
+ e.printStackTrace();
+ return null;
+ }
+ }
+}
diff --git a/feature-server-pool/src/test/java/org/onap/policy/drools/serverpool/BucketWrapperImpl.java b/feature-server-pool/src/test/java/org/onap/policy/drools/serverpool/BucketWrapperImpl.java
new file mode 100644
index 00000000..ca89d99a
--- /dev/null
+++ b/feature-server-pool/src/test/java/org/onap/policy/drools/serverpool/BucketWrapperImpl.java
@@ -0,0 +1,173 @@
+/*
+ * ============LICENSE_START=======================================================
+ * feature-server-pool
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.serverpool;
+
+import java.io.PrintStream;
+import java.util.IdentityHashMap;
+
+import org.onap.policy.drools.serverpooltest.BucketWrapper;
+import org.onap.policy.drools.serverpooltest.ServerWrapper;
+
+/**
+ * This class implements the 'BucketWrapper' interface. There is one
+ * 'BucketWrapperImpl' class for each simulated host.
+ */
+public class BucketWrapperImpl implements BucketWrapper {
+ // this maps a 'Bucket' instance on this host to an associated wrapper
+ private static IdentityHashMap<Bucket, BucketWrapperImpl> bucketToWrapper =
+ new IdentityHashMap<>();
+
+ // this is the 'Bucket' instance associated with the wrapper
+ private Bucket bucket;
+
+ /**
+ * This method maps a 'Bucket' instance into a 'BucketWrapperImpl'
+ * instance. The goal is to have only a single 'BucketWrapperImpl' instance
+ * for each 'Bucket' instance, so that testing for identity will work
+ * as expected.
+ *
+ * @param bucket the 'Bucket' instance
+ * @return the associated 'BucketWrapperImpl' instance
+ */
+ static synchronized BucketWrapperImpl getWrapper(Bucket bucket) {
+ if (bucket == null) {
+ return null;
+ }
+ BucketWrapperImpl rval = bucketToWrapper.get(bucket);
+ if (rval == null) {
+ // a matching entry does not yet exist -- create one
+ rval = new BucketWrapperImpl(bucket);
+ bucketToWrapper.put(bucket, rval);
+ }
+ return rval;
+ }
+
+ /**
+ * Constructor - initialize the 'bucket' field.
+ */
+ BucketWrapperImpl(Bucket bucket) {
+ this.bucket = bucket;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public int getBucketNumber() {
+ return bucket.getIndex();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public ServerWrapper getOwner() {
+ return ServerWrapperImpl.getWrapper(bucket.getOwner());
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public ServerWrapper getPrimaryBackup() {
+ return ServerWrapperImpl.getWrapper(bucket.getPrimaryBackup());
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public ServerWrapper getSecondaryBackup() {
+ return ServerWrapperImpl.getWrapper(bucket.getSecondaryBackup());
+ }
+
+ /* ============================================================ */
+
+ /**
+ * This class implements the 'BucketWrapper.Static' interface. There is
+ * one 'BucketWrapperImpl.Static' class, and one instance for each
+ * simulated host
+ */
+ public static class Static implements BucketWrapper.Static {
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public int getBucketCount() {
+ return Bucket.BUCKETCOUNT;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public int bucketNumber(String value) {
+ return Bucket.bucketNumber(value);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public ServerWrapper bucketToServer(int bucketNumber) {
+ return ServerWrapperImpl.getWrapper(Bucket.bucketToServer(bucketNumber));
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public BucketWrapper getBucket(int bucketNumber) {
+ return getWrapper(Bucket.getBucket(bucketNumber));
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean isKeyOnThisServer(String key) {
+ return Bucket.isKeyOnThisServer(key);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void moveBucket(PrintStream out, int bucketNumber, String newHostUuid) {
+ ClassLoader save = Thread.currentThread().getContextClassLoader();
+ try {
+ Thread.currentThread().setContextClassLoader(
+ BucketWrapperImpl.class.getClassLoader());
+ Bucket.moveBucket(out, bucketNumber, newHostUuid);
+ } finally {
+ Thread.currentThread().setContextClassLoader(save);
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void dumpAdjuncts(PrintStream out) {
+ Bucket.dumpAdjuncts(out);
+ }
+ }
+}
diff --git a/feature-server-pool/src/test/java/org/onap/policy/drools/serverpool/ServerWrapperImpl.java b/feature-server-pool/src/test/java/org/onap/policy/drools/serverpool/ServerWrapperImpl.java
new file mode 100644
index 00000000..468d12af
--- /dev/null
+++ b/feature-server-pool/src/test/java/org/onap/policy/drools/serverpool/ServerWrapperImpl.java
@@ -0,0 +1,146 @@
+/*
+ * ============LICENSE_START=======================================================
+ * feature-server-pool
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.serverpool;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.IdentityHashMap;
+import java.util.UUID;
+
+import org.onap.policy.drools.serverpooltest.ServerWrapper;
+
+/**
+ * This class implements the 'ServerWrapper' interface. There is one
+ * 'ServerWrapperImpl' class for each simulated host.
+ */
+public class ServerWrapperImpl implements ServerWrapper {
+ // this maps a 'Server' instance on this host to an associated wrapper
+ private static IdentityHashMap<Server, ServerWrapperImpl> serverToWrapper =
+ new IdentityHashMap<>();
+
+ // this is the 'Server' instance associated with the wrapper
+ private Server server;
+
+ /**
+ * This method maps a 'Server' instance into a 'ServerWrapperImpl'
+ * instance. The goal is to have only a single 'ServerWrapperImpl' instance
+ * for each 'Server' instance, so that testing for identity will work
+ * as expected.
+ *
+ * @param server the 'Server' instance
+ * @return the associated 'ServerWrapperImpl' instance
+ * ('null' if 'server' is 'null')
+ */
+ static synchronized ServerWrapperImpl getWrapper(Server server) {
+ if (server == null) {
+ return null;
+ }
+ ServerWrapperImpl rval = serverToWrapper.computeIfAbsent(server,
+ (key) -> new ServerWrapperImpl(server));
+ return rval;
+ }
+
+ /**
+ * Constructor - initialize the 'server' field.
+ */
+ private ServerWrapperImpl(Server server) {
+ this.server = server;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public String toString() {
+ return server.toString();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public UUID getUuid() {
+ return server.getUuid();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean isActive() {
+ return server.isActive();
+ }
+
+ /* ============================================================ */
+
+ /**
+ * This class implements the 'ServerWrapper.Static' interface. There is
+ * one 'ServerWrapperImpl.Static' class, and one instance for each
+ * simulated host
+ */
+ public static class Static implements ServerWrapper.Static {
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public ServerWrapper getThisServer() {
+ return getWrapper(Server.getThisServer());
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public ServerWrapper getFirstServer() {
+ return getWrapper(Server.getFirstServer());
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public ServerWrapper getServer(UUID uuid) {
+ return getWrapper(Server.getServer(uuid));
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public int getServerCount() {
+ return Server.getServerCount();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Collection<ServerWrapper> getServers() {
+ // build an 'ArrayList' which mirrors the set of servers
+ ArrayList<ServerWrapper> rval = new ArrayList<>(Server.getServerCount());
+
+ for (Server server : Server.getServers()) {
+ rval.add(getWrapper(server));
+ }
+ return rval;
+ }
+ }
+}
diff --git a/feature-server-pool/src/test/java/org/onap/policy/drools/serverpool/TargetLockWrapperImpl.java b/feature-server-pool/src/test/java/org/onap/policy/drools/serverpool/TargetLockWrapperImpl.java
new file mode 100644
index 00000000..4f496986
--- /dev/null
+++ b/feature-server-pool/src/test/java/org/onap/policy/drools/serverpool/TargetLockWrapperImpl.java
@@ -0,0 +1,195 @@
+/*
+ * ============LICENSE_START=======================================================
+ * feature-server-pool
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.serverpool;
+
+import java.io.Serializable;
+import java.util.IdentityHashMap;
+
+import org.onap.policy.drools.core.lock.Lock;
+import org.onap.policy.drools.core.lock.LockCallback;
+import org.onap.policy.drools.serverpooltest.TargetLockWrapper;
+
+/**
+ * This class implements the 'TargetLockWrapper' interface. There is one
+ * 'TargetLockWrapperImpl' class for each simulated host.
+ */
+public class TargetLockWrapperImpl implements TargetLockWrapper {
+ // this is the 'TargetLock' instance associated with the wrapper
+ private TargetLock targetLock;
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean free() {
+ return targetLock.free();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean isActive() {
+ return targetLock.isActive();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public State getState() {
+ return TargetLockWrapper.State.valueOf(targetLock.getState().toString());
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public String getOwnerKey() {
+ return targetLock.getOwnerKey();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public String toString() {
+ return "TLW-"
+ + String.valueOf(AdapterImpl.getAdapter(TargetLockWrapperImpl.class.getClassLoader()))
+ + "[" + targetLock.toString() + "]";
+ }
+
+ /**
+ * This method creates a new 'TargetLock'. Internally, an 'OwnerAdapter'
+ * instance is built as well, which translates the 'LockCallback'
+ * callbacks to 'TargetLockWrapper.Owner' callbacks. As with the call to
+ * 'new TargetLock(...)', it is possible for the callback occur before
+ * this method returns -- this can happen if the 'key' hashes to a bucket
+ * owned by the current host.
+ *
+ * @param key string key identifying the lock
+ * @param ownerKey string key identifying the owner, which must hash to
+ * a bucket owned by the current host (it is typically a 'RequestID')
+ * @param owner owner of the lock (will be notified when going from
+ * WAITING to ACTIVE)
+ * @param waitForLock this controls the behavior when 'key' is already
+ * locked - 'true' means wait for it to be freed, 'false' means fail
+ * @return a 'TargetLockWrapper' instance associated with the new
+ * 'TargetLock.
+ */
+ static TargetLockWrapper newTargetLock(
+ String key, String ownerKey, TargetLockWrapper.Owner owner, boolean waitForLock) {
+
+ TargetLockWrapperImpl rval = new TargetLockWrapperImpl();
+ rval.targetLock =
+ new TargetLock(key, ownerKey,
+ TargetLockWrapperImpl.getOwnerAdapter(rval, owner),
+ waitForLock);
+ return rval;
+ }
+
+ /**
+ * This method creates a new 'TargetLock'. Internally, an 'OwnerAdapter'
+ * instance is built as well, which translates the 'LockCallback'
+ * callbacks to 'TargetLockWrapper.Owner' callbacks. As with the call to
+ * 'new TargetLock(...)', it is possible for the callback occur before
+ * this method returns -- this can happen if the 'key' hashes to a bucket
+ * owned by the current host.
+ *
+ * @param key string key identifying the lock
+ * @param ownerKey string key identifying the owner, which must hash to
+ * a bucket owned by the current host (it is typically a 'RequestID')
+ * @param owner owner of the lock (will be notified when going from
+ * WAITING to ACTIVE)
+ * @return a 'TargetLockWrapper' instance associated with the new
+ * 'TargetLock.
+ */
+ static TargetLockWrapper newTargetLock(
+ String key, String ownerKey, TargetLockWrapper.Owner owner) {
+
+ TargetLockWrapperImpl rval = new TargetLockWrapperImpl();
+ rval.targetLock =
+ new TargetLock(key, ownerKey,
+ TargetLockWrapperImpl.getOwnerAdapter(rval, owner));
+ return rval;
+ }
+
+ /* ============================================================ */
+
+ /**
+ * This method builds an adapter that implements the 'LockCallback'
+ * callback interface, translating it to 'TargetLockWrapper.Owner'.
+ *
+ * @param targetLock the TargetLockWrapper that is using this adapter
+ * @param owner the 'TargetLockWrapper.Owner' callback
+ * @return an adapter implementing the 'LockCallback' interface
+ * ('null' is returned if 'owner' is null -- this is an error condition,
+ * but is used to verify the error handling of the 'TargetLock'
+ * constructor.
+ */
+ public static LockCallback getOwnerAdapter(
+ TargetLockWrapper targetLock, TargetLockWrapper.Owner owner) {
+
+ return owner == null ? null : new OwnerAdapter(targetLock, owner);
+ }
+
+ /**
+ * This class is an adapter that implements the 'LockCallback' callback
+ * interface, translating it to a 'TargetLockWrapper.Owner' callback.
+ */
+ public static class OwnerAdapter implements LockCallback, Serializable {
+ // the 'TargetLockWrapper' instance to pass as an argument in the callback
+ TargetLockWrapper targetLock;
+
+ // the 'TargetLockWrapper.Owner' callback
+ TargetLockWrapper.Owner owner;
+
+ /**
+ * Constructor - initialize the adapter.
+ *
+ * @param targetLock this will be passed as an argument in the callback
+ * @param owner the object implementing the 'TargetLockWrapper.Owner'
+ * interface
+ */
+ private OwnerAdapter(TargetLockWrapper targetLock, TargetLockWrapper.Owner owner) {
+ this.targetLock = targetLock;
+ this.owner = owner;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void lockAvailable(Lock lock) {
+ // forward 'lockAvailable' callback
+ owner.lockAvailable(targetLock);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void lockUnavailable(Lock lock) {
+ // forward 'lockUnavailable' callback
+ owner.lockUnavailable(targetLock);
+ }
+ }
+}
diff --git a/feature-server-pool/src/test/java/org/onap/policy/drools/serverpooltest/Adapter.java b/feature-server-pool/src/test/java/org/onap/policy/drools/serverpooltest/Adapter.java
new file mode 100644
index 00000000..03b970b5
--- /dev/null
+++ b/feature-server-pool/src/test/java/org/onap/policy/drools/serverpooltest/Adapter.java
@@ -0,0 +1,353 @@
+/*
+ * ============LICENSE_START=======================================================
+ * feature-server-pool
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.serverpooltest;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectStreamClass;
+import java.io.PrintStream;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.kie.api.runtime.KieSession;
+import org.onap.policy.common.utils.network.NetworkUtil;
+import org.onap.policy.drools.serverpool.Util;
+
+/**
+ * This is a common base class for 6 'AdapterImpl' instances, all running
+ * with their own copies of the server pool classes, and a number of the ONAP
+ * classes. The purpose is to simulate 6 separate hosts in a server pool.
+ * Note that there is potentially a 7th copy of any of these classes, which is
+ * the one loaded with the system class loader. Ideally, those classes
+ * shouldn't be referred to, but there were some problems during testing,
+ * where they unexpectedly were (prompting a change in
+ * 'ExtendedObjectInputStream'). This is referred to as the 'null' host,
+ * where the classes may exist, but have not gone through initialization.
+ */
+public abstract class Adapter {
+ // 'true' indicates that initialization is still needed
+ private static boolean initNeeded = true;
+
+ // Each 'Adapter' instance is implemented by 'AdapterImpl', but loaded
+ // with a different class loader that provides each with a different copy
+ // of the set of classes with packages in the list below (see references to
+ // 'BlockingClassLoader').
+ public static Adapter[] adapters = new Adapter[6];
+
+ /**
+ * Ensure that all adapters have been initialized.
+ */
+ public static void ensureInit() throws Exception {
+ synchronized (Adapter.class) {
+ if (initNeeded) {
+ initNeeded = false;
+
+ // start DMAAP Simulator
+ new Thread(new Runnable() {
+ @Override
+ public void run() {
+ SimDmaap.start();
+ }
+ }, "DMAAP Simulator").start();
+
+ // wait 1 second to allow time for the port 3904 listener
+ assertTrue(NetworkUtil.isTcpPortOpen(SimDmaap.HOSTNAME, 3904, 50, 1000));
+
+ // build 'BlockingClassLoader'
+ BlockingClassLoader bcl = new BlockingClassLoader(
+ Adapter.class.getClassLoader(),
+ // All 'org.onap.policy.*' classes are adapter-specific, except
+ // for the exclusions listed below.
+ "org.onap.policy.*"
+ );
+ bcl.addExclude("org.onap.policy.drools.core.DroolsRunnable");
+ bcl.addExclude("org.onap.policy.drools.serverpooltest.*");
+
+ // build URL list for class loader
+ URL[] urls = {};
+
+ // iterate through 'adapter' entries
+ ClassLoader saveClassLoader =
+ Thread.currentThread().getContextClassLoader();
+ if (saveClassLoader instanceof URLClassLoader) {
+ urls = ((URLClassLoader)saveClassLoader).getURLs();
+ } else {
+ // the parent is not a 'URLClassLoader' --
+ // try to get this information from 'java.class.path'
+ ArrayList<URL> tmpUrls = new ArrayList<>();
+ for (String entry : System.getProperty("java.class.path").split(
+ File.pathSeparator)) {
+ if (new File(entry).isDirectory()) {
+ tmpUrls.add(new URL("file:" + entry + "/"));
+ } else {
+ tmpUrls.add(new URL("file:" + entry));
+ }
+ }
+ urls = tmpUrls.toArray(new URL[0]);
+ }
+ try {
+ for (int i = 0 ; i < adapters.length ; i += 1) {
+ // Build a new 'ClassLoader' for this adapter. The
+ // 'ClassLoader' hierarchy is:
+ //
+ // AdapterClassLoader(one copy per Adapter) ->
+ // BlockingClassLoader ->
+ // base ClassLoader (with the complete URL list)
+ ClassLoader classLoader =
+ new AdapterClassLoader(i, urls, bcl);
+
+ // set the current thread class loader, which should be
+ // inherited by any child threads created during
+ // the initialization of the adapter
+ Thread.currentThread().setContextClassLoader(classLoader);
+
+ // now, build the adapter -- it is not just a new instance,
+ // but a new copy of class 'AdapterImpl'
+ Adapter adapter = (Adapter) classLoader.loadClass(
+ "org.onap.policy.drools.serverpool.AdapterImpl")
+ .newInstance();
+
+ // initialize the adapter
+ adapter.init(i);
+ adapters[i] = adapter;
+ }
+ } finally {
+ // restore the class loader to that used during the Junit tests
+ Thread.currentThread().setContextClassLoader(saveClassLoader);
+ }
+ }
+ }
+ }
+
+ /**
+ * Shut everything down.
+ */
+ public static void ensureShutdown() {
+ for (Adapter adapter : adapters) {
+ adapter.shutdown();
+ }
+ SimDmaap.stop();
+ // not sure why the following is started
+ Util.shutdown();
+ }
+
+ /**
+ * Runs server pool initialization for a particular host.
+ *
+ * @param index the index of the adapter (0-5)
+ */
+ public abstract void init(int index) throws Exception;
+
+ /**
+ * Shuts down the server pool for this host.
+ */
+ public abstract void shutdown();
+
+ /**
+ * Return a 'LinkedBlockingQueue' instance, which is used as a way for
+ * Drools code to signal back to running Junit tests.
+ *
+ * @return a 'LinkedBlockingQueue' instance, which is used as a way for
+ * Drools code to signal back to running Junit tests
+ */
+ public abstract LinkedBlockingQueue<String> notificationQueue();
+
+ /**
+ * This method blocks and waits for all buckets to have owners, or for
+ * a timeout, whichever occurs first.
+ *
+ * @param endTime the point at which timeout occurs
+ * @return 'true' if all buckets have owners, 'false' if a timeout occurred
+ */
+ public abstract boolean waitForInit(long endTime) throws InterruptedException;
+
+ /**
+ * Return an object providing indirect references to a select set of
+ * static 'Server' methods.
+ *
+ * @return an object providing indirect references to a select set of
+ * static 'Server' methods
+ */
+ public abstract ServerWrapper.Static getServerStatic();
+
+ /**
+ * Return an object providing an indirect reference to the lead 'Server'
+ * object.
+ *
+ * @return an object providing an indirect reference to the lead 'Server'
+ * object
+ */
+ public abstract ServerWrapper getLeader();
+
+ /**
+ * Return an object providing indirect references to a select set of
+ * static 'Bucket' methods.
+ *
+ * @return an object providing indirect references to a select set of
+ * static 'Bucket' methods
+ */
+ public abstract BucketWrapper.Static getBucketStatic();
+
+ /**
+ * Create a new 'TargetLock' instance, returning an indirect reference.
+ *
+ * @param key string key identifying the lock
+ * @param ownerKey string key identifying the owner, which must hash to
+ * a bucket owned by the current host (it is typically a 'RequestID')
+ * @param owner owner of the lock (will be notified when going from
+ * WAITING to ACTIVE)
+ * @param waitForLock this controls the behavior when 'key' is already
+ * locked - 'true' means wait for it to be freed, 'false' means fail
+ */
+ public abstract TargetLockWrapper newTargetLock(
+ String key, String ownerKey, TargetLockWrapper.Owner owner,
+ boolean waitForLock);
+
+ /**
+ * Create a new 'TargetLock' instance, returning an indirect reference.
+ *
+ * @param key string key identifying the lock
+ * @param ownerKey string key identifying the owner, which must hash to
+ * a bucket owned by the current host (it is typically a 'RequestID')
+ * @param owner owner of the lock (will be notified when going from
+ * WAITING to ACTIVE)
+ */
+ public abstract TargetLockWrapper newTargetLock(
+ String key, String ownerKey, TargetLockWrapper.Owner owner);
+
+ /**
+ * Call 'TargetLock.DumpLocks.dumpLocks'
+ *
+ * @param out where the output should be displayed
+ * @param detail 'true' provides additional bucket and host information
+ * (but abbreviates all UUIDs in order to avoid excessive
+ * line length)
+ */
+ public abstract void dumpLocks(PrintStream out, boolean detail);
+
+ /**
+ * Create and initialize PolicyController 'TestController', and start
+ * the associated Drools container and session.
+ *
+ * @return a string containing controller session information
+ */
+ public abstract String createController();
+
+ /**
+ * Send an event in the form of a JSON message string. The message is
+ * sent to JUNIT-TEST-TOPIC, and the JSON object is converted to a
+ * 'TestDroolsObject' (all compatible with the Drools session created by
+ * 'createController'.
+ *
+ * @param key determines the bucket number, which affects which host the
+ * message is eventually routed to
+ */
+ public abstract void sendEvent(String key);
+
+ /**
+ * Return the one-and-only 'KieSession' on this host.
+ *
+ * @return the one-and-only 'KieSession' on this host
+ */
+ public abstract KieSession getKieSession();
+
+ /**
+ * Insert an object into the one-and-only Drools session.
+ *
+ * @param object the object to insert
+ */
+ public abstract void insertDrools(Object object);
+
+ // some test utilities
+
+ /**
+ * Determine whether any of the objects passed as parameters are of a class
+ * that belongs to different adapter. Print messages are displayed
+ * for any that do occur.
+ *
+ * @param objects one or more objects to be tested
+ * @return 'true' if one or more are foreign
+ */
+ public abstract boolean isForeign(Object... objects);
+
+ /**
+ * This method is used to generate keys that hash to a bucket associated
+ * with a particular server. The algorithm generates a key using 'prefix'
+ * concatenated with a numeric value, and searches for the first one on
+ * the desired host. It will try up to 10000 indices before giving up --
+ * each host owns 1/6 of the buckets, should the 10000 number should be
+ * way more than enough. The tests are written with the assumption that
+ * a valid key will be returned, and 'NullPointerException' is an acceptable
+ * way to handle the situation if this doesn't work out somehow.
+ *
+ * @param prefix the first portion of the key
+ * @param startingIndex the first index to try
+ * @param host this indicates the 'Server' instance to locate, which must
+ * not be foreign to this adapter
+ * @return a key associated with 'host' ('null' if not found)
+ */
+ public abstract String findKey(String prefix, int startingIndex, ServerWrapper host);
+
+ /**
+ * Equivalent to 'findKey(prefix, startingIndex, THIS-SERVER)'.
+ *
+ * @param prefix the first portion of the key
+ * @param startingIndex the first index to try
+ * @return a key associated with 'host' ('null' if not found)
+ */
+ public abstract String findKey(String prefix, int startingIndex);
+
+ /**
+ * Equivalent to 'findKey(prefix, 1, THIS-SERVER)'.
+ *
+ * @param prefix the first portion of the key
+ * @return a key associated with 'host' ('null' if not found)
+ */
+ public abstract String findKey(String prefix);
+
+ /* ============================================================ */
+
+ /**
+ * This class is basically a 'URLClassLoader', but with a 'toString()'
+ * method that indicates the host and adapter number.
+ */
+ public static class AdapterClassLoader extends URLClassLoader {
+ private int index;
+
+ public AdapterClassLoader(int index, URL[] urls, ClassLoader parent) {
+ super(urls, parent);
+ this.index = index;
+ }
+
+ @Override
+ public String toString() {
+ return "AdapterClassLoader(" + index + ")";
+ }
+ }
+}
diff --git a/feature-server-pool/src/test/java/org/onap/policy/drools/serverpooltest/BlockingClassLoader.java b/feature-server-pool/src/test/java/org/onap/policy/drools/serverpooltest/BlockingClassLoader.java
new file mode 100644
index 00000000..7930bf27
--- /dev/null
+++ b/feature-server-pool/src/test/java/org/onap/policy/drools/serverpooltest/BlockingClassLoader.java
@@ -0,0 +1,176 @@
+/*
+ * ============LICENSE_START=======================================================
+ * feature-server-pool
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.serverpooltest;
+
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.HashSet;
+import java.util.NoSuchElementException;
+
+/**
+ * Ordinarily, a 'ClassLoader' first attempts to load a class via the
+ * parent 'ClassLoader'. If that fails, it attempts to load it "locally"
+ * by whatever mechanism the class loader supports.
+ * This 'ClassLoader' instance blocks attempts to load specific classes,
+ * throwing a 'ClassNotFoundException'. This doesn't seem useful on the
+ * surface, but it forces all child 'ClassLoader' instances to do the lookup
+ * themselves. In addition, each child 'ClassLoader' will have their own
+ * copy of the classes they load, providing a way to have multiple copies of
+ * the same class running within the same JVM. Each child 'ClassLoader' can
+ * be viewed as having a separate name space.
+ */
+public class BlockingClassLoader extends ClassLoader {
+ // these are the set of packages to block
+ private HashSet<String> packages;
+
+ // these are the prefixes of class names to block
+ private ArrayList<String> prefixes;
+
+ // these specific classes will not be blocked, even if they are in one
+ // of the packages indicated by 'packages'
+ private HashSet<String> excludes = new HashSet<String>();
+
+ // these are the prefixes of class names to exclude
+ private ArrayList<String> excludePrefixes = new ArrayList<>();
+
+ /**
+ * Constructor -- initialize the 'ClassLoader' and 'packages' variable.
+ *
+ * @param parent the parent ClassLoader
+ * @param packages variable number of packages to block
+ */
+ public BlockingClassLoader(ClassLoader parent, String... packages) {
+ super(parent);
+ this.packages = new HashSet<>();
+ this.prefixes = new ArrayList<>();
+ for (String pkg : packages) {
+ if (pkg.endsWith("*")) {
+ prefixes.add(pkg.substring(0, pkg.length() - 1));
+ } else {
+ this.packages.add(pkg);
+ }
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ protected Class<?> findClass(String name) throws ClassNotFoundException {
+ // throws a 'ClassNotFoundException' if we are blocking this one
+ testClass(name);
+
+ // not blocking this one -- pass it on to the superclass
+ return super.findClass(name);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Enumeration<URL> getResources(String name) {
+ // in order to avoid replicated resources, we return an empty set
+ return new Enumeration<URL>() {
+ public boolean hasMoreElements() {
+ return false;
+ }
+
+ public URL nextElement() {
+ throw new NoSuchElementException("'BlockingClassLoader' blocks duplicate resources");
+ }
+ };
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Class<?> loadClass(String name) throws ClassNotFoundException {
+ // throws a 'ClassNotFoundException' if we are blocking this one
+ testClass(name);
+
+ // not blocking this one -- pass it on to the superclass
+ return super.loadClass(name);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ protected Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException {
+ // throws a 'ClassNotFoundException' if we are blocking this one
+ testClass(name);
+
+ // not blocking this one -- pass it on to the superclass
+ return super.loadClass(name, resolve);
+ }
+
+ /**
+ * Add an entry to the list of classes that should NOT be blocked.
+ *
+ * @param name the full name of a class that shouldn't be blocked
+ */
+ public void addExclude(String name) {
+ if (name.endsWith("*")) {
+ excludePrefixes.add(name.substring(0, name.length() - 1));
+ } else {
+ excludes.add(name);
+ }
+ }
+
+ /**
+ * This method looks at a class name -- if it should be blocked, a
+ * 'ClassNotFoundException' is thrown. Otherwise, it does nothing.
+ *
+ * @param name the name of the class to be tested
+ * @throws ClassNotFoundException if this class should be blocked
+ */
+ private void testClass(String name) throws ClassNotFoundException {
+ if (excludes.contains(name)) {
+ // allow this one
+ return;
+ }
+
+ for (String prefix : excludePrefixes) {
+ if (name.startsWith(prefix)) {
+ // allow this one
+ return;
+ }
+ }
+
+ // extract the package from the class name -- throw a
+ // 'ClassNotFoundException' if the package is in the list
+ // being blocked
+ int index = name.lastIndexOf('.');
+ if (index >= 0) {
+ if (packages.contains(name.substring(0,index))) {
+ throw(new ClassNotFoundException(name));
+ }
+
+ for (String prefix : prefixes) {
+ if (name.startsWith(prefix)) {
+ throw(new ClassNotFoundException(name));
+ }
+ }
+ }
+ }
+}
diff --git a/feature-server-pool/src/test/java/org/onap/policy/drools/serverpooltest/BucketWrapper.java b/feature-server-pool/src/test/java/org/onap/policy/drools/serverpooltest/BucketWrapper.java
new file mode 100644
index 00000000..2628513c
--- /dev/null
+++ b/feature-server-pool/src/test/java/org/onap/policy/drools/serverpooltest/BucketWrapper.java
@@ -0,0 +1,132 @@
+/*
+ * ============LICENSE_START=======================================================
+ * feature-server-pool
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.serverpooltest;
+
+import java.io.PrintStream;
+
+/**
+ * This class provides base classes for accessing the various 'Bucket'
+ * classes. There is a separate copy of the 'Bucket' class for each
+ * adapter, and this wrapper was created to give them a common interface.
+ */
+public interface BucketWrapper {
+ /**
+ * This calls the 'Bucket.getBucketNumber()' method
+ *
+ * @return the bucket number
+ */
+ public int getBucketNumber();
+
+ /**
+ * This calls the 'Bucket.getOwner()' method
+ *
+ * @return a 'ServerWrapper' instance that corresponds to the owner
+ * of the bucket ('null' if unassigned)
+ */
+ public ServerWrapper getOwner();
+
+ /**
+ * This calls the 'Bucket.getPrimaryBackup()' method
+ *
+ * @return a 'ServerWrapper' instance that corresponds to the primary backup
+ * host for the bucket ('null' if unassigned)
+ */
+ public ServerWrapper getPrimaryBackup();
+
+ /**
+ * This calls the 'Bucket.getPrimaryBackup()' method
+ *
+ * @return a 'ServerWrapper' instance that corresponds to the secondary
+ * backup host for the bucket ('null' if unassigned)
+ */
+ public ServerWrapper getSecondaryBackup();
+
+ /* ============================================================ */
+
+ /**
+ * This class provides access to the static 'Bucket' methods. There are
+ * multiple 'Bucket' classes (one for each 'Adapter'), and each has
+ * a corresponding 'BucketWrapper.Static' instance. In other words, there
+ * is one 'Bucket.Static' instance for each simulated host.
+ */
+ public interface Static {
+ /**
+ * This returns the value of 'Bucket.BUCKETCOUNT'
+ *
+ * @return the number of Bucket instances in the bucket table
+ */
+ public int getBucketCount();
+
+ /**
+ * This calls the static 'Bucket.bucketNumber(String)' method
+ *
+ * @param value the keyword to be converted
+ * @return the bucket number
+ */
+ public int bucketNumber(String value);
+
+ /**
+ * This calls the static 'Bucket.bucketToServer(int)' method
+ *
+ * @param bucketNumber a bucket number in the range 0-1023
+ * @return a 'ServerWrapper' for the server that currently handles the
+ * bucket, or 'null' if none is currently assigned
+ */
+ public ServerWrapper bucketToServer(int bucketNumber);
+
+ /**
+ * This calls the static 'Bucket.getBucket(int)' method
+ *
+ * @param bucketNumber a bucket number in the range 0-1023
+ * @return A 'BucketWrapper' for the Bucket associated with
+ * this bucket number
+ */
+ public BucketWrapper getBucket(int bucketNumber);
+
+ /**
+ * This calls the static 'Bucket.isKeyOnThisServer(String)' method
+ *
+ * @param key the keyword to be hashed
+ * @return 'true' if the associated bucket is assigned to this server,
+ * 'false' if not
+ */
+ public boolean isKeyOnThisServer(String key);
+
+ /**
+ * This calls the static 'Bucket.moveBucket(PrintStream, int, String)'
+ * method (the one associated with the '/cmd/moveBucket' REST call).
+ *
+ * @param out the 'PrintStream' to use for displaying information
+ * @param bucketNumber the bucket number to be moved
+ * @param newHostUuid the UUID of the destination host (if 'null', a
+ * destination host will be chosen at random)
+ */
+ public void moveBucket(PrintStream out, int bucketNumber, String newHostUuid);
+
+ /**
+ * This calls the static 'Bucket.dumpAdjuncts(PrintStream)' method
+ * (the one associated with the '/cmd/dumpBucketAdjuncts' REST call).
+ *
+ * @param out the 'PrintStream' to use for displaying information
+ */
+ public void dumpAdjuncts(PrintStream out);
+ }
+}
diff --git a/feature-server-pool/src/test/java/org/onap/policy/drools/serverpooltest/ServerWrapper.java b/feature-server-pool/src/test/java/org/onap/policy/drools/serverpooltest/ServerWrapper.java
new file mode 100644
index 00000000..e31a6817
--- /dev/null
+++ b/feature-server-pool/src/test/java/org/onap/policy/drools/serverpooltest/ServerWrapper.java
@@ -0,0 +1,103 @@
+/*
+ * ============LICENSE_START=======================================================
+ * feature-server-pool
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.serverpooltest;
+
+import java.util.Collection;
+import java.util.UUID;
+
+/**
+ * This class provides base classes for accessing the various 'Server'
+ * classes. There is a separate copy of the 'Server' class for each
+ * adapter, and this wrapper was created to give them a common interface.
+ */
+public interface ServerWrapper {
+ /**
+ * This calls the 'Server.toString()' method
+ *
+ * @return a string of the form 'Server[UUID]'
+ */
+ public String toString();
+
+ /**
+ * This calls the 'Server.getUuid()' method
+ *
+ * @return the UUID associated with this Server
+ */
+ public UUID getUuid();
+
+ /**
+ * This calls the 'Server.isActive()' method
+ *
+ * @return 'true' if the this server is active, and 'false' if not
+ */
+ public boolean isActive();
+
+ /* ============================================================ */
+
+ /**
+ * This class provides access to the static 'Server' methods. There are
+ * multiple 'Server' classes (one for each 'Adapter'), and each has
+ * a corresponding 'ServerWrapper.Static' instance. In other words, there
+ * is one 'Server.Static' instance for each simulated host.
+ */
+ public interface Static {
+ /**
+ * This calls the static 'Server.getThisServer()' method
+ *
+ * @return a 'ServerWrapper' instance that corresponds to the Server
+ * instance associated with this simulated host
+ */
+ public ServerWrapper getThisServer();
+
+ /**
+ * This calls the static 'Server.getFirstServer()' method
+ *
+ * @return a 'ServerWrapper' instance that corresponds to the first
+ * 'Server' instance in the 'servers' list (the one with the
+ * lowest UUID)
+ */
+ public ServerWrapper getFirstServer();
+
+ /**
+ * This calls the static 'Server.getServer(UUID)' method
+ *
+ * @param uuid the key to the lookup
+ * @return a 'ServerWrapper' instance that corresponds to the associated
+ * 'Server' instance ('null' if none)
+ */
+ public ServerWrapper getServer(UUID uuid);
+
+ /**
+ * This calls the static 'Server.getServerCount()' method
+ *
+ * @return a count of the number of servers
+ */
+ public int getServerCount();
+
+ /**
+ * This calls the static 'Server.getServers()' method
+ *
+ * @return the complete list of servers, each with a 'ServerWrapper'
+ * referring to the 'Server'
+ */
+ public Collection<ServerWrapper> getServers();
+ }
+}
diff --git a/feature-server-pool/src/test/java/org/onap/policy/drools/serverpooltest/SimDmaap.java b/feature-server-pool/src/test/java/org/onap/policy/drools/serverpooltest/SimDmaap.java
new file mode 100644
index 00000000..74fef07f
--- /dev/null
+++ b/feature-server-pool/src/test/java/org/onap/policy/drools/serverpooltest/SimDmaap.java
@@ -0,0 +1,327 @@
+/*
+ * ============LICENSE_START=======================================================
+ * feature-server-pool
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.serverpooltest;
+
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import javax.ws.rs.Consumes;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.MediaType;
+
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.ServerConnector;
+import org.eclipse.jetty.servlet.ServletContextHandler;
+import org.eclipse.jetty.servlet.ServletHolder;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class simulates a UEB/DMAAP server.
+ */
+
+@Path("/")
+public class SimDmaap {
+ private static Logger logger = LoggerFactory.getLogger(SimDmaap.class);
+ public static final String HOSTNAME = "127.0.63.250";
+
+ // miscellaneous Jetty/Servlet parameters
+ private static ServletContextHandler context;
+ private static Server jettyServer;
+ private static ServerConnector connector;
+ private static ServletHolder holder;
+
+ /**
+ * Do whatever needs to be done to start the server. I don't know exactly
+ * what abstractions the various pieces provide, but the following code
+ * ties the pieces together, and starts up the server.
+ */
+ public static void start() {
+ try {
+ context = new ServletContextHandler(ServletContextHandler.SESSIONS);
+ context.setContextPath("/");
+
+ jettyServer = new Server();
+
+ connector = new ServerConnector(jettyServer);
+ connector.setName("simdmaap");
+ connector.setReuseAddress(true);
+ connector.setPort(3904);
+ connector.setHost("127.0.63.250");
+
+ jettyServer.addConnector(connector);
+ jettyServer.setHandler(context);
+
+ holder = context.addServlet(org.glassfish.jersey.servlet.ServletContainer.class.getName(), "/*");
+ holder.setInitParameter(
+ "jersey.config.server.provider.classnames",
+ "com.fasterxml.jackson.jaxrs.json.JacksonJaxbJsonProvider"
+ + "," + SimDmaap.class.getName());
+
+ jettyServer.start();
+ jettyServer.join();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * Cleanly shut down the server.
+ */
+ public static void stop() {
+ try {
+ if (jettyServer != null) {
+ jettyServer.stop();
+ jettyServer = null;
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ /* ============================================================ */
+
+ // maps topic name to 'Topic' instance
+ static Map<String,Topic> topicTable = new ConcurrentHashMap<>();
+
+ /**
+ * Each instance of this class corresponds to a DMAAP or UEB topic.
+ */
+ static class Topic {
+ // topic name
+ String topic;
+
+ // maps group name into group instance
+ Map<String,Group> groupTable = new ConcurrentHashMap<>();
+
+ /**
+ * Create or get a Topic.
+ *
+ * @param name the topic name
+ * @return the associated Topic instance
+ */
+ static Topic createOrGet(String name) {
+ // look up the topic name
+ Topic topicObj = topicTable.get(name);
+ if (topicObj == null) {
+ // no entry found -- the following will create one, without
+ // the need for explicit synchronization
+ topicTable.putIfAbsent(name, new Topic(name));
+ topicObj = topicTable.get(name);
+ }
+ return topicObj;
+ }
+
+ /**
+ * Constructor - initialize the 'topic' field.
+ *
+ * @param topic the topic name
+ */
+ private Topic(String topic) {
+ this.topic = topic;
+ }
+
+ /**
+ * Handle an incoming '/events/{topic}' POST REST message.
+ *
+ * @param the body of the REST message
+ * @return the appropriate JSON response
+ */
+ String post(String data) {
+ // start of message processing
+ long startTime = System.currentTimeMillis();
+
+ // current and ending indices to the 'data' field
+ int cur = 0;
+ int end = data.length();
+
+ // the number of messages retrieved so far
+ int messageCount = 0;
+
+ while (cur < end) {
+ // The body of the message may consist of multiple JSON messages,
+ // each preceded by 3 integers separated by '.'. The second one
+ // is the length, in bytes (the third seems to be some kind of
+ // channel identifier).
+
+ int leftBrace = data.indexOf('{', cur);
+ if (leftBrace < 0) {
+ // no more messages
+ break;
+ }
+ String[] prefix = data.substring(cur,leftBrace).split("\\.");
+ if (prefix.length == 3) {
+ try {
+ // determine length of message, and advance current position
+ int length = Integer.valueOf(prefix[1]);
+ cur = leftBrace + length;
+
+ // extract message, and update count -- each double quote
+ // has a '\' character placed before it, so the overall
+ // message can be placed in double quotes, and parsed as
+ // a literal string
+ String message = data.substring(leftBrace, cur)
+ .replace("\\", "\\\\").replace("\"", "\\\"")
+ .replace("\n", "\\n");
+ messageCount += 1;
+
+ // send to all listening groups
+ for (Group group : groupTable.values()) {
+ group.messages.add(message);
+ }
+ } catch (Exception e) {
+ logger.error("{}: {}", prefix[1], e);
+ break;
+ }
+ } else if (cur == 0) {
+ // there is only a single message -- extract it, and update count
+ String message = data.substring(leftBrace, end)
+ .replace("\\", "\\\\").replace("\"", "\\\"")
+ .replace("\n", "\\n");
+ messageCount += 1;
+
+ // send to all listening grops
+ for (Group group : groupTable.values()) {
+ group.messages.add(message);
+ }
+ break;
+ } else {
+ // don't know what this is -- toss it
+ break;
+ }
+ }
+
+ // generate response message
+ long elapsedTime = System.currentTimeMillis() - startTime;
+ return "{\n"
+ + " \"count\": " + messageCount + ",\n"
+ + " \"serverTimeMs\": " + elapsedTime + "\n"
+ + "}";
+ }
+
+ /**
+ * read one or more incoming messages.
+ *
+ * @param group the 'consumerGroup' value
+ * @param timeout how long to wait for a message, in milliseconds
+ * @param limit the maximum number of messages to receive
+ * @return a JSON array, containing somewhere between 0 and 'limit' messages
+ */
+ String get(String group, long timeout, int limit) throws InterruptedException {
+ // look up the group -- create one if it doesn't exist
+ Group groupObj = groupTable.get(group);
+ if (groupObj == null) {
+ // no entry found -- the following will create one, without
+ // the need for explicit synchronization
+ groupTable.putIfAbsent(group, new Group());
+ groupObj = groupTable.get(group);
+ }
+
+ // pass it on to the 'Group' instance
+ return groupObj.get(timeout, limit);
+ }
+ }
+
+ /* ============================================================ */
+
+ /**
+ * Each instance of this class corresponds to a Consumer Group.
+ */
+ static class Group {
+ // messages queued for this group
+ private BlockingQueue<String> messages = new LinkedBlockingQueue<>();
+
+ /**
+ * Retrieve messages sent to this group.
+ *
+ * @param timeout how long to wait for a message, in milliseconds
+ * @param limit the maximum number of messages to receive
+ * @return a JSON array, containing somewhere between 0 and 'limit' messages
+ */
+ String get(long timeout, int limit) throws InterruptedException {
+ String message = messages.poll(timeout, TimeUnit.MILLISECONDS);
+ if (message == null) {
+ // timed out without messages
+ return "[]";
+ }
+
+ // use 'StringBuilder' to assemble the response -- add the first message
+ StringBuilder builder = new StringBuilder();
+ builder.append("[\"").append(message);
+
+ // add up to '<limit>-1' more messages
+ for (int i = 1 ; i < limit ; i += 1) {
+ // fetch the next message -- don't wait if it isn't currently there
+ message = messages.poll();
+ if (message == null) {
+ // no more currently available
+ break;
+ }
+ builder.append("\",\"").append(message);
+ }
+ builder.append("\"]");
+ return builder.toString();
+ }
+ }
+
+ /* ============================================================ */
+
+ /**
+ * Incoming messages from the caller to the simulator.
+ */
+ @POST
+ @Path("/events/{topic}")
+ @Consumes("application/cambria")
+ @Produces(MediaType.APPLICATION_JSON)
+ public String send(@PathParam("topic") String topic,
+ String data) {
+ logger.info("Send: topic={}", topic);
+ return Topic.createOrGet(topic).post(data);
+ }
+
+ /**
+ * Send messages from the simulator to the caller.
+ */
+ @GET
+ @Path("/events/{topic}/{group}/{id}")
+ @Consumes(MediaType.TEXT_PLAIN)
+ @Produces(MediaType.APPLICATION_JSON)
+ public String receive(@PathParam("topic") String topic,
+ @PathParam("group") String group,
+ @PathParam("id") String id,
+ @QueryParam("timeout") long timeout,
+ @QueryParam("limit") int limit)
+ throws InterruptedException {
+
+ logger.info("Receive: topic={}, group={}, id={}, timeout={}, limit={}",
+ topic, group, id, timeout, limit);
+ return Topic.createOrGet(topic).get(group, timeout, limit);
+ }
+}
diff --git a/feature-server-pool/src/test/java/org/onap/policy/drools/serverpooltest/TargetLockWrapper.java b/feature-server-pool/src/test/java/org/onap/policy/drools/serverpooltest/TargetLockWrapper.java
new file mode 100644
index 00000000..ce9f39e0
--- /dev/null
+++ b/feature-server-pool/src/test/java/org/onap/policy/drools/serverpooltest/TargetLockWrapper.java
@@ -0,0 +1,98 @@
+/*
+ * ============LICENSE_START=======================================================
+ * feature-server-pool
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.serverpooltest;
+
+import java.io.Serializable;
+
+/**
+ * This class provides base classes for accessing the various 'TargetLock'
+ * classes. There is a separate copy of the 'TargetLock' class for each
+ * adapter, and this wrapper was created to give them a common interface.
+ */
+public interface TargetLockWrapper extends Serializable {
+ /**
+ * There is a separate copy of 'TargetLock.State' for each adapter --
+ * The 'TargetLockWrapper.getState()' maps these into a common
+ * 'TargetLockWrapper.State' enumeration.
+ */
+ public enum State {
+ WAITING, ACTIVE, FREE, LOST
+ }
+
+ /**
+ * This calls the 'TargetLock.free()' method
+ *
+ * @return 'true' if successful, 'false' if it was not locked, or there
+ * appears to be corruption in 'LocalLocks' tables
+ */
+ public boolean free();
+
+ /**
+ * This calls the 'TargetLock.isActive()' method
+ *
+ * @return 'true' if the lock is in the ACTIVE state, and 'false' if not
+ */
+ public boolean isActive();
+
+ /**
+ * This calls the 'TargetLock.getState()' method
+ *
+ * @return the current state of the lock, as a 'TargetLockWrapper.State'
+ */
+ public State getState();
+
+ /**
+ * This calls the 'TargetLock.getOwnerKey()' method
+ *
+ * @return the owner key field
+ */
+ public String getOwnerKey();
+
+ /**
+ * Return the value returned by 'TargetLock.toString()'.
+ *
+ * @return the value returned by 'TargetLock.toString()'
+ */
+ public String toString();
+
+ /* ============================================================ */
+
+ /**
+ * This interface mimics the 'LockCallback' interface, with the
+ * exception that 'TargetLockWrapper' is used as the arguments to the
+ * callback methods.
+ */
+ public static interface Owner {
+ /**
+ * Callback indicates the lock was successful.
+ *
+ * @param lock the 'TargetLockWrapper' instance
+ */
+ public void lockAvailable(TargetLockWrapper lock);
+
+ /**
+ * Callback indicates the lock request has failed.
+ *
+ * @param lock the 'TargetLockWrapper' instance
+ */
+ public void lockUnavailable(TargetLockWrapper lock);
+ }
+}
diff --git a/feature-server-pool/src/test/java/org/onap/policy/drools/serverpooltest/Test1.java b/feature-server-pool/src/test/java/org/onap/policy/drools/serverpooltest/Test1.java
new file mode 100644
index 00000000..2178fec1
--- /dev/null
+++ b/feature-server-pool/src/test/java/org/onap/policy/drools/serverpooltest/Test1.java
@@ -0,0 +1,912 @@
+/*
+ * ============LICENSE_START=======================================================
+ * feature-server-pool
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.serverpooltest;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.catchThrowable;
+import static org.awaitility.Awaitility.await;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.io.Serializable;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.UUID;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.awaitility.Durations;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.kie.api.runtime.KieSession;
+import org.onap.policy.drools.core.DroolsRunnable;
+import org.onap.policy.drools.serverpool.BucketWrapperImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Test1 {
+ private static Logger logger = LoggerFactory.getLogger(Test1.class);
+
+ // indicates that Drools containers need to be initialized
+ private static boolean needControllerInit = true;
+
+ private static int initialCount = 0;
+
+ private static int threadList(String header, boolean stackTrace) {
+ logger.info("***** threadList: {} *****", header);
+ Thread[] thr = new Thread[1000];
+ int count = Thread.enumerate(thr);
+
+ if (count > thr.length) {
+ count = thr.length;
+ }
+ for (int i = 0 ; i < count ; i += 1) {
+ StringBuilder sb = new StringBuilder();
+ sb.append(" ").append(thr[i]);
+ if (stackTrace) {
+ for (StackTraceElement ste : thr[i].getStackTrace()) {
+ sb.append("\n ").append(ste);
+ }
+ }
+ logger.info(sb.toString());
+ }
+ logger.info("***** end threadList: {}, count = {} *****", header, count);
+ return count;
+ }
+
+ /**
+ * Set up environment prior to running tests.
+ */
+ @BeforeClass
+ public static void init() throws Exception {
+ initialCount = threadList("BeforeClass", false);
+
+ // create 6 adapters, corresponding to 6 'Server' instances
+ Adapter.ensureInit();
+
+ // make sure initialization has completed
+ long endTime = System.currentTimeMillis() + 60000L;
+ for (Adapter adapter : Adapter.adapters) {
+ assertTrue(adapter.toString() + ": Bucket assignments incomplete",
+ adapter.waitForInit(endTime));
+ }
+ }
+
+ public static boolean verifyComplete() {
+ return Thread.enumerate(new Thread[initialCount + 1]) == initialCount;
+ }
+
+ /**
+ * Clean up after tests have finished.
+ */
+ @AfterClass
+ public static void finish() throws InterruptedException {
+ threadList("AfterClass", false);
+ if (needControllerInit) {
+ return;
+ }
+ // shut down Server Pools and DMAAP Simulator
+ Adapter.ensureShutdown();
+
+ // updates for persistence may still be in progress -- wait 5 seconds
+ threadList("AfterEnsureShutdown", false);
+
+ try {
+ initialCount = initialCount + 1; // one for await thread
+ await().atMost(Durations.ONE_MINUTE)
+ .with().pollInterval(Durations.ONE_SECOND)
+ .until(() -> verifyComplete());
+ } finally {
+ threadList("AfterSleep", true);
+ }
+
+ // look at KieSession objects
+ for (Adapter adapter : Adapter.adapters) {
+ StringBuilder sb = new StringBuilder();
+ sb.append(adapter.toString())
+ .append(": ")
+ .append(adapter.getKieSession().getObjects().size())
+ .append(" objects");
+ for (Object o : adapter.getKieSession().getObjects()) {
+ sb.append("\n ").append(o);
+ }
+ LinkedBlockingQueue<String> lbq = adapter.notificationQueue();
+ if (!lbq.isEmpty()) {
+ sb.append("\n")
+ .append(adapter.toString())
+ .append(": ")
+ .append(lbq.size())
+ .append(" queued entries");
+ for (String string : lbq) {
+ sb.append("\n ").append(string);
+ }
+ }
+ logger.info(sb.toString());
+ }
+
+ // this was used during test debugging to verify that no adjuncts
+ // were created on the 'null' host -- there shouldn't be any
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ PrintStream out = new PrintStream(bos, true);
+ new BucketWrapperImpl.Static().dumpAdjuncts(out);
+ logger.info(out.toString());
+ }
+
+ /**
+ * Initialize all Drools controllers, if needed.
+ */
+ static void ensureControllersInitialized() {
+ if (needControllerInit) {
+ needControllerInit = false;
+ for (Adapter adapter : Adapter.adapters) {
+ String rval = adapter.createController();
+ logger.info("{}: Got the following from PolicyController:\n{}",
+ adapter, rval);
+ }
+ }
+ }
+
+ /**
+ * make sure all servers have agreed on a lead server.
+ */
+ @Test
+ public void checkLeadServer() {
+ Adapter firstAdapter = Adapter.adapters[0];
+ UUID leaderUuid = firstAdapter.getLeader().getUuid();
+ for (Adapter adapter : Adapter.adapters) {
+ UUID uuid = adapter.getLeader().getUuid();
+ assertTrue(adapter.toString() + " has UUID " + uuid
+ + " (expected UUID " + leaderUuid + ")",
+ uuid.equals(leaderUuid));
+ }
+ }
+
+ /**
+ * make sure all servers agree on bucket distribution.
+ */
+ @Test
+ public void startup() throws Exception {
+ Adapter firstAdapter = Adapter.adapters[0];
+ BucketWrapper.Static firstBucketStatic = firstAdapter.getBucketStatic();
+
+ for (Adapter adapter : Adapter.adapters) {
+ BucketWrapper.Static bucketStatic = adapter.getBucketStatic();
+ if (adapter == firstAdapter) {
+ // make sure an owner and primary backup have been chosen
+ // for each bucket (secondary backups aren't implemented yet)
+ for (int i = 0 ; i < bucketStatic.getBucketCount() ; i += 1) {
+ BucketWrapper bucket = bucketStatic.getBucket(i);
+ assertNotNull(bucket.getOwner());
+ assertNotNull(bucket.getPrimaryBackup());
+ }
+ } else {
+ // make sure the bucket assignments are consistent with
+ // the primary backup
+ for (int i = 0 ; i < bucketStatic.getBucketCount() ; i += 1) {
+ BucketWrapper firstBucket = firstBucketStatic.getBucket(i);
+ BucketWrapper bucket = bucketStatic.getBucket(i);
+ assertEquals(firstBucket.getOwner().getUuid(),
+ bucket.getOwner().getUuid());
+ assertEquals(firstBucket.getPrimaryBackup().getUuid(),
+ bucket.getPrimaryBackup().getUuid());
+ }
+ }
+ }
+ }
+
+ // test 'TargetLock'
+ @Test
+ public void testTargetLock() throws InterruptedException {
+ // test locks on different hosts
+ lockTests(Adapter.adapters[5], Adapter.adapters[0]);
+
+ // test locks on the same host
+ lockTests(Adapter.adapters[2], Adapter.adapters[2]);
+
+ Adapter adapter0 = Adapter.adapters[0];
+ Adapter adapter5 = Adapter.adapters[5];
+ String ownerKey = adapter0.findKey("owner");
+ String key = adapter5.findKey("key");
+ LockOwner owner = new LockOwner();
+
+ // some exceptions
+ Throwable thrown = catchThrowable(() -> {
+ adapter0.newTargetLock(null, ownerKey, owner);
+ });
+ assertThat(thrown).isInstanceOf(IllegalArgumentException.class)
+ .hasNoCause()
+ .hasMessageContaining("TargetLock: 'key' can't be null");
+
+ thrown = catchThrowable(() -> {
+ adapter0.newTargetLock(key, null, owner);
+ });
+ assertThat(thrown).isInstanceOf(IllegalArgumentException.class)
+ .hasNoCause()
+ .hasMessageContaining("TargetLock: 'ownerKey' can't be null");
+
+ thrown = catchThrowable(() -> {
+ adapter5.newTargetLock(key, ownerKey, owner);
+ });
+ assertThat(thrown).isInstanceOf(IllegalArgumentException.class)
+ .hasNoCause()
+ .hasMessageContaining("not currently assigned to this server");
+
+ thrown = catchThrowable(() -> {
+ adapter0.newTargetLock(key, ownerKey, null);
+ });
+ assertThat(thrown).isInstanceOf(IllegalArgumentException.class)
+ .hasNoCause()
+ .hasMessageContaining("TargetLock: 'owner' can't be null");
+ }
+
+ /**
+ * Run some 'TargetLock' tests.
+ *
+ * @param keyAdapter this is the adapter for the key, which determines
+ * where the server-side data will reside
+ * @param ownerAdapter this is the adapter associated with the requestor
+ */
+ void lockTests(Adapter keyAdapter, Adapter ownerAdapter) throws InterruptedException {
+ // choose 'key' and 'ownerKey' values that map to buckets owned
+ // by their respective adapters
+ String key = keyAdapter.findKey("key");
+ String ownerKey = ownerAdapter.findKey("owner");
+
+ // this receives and queues callback notifications
+ LockOwner owner = new LockOwner();
+
+ // first lock -- should succeed
+ TargetLockWrapper tl1 = ownerAdapter.newTargetLock(key, ownerKey, owner);
+ assertLockAvailable(owner, tl1);
+ //assertArrayEquals(new Object[] {"lockAvailable", tl1},
+ // owner.poll(5, TimeUnit.SECONDS));
+ assertNull(owner.poll());
+ assertTrue(tl1.isActive());
+ assertEquals(TargetLockWrapper.State.ACTIVE, tl1.getState());
+ assertEquals(ownerKey, tl1.getOwnerKey());
+
+ // second lock -- should fail (lock in use)
+ TargetLockWrapper tl2 =
+ ownerAdapter.newTargetLock(key, ownerKey, owner, false);
+ assertLockUnavailable(owner, tl2);
+
+ assertNull(owner.poll());
+ assertFalse(tl2.isActive());
+ assertEquals(TargetLockWrapper.State.FREE, tl2.getState());
+ assertEquals(ownerKey, tl2.getOwnerKey());
+
+ // third and fourth locks -- should wait
+ TargetLockWrapper tl3 = ownerAdapter.newTargetLock(key, ownerKey, owner);
+ TargetLockWrapper tl4 = ownerAdapter.newTargetLock(key, ownerKey, owner);
+ assertNull(owner.poll(5, TimeUnit.SECONDS)); // nothing immediately
+ assertFalse(tl3.isActive());
+ assertFalse(tl4.isActive());
+ assertEquals(TargetLockWrapper.State.WAITING, tl3.getState());
+ assertEquals(TargetLockWrapper.State.WAITING, tl4.getState());
+
+ // free third lock before ever getting a callback
+ assertTrue(tl3.free());
+ assertFalse(tl3.isActive());
+ assertEquals(TargetLockWrapper.State.FREE, tl3.getState());
+ assertFalse(tl3.free());
+
+ // free first lock
+ assertTrue(tl1.free());
+ assertFalse(tl1.isActive());
+ assertEquals(TargetLockWrapper.State.FREE, tl1.getState());
+ assertFalse(tl1.free()); // already free
+
+ // fourth lock should be active now (or soon)
+ assertLockAvailable(owner, tl4);
+ assertNull(owner.poll());
+ assertTrue(tl4.isActive());
+ assertEquals(TargetLockWrapper.State.ACTIVE, tl4.getState());
+
+ // free fourth lock
+ assertTrue(tl4.free());
+ assertFalse(tl4.isActive());
+ assertEquals(TargetLockWrapper.State.FREE, tl4.getState());
+ }
+
+ /**
+ * Test sending of intra-server and inter-server messages.
+ */
+ @Test
+ public void topicSendTests() throws InterruptedException {
+ ensureControllersInitialized();
+
+ // sender and receiver are the same
+ topicSendTest(Adapter.adapters[2], Adapter.adapters[2], false);
+
+ // sender and receiver are different
+ topicSendTest(Adapter.adapters[5], Adapter.adapters[4], false);
+ }
+
+ /**
+ * Send a message from 'sender' to 'receiver' -- the message is delivered
+ * as an incoming 'TopicListener' event, is processed by
+ * 'FeatureServerPool.beforeOffer' (PolicyControllerFeatureApi), which
+ * will route it based upon keyword. At the destination end, it should
+ * be converted to an instance of 'TestDroolsObject', and inserted into
+ * the Drools session. The associated Drools rule should fire, and the
+ * message is placed in the notification queue. The message is then
+ * retracted, unless the string 'SAVE' appears within the message, which
+ * is the case if the 'save' parameter is set.
+ *
+ * @param sender the adapter associated with the sending end
+ * @param receiver the adapter associated with the receiving end
+ * @param save if 'true' the message is not retracted, if 'false' it is
+ * retracted
+ */
+ String topicSendTest(Adapter sender, Adapter receiver, boolean save) throws InterruptedException {
+ // generate base message -- include 'SAVE' in the message if 'save' is set
+ String message = "From " + sender.toString() + " to "
+ + receiver.toString();
+ if (save) {
+ message += " (SAVE)";
+ }
+ message += ": " + UUID.randomUUID().toString() + ".";
+
+ // add a numeric suffix to the message, such that it will be routed
+ // to 'receiver'
+ message = receiver.findKey(message);
+
+ // send the message
+ sender.sendEvent(message);
+
+ // verify that it has been received
+ assertEquals(message,
+ receiver.notificationQueue().poll(60, TimeUnit.SECONDS));
+ return message;
+ }
+
+ /**
+ * Return the Adapter associated with the current lead server.
+ *
+ * @return the Adapter associated with the current lead server
+ * ('null' if there is no leader)
+ */
+ private Adapter getLeader() {
+ for (Adapter adapter : Adapter.adapters) {
+ if (adapter.getLeader() == adapter.getServerStatic().getThisServer()) {
+ // we have located the leader
+ return adapter;
+ }
+ }
+ throw new AssertionError();
+ }
+
+ /**
+ * Test migration of sessions from one server to another.
+ */
+ @Test
+ public void sessionMigrationTest() throws InterruptedException {
+ ensureControllersInitialized();
+
+ // select adapters for roles
+ Adapter sender = Adapter.adapters[1];
+ Adapter receiver = Adapter.adapters[3];
+ Adapter newReceiver = Adapter.adapters[5];
+
+ // determine current leader
+ Adapter leader = getLeader();
+
+ // send message from 'sender' to 'receiver', and save it
+ String message = topicSendTest(sender, receiver, true);
+
+ // verify where the bucket is and is not
+ assertTrue(receiver.getBucketStatic().isKeyOnThisServer(message));
+ assertFalse(newReceiver.getBucketStatic().isKeyOnThisServer(message));
+
+ // move to the new host
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ PrintStream out = new PrintStream(bos, true);
+ leader.getBucketStatic().moveBucket(
+ out, receiver.getBucketStatic().bucketNumber(message),
+ newReceiver.getServerStatic().getThisServer().getUuid().toString());
+ logger.info(bos.toString());
+
+ // poll up to 10 seconds for the bucket to be updated
+ TestDroolsObject matchingObject = new TestDroolsObject(message);
+ await().atMost(50000L, TimeUnit.MILLISECONDS)
+ .until(() -> (new ArrayList<Object>(newReceiver.getKieSession().getObjects())
+ .contains(matchingObject)));
+
+ // verify where the bucket is and is not
+ assertFalse(receiver.getBucketStatic().isKeyOnThisServer(message));
+ assertTrue(newReceiver.getBucketStatic().isKeyOnThisServer(message));
+ }
+
+ /**
+ * Test migration of locks from one server to another.
+ */
+ @Test
+ public void lockMigrationTest() throws InterruptedException {
+ ensureControllersInitialized();
+
+ // select adapters for roles -- '*Server' refers to the 'key' end,
+ // and '*Client' refers to the 'ownerKey' end
+ final Adapter oldServer = Adapter.adapters[0];
+ final Adapter newServer = Adapter.adapters[1];
+ final Adapter oldClient = Adapter.adapters[2];
+ final Adapter newClient = Adapter.adapters[3];
+
+ // determine the current leader
+ final Adapter leader = getLeader();
+
+ // choose 'key' and 'ownerKey' values associated with
+ // 'oldServer' and 'oldClient', respectively
+ String key = oldServer.findKey("key");
+ String ownerKey = oldClient.findKey("owner");
+ LockOwner owner = new LockOwner();
+
+ // allocate lock 1
+ TargetLockWrapper tl1 = oldClient.newTargetLock(key, ownerKey, owner);
+ assertLockAvailable(owner, tl1);
+
+ // allocate a lock 2, which should be in the 'WAITING' state
+ TargetLockWrapper tl2 = oldClient.newTargetLock(key, ownerKey, owner);
+ assertNull(owner.poll(5, TimeUnit.SECONDS)); // nothing immediately
+
+ // verify lock states
+ assertEquals(TargetLockWrapper.State.ACTIVE, tl1.getState());
+ assertEquals(TargetLockWrapper.State.WAITING, tl2.getState());
+
+ // verify key buckets (before)
+ assertTrue(oldServer.getBucketStatic().isKeyOnThisServer(key));
+ assertFalse(newServer.getBucketStatic().isKeyOnThisServer(key));
+
+ // move key buckets to new host (server side)
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ PrintStream out = new PrintStream(bos, true);
+ leader.getBucketStatic().moveBucket(
+ out, oldServer.getBucketStatic().bucketNumber(key),
+ newServer.getServerStatic().getThisServer().getUuid().toString());
+ logger.info(bos.toString());
+
+ logger.debug("lock migration test - before: new isKeyOnThisServer: {}, "
+ + "old isKeyOnThisServer: {}, time: {}",
+ newServer.getBucketStatic().isKeyOnThisServer(key),
+ oldServer.getBucketStatic().isKeyOnThisServer(key),
+ new SimpleDateFormat("yyyy-MM-dd kk:mm:ss").format(new Date()));
+
+ await().atMost(10000L, TimeUnit.MILLISECONDS).until(() ->
+ newServer.getBucketStatic().isKeyOnThisServer(key)
+ && oldServer.getBucketStatic().isKeyOnThisServer(key) == false);
+
+ logger.debug("lock migration test - after : new isKeyOnThisServer: {}, "
+ + "old isKeyOnThisServer: {}, time: {}",
+ newServer.getBucketStatic().isKeyOnThisServer(key),
+ oldServer.getBucketStatic().isKeyOnThisServer(key),
+ new SimpleDateFormat("yyyy-MM-dd kk:mm:ss").format(new Date()));
+
+ // verify key buckets (after)
+ assertFalse(oldServer.getBucketStatic().isKeyOnThisServer(key));
+ assertTrue(newServer.getBucketStatic().isKeyOnThisServer(key));
+
+ // we should be able to free lock1 now, and lock2 should go active,
+ // indicating that the server side is still working
+ assertTrue(tl1.free());
+
+ assertLockAvailable(owner, tl2);
+ assertEquals(TargetLockWrapper.State.ACTIVE, tl2.getState());
+
+ // create a third lock
+ TargetLockWrapper tl3 = oldClient.newTargetLock(key, ownerKey, owner);
+ assertNull(owner.poll(5, TimeUnit.SECONDS)); // nothing immediately
+ assertEquals(TargetLockWrapper.State.WAITING, tl3.getState());
+
+ // insert active objects in Drools session, which is about to be moved
+ // (if we don't do this, the client objects won't be relocated)
+ oldClient.getKieSession().insert(new KeywordWrapper(ownerKey, "lmt.owner", owner));
+ oldClient.getKieSession().insert(new KeywordWrapper(ownerKey, "lmt.tl2", tl2));
+ oldClient.getKieSession().insert(new KeywordWrapper(ownerKey, "lmt.tl3", tl3));
+
+ // dumping out some state information as part of debugging --
+ // I see no reason to remove it now
+ {
+ bos = new ByteArrayOutputStream();
+ out = new PrintStream(bos, true);
+ out.println("BEFORE: tl2=" + tl2 + "\ntl3=" + tl3);
+ oldClient.dumpLocks(out, true);
+ oldClient.getBucketStatic().dumpAdjuncts(out);
+ logger.debug(bos.toString());
+ }
+
+ // don't need these any more -- we will get them back on the new host
+ tl1 = tl2 = tl3 = null;
+ owner = null;
+
+ // verify ownerKey buckets (before)
+ assertTrue(oldClient.getBucketStatic().isKeyOnThisServer(ownerKey));
+ assertFalse(newClient.getBucketStatic().isKeyOnThisServer(ownerKey));
+
+ // move ownerKey buckets to new host (client side)
+ bos = new ByteArrayOutputStream();
+ out = new PrintStream(bos, true);
+ leader.getBucketStatic().moveBucket(
+ out, oldClient.getBucketStatic().bucketNumber(ownerKey),
+ newClient.getServerStatic().getThisServer().getUuid().toString());
+ logger.info(bos.toString());
+
+ logger.debug("lock migration test2 - before: new isKeyOnThisServer: {}, "
+ + "old isKeyOnThisServer: {}, time: {}",
+ newClient.getBucketStatic().isKeyOnThisServer(ownerKey),
+ oldClient.getBucketStatic().isKeyOnThisServer(ownerKey),
+ new SimpleDateFormat("yyyy-MM-dd kk:mm:ss").format(new Date()));
+
+ await().atMost(Durations.FIVE_SECONDS)
+ .with().pollInterval(Durations.ONE_HUNDRED_MILLISECONDS)
+ .until(() -> newClient.getBucketStatic().isKeyOnThisServer(ownerKey)
+ && oldClient.getBucketStatic().isKeyOnThisServer(ownerKey) == false);
+
+ logger.debug("lock migration test2 - before: new isKeyOnThisServer: {}, "
+ + "old isKeyOnThisServer: {}, time: {}",
+ newClient.getBucketStatic().isKeyOnThisServer(ownerKey),
+ oldClient.getBucketStatic().isKeyOnThisServer(ownerKey),
+ new SimpleDateFormat("yyyy-MM-dd kk:mm:ss").format(new Date()));
+
+ // verify ownerKey buckets (before)
+ assertFalse(oldClient.getBucketStatic().isKeyOnThisServer(ownerKey));
+ assertTrue(newClient.getBucketStatic().isKeyOnThisServer(ownerKey));
+
+ // now, we need to locate 'tl2', 'tl3', and 'owner' in Drools memory
+ await().atMost(Durations.FIVE_SECONDS)
+ .with().pollInterval(Durations.ONE_SECOND)
+ .until(() -> newClient.getKieSession() != null);
+ KieSession kieSession = newClient.getKieSession();
+ for (Object obj : new ArrayList<Object>(kieSession.getObjects())) {
+ if (obj instanceof KeywordWrapper) {
+ KeywordWrapper kw = (KeywordWrapper)obj;
+
+ if ("lmt.owner".equals(kw.id)) {
+ owner = kw.getObject(LockOwner.class);
+ } else if ("lmt.tl2".equals(kw.id)) {
+ tl2 = kw.getObject(TargetLockWrapper.class);
+ } else if ("lmt.tl3".equals(kw.id)) {
+ tl3 = kw.getObject(TargetLockWrapper.class);
+ }
+ kieSession.delete(kieSession.getFactHandle(obj));
+ }
+ }
+
+ // make sure we found everything
+ assertNotNull(tl2);
+ assertNotNull(tl3);
+ assertNotNull(owner);
+ assertFalse(newClient.isForeign(tl2, tl3, owner));
+
+ // verify the states of 'tl2' and 'tl3'
+ assertEquals(TargetLockWrapper.State.ACTIVE, tl2.getState());
+ assertEquals(TargetLockWrapper.State.WAITING, tl3.getState());
+
+ // dumping out some state information as part of debugging --
+ // I see no reason to remove it now
+ {
+ bos = new ByteArrayOutputStream();
+ out = new PrintStream(bos, true);
+ out.println("AFTER: tl2=" + tl2 + "\ntl3=" + tl3);
+ newClient.dumpLocks(out, true);
+ newClient.getBucketStatic().dumpAdjuncts(out);
+ logger.debug(bos.toString());
+ }
+
+ // now, we should be able to free 'tl2', and 'tl3' should go active
+ assertNull(owner.poll(5, TimeUnit.SECONDS)); // nothing immediately
+ assertTrue(tl2.free());
+
+ assertLockAvailable(owner, tl3);
+ assertEquals(TargetLockWrapper.State.ACTIVE, tl3.getState());
+ assertTrue(tl3.free());
+ }
+
+ private void assertLockAvailable(LockOwner owner, TargetLockWrapper tl) {
+ AtomicReference<Object[]> objArray = new AtomicReference<>();
+ await().atMost(300000, TimeUnit.MILLISECONDS).until(() -> {
+ objArray.set(owner.poll(5, TimeUnit.SECONDS));
+ return objArray.get() != null;
+ });
+ assertArrayEquals(new Object[] {"lockAvailable", tl}, objArray.get());
+ }
+
+ private void assertLockUnavailable(LockOwner owner, TargetLockWrapper tl) {
+ AtomicReference<Object[]> objArray = new AtomicReference<>();
+ await().atMost(300000, TimeUnit.MILLISECONDS).until(() -> {
+ objArray.set(owner.poll(5, TimeUnit.SECONDS));
+ return objArray.get() != null;
+ });
+ assertArrayEquals(new Object[] {"lockUnavailable", tl}, objArray.get());
+ }
+
+ /**
+ * Test cleanup of locks that have been abandoned.
+ */
+ @Test
+ public void abandonedLocks() throws InterruptedException {
+ // choose adapters
+ Adapter keyAdapter = Adapter.adapters[3];
+ Adapter ownerAdapter = Adapter.adapters[4];
+
+ // generate compatible keys
+ String key = keyAdapter.findKey("abandonedLocks.key");
+ String ownerKey = ownerAdapter.findKey("abandonedLocks.owner");
+
+ // receiver of callback notifications
+ LockOwner owner = new LockOwner();
+
+ // first lock -- should succeed
+ TargetLockWrapper tl1 = ownerAdapter.newTargetLock(key, ownerKey, owner);
+ //assertLockAvailable(owner, tl1);
+ assertArrayEquals(new Object[] {"lockAvailable", tl1},
+ owner.poll(5, TimeUnit.SECONDS));
+
+ // second lock -- should wait
+ final TargetLockWrapper tl2 = ownerAdapter.newTargetLock(key, ownerKey, owner);
+ assertNull(owner.poll(5, TimeUnit.SECONDS)); // nothing immediately
+
+ // abandon first lock, and do a GC cycle -- tl2 should go active
+ tl1 = null;
+ System.gc();
+ //assertLockAvailable(owner, tl2);
+ assertArrayEquals(new Object[] {"lockAvailable", tl2},
+ owner.poll(5, TimeUnit.SECONDS));
+ assertTrue(tl2.isActive());
+ assertEquals(TargetLockWrapper.State.ACTIVE, tl2.getState());
+
+ // free tl2
+ assertTrue(tl2.free());
+ assertFalse(tl2.isActive());
+ assertEquals(TargetLockWrapper.State.FREE, tl2.getState());
+ }
+
+ /**
+ * Test locks within Drools sessions.
+ */
+ @Test
+ public void locksWithinDrools() throws InterruptedException {
+ ensureControllersInitialized();
+
+ // choose adapters
+ Adapter keyAdapter = Adapter.adapters[3];
+ Adapter ownerAdapter = Adapter.adapters[4];
+
+ // generate compatible keys
+ final String key = keyAdapter.findKey("locksWithinDrools.key");
+ final String ownerKey = ownerAdapter.findKey("locksWithinDrools.owner");
+
+ // need a 'LockOwner' variant
+ final LockOwner owner = new LockOwner() {
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void lockAvailable(TargetLockWrapper lock) {
+ // insert notification in 'LinkedBlockingQueue'
+ add(new Object[] {"lockAvailable", lock, Thread.currentThread()});
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void lockUnavailable(TargetLockWrapper lock) {
+ // insert notification in 'LinkedBlockingQueue'
+ add(new Object[] {"lockUnavailable", lock, Thread.currentThread()});
+ }
+ };
+
+ // generate first lock outside of Drools
+ final TargetLockWrapper tl1 = ownerAdapter.newTargetLock(key, ownerKey, owner);
+ Object[] response = owner.poll(5, TimeUnit.SECONDS);
+ assertNotNull(response);
+ assertEquals(3, response.length);
+ assertEquals("lockAvailable", response[0]);
+ assertEquals(tl1, response[1]);
+
+ // now, generate one from within Drools
+ ownerAdapter.getKieSession().insert(new DroolsRunnable() {
+ @Override
+ public void run() {
+ // create lock, which should block
+ TargetLockWrapper tl2 =
+ ownerAdapter.newTargetLock(key, ownerKey, owner);
+ owner.add(new Object[] {"tl2Data", tl2, Thread.currentThread()});
+ }
+ });
+
+ // fetch data from Drools thread
+ response = owner.poll(5, TimeUnit.SECONDS);
+ assertNotNull(response);
+ assertEquals(3, response.length);
+ assertEquals("tl2Data", response[0]);
+
+ TargetLockWrapper tl2 = null;
+ Thread droolsThread = null;
+
+ if (response[1] instanceof TargetLockWrapper) {
+ tl2 = (TargetLockWrapper) response[1];
+ }
+ if (response[2] instanceof Thread) {
+ droolsThread = (Thread) response[2];
+ }
+
+ assertNotNull(tl2);
+ assertNotNull(droolsThread);
+
+ // tl2 should still be waiting
+ assertNull(owner.poll(5, TimeUnit.SECONDS));
+ assertFalse(tl2.isActive());
+ assertEquals(TargetLockWrapper.State.WAITING, tl2.getState());
+
+ // free tl1
+ assertTrue(tl1.free());
+
+ // verify that 'tl2' is now available,
+ // and the call back ran in the Drools thread
+ assertArrayEquals(new Object[] {"lockAvailable", tl2, droolsThread},
+ owner.poll(5, TimeUnit.SECONDS));
+ assertTrue(tl2.isActive());
+ assertEquals(TargetLockWrapper.State.ACTIVE, tl2.getState());
+
+ // free tl2
+ assertTrue(tl2.free());
+ }
+
+ /**
+ * Test insertion of objects into Drools memory.
+ */
+ @Test
+ public void insertDrools() throws InterruptedException {
+ Adapter adapter1 = Adapter.adapters[1];
+ final Adapter adapter2 = Adapter.adapters[2];
+
+ // check whether we can insert objects locally (adapter1 -> adapter1)
+ String key1 = adapter1.findKey("insertDrools1-");
+ adapter1.insertDrools(new KeywordWrapper(key1, "insertDroolsLocal", null));
+
+ await().atMost(Durations.TEN_SECONDS)
+ .with().pollInterval(Durations.ONE_SECOND)
+ .until(() -> adapter1.getKieSession() != null);
+
+ KieSession kieSession;
+ boolean found = false;
+ kieSession = adapter1.getKieSession();
+ for (Object obj : new ArrayList<Object>(kieSession.getObjects())) {
+ if (obj instanceof KeywordWrapper
+ && "insertDroolsLocal".equals(((KeywordWrapper) obj).id)) {
+ found = true;
+ kieSession.delete(kieSession.getFactHandle(obj));
+ break;
+ }
+ }
+ assertTrue(found);
+
+ // check whether we can insert objects remotely (adapter1 -> adapter2)
+ String key2 = adapter2.findKey("insertDrools2-");
+ adapter1.insertDrools(new KeywordWrapper(key2, "insertDroolsRemote", null));
+
+ // it would be nice to test for this, rather than sleep
+ await().atMost(Durations.FIVE_SECONDS)
+ .with().pollInterval(Durations.ONE_HUNDRED_MILLISECONDS)
+ .until(() -> adapter2.getKieSession() != null);
+
+ found = false;
+ kieSession = adapter2.getKieSession();
+ for (Object obj : new ArrayList<Object>(kieSession.getObjects())) {
+ if (obj instanceof KeywordWrapper
+ && "insertDroolsRemote".equals(((KeywordWrapper) obj).id)) {
+ found = true;
+ kieSession.delete(kieSession.getFactHandle(obj));
+ break;
+ }
+ }
+ assertTrue(found);
+ }
+
+ /* ============================================================ */
+
+ /**
+ * This class implements the 'LockCallback' interface, and
+ * makes callback responses available via a 'LinkedBlockingQueue'.
+ */
+ public static class LockOwner extends LinkedBlockingQueue<Object[]>
+ implements TargetLockWrapper.Owner, Serializable {
+ /**
+ * Constructor -- initialize the 'LinkedBlockingQueue'.
+ */
+ public LockOwner() {
+ super();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void lockAvailable(TargetLockWrapper lock) {
+ // insert notification in 'LinkedBlockingQueue'
+ add(new Object[] {"lockAvailable", lock});
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void lockUnavailable(TargetLockWrapper lock) {
+ // insert notification in 'LinkedBlockingQueue'
+ add(new Object[] {"lockUnavailable", lock});
+ }
+ }
+
+ /* ============================================================ */
+
+ /**
+ * This class is used to insert objects in Drools memory to support
+ * testing.
+ */
+ public static class KeywordWrapper implements Serializable {
+ // this is the keyword, which determines the associated bucket,
+ // which then determines when this object is migrated
+ public String key;
+
+ // this is an identifier, which can be used to select objects
+ // on the receiving end
+ public String id;
+
+ // this is the object being wrapped
+ public Serializable obj;
+
+ /**
+ * Constructor -- initialize fields.
+ *
+ * @param key keyword, which determines the associated bucket
+ * @param id string identifier, used to match objects from the sending
+ * to the receiving end
+ * @param obj the object being wrapped
+ */
+ public KeywordWrapper(String key, String id, Serializable obj) {
+ this.key = key;
+ this.id = id;
+ this.obj = obj;
+ }
+
+ /**
+ * This is used to extract objects on the receiving end. If the class
+ * matches, we get the expected object. If the class does not match,
+ * we get 'null', and the test should fail.
+ *
+ * @param clazz the expected class of the 'obj' field
+ * @return the object (if 'clazz' matches), 'null' if it does not
+ */
+ public <T> T getObject(Class<T> clazz) {
+ return clazz.isInstance(obj) ? clazz.cast(obj) : null;
+ }
+ }
+}
diff --git a/feature-server-pool/src/test/java/org/onap/policy/drools/serverpooltest/TestDroolsObject.java b/feature-server-pool/src/test/java/org/onap/policy/drools/serverpooltest/TestDroolsObject.java
new file mode 100644
index 00000000..43aa5de7
--- /dev/null
+++ b/feature-server-pool/src/test/java/org/onap/policy/drools/serverpooltest/TestDroolsObject.java
@@ -0,0 +1,58 @@
+/*
+ * ============LICENSE_START=======================================================
+ * feature-server-pool
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.serverpooltest;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+
+/**
+ * Instances of this class can be inserted into a Drools session, and used
+ * to test things like message routing and bucket migration.
+ */
+@Getter
+@Setter
+@ToString
+@EqualsAndHashCode
+public class TestDroolsObject implements Serializable {
+ // determines the bucket number
+ private String key;
+
+ /**
+ * Constructor - no key specified.
+ */
+ public TestDroolsObject() {
+ this.key = null;
+ }
+
+ /**
+ * Constructor - initialize the key.
+ *
+ * @param key key that is hashed to determine the bucket number
+ */
+ public TestDroolsObject(String key) {
+ this.key = key;
+ }
+}