From 001320ed1ecbdf3b2f58d261f008f681da3f4c67 Mon Sep 17 00:00:00 2001 From: "Straubs, Ralph (rs8887)" Date: Tue, 4 Feb 2020 03:26:30 -0600 Subject: Add feature-server-pool to the ONAP drools-pdp repository. Issue-ID: POLICY-2351 Change-Id: I8ddde547a73a51c04c8dd9f1d66232e8281599a9 Signed-off-by: Straubs, Ralph (rs8887) --- .../onap/policy/drools/serverpool/AdapterImpl.java | 456 +++++++++++ .../drools/serverpool/BucketWrapperImpl.java | 173 ++++ .../drools/serverpool/ServerWrapperImpl.java | 146 ++++ .../drools/serverpool/TargetLockWrapperImpl.java | 195 +++++ .../onap/policy/drools/serverpooltest/Adapter.java | 353 ++++++++ .../drools/serverpooltest/BlockingClassLoader.java | 176 ++++ .../drools/serverpooltest/BucketWrapper.java | 132 +++ .../drools/serverpooltest/ServerWrapper.java | 103 +++ .../policy/drools/serverpooltest/SimDmaap.java | 327 ++++++++ .../drools/serverpooltest/TargetLockWrapper.java | 98 +++ .../onap/policy/drools/serverpooltest/Test1.java | 912 +++++++++++++++++++++ .../drools/serverpooltest/TestDroolsObject.java | 58 ++ 12 files changed, 3129 insertions(+) create mode 100644 feature-server-pool/src/test/java/org/onap/policy/drools/serverpool/AdapterImpl.java create mode 100644 feature-server-pool/src/test/java/org/onap/policy/drools/serverpool/BucketWrapperImpl.java create mode 100644 feature-server-pool/src/test/java/org/onap/policy/drools/serverpool/ServerWrapperImpl.java create mode 100644 feature-server-pool/src/test/java/org/onap/policy/drools/serverpool/TargetLockWrapperImpl.java create mode 100644 feature-server-pool/src/test/java/org/onap/policy/drools/serverpooltest/Adapter.java create mode 100644 feature-server-pool/src/test/java/org/onap/policy/drools/serverpooltest/BlockingClassLoader.java create mode 100644 feature-server-pool/src/test/java/org/onap/policy/drools/serverpooltest/BucketWrapper.java create mode 100644 feature-server-pool/src/test/java/org/onap/policy/drools/serverpooltest/ServerWrapper.java create mode 100644 feature-server-pool/src/test/java/org/onap/policy/drools/serverpooltest/SimDmaap.java create mode 100644 feature-server-pool/src/test/java/org/onap/policy/drools/serverpooltest/TargetLockWrapper.java create mode 100644 feature-server-pool/src/test/java/org/onap/policy/drools/serverpooltest/Test1.java create mode 100644 feature-server-pool/src/test/java/org/onap/policy/drools/serverpooltest/TestDroolsObject.java (limited to 'feature-server-pool/src/test/java') diff --git a/feature-server-pool/src/test/java/org/onap/policy/drools/serverpool/AdapterImpl.java b/feature-server-pool/src/test/java/org/onap/policy/drools/serverpool/AdapterImpl.java new file mode 100644 index 00000000..bac13f18 --- /dev/null +++ b/feature-server-pool/src/test/java/org/onap/policy/drools/serverpool/AdapterImpl.java @@ -0,0 +1,456 @@ +/* + * ============LICENSE_START======================================================= + * feature-server-pool + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.serverpool; + +import static org.awaitility.Awaitility.await; + +import java.io.PrintStream; +import java.nio.file.Paths; +import java.util.Properties; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import org.kie.api.runtime.KieSession; + +import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; +import org.onap.policy.common.endpoints.event.comm.TopicListener; + +import org.onap.policy.drools.core.PolicyContainer; +import org.onap.policy.drools.core.PolicySession; +import org.onap.policy.drools.core.PolicySessionFeatureApiConstants; + +import org.onap.policy.drools.serverpooltest.Adapter; +import org.onap.policy.drools.serverpooltest.BucketWrapper; +import org.onap.policy.drools.serverpooltest.ServerWrapper; +import org.onap.policy.drools.serverpooltest.TargetLockWrapper; + +import org.onap.policy.drools.system.PolicyController; +import org.onap.policy.drools.system.PolicyEngineConstants; +import org.onap.policy.drools.util.KieUtils; +import org.onap.policy.drools.utils.PropertyUtil; +import org.powermock.reflect.Whitebox; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class implements the 'Adapter' interface. There is one 'AdapterImpl' + * class for each simulated host, and one instance of each 'AdapterImpl' class. + */ +public class AdapterImpl extends Adapter { + private static Logger logger = LoggerFactory.getLogger(AdapterImpl.class); + + // Each 'AdapterImpl' instance has it's own class object, making it a + // singleton. There is only a single 'Adapter' class object, and all + // 'AdapterImpl' classes are derived from it. + private static AdapterImpl adapter = null; + + // this is the adapter index + private int index; + + // this will refer to the Drools session 'PolicyController' instance + private PolicyController policyController = null; + + // this will refer to the Drools session 'PolicySession' instance + private PolicySession policySession = null; + + // used by Drools session to signal back to Junit tests + private LinkedBlockingQueue inotificationQueue = + new LinkedBlockingQueue<>(); + + // provides indirect references to a select set of static 'Server' methods + private static ServerWrapper.Static serverStatic = + new ServerWrapperImpl.Static(); + + // provides indirect references to a select set of static 'Bucket' methods + private static BucketWrapper.Static bucketStatic = + new BucketWrapperImpl.Static(); + + /** + * {@inheritDoc} + */ + @Override + public void init(int index) throws Exception { + adapter = this; + this.index = index; + + PolicyEngineConstants.getManager().configure(new Properties()); + PolicyEngineConstants.getManager().start(); + + // Note that this method does basically what + // 'FeatureServerPool.afterStart(PolicyEngine)' does, but allows us to + // specify different properties for each of the 6 simulated hosts + logger.info("{}: Running: AdapterImpl.init({}), class hash code = {}", + this, index, AdapterImpl.class.hashCode()); + + Properties prop = new Properties(); + prop.setProperty("server.pool.discovery.servers", "127.0.63.250"); + prop.setProperty("server.pool.discovery.topic", "DISCOVERY-TOPIC"); + prop.setProperty("server.pool.server.ipAddress", "127.0.63." + index); + prop.setProperty("server.pool.server.port", "20000"); + + prop.setProperty("keyword.path", "requestID,CommonHeader.RequestID,key"); + + prop.setProperty("keyword.org.onap.policy.m2.base.Transaction.lookup", + "getRequestID()"); + prop.setProperty("keyword.org.onap.policy.controlloop.ControlLoopEvent.lookup", "requestID"); + prop.setProperty("keyword.org.onap.policy.drools.serverpool.TargetLock.lookup", "getOwnerKey()"); + prop.setProperty("keyword.java.lang.String.lookup", "toString()"); + prop.setProperty("keyword.org.onap.policy.drools.serverpooltest.TestDroolsObject.lookup", + "getKey()"); + prop.setProperty("keyword.org.onap.policy.drools.serverpooltest.Test1$KeywordWrapper.lookup", "key"); + + TargetLock.startup(); + Server.startup(prop); + + // use reflection to set private static field + // 'FeatureServerPool.droolsTimeoutMillis' + Whitebox.setInternalState(FeatureServerPool.class, "droolsTimeoutMillis", + ServerPoolProperties.DEFAULT_BUCKET_DROOLS_TIMEOUT); + + // use reflection to set private static field + // 'FeatureServerPool.timeToLiveSecond' + Whitebox.setInternalState(FeatureServerPool.class, "timeToLiveSecond", + String.valueOf(ServerPoolProperties.DEFAULT_BUCKET_TIME_TO_LIVE)); + + // use reflection to call private static method + // 'FeatureServerPool.buildKeywordTable()' + Whitebox.invokeMethod(FeatureServerPool.class, "buildKeywordTable"); + + Bucket.Backup.register(new FeatureServerPool.DroolsSessionBackup()); + Bucket.Backup.register(new TargetLock.LockBackup()); + + // dump out feature lists + logger.info("{}: ServerPoolApi features list: {}", + this, ServerPoolApi.impl.getList()); + logger.info("{}: PolicySessionFeatureApi features list: {}", + this, PolicySessionFeatureApiConstants.getImpl().getList()); + } + + /** + * {@inheritDoc} + */ + @Override + public void shutdown() { + policyController.stop(); + Server.shutdown(); + + PolicyEngineConstants.getManager().stop(); + PolicyEngineConstants.getManager().getExecutorService().shutdown(); + } + + /** + * {@inheritDoc} + */ + @Override + public LinkedBlockingQueue notificationQueue() { + return inotificationQueue; + } + + /** + * {@inheritDoc} + */ + @Override + public boolean waitForInit(long endTime) throws InterruptedException { + try { + // wait until a leader is elected + await().atMost(endTime - System.currentTimeMillis(), + TimeUnit.MILLISECONDS).until(() -> Leader.getLeader() != null); + + // wait for each bucket to have an owner + for (int i = 0 ; i < Bucket.BUCKETCOUNT ; i += 1) { + Bucket bucket = Bucket.getBucket(i); + while (bucket.getOwner() == null) { + Thread.sleep(Math.min(endTime - System.currentTimeMillis(), 100L)); + } + //await().atMost(endTime - System.currentTimeMillis(), + //TimeUnit.MILLISECONDS).until(() -> bucket.getOwner() != null); + } + } catch (IllegalArgumentException e) { + // 'Thread.sleep()' was passed a negative time-out value -- + // time is up + logger.debug("AdapterImpl waitForInit error", e); + return false; + } + return true; + } + + /** + * {@inheritDoc} + */ + @Override + public ServerWrapper.Static getServerStatic() { + return serverStatic; + } + + /** + * {@inheritDoc} + */ + @Override + public ServerWrapper getLeader() { + return ServerWrapperImpl.getWrapper(Leader.getLeader()); + } + + /** + * {@inheritDoc} + */ + @Override + public BucketWrapper.Static getBucketStatic() { + return bucketStatic; + } + + /** + * {@inheritDoc} + */ + @Override + public TargetLockWrapper newTargetLock( + String key, String ownerKey, TargetLockWrapper.Owner owner, boolean waitForLock) { + + return TargetLockWrapperImpl.newTargetLock(key, ownerKey, owner, waitForLock); + } + + /** + * {@inheritDoc} + */ + @Override + public TargetLockWrapper newTargetLock(String key, String ownerKey, TargetLockWrapper.Owner owner) { + return TargetLockWrapperImpl.newTargetLock(key, ownerKey, owner); + } + + /** + * {@inheritDoc} + */ + @Override + public void dumpLocks(PrintStream out, boolean detail) { + try { + TargetLock.DumpLocks.dumpLocks(out, detail); + } catch (Exception e) { + logger.error("{}: Exception in 'dumpLocks'", this, e); + } + } + + /** + * {@inheritDoc} + */ + @Override + public String createController() { + Properties properties; + + // set the thread class loader to be the same as the one associated + // with the 'AdapterImpl' instance, so it will be inherited by any + // new threads created (the Drools session thread, in particular) + ClassLoader saveClassLoader = + Thread.currentThread().getContextClassLoader(); + Thread.currentThread().setContextClassLoader(AdapterImpl.class.getClassLoader()); + + try { + // build and install Drools artifact + KieUtils.installArtifact( + Paths.get("src/test/resources/drools-artifact-1.1/src/main/resources/META-INF/kmodule.xml").toFile(), + Paths.get("src/test/resources/drools-artifact-1.1/pom.xml").toFile(), + "src/main/resources/rules/org/onap/policy/drools/core/test/rules.drl", + Paths.get("src/test/resources/drools-artifact-1.1/src/main/resources/rules.drl").toFile()); + + // load properties from file + properties = PropertyUtil.getProperties("src/test/resources/TestController-controller.properties"); + } catch (Exception e) { + e.printStackTrace(); + Thread.currentThread().setContextClassLoader(saveClassLoader); + return e.toString(); + } + + StringBuilder sb = new StringBuilder(); + try { + // create and start 'PolicyController' + policyController = PolicyEngineConstants.getManager() + .createPolicyController("TestController", properties); + policyController.start(); + + // dump out container information (used for debugging tests) + sb.append("PolicyContainer count: ") + .append(PolicyContainer.getPolicyContainers().size()).append('\n'); + for (PolicyContainer policyContainer : + PolicyContainer.getPolicyContainers()) { + sb.append(" name = ") + .append(policyContainer.getName()) + .append('\n') + .append(" session count = ") + .append(policyContainer.getPolicySessions().size()) + .append('\n'); + for (PolicySession pc : policyContainer.getPolicySessions()) { + policySession = pc; + } + } + } finally { + Thread.currentThread().setContextClassLoader(saveClassLoader); + } + return sb.toString(); + } + + /** + * {@inheritDoc} + */ + @Override + public void sendEvent(String key) { + /* + * Note: the dumping out of package information was useful in tracking + * down strange Drools behavior that was eventually tied to the + * Drools class loader. + */ + logger.info("{}: Calling 'sendEvent': packages = {}", this, + policySession.getKieSession().getKieBase().getKiePackages()); + ((TopicListener)policyController).onTopicEvent( + CommInfrastructure.UEB, "JUNIT-TEST-TOPIC", + "{\"key\":\"" + key + "\"}"); + } + + /** + * {@inheritDoc} + */ + @Override + public KieSession getKieSession() { + return policySession == null ? null : policySession.getKieSession(); + } + + /** + * {@inheritDoc} + */ + @Override + public void insertDrools(Object object) { + if (policySession != null) { + /* + * this will eventually be changed to use the + * 'PolicySession.insertObject(...)' method + */ + new FeatureServerPool().insertDrools(policySession, object); + } + } + + /** + * {@inheritDoc} + */ + @Override + public boolean isForeign(Object... objects) { + boolean rval = false; + ClassLoader myClassLoader = AdapterImpl.class.getClassLoader(); + for (Object o : objects) { + Class clazz = o.getClass(); + ClassLoader objClassLoader = clazz.getClassLoader(); + + try { + if (myClassLoader != objClassLoader + && clazz != myClassLoader.loadClass(clazz.getName())) { + rval = true; + logger.info("{}: FOREIGN OBJECT ({}) - {}", + this, getAdapter(objClassLoader), o); + } + } catch (ClassNotFoundException e) { + rval = true; + logger.error("{}: FOREIGN OBJECT -- CLASS NOT FOUND ({}) - {}", + this, getAdapter(objClassLoader), o); + } + } + return rval; + } + + /** + * {@inheritDoc} + */ + @Override + public String findKey(String prefix, int startingIndex, ServerWrapper host) { + String rval = null; + + // try up to 10000 numeric values to locate one on a particular host + for (int i = 0 ; i < 10000 ; i += 1) { + // generate key, and see if it is on the desired server + String testString = prefix + (startingIndex + i); + if (ServerWrapperImpl.getWrapper( + Bucket.bucketToServer(Bucket.bucketNumber(testString))) == host) { + // we have one that works + rval = testString; + break; + } + } + return rval; + } + + /** + * {@inheritDoc} + */ + @Override + public String findKey(String prefix, int startingIndex) { + return findKey(prefix, startingIndex, serverStatic.getThisServer()); + } + + /** + * {@inheritDoc} + */ + @Override + public String findKey(String prefix) { + return findKey(prefix, 1, serverStatic.getThisServer()); + } + + /** + * {@inheritDoc} + */ + @Override + public String toString() { + return "AdapterImpl[" + index + "]"; + } + + /** + * Return an Adapter. + * + * @return the 'Adapter' instance associated with the ClassLoader associated + * with the current thread + */ + public static Adapter getAdapter() { + /* + * Note that 'return(adapter)' doesn't work as expected when called from + * within a 'Drools' session, because of the strange way that the Drools + * 'ClassLoader' works -- it bypasses 'AdapterClassLoader' when doing + * class lookups, even though it is the immediate parent of the Drools + * session class loader. + */ + return getAdapter(Thread.currentThread().getContextClassLoader()); + } + + /** + * Return an Adapter. + * + * @param classLoader a ClassLoader instance + * @return the 'Adapter' instance associated with the specified ClassLoader + */ + public static Adapter getAdapter(ClassLoader classLoader) { + try { + // locate the 'AdapterImpl' class associated with a particular + // 'ClassLoader' (which may be different from the current one) + Class thisAdapterClass = + classLoader.loadClass("org.onap.policy.drools.serverpool.AdapterImpl"); + + // return the 'adapter' field value + return Whitebox.getInternalState(thisAdapterClass, "adapter"); + } catch (Exception e) { + e.printStackTrace(); + return null; + } + } +} diff --git a/feature-server-pool/src/test/java/org/onap/policy/drools/serverpool/BucketWrapperImpl.java b/feature-server-pool/src/test/java/org/onap/policy/drools/serverpool/BucketWrapperImpl.java new file mode 100644 index 00000000..ca89d99a --- /dev/null +++ b/feature-server-pool/src/test/java/org/onap/policy/drools/serverpool/BucketWrapperImpl.java @@ -0,0 +1,173 @@ +/* + * ============LICENSE_START======================================================= + * feature-server-pool + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.serverpool; + +import java.io.PrintStream; +import java.util.IdentityHashMap; + +import org.onap.policy.drools.serverpooltest.BucketWrapper; +import org.onap.policy.drools.serverpooltest.ServerWrapper; + +/** + * This class implements the 'BucketWrapper' interface. There is one + * 'BucketWrapperImpl' class for each simulated host. + */ +public class BucketWrapperImpl implements BucketWrapper { + // this maps a 'Bucket' instance on this host to an associated wrapper + private static IdentityHashMap bucketToWrapper = + new IdentityHashMap<>(); + + // this is the 'Bucket' instance associated with the wrapper + private Bucket bucket; + + /** + * This method maps a 'Bucket' instance into a 'BucketWrapperImpl' + * instance. The goal is to have only a single 'BucketWrapperImpl' instance + * for each 'Bucket' instance, so that testing for identity will work + * as expected. + * + * @param bucket the 'Bucket' instance + * @return the associated 'BucketWrapperImpl' instance + */ + static synchronized BucketWrapperImpl getWrapper(Bucket bucket) { + if (bucket == null) { + return null; + } + BucketWrapperImpl rval = bucketToWrapper.get(bucket); + if (rval == null) { + // a matching entry does not yet exist -- create one + rval = new BucketWrapperImpl(bucket); + bucketToWrapper.put(bucket, rval); + } + return rval; + } + + /** + * Constructor - initialize the 'bucket' field. + */ + BucketWrapperImpl(Bucket bucket) { + this.bucket = bucket; + } + + /** + * {@inheritDoc} + */ + @Override + public int getBucketNumber() { + return bucket.getIndex(); + } + + /** + * {@inheritDoc} + */ + @Override + public ServerWrapper getOwner() { + return ServerWrapperImpl.getWrapper(bucket.getOwner()); + } + + /** + * {@inheritDoc} + */ + @Override + public ServerWrapper getPrimaryBackup() { + return ServerWrapperImpl.getWrapper(bucket.getPrimaryBackup()); + } + + /** + * {@inheritDoc} + */ + @Override + public ServerWrapper getSecondaryBackup() { + return ServerWrapperImpl.getWrapper(bucket.getSecondaryBackup()); + } + + /* ============================================================ */ + + /** + * This class implements the 'BucketWrapper.Static' interface. There is + * one 'BucketWrapperImpl.Static' class, and one instance for each + * simulated host + */ + public static class Static implements BucketWrapper.Static { + /** + * {@inheritDoc} + */ + @Override + public int getBucketCount() { + return Bucket.BUCKETCOUNT; + } + + /** + * {@inheritDoc} + */ + @Override + public int bucketNumber(String value) { + return Bucket.bucketNumber(value); + } + + /** + * {@inheritDoc} + */ + @Override + public ServerWrapper bucketToServer(int bucketNumber) { + return ServerWrapperImpl.getWrapper(Bucket.bucketToServer(bucketNumber)); + } + + /** + * {@inheritDoc} + */ + @Override + public BucketWrapper getBucket(int bucketNumber) { + return getWrapper(Bucket.getBucket(bucketNumber)); + } + + /** + * {@inheritDoc} + */ + @Override + public boolean isKeyOnThisServer(String key) { + return Bucket.isKeyOnThisServer(key); + } + + /** + * {@inheritDoc} + */ + @Override + public void moveBucket(PrintStream out, int bucketNumber, String newHostUuid) { + ClassLoader save = Thread.currentThread().getContextClassLoader(); + try { + Thread.currentThread().setContextClassLoader( + BucketWrapperImpl.class.getClassLoader()); + Bucket.moveBucket(out, bucketNumber, newHostUuid); + } finally { + Thread.currentThread().setContextClassLoader(save); + } + } + + /** + * {@inheritDoc} + */ + @Override + public void dumpAdjuncts(PrintStream out) { + Bucket.dumpAdjuncts(out); + } + } +} diff --git a/feature-server-pool/src/test/java/org/onap/policy/drools/serverpool/ServerWrapperImpl.java b/feature-server-pool/src/test/java/org/onap/policy/drools/serverpool/ServerWrapperImpl.java new file mode 100644 index 00000000..468d12af --- /dev/null +++ b/feature-server-pool/src/test/java/org/onap/policy/drools/serverpool/ServerWrapperImpl.java @@ -0,0 +1,146 @@ +/* + * ============LICENSE_START======================================================= + * feature-server-pool + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.serverpool; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.IdentityHashMap; +import java.util.UUID; + +import org.onap.policy.drools.serverpooltest.ServerWrapper; + +/** + * This class implements the 'ServerWrapper' interface. There is one + * 'ServerWrapperImpl' class for each simulated host. + */ +public class ServerWrapperImpl implements ServerWrapper { + // this maps a 'Server' instance on this host to an associated wrapper + private static IdentityHashMap serverToWrapper = + new IdentityHashMap<>(); + + // this is the 'Server' instance associated with the wrapper + private Server server; + + /** + * This method maps a 'Server' instance into a 'ServerWrapperImpl' + * instance. The goal is to have only a single 'ServerWrapperImpl' instance + * for each 'Server' instance, so that testing for identity will work + * as expected. + * + * @param server the 'Server' instance + * @return the associated 'ServerWrapperImpl' instance + * ('null' if 'server' is 'null') + */ + static synchronized ServerWrapperImpl getWrapper(Server server) { + if (server == null) { + return null; + } + ServerWrapperImpl rval = serverToWrapper.computeIfAbsent(server, + (key) -> new ServerWrapperImpl(server)); + return rval; + } + + /** + * Constructor - initialize the 'server' field. + */ + private ServerWrapperImpl(Server server) { + this.server = server; + } + + /** + * {@inheritDoc} + */ + @Override + public String toString() { + return server.toString(); + } + + /** + * {@inheritDoc} + */ + @Override + public UUID getUuid() { + return server.getUuid(); + } + + /** + * {@inheritDoc} + */ + @Override + public boolean isActive() { + return server.isActive(); + } + + /* ============================================================ */ + + /** + * This class implements the 'ServerWrapper.Static' interface. There is + * one 'ServerWrapperImpl.Static' class, and one instance for each + * simulated host + */ + public static class Static implements ServerWrapper.Static { + /** + * {@inheritDoc} + */ + @Override + public ServerWrapper getThisServer() { + return getWrapper(Server.getThisServer()); + } + + /** + * {@inheritDoc} + */ + @Override + public ServerWrapper getFirstServer() { + return getWrapper(Server.getFirstServer()); + } + + /** + * {@inheritDoc} + */ + @Override + public ServerWrapper getServer(UUID uuid) { + return getWrapper(Server.getServer(uuid)); + } + + /** + * {@inheritDoc} + */ + @Override + public int getServerCount() { + return Server.getServerCount(); + } + + /** + * {@inheritDoc} + */ + @Override + public Collection getServers() { + // build an 'ArrayList' which mirrors the set of servers + ArrayList rval = new ArrayList<>(Server.getServerCount()); + + for (Server server : Server.getServers()) { + rval.add(getWrapper(server)); + } + return rval; + } + } +} diff --git a/feature-server-pool/src/test/java/org/onap/policy/drools/serverpool/TargetLockWrapperImpl.java b/feature-server-pool/src/test/java/org/onap/policy/drools/serverpool/TargetLockWrapperImpl.java new file mode 100644 index 00000000..4f496986 --- /dev/null +++ b/feature-server-pool/src/test/java/org/onap/policy/drools/serverpool/TargetLockWrapperImpl.java @@ -0,0 +1,195 @@ +/* + * ============LICENSE_START======================================================= + * feature-server-pool + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.serverpool; + +import java.io.Serializable; +import java.util.IdentityHashMap; + +import org.onap.policy.drools.core.lock.Lock; +import org.onap.policy.drools.core.lock.LockCallback; +import org.onap.policy.drools.serverpooltest.TargetLockWrapper; + +/** + * This class implements the 'TargetLockWrapper' interface. There is one + * 'TargetLockWrapperImpl' class for each simulated host. + */ +public class TargetLockWrapperImpl implements TargetLockWrapper { + // this is the 'TargetLock' instance associated with the wrapper + private TargetLock targetLock; + + /** + * {@inheritDoc} + */ + @Override + public boolean free() { + return targetLock.free(); + } + + /** + * {@inheritDoc} + */ + @Override + public boolean isActive() { + return targetLock.isActive(); + } + + /** + * {@inheritDoc} + */ + @Override + public State getState() { + return TargetLockWrapper.State.valueOf(targetLock.getState().toString()); + } + + /** + * {@inheritDoc} + */ + @Override + public String getOwnerKey() { + return targetLock.getOwnerKey(); + } + + /** + * {@inheritDoc} + */ + @Override + public String toString() { + return "TLW-" + + String.valueOf(AdapterImpl.getAdapter(TargetLockWrapperImpl.class.getClassLoader())) + + "[" + targetLock.toString() + "]"; + } + + /** + * This method creates a new 'TargetLock'. Internally, an 'OwnerAdapter' + * instance is built as well, which translates the 'LockCallback' + * callbacks to 'TargetLockWrapper.Owner' callbacks. As with the call to + * 'new TargetLock(...)', it is possible for the callback occur before + * this method returns -- this can happen if the 'key' hashes to a bucket + * owned by the current host. + * + * @param key string key identifying the lock + * @param ownerKey string key identifying the owner, which must hash to + * a bucket owned by the current host (it is typically a 'RequestID') + * @param owner owner of the lock (will be notified when going from + * WAITING to ACTIVE) + * @param waitForLock this controls the behavior when 'key' is already + * locked - 'true' means wait for it to be freed, 'false' means fail + * @return a 'TargetLockWrapper' instance associated with the new + * 'TargetLock. + */ + static TargetLockWrapper newTargetLock( + String key, String ownerKey, TargetLockWrapper.Owner owner, boolean waitForLock) { + + TargetLockWrapperImpl rval = new TargetLockWrapperImpl(); + rval.targetLock = + new TargetLock(key, ownerKey, + TargetLockWrapperImpl.getOwnerAdapter(rval, owner), + waitForLock); + return rval; + } + + /** + * This method creates a new 'TargetLock'. Internally, an 'OwnerAdapter' + * instance is built as well, which translates the 'LockCallback' + * callbacks to 'TargetLockWrapper.Owner' callbacks. As with the call to + * 'new TargetLock(...)', it is possible for the callback occur before + * this method returns -- this can happen if the 'key' hashes to a bucket + * owned by the current host. + * + * @param key string key identifying the lock + * @param ownerKey string key identifying the owner, which must hash to + * a bucket owned by the current host (it is typically a 'RequestID') + * @param owner owner of the lock (will be notified when going from + * WAITING to ACTIVE) + * @return a 'TargetLockWrapper' instance associated with the new + * 'TargetLock. + */ + static TargetLockWrapper newTargetLock( + String key, String ownerKey, TargetLockWrapper.Owner owner) { + + TargetLockWrapperImpl rval = new TargetLockWrapperImpl(); + rval.targetLock = + new TargetLock(key, ownerKey, + TargetLockWrapperImpl.getOwnerAdapter(rval, owner)); + return rval; + } + + /* ============================================================ */ + + /** + * This method builds an adapter that implements the 'LockCallback' + * callback interface, translating it to 'TargetLockWrapper.Owner'. + * + * @param targetLock the TargetLockWrapper that is using this adapter + * @param owner the 'TargetLockWrapper.Owner' callback + * @return an adapter implementing the 'LockCallback' interface + * ('null' is returned if 'owner' is null -- this is an error condition, + * but is used to verify the error handling of the 'TargetLock' + * constructor. + */ + public static LockCallback getOwnerAdapter( + TargetLockWrapper targetLock, TargetLockWrapper.Owner owner) { + + return owner == null ? null : new OwnerAdapter(targetLock, owner); + } + + /** + * This class is an adapter that implements the 'LockCallback' callback + * interface, translating it to a 'TargetLockWrapper.Owner' callback. + */ + public static class OwnerAdapter implements LockCallback, Serializable { + // the 'TargetLockWrapper' instance to pass as an argument in the callback + TargetLockWrapper targetLock; + + // the 'TargetLockWrapper.Owner' callback + TargetLockWrapper.Owner owner; + + /** + * Constructor - initialize the adapter. + * + * @param targetLock this will be passed as an argument in the callback + * @param owner the object implementing the 'TargetLockWrapper.Owner' + * interface + */ + private OwnerAdapter(TargetLockWrapper targetLock, TargetLockWrapper.Owner owner) { + this.targetLock = targetLock; + this.owner = owner; + } + + /** + * {@inheritDoc} + */ + @Override + public void lockAvailable(Lock lock) { + // forward 'lockAvailable' callback + owner.lockAvailable(targetLock); + } + + /** + * {@inheritDoc} + */ + @Override + public void lockUnavailable(Lock lock) { + // forward 'lockUnavailable' callback + owner.lockUnavailable(targetLock); + } + } +} diff --git a/feature-server-pool/src/test/java/org/onap/policy/drools/serverpooltest/Adapter.java b/feature-server-pool/src/test/java/org/onap/policy/drools/serverpooltest/Adapter.java new file mode 100644 index 00000000..03b970b5 --- /dev/null +++ b/feature-server-pool/src/test/java/org/onap/policy/drools/serverpooltest/Adapter.java @@ -0,0 +1,353 @@ +/* + * ============LICENSE_START======================================================= + * feature-server-pool + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.serverpooltest; + +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectInputStream; +import java.io.ObjectStreamClass; +import java.io.PrintStream; +import java.net.MalformedURLException; +import java.net.URL; +import java.net.URLClassLoader; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.concurrent.LinkedBlockingQueue; + +import org.kie.api.runtime.KieSession; +import org.onap.policy.common.utils.network.NetworkUtil; +import org.onap.policy.drools.serverpool.Util; + +/** + * This is a common base class for 6 'AdapterImpl' instances, all running + * with their own copies of the server pool classes, and a number of the ONAP + * classes. The purpose is to simulate 6 separate hosts in a server pool. + * Note that there is potentially a 7th copy of any of these classes, which is + * the one loaded with the system class loader. Ideally, those classes + * shouldn't be referred to, but there were some problems during testing, + * where they unexpectedly were (prompting a change in + * 'ExtendedObjectInputStream'). This is referred to as the 'null' host, + * where the classes may exist, but have not gone through initialization. + */ +public abstract class Adapter { + // 'true' indicates that initialization is still needed + private static boolean initNeeded = true; + + // Each 'Adapter' instance is implemented by 'AdapterImpl', but loaded + // with a different class loader that provides each with a different copy + // of the set of classes with packages in the list below (see references to + // 'BlockingClassLoader'). + public static Adapter[] adapters = new Adapter[6]; + + /** + * Ensure that all adapters have been initialized. + */ + public static void ensureInit() throws Exception { + synchronized (Adapter.class) { + if (initNeeded) { + initNeeded = false; + + // start DMAAP Simulator + new Thread(new Runnable() { + @Override + public void run() { + SimDmaap.start(); + } + }, "DMAAP Simulator").start(); + + // wait 1 second to allow time for the port 3904 listener + assertTrue(NetworkUtil.isTcpPortOpen(SimDmaap.HOSTNAME, 3904, 50, 1000)); + + // build 'BlockingClassLoader' + BlockingClassLoader bcl = new BlockingClassLoader( + Adapter.class.getClassLoader(), + // All 'org.onap.policy.*' classes are adapter-specific, except + // for the exclusions listed below. + "org.onap.policy.*" + ); + bcl.addExclude("org.onap.policy.drools.core.DroolsRunnable"); + bcl.addExclude("org.onap.policy.drools.serverpooltest.*"); + + // build URL list for class loader + URL[] urls = {}; + + // iterate through 'adapter' entries + ClassLoader saveClassLoader = + Thread.currentThread().getContextClassLoader(); + if (saveClassLoader instanceof URLClassLoader) { + urls = ((URLClassLoader)saveClassLoader).getURLs(); + } else { + // the parent is not a 'URLClassLoader' -- + // try to get this information from 'java.class.path' + ArrayList tmpUrls = new ArrayList<>(); + for (String entry : System.getProperty("java.class.path").split( + File.pathSeparator)) { + if (new File(entry).isDirectory()) { + tmpUrls.add(new URL("file:" + entry + "/")); + } else { + tmpUrls.add(new URL("file:" + entry)); + } + } + urls = tmpUrls.toArray(new URL[0]); + } + try { + for (int i = 0 ; i < adapters.length ; i += 1) { + // Build a new 'ClassLoader' for this adapter. The + // 'ClassLoader' hierarchy is: + // + // AdapterClassLoader(one copy per Adapter) -> + // BlockingClassLoader -> + // base ClassLoader (with the complete URL list) + ClassLoader classLoader = + new AdapterClassLoader(i, urls, bcl); + + // set the current thread class loader, which should be + // inherited by any child threads created during + // the initialization of the adapter + Thread.currentThread().setContextClassLoader(classLoader); + + // now, build the adapter -- it is not just a new instance, + // but a new copy of class 'AdapterImpl' + Adapter adapter = (Adapter) classLoader.loadClass( + "org.onap.policy.drools.serverpool.AdapterImpl") + .newInstance(); + + // initialize the adapter + adapter.init(i); + adapters[i] = adapter; + } + } finally { + // restore the class loader to that used during the Junit tests + Thread.currentThread().setContextClassLoader(saveClassLoader); + } + } + } + } + + /** + * Shut everything down. + */ + public static void ensureShutdown() { + for (Adapter adapter : adapters) { + adapter.shutdown(); + } + SimDmaap.stop(); + // not sure why the following is started + Util.shutdown(); + } + + /** + * Runs server pool initialization for a particular host. + * + * @param index the index of the adapter (0-5) + */ + public abstract void init(int index) throws Exception; + + /** + * Shuts down the server pool for this host. + */ + public abstract void shutdown(); + + /** + * Return a 'LinkedBlockingQueue' instance, which is used as a way for + * Drools code to signal back to running Junit tests. + * + * @return a 'LinkedBlockingQueue' instance, which is used as a way for + * Drools code to signal back to running Junit tests + */ + public abstract LinkedBlockingQueue notificationQueue(); + + /** + * This method blocks and waits for all buckets to have owners, or for + * a timeout, whichever occurs first. + * + * @param endTime the point at which timeout occurs + * @return 'true' if all buckets have owners, 'false' if a timeout occurred + */ + public abstract boolean waitForInit(long endTime) throws InterruptedException; + + /** + * Return an object providing indirect references to a select set of + * static 'Server' methods. + * + * @return an object providing indirect references to a select set of + * static 'Server' methods + */ + public abstract ServerWrapper.Static getServerStatic(); + + /** + * Return an object providing an indirect reference to the lead 'Server' + * object. + * + * @return an object providing an indirect reference to the lead 'Server' + * object + */ + public abstract ServerWrapper getLeader(); + + /** + * Return an object providing indirect references to a select set of + * static 'Bucket' methods. + * + * @return an object providing indirect references to a select set of + * static 'Bucket' methods + */ + public abstract BucketWrapper.Static getBucketStatic(); + + /** + * Create a new 'TargetLock' instance, returning an indirect reference. + * + * @param key string key identifying the lock + * @param ownerKey string key identifying the owner, which must hash to + * a bucket owned by the current host (it is typically a 'RequestID') + * @param owner owner of the lock (will be notified when going from + * WAITING to ACTIVE) + * @param waitForLock this controls the behavior when 'key' is already + * locked - 'true' means wait for it to be freed, 'false' means fail + */ + public abstract TargetLockWrapper newTargetLock( + String key, String ownerKey, TargetLockWrapper.Owner owner, + boolean waitForLock); + + /** + * Create a new 'TargetLock' instance, returning an indirect reference. + * + * @param key string key identifying the lock + * @param ownerKey string key identifying the owner, which must hash to + * a bucket owned by the current host (it is typically a 'RequestID') + * @param owner owner of the lock (will be notified when going from + * WAITING to ACTIVE) + */ + public abstract TargetLockWrapper newTargetLock( + String key, String ownerKey, TargetLockWrapper.Owner owner); + + /** + * Call 'TargetLock.DumpLocks.dumpLocks' + * + * @param out where the output should be displayed + * @param detail 'true' provides additional bucket and host information + * (but abbreviates all UUIDs in order to avoid excessive + * line length) + */ + public abstract void dumpLocks(PrintStream out, boolean detail); + + /** + * Create and initialize PolicyController 'TestController', and start + * the associated Drools container and session. + * + * @return a string containing controller session information + */ + public abstract String createController(); + + /** + * Send an event in the form of a JSON message string. The message is + * sent to JUNIT-TEST-TOPIC, and the JSON object is converted to a + * 'TestDroolsObject' (all compatible with the Drools session created by + * 'createController'. + * + * @param key determines the bucket number, which affects which host the + * message is eventually routed to + */ + public abstract void sendEvent(String key); + + /** + * Return the one-and-only 'KieSession' on this host. + * + * @return the one-and-only 'KieSession' on this host + */ + public abstract KieSession getKieSession(); + + /** + * Insert an object into the one-and-only Drools session. + * + * @param object the object to insert + */ + public abstract void insertDrools(Object object); + + // some test utilities + + /** + * Determine whether any of the objects passed as parameters are of a class + * that belongs to different adapter. Print messages are displayed + * for any that do occur. + * + * @param objects one or more objects to be tested + * @return 'true' if one or more are foreign + */ + public abstract boolean isForeign(Object... objects); + + /** + * This method is used to generate keys that hash to a bucket associated + * with a particular server. The algorithm generates a key using 'prefix' + * concatenated with a numeric value, and searches for the first one on + * the desired host. It will try up to 10000 indices before giving up -- + * each host owns 1/6 of the buckets, should the 10000 number should be + * way more than enough. The tests are written with the assumption that + * a valid key will be returned, and 'NullPointerException' is an acceptable + * way to handle the situation if this doesn't work out somehow. + * + * @param prefix the first portion of the key + * @param startingIndex the first index to try + * @param host this indicates the 'Server' instance to locate, which must + * not be foreign to this adapter + * @return a key associated with 'host' ('null' if not found) + */ + public abstract String findKey(String prefix, int startingIndex, ServerWrapper host); + + /** + * Equivalent to 'findKey(prefix, startingIndex, THIS-SERVER)'. + * + * @param prefix the first portion of the key + * @param startingIndex the first index to try + * @return a key associated with 'host' ('null' if not found) + */ + public abstract String findKey(String prefix, int startingIndex); + + /** + * Equivalent to 'findKey(prefix, 1, THIS-SERVER)'. + * + * @param prefix the first portion of the key + * @return a key associated with 'host' ('null' if not found) + */ + public abstract String findKey(String prefix); + + /* ============================================================ */ + + /** + * This class is basically a 'URLClassLoader', but with a 'toString()' + * method that indicates the host and adapter number. + */ + public static class AdapterClassLoader extends URLClassLoader { + private int index; + + public AdapterClassLoader(int index, URL[] urls, ClassLoader parent) { + super(urls, parent); + this.index = index; + } + + @Override + public String toString() { + return "AdapterClassLoader(" + index + ")"; + } + } +} diff --git a/feature-server-pool/src/test/java/org/onap/policy/drools/serverpooltest/BlockingClassLoader.java b/feature-server-pool/src/test/java/org/onap/policy/drools/serverpooltest/BlockingClassLoader.java new file mode 100644 index 00000000..7930bf27 --- /dev/null +++ b/feature-server-pool/src/test/java/org/onap/policy/drools/serverpooltest/BlockingClassLoader.java @@ -0,0 +1,176 @@ +/* + * ============LICENSE_START======================================================= + * feature-server-pool + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.serverpooltest; + +import java.net.URL; +import java.util.ArrayList; +import java.util.Enumeration; +import java.util.HashSet; +import java.util.NoSuchElementException; + +/** + * Ordinarily, a 'ClassLoader' first attempts to load a class via the + * parent 'ClassLoader'. If that fails, it attempts to load it "locally" + * by whatever mechanism the class loader supports. + * This 'ClassLoader' instance blocks attempts to load specific classes, + * throwing a 'ClassNotFoundException'. This doesn't seem useful on the + * surface, but it forces all child 'ClassLoader' instances to do the lookup + * themselves. In addition, each child 'ClassLoader' will have their own + * copy of the classes they load, providing a way to have multiple copies of + * the same class running within the same JVM. Each child 'ClassLoader' can + * be viewed as having a separate name space. + */ +public class BlockingClassLoader extends ClassLoader { + // these are the set of packages to block + private HashSet packages; + + // these are the prefixes of class names to block + private ArrayList prefixes; + + // these specific classes will not be blocked, even if they are in one + // of the packages indicated by 'packages' + private HashSet excludes = new HashSet(); + + // these are the prefixes of class names to exclude + private ArrayList excludePrefixes = new ArrayList<>(); + + /** + * Constructor -- initialize the 'ClassLoader' and 'packages' variable. + * + * @param parent the parent ClassLoader + * @param packages variable number of packages to block + */ + public BlockingClassLoader(ClassLoader parent, String... packages) { + super(parent); + this.packages = new HashSet<>(); + this.prefixes = new ArrayList<>(); + for (String pkg : packages) { + if (pkg.endsWith("*")) { + prefixes.add(pkg.substring(0, pkg.length() - 1)); + } else { + this.packages.add(pkg); + } + } + } + + /** + * {@inheritDoc} + */ + @Override + protected Class findClass(String name) throws ClassNotFoundException { + // throws a 'ClassNotFoundException' if we are blocking this one + testClass(name); + + // not blocking this one -- pass it on to the superclass + return super.findClass(name); + } + + /** + * {@inheritDoc} + */ + @Override + public Enumeration getResources(String name) { + // in order to avoid replicated resources, we return an empty set + return new Enumeration() { + public boolean hasMoreElements() { + return false; + } + + public URL nextElement() { + throw new NoSuchElementException("'BlockingClassLoader' blocks duplicate resources"); + } + }; + } + + /** + * {@inheritDoc} + */ + @Override + public Class loadClass(String name) throws ClassNotFoundException { + // throws a 'ClassNotFoundException' if we are blocking this one + testClass(name); + + // not blocking this one -- pass it on to the superclass + return super.loadClass(name); + } + + /** + * {@inheritDoc} + */ + @Override + protected Class loadClass(String name, boolean resolve) throws ClassNotFoundException { + // throws a 'ClassNotFoundException' if we are blocking this one + testClass(name); + + // not blocking this one -- pass it on to the superclass + return super.loadClass(name, resolve); + } + + /** + * Add an entry to the list of classes that should NOT be blocked. + * + * @param name the full name of a class that shouldn't be blocked + */ + public void addExclude(String name) { + if (name.endsWith("*")) { + excludePrefixes.add(name.substring(0, name.length() - 1)); + } else { + excludes.add(name); + } + } + + /** + * This method looks at a class name -- if it should be blocked, a + * 'ClassNotFoundException' is thrown. Otherwise, it does nothing. + * + * @param name the name of the class to be tested + * @throws ClassNotFoundException if this class should be blocked + */ + private void testClass(String name) throws ClassNotFoundException { + if (excludes.contains(name)) { + // allow this one + return; + } + + for (String prefix : excludePrefixes) { + if (name.startsWith(prefix)) { + // allow this one + return; + } + } + + // extract the package from the class name -- throw a + // 'ClassNotFoundException' if the package is in the list + // being blocked + int index = name.lastIndexOf('.'); + if (index >= 0) { + if (packages.contains(name.substring(0,index))) { + throw(new ClassNotFoundException(name)); + } + + for (String prefix : prefixes) { + if (name.startsWith(prefix)) { + throw(new ClassNotFoundException(name)); + } + } + } + } +} diff --git a/feature-server-pool/src/test/java/org/onap/policy/drools/serverpooltest/BucketWrapper.java b/feature-server-pool/src/test/java/org/onap/policy/drools/serverpooltest/BucketWrapper.java new file mode 100644 index 00000000..2628513c --- /dev/null +++ b/feature-server-pool/src/test/java/org/onap/policy/drools/serverpooltest/BucketWrapper.java @@ -0,0 +1,132 @@ +/* + * ============LICENSE_START======================================================= + * feature-server-pool + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.serverpooltest; + +import java.io.PrintStream; + +/** + * This class provides base classes for accessing the various 'Bucket' + * classes. There is a separate copy of the 'Bucket' class for each + * adapter, and this wrapper was created to give them a common interface. + */ +public interface BucketWrapper { + /** + * This calls the 'Bucket.getBucketNumber()' method + * + * @return the bucket number + */ + public int getBucketNumber(); + + /** + * This calls the 'Bucket.getOwner()' method + * + * @return a 'ServerWrapper' instance that corresponds to the owner + * of the bucket ('null' if unassigned) + */ + public ServerWrapper getOwner(); + + /** + * This calls the 'Bucket.getPrimaryBackup()' method + * + * @return a 'ServerWrapper' instance that corresponds to the primary backup + * host for the bucket ('null' if unassigned) + */ + public ServerWrapper getPrimaryBackup(); + + /** + * This calls the 'Bucket.getPrimaryBackup()' method + * + * @return a 'ServerWrapper' instance that corresponds to the secondary + * backup host for the bucket ('null' if unassigned) + */ + public ServerWrapper getSecondaryBackup(); + + /* ============================================================ */ + + /** + * This class provides access to the static 'Bucket' methods. There are + * multiple 'Bucket' classes (one for each 'Adapter'), and each has + * a corresponding 'BucketWrapper.Static' instance. In other words, there + * is one 'Bucket.Static' instance for each simulated host. + */ + public interface Static { + /** + * This returns the value of 'Bucket.BUCKETCOUNT' + * + * @return the number of Bucket instances in the bucket table + */ + public int getBucketCount(); + + /** + * This calls the static 'Bucket.bucketNumber(String)' method + * + * @param value the keyword to be converted + * @return the bucket number + */ + public int bucketNumber(String value); + + /** + * This calls the static 'Bucket.bucketToServer(int)' method + * + * @param bucketNumber a bucket number in the range 0-1023 + * @return a 'ServerWrapper' for the server that currently handles the + * bucket, or 'null' if none is currently assigned + */ + public ServerWrapper bucketToServer(int bucketNumber); + + /** + * This calls the static 'Bucket.getBucket(int)' method + * + * @param bucketNumber a bucket number in the range 0-1023 + * @return A 'BucketWrapper' for the Bucket associated with + * this bucket number + */ + public BucketWrapper getBucket(int bucketNumber); + + /** + * This calls the static 'Bucket.isKeyOnThisServer(String)' method + * + * @param key the keyword to be hashed + * @return 'true' if the associated bucket is assigned to this server, + * 'false' if not + */ + public boolean isKeyOnThisServer(String key); + + /** + * This calls the static 'Bucket.moveBucket(PrintStream, int, String)' + * method (the one associated with the '/cmd/moveBucket' REST call). + * + * @param out the 'PrintStream' to use for displaying information + * @param bucketNumber the bucket number to be moved + * @param newHostUuid the UUID of the destination host (if 'null', a + * destination host will be chosen at random) + */ + public void moveBucket(PrintStream out, int bucketNumber, String newHostUuid); + + /** + * This calls the static 'Bucket.dumpAdjuncts(PrintStream)' method + * (the one associated with the '/cmd/dumpBucketAdjuncts' REST call). + * + * @param out the 'PrintStream' to use for displaying information + */ + public void dumpAdjuncts(PrintStream out); + } +} diff --git a/feature-server-pool/src/test/java/org/onap/policy/drools/serverpooltest/ServerWrapper.java b/feature-server-pool/src/test/java/org/onap/policy/drools/serverpooltest/ServerWrapper.java new file mode 100644 index 00000000..e31a6817 --- /dev/null +++ b/feature-server-pool/src/test/java/org/onap/policy/drools/serverpooltest/ServerWrapper.java @@ -0,0 +1,103 @@ +/* + * ============LICENSE_START======================================================= + * feature-server-pool + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.serverpooltest; + +import java.util.Collection; +import java.util.UUID; + +/** + * This class provides base classes for accessing the various 'Server' + * classes. There is a separate copy of the 'Server' class for each + * adapter, and this wrapper was created to give them a common interface. + */ +public interface ServerWrapper { + /** + * This calls the 'Server.toString()' method + * + * @return a string of the form 'Server[UUID]' + */ + public String toString(); + + /** + * This calls the 'Server.getUuid()' method + * + * @return the UUID associated with this Server + */ + public UUID getUuid(); + + /** + * This calls the 'Server.isActive()' method + * + * @return 'true' if the this server is active, and 'false' if not + */ + public boolean isActive(); + + /* ============================================================ */ + + /** + * This class provides access to the static 'Server' methods. There are + * multiple 'Server' classes (one for each 'Adapter'), and each has + * a corresponding 'ServerWrapper.Static' instance. In other words, there + * is one 'Server.Static' instance for each simulated host. + */ + public interface Static { + /** + * This calls the static 'Server.getThisServer()' method + * + * @return a 'ServerWrapper' instance that corresponds to the Server + * instance associated with this simulated host + */ + public ServerWrapper getThisServer(); + + /** + * This calls the static 'Server.getFirstServer()' method + * + * @return a 'ServerWrapper' instance that corresponds to the first + * 'Server' instance in the 'servers' list (the one with the + * lowest UUID) + */ + public ServerWrapper getFirstServer(); + + /** + * This calls the static 'Server.getServer(UUID)' method + * + * @param uuid the key to the lookup + * @return a 'ServerWrapper' instance that corresponds to the associated + * 'Server' instance ('null' if none) + */ + public ServerWrapper getServer(UUID uuid); + + /** + * This calls the static 'Server.getServerCount()' method + * + * @return a count of the number of servers + */ + public int getServerCount(); + + /** + * This calls the static 'Server.getServers()' method + * + * @return the complete list of servers, each with a 'ServerWrapper' + * referring to the 'Server' + */ + public Collection getServers(); + } +} diff --git a/feature-server-pool/src/test/java/org/onap/policy/drools/serverpooltest/SimDmaap.java b/feature-server-pool/src/test/java/org/onap/policy/drools/serverpooltest/SimDmaap.java new file mode 100644 index 00000000..74fef07f --- /dev/null +++ b/feature-server-pool/src/test/java/org/onap/policy/drools/serverpooltest/SimDmaap.java @@ -0,0 +1,327 @@ +/* + * ============LICENSE_START======================================================= + * feature-server-pool + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.serverpooltest; + +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import javax.ws.rs.Consumes; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.core.MediaType; + +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.servlet.ServletHolder; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class simulates a UEB/DMAAP server. + */ + +@Path("/") +public class SimDmaap { + private static Logger logger = LoggerFactory.getLogger(SimDmaap.class); + public static final String HOSTNAME = "127.0.63.250"; + + // miscellaneous Jetty/Servlet parameters + private static ServletContextHandler context; + private static Server jettyServer; + private static ServerConnector connector; + private static ServletHolder holder; + + /** + * Do whatever needs to be done to start the server. I don't know exactly + * what abstractions the various pieces provide, but the following code + * ties the pieces together, and starts up the server. + */ + public static void start() { + try { + context = new ServletContextHandler(ServletContextHandler.SESSIONS); + context.setContextPath("/"); + + jettyServer = new Server(); + + connector = new ServerConnector(jettyServer); + connector.setName("simdmaap"); + connector.setReuseAddress(true); + connector.setPort(3904); + connector.setHost("127.0.63.250"); + + jettyServer.addConnector(connector); + jettyServer.setHandler(context); + + holder = context.addServlet(org.glassfish.jersey.servlet.ServletContainer.class.getName(), "/*"); + holder.setInitParameter( + "jersey.config.server.provider.classnames", + "com.fasterxml.jackson.jaxrs.json.JacksonJaxbJsonProvider" + + "," + SimDmaap.class.getName()); + + jettyServer.start(); + jettyServer.join(); + } catch (Exception e) { + e.printStackTrace(); + } + } + + /** + * Cleanly shut down the server. + */ + public static void stop() { + try { + if (jettyServer != null) { + jettyServer.stop(); + jettyServer = null; + } + } catch (Exception e) { + e.printStackTrace(); + } + } + + /* ============================================================ */ + + // maps topic name to 'Topic' instance + static Map topicTable = new ConcurrentHashMap<>(); + + /** + * Each instance of this class corresponds to a DMAAP or UEB topic. + */ + static class Topic { + // topic name + String topic; + + // maps group name into group instance + Map groupTable = new ConcurrentHashMap<>(); + + /** + * Create or get a Topic. + * + * @param name the topic name + * @return the associated Topic instance + */ + static Topic createOrGet(String name) { + // look up the topic name + Topic topicObj = topicTable.get(name); + if (topicObj == null) { + // no entry found -- the following will create one, without + // the need for explicit synchronization + topicTable.putIfAbsent(name, new Topic(name)); + topicObj = topicTable.get(name); + } + return topicObj; + } + + /** + * Constructor - initialize the 'topic' field. + * + * @param topic the topic name + */ + private Topic(String topic) { + this.topic = topic; + } + + /** + * Handle an incoming '/events/{topic}' POST REST message. + * + * @param the body of the REST message + * @return the appropriate JSON response + */ + String post(String data) { + // start of message processing + long startTime = System.currentTimeMillis(); + + // current and ending indices to the 'data' field + int cur = 0; + int end = data.length(); + + // the number of messages retrieved so far + int messageCount = 0; + + while (cur < end) { + // The body of the message may consist of multiple JSON messages, + // each preceded by 3 integers separated by '.'. The second one + // is the length, in bytes (the third seems to be some kind of + // channel identifier). + + int leftBrace = data.indexOf('{', cur); + if (leftBrace < 0) { + // no more messages + break; + } + String[] prefix = data.substring(cur,leftBrace).split("\\."); + if (prefix.length == 3) { + try { + // determine length of message, and advance current position + int length = Integer.valueOf(prefix[1]); + cur = leftBrace + length; + + // extract message, and update count -- each double quote + // has a '\' character placed before it, so the overall + // message can be placed in double quotes, and parsed as + // a literal string + String message = data.substring(leftBrace, cur) + .replace("\\", "\\\\").replace("\"", "\\\"") + .replace("\n", "\\n"); + messageCount += 1; + + // send to all listening groups + for (Group group : groupTable.values()) { + group.messages.add(message); + } + } catch (Exception e) { + logger.error("{}: {}", prefix[1], e); + break; + } + } else if (cur == 0) { + // there is only a single message -- extract it, and update count + String message = data.substring(leftBrace, end) + .replace("\\", "\\\\").replace("\"", "\\\"") + .replace("\n", "\\n"); + messageCount += 1; + + // send to all listening grops + for (Group group : groupTable.values()) { + group.messages.add(message); + } + break; + } else { + // don't know what this is -- toss it + break; + } + } + + // generate response message + long elapsedTime = System.currentTimeMillis() - startTime; + return "{\n" + + " \"count\": " + messageCount + ",\n" + + " \"serverTimeMs\": " + elapsedTime + "\n" + + "}"; + } + + /** + * read one or more incoming messages. + * + * @param group the 'consumerGroup' value + * @param timeout how long to wait for a message, in milliseconds + * @param limit the maximum number of messages to receive + * @return a JSON array, containing somewhere between 0 and 'limit' messages + */ + String get(String group, long timeout, int limit) throws InterruptedException { + // look up the group -- create one if it doesn't exist + Group groupObj = groupTable.get(group); + if (groupObj == null) { + // no entry found -- the following will create one, without + // the need for explicit synchronization + groupTable.putIfAbsent(group, new Group()); + groupObj = groupTable.get(group); + } + + // pass it on to the 'Group' instance + return groupObj.get(timeout, limit); + } + } + + /* ============================================================ */ + + /** + * Each instance of this class corresponds to a Consumer Group. + */ + static class Group { + // messages queued for this group + private BlockingQueue messages = new LinkedBlockingQueue<>(); + + /** + * Retrieve messages sent to this group. + * + * @param timeout how long to wait for a message, in milliseconds + * @param limit the maximum number of messages to receive + * @return a JSON array, containing somewhere between 0 and 'limit' messages + */ + String get(long timeout, int limit) throws InterruptedException { + String message = messages.poll(timeout, TimeUnit.MILLISECONDS); + if (message == null) { + // timed out without messages + return "[]"; + } + + // use 'StringBuilder' to assemble the response -- add the first message + StringBuilder builder = new StringBuilder(); + builder.append("[\"").append(message); + + // add up to '-1' more messages + for (int i = 1 ; i < limit ; i += 1) { + // fetch the next message -- don't wait if it isn't currently there + message = messages.poll(); + if (message == null) { + // no more currently available + break; + } + builder.append("\",\"").append(message); + } + builder.append("\"]"); + return builder.toString(); + } + } + + /* ============================================================ */ + + /** + * Incoming messages from the caller to the simulator. + */ + @POST + @Path("/events/{topic}") + @Consumes("application/cambria") + @Produces(MediaType.APPLICATION_JSON) + public String send(@PathParam("topic") String topic, + String data) { + logger.info("Send: topic={}", topic); + return Topic.createOrGet(topic).post(data); + } + + /** + * Send messages from the simulator to the caller. + */ + @GET + @Path("/events/{topic}/{group}/{id}") + @Consumes(MediaType.TEXT_PLAIN) + @Produces(MediaType.APPLICATION_JSON) + public String receive(@PathParam("topic") String topic, + @PathParam("group") String group, + @PathParam("id") String id, + @QueryParam("timeout") long timeout, + @QueryParam("limit") int limit) + throws InterruptedException { + + logger.info("Receive: topic={}, group={}, id={}, timeout={}, limit={}", + topic, group, id, timeout, limit); + return Topic.createOrGet(topic).get(group, timeout, limit); + } +} diff --git a/feature-server-pool/src/test/java/org/onap/policy/drools/serverpooltest/TargetLockWrapper.java b/feature-server-pool/src/test/java/org/onap/policy/drools/serverpooltest/TargetLockWrapper.java new file mode 100644 index 00000000..ce9f39e0 --- /dev/null +++ b/feature-server-pool/src/test/java/org/onap/policy/drools/serverpooltest/TargetLockWrapper.java @@ -0,0 +1,98 @@ +/* + * ============LICENSE_START======================================================= + * feature-server-pool + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.serverpooltest; + +import java.io.Serializable; + +/** + * This class provides base classes for accessing the various 'TargetLock' + * classes. There is a separate copy of the 'TargetLock' class for each + * adapter, and this wrapper was created to give them a common interface. + */ +public interface TargetLockWrapper extends Serializable { + /** + * There is a separate copy of 'TargetLock.State' for each adapter -- + * The 'TargetLockWrapper.getState()' maps these into a common + * 'TargetLockWrapper.State' enumeration. + */ + public enum State { + WAITING, ACTIVE, FREE, LOST + } + + /** + * This calls the 'TargetLock.free()' method + * + * @return 'true' if successful, 'false' if it was not locked, or there + * appears to be corruption in 'LocalLocks' tables + */ + public boolean free(); + + /** + * This calls the 'TargetLock.isActive()' method + * + * @return 'true' if the lock is in the ACTIVE state, and 'false' if not + */ + public boolean isActive(); + + /** + * This calls the 'TargetLock.getState()' method + * + * @return the current state of the lock, as a 'TargetLockWrapper.State' + */ + public State getState(); + + /** + * This calls the 'TargetLock.getOwnerKey()' method + * + * @return the owner key field + */ + public String getOwnerKey(); + + /** + * Return the value returned by 'TargetLock.toString()'. + * + * @return the value returned by 'TargetLock.toString()' + */ + public String toString(); + + /* ============================================================ */ + + /** + * This interface mimics the 'LockCallback' interface, with the + * exception that 'TargetLockWrapper' is used as the arguments to the + * callback methods. + */ + public static interface Owner { + /** + * Callback indicates the lock was successful. + * + * @param lock the 'TargetLockWrapper' instance + */ + public void lockAvailable(TargetLockWrapper lock); + + /** + * Callback indicates the lock request has failed. + * + * @param lock the 'TargetLockWrapper' instance + */ + public void lockUnavailable(TargetLockWrapper lock); + } +} diff --git a/feature-server-pool/src/test/java/org/onap/policy/drools/serverpooltest/Test1.java b/feature-server-pool/src/test/java/org/onap/policy/drools/serverpooltest/Test1.java new file mode 100644 index 00000000..2178fec1 --- /dev/null +++ b/feature-server-pool/src/test/java/org/onap/policy/drools/serverpooltest/Test1.java @@ -0,0 +1,912 @@ +/* + * ============LICENSE_START======================================================= + * feature-server-pool + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.serverpooltest; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.catchThrowable; +import static org.awaitility.Awaitility.await; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import java.io.Serializable; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Date; +import java.util.UUID; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import org.awaitility.Durations; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.kie.api.runtime.KieSession; +import org.onap.policy.drools.core.DroolsRunnable; +import org.onap.policy.drools.serverpool.BucketWrapperImpl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class Test1 { + private static Logger logger = LoggerFactory.getLogger(Test1.class); + + // indicates that Drools containers need to be initialized + private static boolean needControllerInit = true; + + private static int initialCount = 0; + + private static int threadList(String header, boolean stackTrace) { + logger.info("***** threadList: {} *****", header); + Thread[] thr = new Thread[1000]; + int count = Thread.enumerate(thr); + + if (count > thr.length) { + count = thr.length; + } + for (int i = 0 ; i < count ; i += 1) { + StringBuilder sb = new StringBuilder(); + sb.append(" ").append(thr[i]); + if (stackTrace) { + for (StackTraceElement ste : thr[i].getStackTrace()) { + sb.append("\n ").append(ste); + } + } + logger.info(sb.toString()); + } + logger.info("***** end threadList: {}, count = {} *****", header, count); + return count; + } + + /** + * Set up environment prior to running tests. + */ + @BeforeClass + public static void init() throws Exception { + initialCount = threadList("BeforeClass", false); + + // create 6 adapters, corresponding to 6 'Server' instances + Adapter.ensureInit(); + + // make sure initialization has completed + long endTime = System.currentTimeMillis() + 60000L; + for (Adapter adapter : Adapter.adapters) { + assertTrue(adapter.toString() + ": Bucket assignments incomplete", + adapter.waitForInit(endTime)); + } + } + + public static boolean verifyComplete() { + return Thread.enumerate(new Thread[initialCount + 1]) == initialCount; + } + + /** + * Clean up after tests have finished. + */ + @AfterClass + public static void finish() throws InterruptedException { + threadList("AfterClass", false); + if (needControllerInit) { + return; + } + // shut down Server Pools and DMAAP Simulator + Adapter.ensureShutdown(); + + // updates for persistence may still be in progress -- wait 5 seconds + threadList("AfterEnsureShutdown", false); + + try { + initialCount = initialCount + 1; // one for await thread + await().atMost(Durations.ONE_MINUTE) + .with().pollInterval(Durations.ONE_SECOND) + .until(() -> verifyComplete()); + } finally { + threadList("AfterSleep", true); + } + + // look at KieSession objects + for (Adapter adapter : Adapter.adapters) { + StringBuilder sb = new StringBuilder(); + sb.append(adapter.toString()) + .append(": ") + .append(adapter.getKieSession().getObjects().size()) + .append(" objects"); + for (Object o : adapter.getKieSession().getObjects()) { + sb.append("\n ").append(o); + } + LinkedBlockingQueue lbq = adapter.notificationQueue(); + if (!lbq.isEmpty()) { + sb.append("\n") + .append(adapter.toString()) + .append(": ") + .append(lbq.size()) + .append(" queued entries"); + for (String string : lbq) { + sb.append("\n ").append(string); + } + } + logger.info(sb.toString()); + } + + // this was used during test debugging to verify that no adjuncts + // were created on the 'null' host -- there shouldn't be any + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + PrintStream out = new PrintStream(bos, true); + new BucketWrapperImpl.Static().dumpAdjuncts(out); + logger.info(out.toString()); + } + + /** + * Initialize all Drools controllers, if needed. + */ + static void ensureControllersInitialized() { + if (needControllerInit) { + needControllerInit = false; + for (Adapter adapter : Adapter.adapters) { + String rval = adapter.createController(); + logger.info("{}: Got the following from PolicyController:\n{}", + adapter, rval); + } + } + } + + /** + * make sure all servers have agreed on a lead server. + */ + @Test + public void checkLeadServer() { + Adapter firstAdapter = Adapter.adapters[0]; + UUID leaderUuid = firstAdapter.getLeader().getUuid(); + for (Adapter adapter : Adapter.adapters) { + UUID uuid = adapter.getLeader().getUuid(); + assertTrue(adapter.toString() + " has UUID " + uuid + + " (expected UUID " + leaderUuid + ")", + uuid.equals(leaderUuid)); + } + } + + /** + * make sure all servers agree on bucket distribution. + */ + @Test + public void startup() throws Exception { + Adapter firstAdapter = Adapter.adapters[0]; + BucketWrapper.Static firstBucketStatic = firstAdapter.getBucketStatic(); + + for (Adapter adapter : Adapter.adapters) { + BucketWrapper.Static bucketStatic = adapter.getBucketStatic(); + if (adapter == firstAdapter) { + // make sure an owner and primary backup have been chosen + // for each bucket (secondary backups aren't implemented yet) + for (int i = 0 ; i < bucketStatic.getBucketCount() ; i += 1) { + BucketWrapper bucket = bucketStatic.getBucket(i); + assertNotNull(bucket.getOwner()); + assertNotNull(bucket.getPrimaryBackup()); + } + } else { + // make sure the bucket assignments are consistent with + // the primary backup + for (int i = 0 ; i < bucketStatic.getBucketCount() ; i += 1) { + BucketWrapper firstBucket = firstBucketStatic.getBucket(i); + BucketWrapper bucket = bucketStatic.getBucket(i); + assertEquals(firstBucket.getOwner().getUuid(), + bucket.getOwner().getUuid()); + assertEquals(firstBucket.getPrimaryBackup().getUuid(), + bucket.getPrimaryBackup().getUuid()); + } + } + } + } + + // test 'TargetLock' + @Test + public void testTargetLock() throws InterruptedException { + // test locks on different hosts + lockTests(Adapter.adapters[5], Adapter.adapters[0]); + + // test locks on the same host + lockTests(Adapter.adapters[2], Adapter.adapters[2]); + + Adapter adapter0 = Adapter.adapters[0]; + Adapter adapter5 = Adapter.adapters[5]; + String ownerKey = adapter0.findKey("owner"); + String key = adapter5.findKey("key"); + LockOwner owner = new LockOwner(); + + // some exceptions + Throwable thrown = catchThrowable(() -> { + adapter0.newTargetLock(null, ownerKey, owner); + }); + assertThat(thrown).isInstanceOf(IllegalArgumentException.class) + .hasNoCause() + .hasMessageContaining("TargetLock: 'key' can't be null"); + + thrown = catchThrowable(() -> { + adapter0.newTargetLock(key, null, owner); + }); + assertThat(thrown).isInstanceOf(IllegalArgumentException.class) + .hasNoCause() + .hasMessageContaining("TargetLock: 'ownerKey' can't be null"); + + thrown = catchThrowable(() -> { + adapter5.newTargetLock(key, ownerKey, owner); + }); + assertThat(thrown).isInstanceOf(IllegalArgumentException.class) + .hasNoCause() + .hasMessageContaining("not currently assigned to this server"); + + thrown = catchThrowable(() -> { + adapter0.newTargetLock(key, ownerKey, null); + }); + assertThat(thrown).isInstanceOf(IllegalArgumentException.class) + .hasNoCause() + .hasMessageContaining("TargetLock: 'owner' can't be null"); + } + + /** + * Run some 'TargetLock' tests. + * + * @param keyAdapter this is the adapter for the key, which determines + * where the server-side data will reside + * @param ownerAdapter this is the adapter associated with the requestor + */ + void lockTests(Adapter keyAdapter, Adapter ownerAdapter) throws InterruptedException { + // choose 'key' and 'ownerKey' values that map to buckets owned + // by their respective adapters + String key = keyAdapter.findKey("key"); + String ownerKey = ownerAdapter.findKey("owner"); + + // this receives and queues callback notifications + LockOwner owner = new LockOwner(); + + // first lock -- should succeed + TargetLockWrapper tl1 = ownerAdapter.newTargetLock(key, ownerKey, owner); + assertLockAvailable(owner, tl1); + //assertArrayEquals(new Object[] {"lockAvailable", tl1}, + // owner.poll(5, TimeUnit.SECONDS)); + assertNull(owner.poll()); + assertTrue(tl1.isActive()); + assertEquals(TargetLockWrapper.State.ACTIVE, tl1.getState()); + assertEquals(ownerKey, tl1.getOwnerKey()); + + // second lock -- should fail (lock in use) + TargetLockWrapper tl2 = + ownerAdapter.newTargetLock(key, ownerKey, owner, false); + assertLockUnavailable(owner, tl2); + + assertNull(owner.poll()); + assertFalse(tl2.isActive()); + assertEquals(TargetLockWrapper.State.FREE, tl2.getState()); + assertEquals(ownerKey, tl2.getOwnerKey()); + + // third and fourth locks -- should wait + TargetLockWrapper tl3 = ownerAdapter.newTargetLock(key, ownerKey, owner); + TargetLockWrapper tl4 = ownerAdapter.newTargetLock(key, ownerKey, owner); + assertNull(owner.poll(5, TimeUnit.SECONDS)); // nothing immediately + assertFalse(tl3.isActive()); + assertFalse(tl4.isActive()); + assertEquals(TargetLockWrapper.State.WAITING, tl3.getState()); + assertEquals(TargetLockWrapper.State.WAITING, tl4.getState()); + + // free third lock before ever getting a callback + assertTrue(tl3.free()); + assertFalse(tl3.isActive()); + assertEquals(TargetLockWrapper.State.FREE, tl3.getState()); + assertFalse(tl3.free()); + + // free first lock + assertTrue(tl1.free()); + assertFalse(tl1.isActive()); + assertEquals(TargetLockWrapper.State.FREE, tl1.getState()); + assertFalse(tl1.free()); // already free + + // fourth lock should be active now (or soon) + assertLockAvailable(owner, tl4); + assertNull(owner.poll()); + assertTrue(tl4.isActive()); + assertEquals(TargetLockWrapper.State.ACTIVE, tl4.getState()); + + // free fourth lock + assertTrue(tl4.free()); + assertFalse(tl4.isActive()); + assertEquals(TargetLockWrapper.State.FREE, tl4.getState()); + } + + /** + * Test sending of intra-server and inter-server messages. + */ + @Test + public void topicSendTests() throws InterruptedException { + ensureControllersInitialized(); + + // sender and receiver are the same + topicSendTest(Adapter.adapters[2], Adapter.adapters[2], false); + + // sender and receiver are different + topicSendTest(Adapter.adapters[5], Adapter.adapters[4], false); + } + + /** + * Send a message from 'sender' to 'receiver' -- the message is delivered + * as an incoming 'TopicListener' event, is processed by + * 'FeatureServerPool.beforeOffer' (PolicyControllerFeatureApi), which + * will route it based upon keyword. At the destination end, it should + * be converted to an instance of 'TestDroolsObject', and inserted into + * the Drools session. The associated Drools rule should fire, and the + * message is placed in the notification queue. The message is then + * retracted, unless the string 'SAVE' appears within the message, which + * is the case if the 'save' parameter is set. + * + * @param sender the adapter associated with the sending end + * @param receiver the adapter associated with the receiving end + * @param save if 'true' the message is not retracted, if 'false' it is + * retracted + */ + String topicSendTest(Adapter sender, Adapter receiver, boolean save) throws InterruptedException { + // generate base message -- include 'SAVE' in the message if 'save' is set + String message = "From " + sender.toString() + " to " + + receiver.toString(); + if (save) { + message += " (SAVE)"; + } + message += ": " + UUID.randomUUID().toString() + "."; + + // add a numeric suffix to the message, such that it will be routed + // to 'receiver' + message = receiver.findKey(message); + + // send the message + sender.sendEvent(message); + + // verify that it has been received + assertEquals(message, + receiver.notificationQueue().poll(60, TimeUnit.SECONDS)); + return message; + } + + /** + * Return the Adapter associated with the current lead server. + * + * @return the Adapter associated with the current lead server + * ('null' if there is no leader) + */ + private Adapter getLeader() { + for (Adapter adapter : Adapter.adapters) { + if (adapter.getLeader() == adapter.getServerStatic().getThisServer()) { + // we have located the leader + return adapter; + } + } + throw new AssertionError(); + } + + /** + * Test migration of sessions from one server to another. + */ + @Test + public void sessionMigrationTest() throws InterruptedException { + ensureControllersInitialized(); + + // select adapters for roles + Adapter sender = Adapter.adapters[1]; + Adapter receiver = Adapter.adapters[3]; + Adapter newReceiver = Adapter.adapters[5]; + + // determine current leader + Adapter leader = getLeader(); + + // send message from 'sender' to 'receiver', and save it + String message = topicSendTest(sender, receiver, true); + + // verify where the bucket is and is not + assertTrue(receiver.getBucketStatic().isKeyOnThisServer(message)); + assertFalse(newReceiver.getBucketStatic().isKeyOnThisServer(message)); + + // move to the new host + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + PrintStream out = new PrintStream(bos, true); + leader.getBucketStatic().moveBucket( + out, receiver.getBucketStatic().bucketNumber(message), + newReceiver.getServerStatic().getThisServer().getUuid().toString()); + logger.info(bos.toString()); + + // poll up to 10 seconds for the bucket to be updated + TestDroolsObject matchingObject = new TestDroolsObject(message); + await().atMost(50000L, TimeUnit.MILLISECONDS) + .until(() -> (new ArrayList(newReceiver.getKieSession().getObjects()) + .contains(matchingObject))); + + // verify where the bucket is and is not + assertFalse(receiver.getBucketStatic().isKeyOnThisServer(message)); + assertTrue(newReceiver.getBucketStatic().isKeyOnThisServer(message)); + } + + /** + * Test migration of locks from one server to another. + */ + @Test + public void lockMigrationTest() throws InterruptedException { + ensureControllersInitialized(); + + // select adapters for roles -- '*Server' refers to the 'key' end, + // and '*Client' refers to the 'ownerKey' end + final Adapter oldServer = Adapter.adapters[0]; + final Adapter newServer = Adapter.adapters[1]; + final Adapter oldClient = Adapter.adapters[2]; + final Adapter newClient = Adapter.adapters[3]; + + // determine the current leader + final Adapter leader = getLeader(); + + // choose 'key' and 'ownerKey' values associated with + // 'oldServer' and 'oldClient', respectively + String key = oldServer.findKey("key"); + String ownerKey = oldClient.findKey("owner"); + LockOwner owner = new LockOwner(); + + // allocate lock 1 + TargetLockWrapper tl1 = oldClient.newTargetLock(key, ownerKey, owner); + assertLockAvailable(owner, tl1); + + // allocate a lock 2, which should be in the 'WAITING' state + TargetLockWrapper tl2 = oldClient.newTargetLock(key, ownerKey, owner); + assertNull(owner.poll(5, TimeUnit.SECONDS)); // nothing immediately + + // verify lock states + assertEquals(TargetLockWrapper.State.ACTIVE, tl1.getState()); + assertEquals(TargetLockWrapper.State.WAITING, tl2.getState()); + + // verify key buckets (before) + assertTrue(oldServer.getBucketStatic().isKeyOnThisServer(key)); + assertFalse(newServer.getBucketStatic().isKeyOnThisServer(key)); + + // move key buckets to new host (server side) + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + PrintStream out = new PrintStream(bos, true); + leader.getBucketStatic().moveBucket( + out, oldServer.getBucketStatic().bucketNumber(key), + newServer.getServerStatic().getThisServer().getUuid().toString()); + logger.info(bos.toString()); + + logger.debug("lock migration test - before: new isKeyOnThisServer: {}, " + + "old isKeyOnThisServer: {}, time: {}", + newServer.getBucketStatic().isKeyOnThisServer(key), + oldServer.getBucketStatic().isKeyOnThisServer(key), + new SimpleDateFormat("yyyy-MM-dd kk:mm:ss").format(new Date())); + + await().atMost(10000L, TimeUnit.MILLISECONDS).until(() -> + newServer.getBucketStatic().isKeyOnThisServer(key) + && oldServer.getBucketStatic().isKeyOnThisServer(key) == false); + + logger.debug("lock migration test - after : new isKeyOnThisServer: {}, " + + "old isKeyOnThisServer: {}, time: {}", + newServer.getBucketStatic().isKeyOnThisServer(key), + oldServer.getBucketStatic().isKeyOnThisServer(key), + new SimpleDateFormat("yyyy-MM-dd kk:mm:ss").format(new Date())); + + // verify key buckets (after) + assertFalse(oldServer.getBucketStatic().isKeyOnThisServer(key)); + assertTrue(newServer.getBucketStatic().isKeyOnThisServer(key)); + + // we should be able to free lock1 now, and lock2 should go active, + // indicating that the server side is still working + assertTrue(tl1.free()); + + assertLockAvailable(owner, tl2); + assertEquals(TargetLockWrapper.State.ACTIVE, tl2.getState()); + + // create a third lock + TargetLockWrapper tl3 = oldClient.newTargetLock(key, ownerKey, owner); + assertNull(owner.poll(5, TimeUnit.SECONDS)); // nothing immediately + assertEquals(TargetLockWrapper.State.WAITING, tl3.getState()); + + // insert active objects in Drools session, which is about to be moved + // (if we don't do this, the client objects won't be relocated) + oldClient.getKieSession().insert(new KeywordWrapper(ownerKey, "lmt.owner", owner)); + oldClient.getKieSession().insert(new KeywordWrapper(ownerKey, "lmt.tl2", tl2)); + oldClient.getKieSession().insert(new KeywordWrapper(ownerKey, "lmt.tl3", tl3)); + + // dumping out some state information as part of debugging -- + // I see no reason to remove it now + { + bos = new ByteArrayOutputStream(); + out = new PrintStream(bos, true); + out.println("BEFORE: tl2=" + tl2 + "\ntl3=" + tl3); + oldClient.dumpLocks(out, true); + oldClient.getBucketStatic().dumpAdjuncts(out); + logger.debug(bos.toString()); + } + + // don't need these any more -- we will get them back on the new host + tl1 = tl2 = tl3 = null; + owner = null; + + // verify ownerKey buckets (before) + assertTrue(oldClient.getBucketStatic().isKeyOnThisServer(ownerKey)); + assertFalse(newClient.getBucketStatic().isKeyOnThisServer(ownerKey)); + + // move ownerKey buckets to new host (client side) + bos = new ByteArrayOutputStream(); + out = new PrintStream(bos, true); + leader.getBucketStatic().moveBucket( + out, oldClient.getBucketStatic().bucketNumber(ownerKey), + newClient.getServerStatic().getThisServer().getUuid().toString()); + logger.info(bos.toString()); + + logger.debug("lock migration test2 - before: new isKeyOnThisServer: {}, " + + "old isKeyOnThisServer: {}, time: {}", + newClient.getBucketStatic().isKeyOnThisServer(ownerKey), + oldClient.getBucketStatic().isKeyOnThisServer(ownerKey), + new SimpleDateFormat("yyyy-MM-dd kk:mm:ss").format(new Date())); + + await().atMost(Durations.FIVE_SECONDS) + .with().pollInterval(Durations.ONE_HUNDRED_MILLISECONDS) + .until(() -> newClient.getBucketStatic().isKeyOnThisServer(ownerKey) + && oldClient.getBucketStatic().isKeyOnThisServer(ownerKey) == false); + + logger.debug("lock migration test2 - before: new isKeyOnThisServer: {}, " + + "old isKeyOnThisServer: {}, time: {}", + newClient.getBucketStatic().isKeyOnThisServer(ownerKey), + oldClient.getBucketStatic().isKeyOnThisServer(ownerKey), + new SimpleDateFormat("yyyy-MM-dd kk:mm:ss").format(new Date())); + + // verify ownerKey buckets (before) + assertFalse(oldClient.getBucketStatic().isKeyOnThisServer(ownerKey)); + assertTrue(newClient.getBucketStatic().isKeyOnThisServer(ownerKey)); + + // now, we need to locate 'tl2', 'tl3', and 'owner' in Drools memory + await().atMost(Durations.FIVE_SECONDS) + .with().pollInterval(Durations.ONE_SECOND) + .until(() -> newClient.getKieSession() != null); + KieSession kieSession = newClient.getKieSession(); + for (Object obj : new ArrayList(kieSession.getObjects())) { + if (obj instanceof KeywordWrapper) { + KeywordWrapper kw = (KeywordWrapper)obj; + + if ("lmt.owner".equals(kw.id)) { + owner = kw.getObject(LockOwner.class); + } else if ("lmt.tl2".equals(kw.id)) { + tl2 = kw.getObject(TargetLockWrapper.class); + } else if ("lmt.tl3".equals(kw.id)) { + tl3 = kw.getObject(TargetLockWrapper.class); + } + kieSession.delete(kieSession.getFactHandle(obj)); + } + } + + // make sure we found everything + assertNotNull(tl2); + assertNotNull(tl3); + assertNotNull(owner); + assertFalse(newClient.isForeign(tl2, tl3, owner)); + + // verify the states of 'tl2' and 'tl3' + assertEquals(TargetLockWrapper.State.ACTIVE, tl2.getState()); + assertEquals(TargetLockWrapper.State.WAITING, tl3.getState()); + + // dumping out some state information as part of debugging -- + // I see no reason to remove it now + { + bos = new ByteArrayOutputStream(); + out = new PrintStream(bos, true); + out.println("AFTER: tl2=" + tl2 + "\ntl3=" + tl3); + newClient.dumpLocks(out, true); + newClient.getBucketStatic().dumpAdjuncts(out); + logger.debug(bos.toString()); + } + + // now, we should be able to free 'tl2', and 'tl3' should go active + assertNull(owner.poll(5, TimeUnit.SECONDS)); // nothing immediately + assertTrue(tl2.free()); + + assertLockAvailable(owner, tl3); + assertEquals(TargetLockWrapper.State.ACTIVE, tl3.getState()); + assertTrue(tl3.free()); + } + + private void assertLockAvailable(LockOwner owner, TargetLockWrapper tl) { + AtomicReference objArray = new AtomicReference<>(); + await().atMost(300000, TimeUnit.MILLISECONDS).until(() -> { + objArray.set(owner.poll(5, TimeUnit.SECONDS)); + return objArray.get() != null; + }); + assertArrayEquals(new Object[] {"lockAvailable", tl}, objArray.get()); + } + + private void assertLockUnavailable(LockOwner owner, TargetLockWrapper tl) { + AtomicReference objArray = new AtomicReference<>(); + await().atMost(300000, TimeUnit.MILLISECONDS).until(() -> { + objArray.set(owner.poll(5, TimeUnit.SECONDS)); + return objArray.get() != null; + }); + assertArrayEquals(new Object[] {"lockUnavailable", tl}, objArray.get()); + } + + /** + * Test cleanup of locks that have been abandoned. + */ + @Test + public void abandonedLocks() throws InterruptedException { + // choose adapters + Adapter keyAdapter = Adapter.adapters[3]; + Adapter ownerAdapter = Adapter.adapters[4]; + + // generate compatible keys + String key = keyAdapter.findKey("abandonedLocks.key"); + String ownerKey = ownerAdapter.findKey("abandonedLocks.owner"); + + // receiver of callback notifications + LockOwner owner = new LockOwner(); + + // first lock -- should succeed + TargetLockWrapper tl1 = ownerAdapter.newTargetLock(key, ownerKey, owner); + //assertLockAvailable(owner, tl1); + assertArrayEquals(new Object[] {"lockAvailable", tl1}, + owner.poll(5, TimeUnit.SECONDS)); + + // second lock -- should wait + final TargetLockWrapper tl2 = ownerAdapter.newTargetLock(key, ownerKey, owner); + assertNull(owner.poll(5, TimeUnit.SECONDS)); // nothing immediately + + // abandon first lock, and do a GC cycle -- tl2 should go active + tl1 = null; + System.gc(); + //assertLockAvailable(owner, tl2); + assertArrayEquals(new Object[] {"lockAvailable", tl2}, + owner.poll(5, TimeUnit.SECONDS)); + assertTrue(tl2.isActive()); + assertEquals(TargetLockWrapper.State.ACTIVE, tl2.getState()); + + // free tl2 + assertTrue(tl2.free()); + assertFalse(tl2.isActive()); + assertEquals(TargetLockWrapper.State.FREE, tl2.getState()); + } + + /** + * Test locks within Drools sessions. + */ + @Test + public void locksWithinDrools() throws InterruptedException { + ensureControllersInitialized(); + + // choose adapters + Adapter keyAdapter = Adapter.adapters[3]; + Adapter ownerAdapter = Adapter.adapters[4]; + + // generate compatible keys + final String key = keyAdapter.findKey("locksWithinDrools.key"); + final String ownerKey = ownerAdapter.findKey("locksWithinDrools.owner"); + + // need a 'LockOwner' variant + final LockOwner owner = new LockOwner() { + /** + * {@inheritDoc} + */ + @Override + public void lockAvailable(TargetLockWrapper lock) { + // insert notification in 'LinkedBlockingQueue' + add(new Object[] {"lockAvailable", lock, Thread.currentThread()}); + } + + /** + * {@inheritDoc} + */ + @Override + public void lockUnavailable(TargetLockWrapper lock) { + // insert notification in 'LinkedBlockingQueue' + add(new Object[] {"lockUnavailable", lock, Thread.currentThread()}); + } + }; + + // generate first lock outside of Drools + final TargetLockWrapper tl1 = ownerAdapter.newTargetLock(key, ownerKey, owner); + Object[] response = owner.poll(5, TimeUnit.SECONDS); + assertNotNull(response); + assertEquals(3, response.length); + assertEquals("lockAvailable", response[0]); + assertEquals(tl1, response[1]); + + // now, generate one from within Drools + ownerAdapter.getKieSession().insert(new DroolsRunnable() { + @Override + public void run() { + // create lock, which should block + TargetLockWrapper tl2 = + ownerAdapter.newTargetLock(key, ownerKey, owner); + owner.add(new Object[] {"tl2Data", tl2, Thread.currentThread()}); + } + }); + + // fetch data from Drools thread + response = owner.poll(5, TimeUnit.SECONDS); + assertNotNull(response); + assertEquals(3, response.length); + assertEquals("tl2Data", response[0]); + + TargetLockWrapper tl2 = null; + Thread droolsThread = null; + + if (response[1] instanceof TargetLockWrapper) { + tl2 = (TargetLockWrapper) response[1]; + } + if (response[2] instanceof Thread) { + droolsThread = (Thread) response[2]; + } + + assertNotNull(tl2); + assertNotNull(droolsThread); + + // tl2 should still be waiting + assertNull(owner.poll(5, TimeUnit.SECONDS)); + assertFalse(tl2.isActive()); + assertEquals(TargetLockWrapper.State.WAITING, tl2.getState()); + + // free tl1 + assertTrue(tl1.free()); + + // verify that 'tl2' is now available, + // and the call back ran in the Drools thread + assertArrayEquals(new Object[] {"lockAvailable", tl2, droolsThread}, + owner.poll(5, TimeUnit.SECONDS)); + assertTrue(tl2.isActive()); + assertEquals(TargetLockWrapper.State.ACTIVE, tl2.getState()); + + // free tl2 + assertTrue(tl2.free()); + } + + /** + * Test insertion of objects into Drools memory. + */ + @Test + public void insertDrools() throws InterruptedException { + Adapter adapter1 = Adapter.adapters[1]; + final Adapter adapter2 = Adapter.adapters[2]; + + // check whether we can insert objects locally (adapter1 -> adapter1) + String key1 = adapter1.findKey("insertDrools1-"); + adapter1.insertDrools(new KeywordWrapper(key1, "insertDroolsLocal", null)); + + await().atMost(Durations.TEN_SECONDS) + .with().pollInterval(Durations.ONE_SECOND) + .until(() -> adapter1.getKieSession() != null); + + KieSession kieSession; + boolean found = false; + kieSession = adapter1.getKieSession(); + for (Object obj : new ArrayList(kieSession.getObjects())) { + if (obj instanceof KeywordWrapper + && "insertDroolsLocal".equals(((KeywordWrapper) obj).id)) { + found = true; + kieSession.delete(kieSession.getFactHandle(obj)); + break; + } + } + assertTrue(found); + + // check whether we can insert objects remotely (adapter1 -> adapter2) + String key2 = adapter2.findKey("insertDrools2-"); + adapter1.insertDrools(new KeywordWrapper(key2, "insertDroolsRemote", null)); + + // it would be nice to test for this, rather than sleep + await().atMost(Durations.FIVE_SECONDS) + .with().pollInterval(Durations.ONE_HUNDRED_MILLISECONDS) + .until(() -> adapter2.getKieSession() != null); + + found = false; + kieSession = adapter2.getKieSession(); + for (Object obj : new ArrayList(kieSession.getObjects())) { + if (obj instanceof KeywordWrapper + && "insertDroolsRemote".equals(((KeywordWrapper) obj).id)) { + found = true; + kieSession.delete(kieSession.getFactHandle(obj)); + break; + } + } + assertTrue(found); + } + + /* ============================================================ */ + + /** + * This class implements the 'LockCallback' interface, and + * makes callback responses available via a 'LinkedBlockingQueue'. + */ + public static class LockOwner extends LinkedBlockingQueue + implements TargetLockWrapper.Owner, Serializable { + /** + * Constructor -- initialize the 'LinkedBlockingQueue'. + */ + public LockOwner() { + super(); + } + + /** + * {@inheritDoc} + */ + @Override + public void lockAvailable(TargetLockWrapper lock) { + // insert notification in 'LinkedBlockingQueue' + add(new Object[] {"lockAvailable", lock}); + } + + /** + * {@inheritDoc} + */ + @Override + public void lockUnavailable(TargetLockWrapper lock) { + // insert notification in 'LinkedBlockingQueue' + add(new Object[] {"lockUnavailable", lock}); + } + } + + /* ============================================================ */ + + /** + * This class is used to insert objects in Drools memory to support + * testing. + */ + public static class KeywordWrapper implements Serializable { + // this is the keyword, which determines the associated bucket, + // which then determines when this object is migrated + public String key; + + // this is an identifier, which can be used to select objects + // on the receiving end + public String id; + + // this is the object being wrapped + public Serializable obj; + + /** + * Constructor -- initialize fields. + * + * @param key keyword, which determines the associated bucket + * @param id string identifier, used to match objects from the sending + * to the receiving end + * @param obj the object being wrapped + */ + public KeywordWrapper(String key, String id, Serializable obj) { + this.key = key; + this.id = id; + this.obj = obj; + } + + /** + * This is used to extract objects on the receiving end. If the class + * matches, we get the expected object. If the class does not match, + * we get 'null', and the test should fail. + * + * @param clazz the expected class of the 'obj' field + * @return the object (if 'clazz' matches), 'null' if it does not + */ + public T getObject(Class clazz) { + return clazz.isInstance(obj) ? clazz.cast(obj) : null; + } + } +} diff --git a/feature-server-pool/src/test/java/org/onap/policy/drools/serverpooltest/TestDroolsObject.java b/feature-server-pool/src/test/java/org/onap/policy/drools/serverpooltest/TestDroolsObject.java new file mode 100644 index 00000000..43aa5de7 --- /dev/null +++ b/feature-server-pool/src/test/java/org/onap/policy/drools/serverpooltest/TestDroolsObject.java @@ -0,0 +1,58 @@ +/* + * ============LICENSE_START======================================================= + * feature-server-pool + * ================================================================================ + * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.drools.serverpooltest; + +import java.io.Serializable; +import java.util.Objects; + +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; + +/** + * Instances of this class can be inserted into a Drools session, and used + * to test things like message routing and bucket migration. + */ +@Getter +@Setter +@ToString +@EqualsAndHashCode +public class TestDroolsObject implements Serializable { + // determines the bucket number + private String key; + + /** + * Constructor - no key specified. + */ + public TestDroolsObject() { + this.key = null; + } + + /** + * Constructor - initialize the key. + * + * @param key key that is hashed to determine the bucket number + */ + public TestDroolsObject(String key) { + this.key = key; + } +} -- cgit 1.2.3-korg