From 001320ed1ecbdf3b2f58d261f008f681da3f4c67 Mon Sep 17 00:00:00 2001
From: "Straubs, Ralph (rs8887)"
Date: Tue, 4 Feb 2020 03:26:30 -0600
Subject: Add feature-server-pool to the ONAP drools-pdp repository.
Issue-ID: POLICY-2351
Change-Id: I8ddde547a73a51c04c8dd9f1d66232e8281599a9
Signed-off-by: Straubs, Ralph (rs8887)
---
api-server-pool-state-control/pom.xml | 46 +
.../control/api/DroolsPdpStateControlApi.java | 40 +
.../api/DroolsPdpStateControlApiConstants.java | 37 +
feature-server-pool/lombok.config | 3 +
feature-server-pool/pom.xml | 178 ++
feature-server-pool/src/assembly/assemble_zip.xml | 75 +
.../feature/config/feature-server-pool.properties | 138 +
.../org/onap/policy/drools/serverpool/Bucket.java | 2495 +++++++++++++++++
.../onap/policy/drools/serverpool/Discovery.java | 354 +++
.../org/onap/policy/drools/serverpool/Events.java | 103 +
.../serverpool/ExtendedObjectInputStream.java | 70 +
.../drools/serverpool/FeatureServerPool.java | 986 +++++++
.../org/onap/policy/drools/serverpool/Keyword.java | 507 ++++
.../org/onap/policy/drools/serverpool/Leader.java | 573 ++++
.../onap/policy/drools/serverpool/MainLoop.java | 186 ++
.../policy/drools/serverpool/RestServerPool.java | 447 ++++
.../org/onap/policy/drools/serverpool/Server.java | 1352 ++++++++++
.../policy/drools/serverpool/ServerPoolApi.java | 79 +
.../drools/serverpool/ServerPoolProperties.java | 332 +++
.../onap/policy/drools/serverpool/TargetLock.java | 2821 ++++++++++++++++++++
.../org/onap/policy/drools/serverpool/Util.java | 181 ++
.../drools/serverpool/persistence/Persistence.java | 875 ++++++
...icy.drools.control.api.DroolsPdpStateControlApi | 1 +
...onap.policy.drools.core.PolicySessionFeatureApi | 2 +
...licy.drools.features.PolicyControllerFeatureApi | 1 +
...p.policy.drools.features.PolicyEngineFeatureApi | 1 +
...org.onap.policy.drools.serverpool.ServerPoolApi | 1 +
.../onap/policy/drools/serverpool/AdapterImpl.java | 456 ++++
.../drools/serverpool/BucketWrapperImpl.java | 173 ++
.../drools/serverpool/ServerWrapperImpl.java | 146 +
.../drools/serverpool/TargetLockWrapperImpl.java | 195 ++
.../onap/policy/drools/serverpooltest/Adapter.java | 353 +++
.../drools/serverpooltest/BlockingClassLoader.java | 176 ++
.../drools/serverpooltest/BucketWrapper.java | 132 +
.../drools/serverpooltest/ServerWrapper.java | 103 +
.../policy/drools/serverpooltest/SimDmaap.java | 327 +++
.../drools/serverpooltest/TargetLockWrapper.java | 98 +
.../onap/policy/drools/serverpooltest/Test1.java | 912 +++++++
.../drools/serverpooltest/TestDroolsObject.java | 58 +
.../resources/TestController-controller.properties | 13 +
.../src/test/resources/drools-artifact-1.1/pom.xml | 31 +
.../src/main/resources/META-INF/kmodule.xml | 25 +
.../src/main/resources/rules.drl | 51 +
.../install/src/files/feature-server-pool.conf | 30 +
pom.xml | 2 +
45 files changed, 15165 insertions(+)
create mode 100644 api-server-pool-state-control/pom.xml
create mode 100644 api-server-pool-state-control/src/main/java/org/onap/policy/drools/control/api/DroolsPdpStateControlApi.java
create mode 100644 api-server-pool-state-control/src/main/java/org/onap/policy/drools/control/api/DroolsPdpStateControlApiConstants.java
create mode 100644 feature-server-pool/lombok.config
create mode 100644 feature-server-pool/pom.xml
create mode 100644 feature-server-pool/src/assembly/assemble_zip.xml
create mode 100644 feature-server-pool/src/main/feature/config/feature-server-pool.properties
create mode 100644 feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Bucket.java
create mode 100644 feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Discovery.java
create mode 100644 feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Events.java
create mode 100644 feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/ExtendedObjectInputStream.java
create mode 100644 feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/FeatureServerPool.java
create mode 100644 feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Keyword.java
create mode 100644 feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Leader.java
create mode 100644 feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/MainLoop.java
create mode 100644 feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/RestServerPool.java
create mode 100644 feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Server.java
create mode 100644 feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/ServerPoolApi.java
create mode 100644 feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/ServerPoolProperties.java
create mode 100644 feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/TargetLock.java
create mode 100644 feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Util.java
create mode 100644 feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/persistence/Persistence.java
create mode 100644 feature-server-pool/src/main/resources/META-INF/services/org.onap.policy.drools.control.api.DroolsPdpStateControlApi
create mode 100644 feature-server-pool/src/main/resources/META-INF/services/org.onap.policy.drools.core.PolicySessionFeatureApi
create mode 100644 feature-server-pool/src/main/resources/META-INF/services/org.onap.policy.drools.features.PolicyControllerFeatureApi
create mode 100644 feature-server-pool/src/main/resources/META-INF/services/org.onap.policy.drools.features.PolicyEngineFeatureApi
create mode 100644 feature-server-pool/src/main/resources/META-INF/services/org.onap.policy.drools.serverpool.ServerPoolApi
create mode 100644 feature-server-pool/src/test/java/org/onap/policy/drools/serverpool/AdapterImpl.java
create mode 100644 feature-server-pool/src/test/java/org/onap/policy/drools/serverpool/BucketWrapperImpl.java
create mode 100644 feature-server-pool/src/test/java/org/onap/policy/drools/serverpool/ServerWrapperImpl.java
create mode 100644 feature-server-pool/src/test/java/org/onap/policy/drools/serverpool/TargetLockWrapperImpl.java
create mode 100644 feature-server-pool/src/test/java/org/onap/policy/drools/serverpooltest/Adapter.java
create mode 100644 feature-server-pool/src/test/java/org/onap/policy/drools/serverpooltest/BlockingClassLoader.java
create mode 100644 feature-server-pool/src/test/java/org/onap/policy/drools/serverpooltest/BucketWrapper.java
create mode 100644 feature-server-pool/src/test/java/org/onap/policy/drools/serverpooltest/ServerWrapper.java
create mode 100644 feature-server-pool/src/test/java/org/onap/policy/drools/serverpooltest/SimDmaap.java
create mode 100644 feature-server-pool/src/test/java/org/onap/policy/drools/serverpooltest/TargetLockWrapper.java
create mode 100644 feature-server-pool/src/test/java/org/onap/policy/drools/serverpooltest/Test1.java
create mode 100644 feature-server-pool/src/test/java/org/onap/policy/drools/serverpooltest/TestDroolsObject.java
create mode 100644 feature-server-pool/src/test/resources/TestController-controller.properties
create mode 100644 feature-server-pool/src/test/resources/drools-artifact-1.1/pom.xml
create mode 100644 feature-server-pool/src/test/resources/drools-artifact-1.1/src/main/resources/META-INF/kmodule.xml
create mode 100644 feature-server-pool/src/test/resources/drools-artifact-1.1/src/main/resources/rules.drl
create mode 100644 packages/install/src/files/feature-server-pool.conf
diff --git a/api-server-pool-state-control/pom.xml b/api-server-pool-state-control/pom.xml
new file mode 100644
index 00000000..790ea506
--- /dev/null
+++ b/api-server-pool-state-control/pom.xml
@@ -0,0 +1,46 @@
+
+
+
+
+ 4.0.0
+
+
+ org.onap.policy.drools-pdp
+ drools-pdp
+ 1.6.0-SNAPSHOT
+
+
+ api-server-pool-state-control
+
+ api-server-pool-state-control
+ APIs for server pool state control
+
+
+
+ org.onap.policy.drools-pdp
+ policy-core
+ ${project.version}
+ provided
+
+
+
+
\ No newline at end of file
diff --git a/api-server-pool-state-control/src/main/java/org/onap/policy/drools/control/api/DroolsPdpStateControlApi.java b/api-server-pool-state-control/src/main/java/org/onap/policy/drools/control/api/DroolsPdpStateControlApi.java
new file mode 100644
index 00000000..7148f30e
--- /dev/null
+++ b/api-server-pool-state-control/src/main/java/org/onap/policy/drools/control/api/DroolsPdpStateControlApi.java
@@ -0,0 +1,40 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * drools-pdp-state-control-api
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.control.api;
+
+import org.onap.policy.common.utils.services.OrderedService;
+
+public interface DroolsPdpStateControlApi extends OrderedService {
+
+ /**
+ * This method is called when wanting to interrupt the operation of the
+ * drools pdp. It locks the endpoints, stops the message processing
+ * and removes the instance of the drools pdp from the pool.
+ */
+ void shutdown();
+
+ /**
+ * This method is called when wanting to resume the operation of the
+ * drools pdp. It unlocks the endpoints, resumes message processing
+ * and adds the instance of the drools pdp to the pool.
+ */
+ void restart();
+}
diff --git a/api-server-pool-state-control/src/main/java/org/onap/policy/drools/control/api/DroolsPdpStateControlApiConstants.java b/api-server-pool-state-control/src/main/java/org/onap/policy/drools/control/api/DroolsPdpStateControlApiConstants.java
new file mode 100644
index 00000000..c82003f3
--- /dev/null
+++ b/api-server-pool-state-control/src/main/java/org/onap/policy/drools/control/api/DroolsPdpStateControlApiConstants.java
@@ -0,0 +1,37 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * drools-pdp-state-control-api
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.control.api;
+
+import org.onap.policy.common.utils.services.OrderedServiceImpl;
+
+public class DroolsPdpStateControlApiConstants {
+
+ /**
+ * 'FeatureAPI.impl.getList()' returns an ordered list of objects
+ * implementing the 'FeatureAPI' interface.
+ */
+ public static OrderedServiceImpl impl =
+ new OrderedServiceImpl<>(DroolsPdpStateControlApi.class);
+
+ private DroolsPdpStateControlApiConstants() {
+ // do nothing
+ }
+}
diff --git a/feature-server-pool/lombok.config b/feature-server-pool/lombok.config
new file mode 100644
index 00000000..c8811fdb
--- /dev/null
+++ b/feature-server-pool/lombok.config
@@ -0,0 +1,3 @@
+config.stopBubbling = true
+lombok.addLombokGeneratedAnnotation = true
+lombok.nonNull.exceptionType = IllegalArgumentException
\ No newline at end of file
diff --git a/feature-server-pool/pom.xml b/feature-server-pool/pom.xml
new file mode 100644
index 00000000..177f88ae
--- /dev/null
+++ b/feature-server-pool/pom.xml
@@ -0,0 +1,178 @@
+
+
+
+
+ 4.0.0
+
+
+ org.onap.policy.drools-pdp
+ drools-pdp
+ 1.6.0-SNAPSHOT
+
+
+ feature-server-pool
+
+ feature-server-pool
+ Allows multiple DroolsPDP hosts to be active at once
+
+
+
+
+ maven-assembly-plugin
+
+
+ zipfile
+
+ single
+
+ package
+
+ true
+ ${project.artifactId}-${project.version}
+
+ src/assembly/assemble_zip.xml
+
+ false
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-dependency-plugin
+ 3.1.1
+
+
+ copy-dependencies
+
+ copy-dependencies
+
+ prepare-package
+
+ false
+ ${project.build.directory}/assembly/lib
+ false
+ true
+ true
+ false
+ false
+ false
+ runtime
+ true
+
+
+
+
+
+
+
+
+
+ org.onap.policy.drools-pdp
+ policy-core
+ ${project.version}
+ provided
+
+
+
+ org.onap.policy.common
+ policy-endpoints
+ ${policy.common.version}
+ provided
+
+
+
+ org.onap.policy.common
+ utils
+ ${policy.common.version}
+ provided
+
+
+
+ org.onap.policy.drools-pdp
+ policy-management
+ ${project.version}
+ provided
+
+
+
+ com.att.nsa
+ cambriaClient
+ provided
+
+
+
+ org.onap.policy.drools-pdp
+ api-server-pool-state-control
+ ${project.version}
+
+
+
+ org.glassfish.jersey.core
+ jersey-common
+ test
+
+
+
+ org.glassfish.jersey.media
+ jersey-media-json-jackson
+ test
+
+
+
+ com.google.guava
+ guava
+ test
+
+
+
+ org.powermock
+ powermock-api-mockito2
+ test
+
+
+
+ org.powermock
+ powermock-module-junit4
+ test
+
+
+
+ junit
+ junit
+ provided
+
+
+
+ org.assertj
+ assertj-core
+ test
+
+
+
+ org.awaitility
+ awaitility
+ test
+
+
+
+
diff --git a/feature-server-pool/src/assembly/assemble_zip.xml b/feature-server-pool/src/assembly/assemble_zip.xml
new file mode 100644
index 00000000..e735a8d6
--- /dev/null
+++ b/feature-server-pool/src/assembly/assemble_zip.xml
@@ -0,0 +1,75 @@
+
+
+
+
+
+ feature-server-pool-package
+
+ zip
+
+
+ false
+
+
+
+ target
+ lib/feature
+
+ feature-server-pool-${project.version}.jar
+
+
+
+ target/assembly/lib
+ lib/dependencies
+
+ *.jar
+
+
+
+ src/main/feature/config
+ config
+ 0644
+
+
+
+ src/main/feature/bin
+ bin
+ 0744
+
+
+
+ src/main/feature/db
+ db
+ 0744
+
+
+
+ src/main/feature/install
+ install
+ 0744
+
+
+
+
+
diff --git a/feature-server-pool/src/main/feature/config/feature-server-pool.properties b/feature-server-pool/src/main/feature/config/feature-server-pool.properties
new file mode 100644
index 00000000..7be2167f
--- /dev/null
+++ b/feature-server-pool/src/main/feature/config/feature-server-pool.properties
@@ -0,0 +1,138 @@
+###
+# ============LICENSE_START=======================================================
+# feature-server-pool
+# ================================================================================
+# Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+###
+
+# The following properties control the IP address and port that a given
+# server binds to. The default will bind to all interfaces on the host,
+# and choose a port number at random.
+
+server.pool.server.ipAddress=0.0.0.0
+server.pool.server.port=${envd:SERVER_POOL_PORT}
+
+# The following properties determine whether HTTPS is used -- note that HTTPS
+# also requires that the 'java.net.ssl.*' parameters in 'system.properties' be
+# specified, and the key store and trust store be configured, as appropriate.
+server.pool.server.https=${envd:SERVER_POOL_HTTPS}
+server.pool.server.selfSignedCerts=false
+
+# The IP address and port that servers on the geo-redundant site
+# should use to connect to servers on this site.
+server.pool.server.site.ip=${envd:SERVER_POOL_SITE_IP}
+server.pool.server.site.port=${envd:SERVER_POOL_SITE_PORT}
+
+# A comma-separated list of host names -- if an entry is found that refers
+# to an HTTP/HTTPS destination IP address, the host name will used as the
+# destination, instead of the IP address
+server.pool.server.hostlist=${envd:SERVER_POOL_HOST_LIST}
+
+# The servers send 'pings' to each other once per main loop cycle. They
+# also measure the gap between 'pings' from each server, and calculate
+# an allowed time gap based upon this. 'server.pool.server.allowedGap' is the initial
+# allowed gap prior to receiving any pings (default=30 seconds), and
+# 'server.pool.server.adaptiveGapAdjust' is a value that is added to the calculated
+# gap "just in case" (default=5 seconds)
+
+server.pool.server.allowedGap=30000
+server.pool.server.adaptiveGapAdjust=5000
+
+# 'connectTimeout' and 'readTimeout' affect the client end of a REST
+# connection (default=10 seconds each)
+
+server.pool.server.connectTimeout=10000
+server.pool.server.readTimeout=10000
+
+# Each server has a thread pool per remote server, which is used when
+# sending HTTP REST messages -- the following parameters determine the
+# configuration.
+
+server.pool.server.threads.corePoolSize=5
+server.pool.server.threads.maximumPoolSize=10
+server.pool.server.threads.keepAliveTime=5000
+
+# The server pool members use a UEB/DMAAP topic to connect with other
+# servers in the pool. The following set of parameters are passed to
+# the CambriaClient library, and are used in setting up the consumer and
+# publisher ends of the connection. 'discovery.servers' and 'discovery.topic'
+# are the minimum values that need to be specified. The last parameter in
+# this set, 'discovery.publisherLoopCycleTime' isn't passed to the
+# CambriaClient library; instead, it controls how frequently the 'ping'
+# messages are sent out on this channel. Note that only the lead server
+# keeps this channel open long-term.
+
+server.pool.discovery.servers=${envd:SERVER_POOL_DISCOVERY_SERVERS}
+server.pool.discovery.topic=${envd:SERVER_POOL_DISCOVERY_TOPIC}
+server.pool.discovery.username=${envd:SERVER_POOL_DISCOVERY_USERNAME}
+server.pool.discovery.password=${envd:SERVER_POOL_DISCOVERY_PASSWORD}
+server.pool.discovery.https=${envd:DMAAP_USE_HTTPS}
+server.pool.discovery.apiKey=
+server.pool.discovery.apiSecret=
+#server.pool.discovery.publisherSocketTimeout=5000
+#server.pool.discovery.consumerSocketTimeout=70000
+server.pool.discovery.fetchTimeout=60000
+server.pool.discovery.fetchLimit=100
+server.pool.discovery.selfSignedCertificates=false
+server.pool.discovery.publisherLoopCycleTime=5000
+
+# The 'leader.*' parameters affect behavior during an election. The value of
+# 'mainLoop.cycle' determines the actual time delay. 'leader.stableIdCycles'
+# is the required minimum number of "stable" cycles before voting can start
+# (in this case, "stable" means no servers added or failing). Once voting has
+# started, "leader.stableVotingCycles' is the minimum number of "stable"
+# cycles needed before declaring a winner (in this case, "stable" means no
+# votes changing).
+
+server.pool.leader.stableIdleCycles=5
+server.pool.leader.stableVotingCycles=5
+
+# The value of 'mainLoop.cycle' (default = 1 second) determines how frequently
+# various events occur, such as the sending of 'ping' messages, and the
+# duration of a "cycle" while voting for a lead server.
+
+server.pool.mainLoop.cycle=1000
+
+# 'keyword.path' is used when looking for "keywords" within JSON messages.
+# The first keyword located is hashed to determine which bucket to route
+# through.
+
+keyword.path=requestID,CommonHeader.RequestID,body.output.common-header.request-id,result-info.request-id:uuid
+# 'keyword..lookup' is used to locate "keywords" within objects.
+# The 'value' field contains a list of method calls or field names separated
+# by '.' that are used to locate the keyword
+# (e.g. 'method1().field2.method3()')
+
+keyword.org.onap.policy.m2.base.Transaction.lookup=getRequestID()
+keyword.org.onap.policy.controlloop.ControlLoopEvent.lookup=requestID
+keyword.org.onap.policy.appclcm.LcmRequestWrapper.lookup=getBody().getCommonHeader().getRequestId()
+keyword.org.onap.policy.appclcm.LcmResponseWrapper.lookup=getBody().getCommonHeader().getRequestId()
+keyword.org.onap.policy.drools.serverpool.TargetLock.lookup=getOwnerKey()
+
+# The following properties affect distributed 'TargetLock' behavior.
+#
+# server.pool.lock.ttl - controls how many hops a 'TargetLock' message can take
+# server.pool.lock.audit.period - how frequently should the audit run?
+# server.pool.lock.audit.gracePeriod - how long to wait after bucket reassignments
+# before running the audit again
+# server.pool.lock.audit.retryDelay - mismatches can occur due to the transient nature
+# of the lock state: this property controls how long to wait before
+# trying again
+
+server.pool.lock.ttl=3
+server.pool.lock.audit.period=300000
+server.pool.lock.audit.gracePeriod=60000
+server.pool.lock.audit.retryDelay=5000
\ No newline at end of file
diff --git a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Bucket.java b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Bucket.java
new file mode 100644
index 00000000..2236506e
--- /dev/null
+++ b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Bucket.java
@@ -0,0 +1,2495 @@
+/*
+ * ============LICENSE_START=======================================================
+ * feature-server-pool
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.serverpool;
+
+import static org.onap.policy.drools.serverpool.ServerPoolProperties.BUCKET_CONFIRMED_TIMEOUT;
+import static org.onap.policy.drools.serverpool.ServerPoolProperties.BUCKET_TIME_TO_LIVE;
+import static org.onap.policy.drools.serverpool.ServerPoolProperties.BUCKET_UNCONFIRMED_GRACE_PERIOD;
+import static org.onap.policy.drools.serverpool.ServerPoolProperties.BUCKET_UNCONFIRMED_TIMEOUT;
+import static org.onap.policy.drools.serverpool.ServerPoolProperties.DEFAULT_BUCKET_CONFIRMED_TIMEOUT;
+import static org.onap.policy.drools.serverpool.ServerPoolProperties.DEFAULT_BUCKET_TIME_TO_LIVE;
+import static org.onap.policy.drools.serverpool.ServerPoolProperties.DEFAULT_BUCKET_UNCONFIRMED_GRACE_PERIOD;
+import static org.onap.policy.drools.serverpool.ServerPoolProperties.DEFAULT_BUCKET_UNCONFIRMED_TIMEOUT;
+import static org.onap.policy.drools.serverpool.ServerPoolProperties.getProperty;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.io.Serializable;
+import java.net.InetSocketAddress;
+import java.nio.charset.StandardCharsets;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.Base64;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Queue;
+import java.util.Random;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.LinkedTransferQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.client.WebTarget;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+import lombok.Getter;
+import lombok.Setter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The server pool uses an algorithmic way to map things like transactions
+ * (identified by a 'requestID') and locks (identified by a string key)
+ * into a server handling that transaction or lock. It does this by mapping
+ * the string name into one of a set of predefined hash buckets, with each
+ * bucket being assigned to one of the active servers.
+ * In other words:
+ * string key -> hash bucket (fixed mapping, known to all servers)
+ * hash bucket -> server (assignments may change when servers go up or down,
+ * but remains fairly static when the system is stable)
+ * With this approach, there is no global dynamic table that needs to be
+ * updated as transactions, or other objects come and go.
+ * Each instance of class 'Bucket' corresponds to one of the hash buckets,
+ * there are static methods that provide the overall abstraction, as well
+ * as some supporting classes.
+ */
+
+@Getter
+@Setter
+public class Bucket {
+ private static Logger logger = LoggerFactory.getLogger(Bucket.class);
+
+ /*
+ * Listener class to handle state changes that may lead to
+ * reassignments of buckets
+ */
+ private static EventHandler eventHandler = new EventHandler();
+
+ // Used to hash keywords into buckets
+ private static MessageDigest messageDigest;
+
+ static {
+ // register Listener class
+ Events.register(eventHandler);
+
+ // create MD5 MessageDigest -- used to hash keywords
+ try {
+ messageDigest = MessageDigest.getInstance("MD5");
+ } catch (NoSuchAlgorithmException e) {
+ throw new ExceptionInInitializerError(e);
+ }
+ }
+
+ /*
+ * Values extracted from properties
+ */
+
+ private static String timeToLive;
+ private static long confirmedTimeout;
+ private static long unconfirmedTimeout;
+ private static long unconfirmedGracePeriod;
+
+ /*
+ * Tags for encoding of bucket data
+ */
+ private static final int END_OF_PARAMETERS_TAG = 0;
+ private static final int OWNER_UPDATE = 1;
+ private static final int OWNER_NULL = 2;
+ private static final int PRIMARY_BACKUP_UPDATE = 3;
+ private static final int PRIMARY_BACKUP_NULL = 4;
+ private static final int SECONDARY_BACKUP_UPDATE = 5;
+ private static final int SECONDARY_BACKUP_NULL = 6;
+
+ // This is the table itself -- the current size is fixed at 1024 buckets
+ public static final int BUCKETCOUNT = 1024;
+ private static Bucket[] indexToBucket = new Bucket[BUCKETCOUNT];
+
+ static {
+ // create hash bucket entries, but there are no assignments yet
+ for (int i = 0 ; i < indexToBucket.length ; i += 1) {
+ Bucket bucket = new Bucket(i);
+ indexToBucket[i] = bucket;
+ }
+ }
+
+ // this is a list of all objects registered for the 'Backup' interface
+ private static List backupList = new LinkedList<>();
+
+ // 'rebalance' is a non-null value when rebalancing is in progress
+ private static Object rebalanceLock = new Object();
+ private static Rebalance rebalance = null;
+
+ // bucket number
+ private volatile int index;
+
+ // owner of the bucket -- this is the host where messages should be directed
+ private volatile Server owner = null;
+
+ // this host will take over as the owner if the current owner goes down,
+ // and may also contain backup data to support persistence
+ private volatile Server primaryBackup = null;
+
+ // this is a secondary backup host, which can be used if both owner and
+ // primary backup go out in quick succession
+ private volatile Server secondaryBackup = null;
+
+ // when we are in a transient state, certain events are forwarded to
+ // this object
+ private volatile State state = null;
+
+ // storage for additional data
+ private Map, Object> adjuncts = new HashMap, Object>();
+
+ // BACKUP data (only buckets for where we are the owner, or a backup)
+
+ // TBD: need fields for outgoing queues for application message transfers
+
+ /**
+ * This method triggers registration of 'eventHandler', and also extracts
+ * property values.
+ */
+ static void startup() {
+ int intTimeToLive =
+ getProperty(BUCKET_TIME_TO_LIVE, DEFAULT_BUCKET_TIME_TO_LIVE);
+ timeToLive = String.valueOf(intTimeToLive);
+ confirmedTimeout =
+ getProperty(BUCKET_CONFIRMED_TIMEOUT, DEFAULT_BUCKET_CONFIRMED_TIMEOUT);
+ unconfirmedTimeout =
+ getProperty(BUCKET_UNCONFIRMED_TIMEOUT,
+ DEFAULT_BUCKET_UNCONFIRMED_TIMEOUT);
+ unconfirmedGracePeriod =
+ getProperty(BUCKET_UNCONFIRMED_GRACE_PERIOD,
+ DEFAULT_BUCKET_UNCONFIRMED_GRACE_PERIOD);
+ }
+
+ /**
+ * Constructor -- called when building the 'indexToBucket' table.
+ *
+ * @param index the bucket number
+ */
+ private Bucket(int index) {
+ this.index = index;
+ }
+
+ /**
+ * This method converts a String keyword into the corresponding bucket
+ * number.
+ *
+ * @param value the keyword to be converted
+ * @return the bucket number
+ */
+ public static int bucketNumber(String value) {
+ /*
+ * It would be possible to create a new 'MessageDigest' instance each
+ * It would be possible to create a new 'MessageDigest' instance each
+ * time this method is called, and avoid the need for synchronization.
+ * However, past experience has taught me that this might involve a
+ * considerable amount of computation, due to internal table
+ * initialization, so it shouldn't be done this way for performance
+ * reasons.
+ * If we start running into blocking issues because there are too many
+ * simultaneous calls to this method, we can initialize an array of these
+ * objects, and iterate over them using an AtomicInteger index.
+ */
+ synchronized (messageDigest) {
+ /*
+ * Note that we only need the first two bytes of this, even though
+ * 16 bytes are produced. There may be other operations that can be
+ * used to more efficiently map keyword -> hash bucket. The only
+ * issue is the same algorithm must be used on all servers, and it
+ * should produce a fairly even distribution across all of the buckets.
+ */
+ byte[] digest = messageDigest.digest(value.getBytes());
+ return ((Byte.toUnsignedInt(digest[0]) << 8)
+ | Byte.toUnsignedInt(digest[1])) & 0x3ff;
+ }
+ }
+
+ /**
+ * Fetch the server associated with a particular bucket number.
+ *
+ * @param bucketNumber a bucket number in the range 0-1023
+ * @return the Server that currently handles the bucket,
+ * or 'null' if none is currently assigned
+ */
+ public static Server bucketToServer(int bucketNumber) {
+ Bucket bucket = indexToBucket[bucketNumber];
+ return bucket.getOwner();
+ }
+
+ /**
+ * Fetch the bucket object associated with a particular bucket number.
+ *
+ * @param bucketNumber a bucket number in the range 0-1023
+ * @return the Bucket associated with this bucket number
+ */
+ public static Bucket getBucket(int bucketNumber) {
+ return indexToBucket[bucketNumber];
+ }
+
+ /**
+ * Fetch the bucket object associated with a particular keyword.
+ *
+ * @param value the keyword to be converted
+ * @return the Bucket associated with this keyword
+ */
+ public static Bucket getBucket(String value) {
+ return indexToBucket[bucketNumber(value)];
+ }
+
+ /**
+ * Determine if the associated key is assigned to the current server.
+ *
+ * @param key the keyword to be hashed
+ * @return 'true' if the associated bucket is assigned to this server,
+ * 'false' if not
+ */
+ public static boolean isKeyOnThisServer(String key) {
+ int bucketNumber = bucketNumber(key);
+ Bucket bucket = indexToBucket[bucketNumber];
+ return bucket.getOwner() == Server.getThisServer();
+ }
+
+ /**
+ * This method is called to start a 'rebalance' operation in a background
+ * thread, but it only does this on the lead server. Being balanced means
+ * the following:
+ * 1) Each server owns approximately the same number of buckets
+ * 2) If any server were to fail, and the designated primaries take over
+ * for all of that server's buckets, all remaining servers would still
+ * own approximately the same number of buckets.
+ * 3) If any two servers were to fail, and the designated primaries were
+ * to take over for the failed server's buckets (secondaries would take
+ * for buckets where the owner and primary are OOS), all remaining
+ * servers would still own approximately the same number of buckets.
+ * 4) Each server should have approximately the same number of
+ * (primary-backup + secondary-backup) buckets that it is responsible for.
+ * 5) The primary backup for each bucket must be on the same site as the
+ * owner, and the secondary backup must be on a different site.
+ */
+ private static void rebalance() {
+ if (Leader.getLeader() == Server.getThisServer()) {
+ Rebalance rb = new Rebalance();
+ synchronized (rebalanceLock) {
+ // the most recent 'Rebalance' instance is the only valid one
+ rebalance = rb;
+ }
+
+ new Thread("BUCKET REBALANCER") {
+ @Override
+ public void run() {
+ /*
+ * copy bucket and host data,
+ * generating a temporary internal table.
+ */
+ rb.copyData();
+
+ /*
+ * allocate owners for all buckets without an owner,
+ * and rebalance bucket owners, if necessary --
+ * this takes card of item #1, above.
+ */
+ rb.allocateBuckets();
+
+ /*
+ * make sure that primary backups always have the same site
+ * as the owner, and secondary backups always have a different
+ * site -- this takes care of #5, above.
+ */
+ rb.checkSiteValues();
+
+ /*
+ * adjust primary backup lists to take care of item #2, above
+ * (taking #5 into account).
+ */
+ rb.rebalancePrimaryBackups();
+
+ /*
+ * allocate secondary backups, and take care of items
+ * #3 and #4, above (taking #5 into account).
+ */
+ rb.rebalanceSecondaryBackups();
+
+ try {
+ synchronized (rebalanceLock) {
+ /*
+ * if another 'Rebalance' instance has started in the
+ * mean time, don't do the update.
+ */
+ if (rebalance == rb) {
+ /*
+ * build a message containing all of the updated bucket
+ * information, process it internally in this host
+ * (lead server), and send it out to others in the
+ * "notify list".
+ */
+ rb.generateBucketMessage();
+ rebalance = null;
+ }
+ }
+ } catch (IOException e) {
+ logger.error("Exception in Rebalance.generateBucketMessage",
+ e);
+ }
+ }
+ }.start();
+ }
+ }
+
+ /**
+ * Handle an incoming /bucket/update REST message.
+ *
+ * @param data base64-encoded data, containing all bucket updates
+ */
+ static void updateBucket(byte[] data) {
+ final byte[] packet = Base64.getDecoder().decode(data);
+ Runnable task = () -> {
+ try {
+ /*
+ * process the packet, handling any updates
+ */
+ if (updateBucketInternal(packet)) {
+ /*
+ * updates have occurred -- forward this packet to
+ * all servers in the "notify list"
+ */
+ logger.info("One or more bucket updates occurred");
+ Entity entity =
+ Entity.entity(new String(data, StandardCharsets.UTF_8),
+ MediaType.APPLICATION_OCTET_STREAM_TYPE);
+ for (Server server : Server.getNotifyList()) {
+ server.post("bucket/update", entity);
+ }
+ }
+ } catch (Exception e) {
+ logger.error("Exception in Bucket.updateBucket", e);
+ }
+ };
+ MainLoop.queueWork(task);
+ }
+
+ /**
+ * This method supports the 'updateBucket' method, and runs entirely within
+ * the 'MainLoop' thread.
+ */
+ private static boolean updateBucketInternal(byte[] packet) throws IOException {
+ boolean changes = false;
+
+ ByteArrayInputStream bis = new ByteArrayInputStream(packet);
+ DataInputStream dis = new DataInputStream(bis);
+
+ // the packet contains a sequence of bucket updates
+ while (dis.available() != 0) {
+ // first parameter = bucket number
+ int index = dis.readUnsignedShort();
+
+ // locate the corresponding 'Bucket' object
+ Bucket bucket = indexToBucket[index];
+
+ // indicates whether changes occurred to the bucket
+ boolean bucketChanges = false;
+
+ /*
+ * the remainder of the information for this bucket consists of
+ * a sequence of ' [ ]' followed by the tag
+ * value 'END_OF_PARAMETERS_TAG'.
+ */
+ int tag;
+ while ((tag = dis.readUnsignedByte()) != END_OF_PARAMETERS_TAG) {
+ switch (tag) {
+ case OWNER_UPDATE: {
+ // -- owner UUID specified
+ bucketChanges = updateBucketInternalOwnerUpdate(bucket, dis, index);
+ break;
+ }
+
+ case OWNER_NULL: {
+ // -- owner UUID should be set to 'null'
+ if (bucket.getOwner() != null) {
+ logger.info("Bucket {} owner: {}->null",
+ index, bucket.getOwner());
+ bucketChanges = true;
+ synchronized (bucket) {
+ bucket.setOwner(null);
+ bucket.setState(null);
+ }
+ }
+ break;
+ }
+
+ case PRIMARY_BACKUP_UPDATE: {
+ // --
+ // primary backup UUID specified
+ Server newPrimaryBackup =
+ Server.getServer(Util.readUuid(dis));
+ if (bucket.primaryBackup != newPrimaryBackup) {
+ logger.info("Bucket {} primary backup: {}->{}", index,
+ bucket.primaryBackup, newPrimaryBackup);
+ bucketChanges = true;
+ bucket.primaryBackup = newPrimaryBackup;
+ }
+ break;
+ }
+
+ case PRIMARY_BACKUP_NULL: {
+ // --
+ // primary backup should be set to 'null'
+ if (bucket.primaryBackup != null) {
+ logger.info("Bucket {} primary backup: {}->null",
+ index, bucket.primaryBackup);
+ bucketChanges = true;
+ bucket.primaryBackup = null;
+ }
+ break;
+ }
+
+ case SECONDARY_BACKUP_UPDATE: {
+ // --
+ // secondary backup UUID specified
+ Server newSecondaryBackup =
+ Server.getServer(Util.readUuid(dis));
+ if (bucket.secondaryBackup != newSecondaryBackup) {
+ logger.info("Bucket {} secondary backup: {}->{}", index,
+ bucket.secondaryBackup, newSecondaryBackup);
+ bucketChanges = true;
+ bucket.secondaryBackup = newSecondaryBackup;
+ }
+ break;
+ }
+
+ case SECONDARY_BACKUP_NULL: {
+ // --
+ // secondary backup should be set to 'null'
+ if (bucket.secondaryBackup != null) {
+ logger.info("Bucket {} secondary backup: {}->null",
+ index, bucket.secondaryBackup);
+ bucketChanges = true;
+ bucket.secondaryBackup = null;
+ }
+ break;
+ }
+
+ default:
+ logger.error("Illegal tag: {}", tag);
+ break;
+ }
+ }
+ if (bucketChanges) {
+ // give audit a chance to run
+ changes = true;
+ bucket.stateChanged();
+ }
+ }
+ return changes;
+ }
+
+ /**
+ * Update bucket owner information.
+ *
+ * @param bucket the bucket in process
+ * @param dis data input stream contains the update
+ * @param index the bucket number
+ * @return a value indicate bucket changes
+ */
+ private static boolean updateBucketInternalOwnerUpdate(Bucket bucket, DataInputStream dis,
+ int index) throws IOException {
+ boolean bucketChanges = false;
+ Server newOwner = Server.getServer(Util.readUuid(dis));
+ if (bucket.getOwner() != newOwner) {
+ logger.info("Bucket {} owner: {}->{}",
+ index, bucket.getOwner(), newOwner);
+ bucketChanges = true;
+
+ Server thisServer = Server.getThisServer();
+ Server oldOwner = bucket.getOwner();
+ bucket.setOwner(newOwner);
+ if (thisServer == oldOwner) {
+ // the current server is the old owner
+ if (bucket.getState() == null) {
+ bucket.state = bucket.new OldOwner(newOwner);
+ }
+ } else if (thisServer == newOwner) {
+ // the current server the new owner
+ if (bucket.getState() == null) {
+ bucket.state = bucket.new NewOwner(true, oldOwner);
+ } else {
+ // new owner has been confirmed
+ // orig bucket.state.newOwner();
+ bucket.state.newOwner();
+ }
+ }
+ }
+ return bucketChanges;
+ }
+
+ /**
+ * Forward a message to the specified bucket number. If the bucket is
+ * in a transient state (the value of 'state' is not 'null'), the handling
+ * is determined by that state.
+ *
+ * @param bucketNumber the bucket number determined by extracting the
+ * keyword from 'message'
+ * @param message the message to be forwarded/processed
+ * @return a value of 'true' indicates the message has been "handled"
+ * (forwarded or queued), and 'false' indicates it has not, and needs
+ * to be handled locally.
+ */
+ public static boolean forward(int bucketNumber, Message message) {
+ Bucket bucket = indexToBucket[bucketNumber];
+ Server server;
+
+ synchronized (bucket) {
+ if (bucket.state != null) {
+ // we are in a transient state -- the handling is state-specific
+ return bucket.state.forward(message);
+ }
+ server = bucket.getOwner();
+ }
+
+ if (server == null || server == Server.getThisServer()) {
+ // this needs to be processed locally
+ return false;
+ } else {
+ // send message to remote server
+ message.sendToServer(server, bucketNumber);
+ return true;
+ }
+ }
+
+ /**
+ * This is a convenience method, which forwards a message through the
+ * bucket associated with the specified keyword.
+ *
+ * @param keyword the keyword extracted from 'message'
+ * keyword from 'message'
+ * @param message the message to be forwarded/processed
+ * @return a value of 'true' indicates the message has been "handled"
+ * (forwarded or queued), and 'false' indicates it has not, and needs
+ * to be handled locally.
+ */
+ public static boolean forward(String keyword, Message message) {
+ return forward(bucketNumber(keyword), message);
+ }
+
+ /**
+ * Forward a message to the specified bucket number. If the bucket is
+ * in a transient state (the value of 'state' is not 'null'), the handling
+ * is determined by that state. This is a variant of the 'forward' method,
+ * which handles local processing, instead of just returning 'false'.
+ *
+ * @param bucketNumber the bucket number determined by extracting the
+ * keyword from 'message'
+ * @param message the message to be forwarded/processed
+ */
+ public static void forwardAndProcess(int bucketNumber, Message message) {
+ if (!forward(bucketNumber, message)) {
+ message.process();
+ }
+ }
+
+ /**
+ * Forward a message to the specified bucket number. If the bucket is
+ * in a transient state (the value of 'state' is not 'null'), the handling
+ * is determined by that state. This is a variant of the 'forward' method,
+ * which handles local processing, instead of just returning 'false'.
+ *
+ * @param keyword the keyword extracted from 'message'
+ * keyword from 'message'
+ * @param message the message to be forwarded/processed
+ */
+ public static void forwardAndProcess(String keyword, Message message) {
+ forwardAndProcess(bucketNumber(keyword), message);
+ }
+
+ /**
+ * Handle an incoming /cmd/dumpBuckets REST message.
+ *
+ * @param out the 'PrintStream' to use for displaying information
+ */
+ public static void dumpBuckets(final PrintStream out) {
+ /*
+ * we aren't really doing a 'Rebalance' here, but the 'copyData' method
+ * is useful for extracting the data, and determining the buckets
+ * associated with each server.
+ */
+ Rebalance rb = new Rebalance();
+ rb.copyData();
+
+ /*
+ * this method is not accessing anything in the 'Server' or 'Bucket'
+ * table, so it doesn't need to run within the 'MainLoop' thread.
+ */
+ rb.dumpBucketsInternal(out);
+ }
+
+ /**
+ * Handle an incoming /cmd/bucketMessage REST message -- this is only
+ * used for testing the routing of messages between servers.
+ *
+ * @param out the 'PrintStream' to use for displaying information
+ * @param keyword the keyword that is hashed to select the bucket number
+ * @param message the message to send to the remote end
+ * @throws IOException when error occurred
+ */
+ public static void bucketMessage(
+ final PrintStream out, final String keyword, String message) throws IOException {
+
+ if (keyword == null) {
+ out.println("'keyword' is mandatory");
+ return;
+ }
+ if (message == null) {
+ message = "Message generated at " + new Date();
+ }
+ final int bucketNumber = bucketNumber(keyword);
+ Server server = bucketToServer(bucketNumber);
+
+ if (server == null) {
+ /*
+ * selected bucket has no server assigned -- this should only be a
+ * transient situation, until 'rebalance' is run.
+ */
+ out.println("Bucket is " + bucketNumber + ", which has no owner");
+ } else if (server == Server.getThisServer()) {
+ /*
+ * the selected bucket is associated with this particular server --
+ * no forwarding is needed.
+ */
+ out.println("Bucket is " + bucketNumber
+ + ", which is owned by this server: " + server.getUuid());
+ } else {
+ /*
+ * the selected bucket is assigned to a different server -- forward
+ * the message.
+ */
+ out.println("Bucket is " + bucketNumber + ": sending from\n"
+ + " " + Server.getThisServer().getUuid() + " to \n"
+ + " " + server.getUuid());
+
+ // do a POST call of /bucket/bucketResponse to the remoote server
+ Entity entity =
+ Entity.entity(new String(message.getBytes(), StandardCharsets.UTF_8),
+ MediaType.TEXT_PLAIN);
+
+ /*
+ * the POST itself runs in a server-specific thread, and
+ * 'responseQueue' is used to pass back the response.
+ */
+ final LinkedTransferQueue responseQueue =
+ new LinkedTransferQueue<>();
+
+ server.post("bucket/bucketResponse", entity, new Server.PostResponse() {
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public WebTarget webTarget(WebTarget webTarget) {
+ // we need to include the 'bucket' and 'keyword' parameters
+ // in the POST that we are sending out
+ return webTarget
+ .queryParam("bucket", bucketNumber)
+ .queryParam("keyword", keyword);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void response(Response response) {
+ // this is the POST response --
+ // pass it back to the calling thread
+ responseQueue.put(response);
+ }
+ });
+
+ try {
+ // this is the calling thread -- wait for the POST response
+ Response response = responseQueue.poll(60, TimeUnit.SECONDS);
+ if (response == null) {
+ out.println("Timed out waiting for a response");
+ } else {
+ out.println("Received response code " + response.getStatus());
+ out.println("Entity = " + response.readEntity(String.class));
+ }
+ } catch (InterruptedException e) {
+ out.println(e);
+ throw new IOException(e);
+ }
+ }
+ }
+
+ /**
+ * Handle an incoming /bucket/bucketResponse REST message -- this runs on
+ * the destination host, and is the continuation of an operation triggered
+ * by the /cmd/bucketMessage REST message running on the originating host.
+ *
+ * @param out the 'PrintStream' to use for passing back information
+ * in a human-readable form
+ * @param bucket the bucket number, which should be owned by this host
+ * if we are in sync with the sending host, and didn't get caught
+ * in a transient state
+ * @param keyword the keyword selected on the originating end, which should
+ * hash to 'bucket'
+ * @param message the message selected on the originating end
+ */
+ public static void bucketResponse(
+ final PrintStream out, int bucket, String keyword, byte[] message) {
+
+ Server thisServer = Server.getThisServer();
+ Server server = bucketToServer(bucket);
+
+ if (server != thisServer) {
+ /*
+ * this isn't expected, and either indicates we are out-of-sync with
+ * pthe originating server, or this operation was triggered while in
+ * a transient state.
+ */
+ out.println("ERROR: " + thisServer.toString() + ": bucket " + bucket
+ + "is owned by\n " + server);
+ } else {
+ /*
+ * As expected, we are the owner of this bucket. Print out a message,
+ * which will be returned to the originating host, and displayed.
+ */
+ out.println(thisServer.toString() + ":\n"
+ + " bucket = " + bucket
+ + "\n keyword = " + keyword
+ + "\n message = " + new String(message));
+ }
+ }
+
+ /**
+ * Handle an incoming /cmd/moveBucket REST message -- this is only
+ * used for testing bucket migration. It only works on the lead server.
+ *
+ * @param out the 'PrintStream' to use for displaying information
+ * @param bucketNumber the bucket number to be moved
+ * @param newHostUuid the UUID of the destination host (if 'null', a
+ * destination host will be chosen at random)
+ */
+ public static void moveBucket(PrintStream out, int bucketNumber, String newHostUuid) {
+ Server leader = Leader.getLeader();
+ if (leader != Server.getThisServer()) {
+ out.println("This is not the lead server");
+ return;
+ }
+
+ if (bucketNumber < 0 || bucketNumber >= indexToBucket.length) {
+ out.println("Bucket number out of range");
+ return;
+ }
+
+ Rebalance rb = new Rebalance();
+ rb.copyData();
+
+ TestBucket bucket = rb.buckets[bucketNumber];
+ TestServer oldHost = bucket.owner;
+
+ if (oldHost == rb.nullServer) {
+ out.println("Bucket " + bucketNumber + " is currently unassigned");
+ return;
+ }
+
+ TestServer newHost = null;
+
+ if (newHostUuid != null) {
+ // the UUID of a destination host has been specified
+ newHost = rb.testServers.get(UUID.fromString(newHostUuid));
+ if (newHost == null) {
+ out.println("Can't locate UUID " + newHostUuid);
+ return;
+ }
+ } else {
+ /*
+ * Choose a destination host at random, other than the current owner.
+ * Step a random count in the range of 1 to (n-1) relative to the
+ * current host.
+ */
+ UUID key = oldHost.uuid;
+ for (int count = new Random().nextInt(rb.testServers.size() - 1) ;
+ count >= 0 ; count -= 1) {
+ key = rb.testServers.higherKey(key);
+ if (key == null) {
+ // wrap to the beginning of the list
+ key = rb.testServers.firstKey();
+ }
+ }
+ newHost = rb.testServers.get(key);
+ }
+ out.println("Moving bucket " + bucketNumber + " from "
+ + oldHost + " to " + newHost);
+
+ /*
+ * update the owner, and ensure that the primary and secondary backup
+ * remain different from the owner.
+ */
+ bucket.setOwner(newHost);
+ if (newHost == bucket.primaryBackup) {
+ out.println("Moving primary back from " + newHost + " to " + oldHost);
+ bucket.setPrimaryBackup(oldHost);
+ } else if (newHost == bucket.secondaryBackup) {
+ out.println("Moving secondary back from " + newHost
+ + " to " + oldHost);
+ bucket.setSecondaryBackup(oldHost);
+ }
+
+ try {
+ /*
+ * build a message containing all of the updated bucket
+ * information, process it internally in this host
+ * (lead server), and send it out to others in the
+ * "notify list".
+ */
+ rb.generateBucketMessage();
+ } catch (IOException e) {
+ logger.error("Exception in Rebalance.generateBucketMessage",
+ e);
+ }
+ }
+
+ /**
+ * This method is called when an incoming /bucket/sessionData message is
+ * received from the old owner of the bucket, which presumably means that
+ * we are the new owner of the bucket.
+ *
+ * @param bucketNumber the bucket number
+ * @param dest the UUID of the intended destination
+ * @param ttl similar to IP time-to-live -- it controls the number of hops
+ * the message may take
+ * @param data serialized data associated with this bucket, encoded using
+ * base64
+ */
+
+ static void sessionData(int bucketNumber, UUID dest, int ttl, byte[] data) {
+ logger.info("Bucket.sessionData: bucket={}, data length={}",
+ bucketNumber, data.length);
+
+ if (dest != null && !dest.equals(Server.getThisServer().getUuid())) {
+ // the message needs to be forwarded to the intended destination
+ Server server;
+ WebTarget webTarget;
+
+ if ((ttl -= 1) > 0
+ && (server = Server.getServer(dest)) != null
+ && (webTarget = server.getWebTarget("bucket/sessionData")) != null) {
+ logger.info("Forwarding 'bucket/sessionData' to uuid {}",
+ server.getUuid());
+ Entity entity =
+ Entity.entity(new String(data, StandardCharsets.UTF_8),
+ MediaType.APPLICATION_OCTET_STREAM_TYPE);
+ Response response =
+ webTarget
+ .queryParam("bucket", bucketNumber)
+ .queryParam("dest", dest)
+ .queryParam("ttl", String.valueOf(ttl))
+ .request().post(entity);
+ logger.info("/bucket/sessionData response code = {}",
+ response.getStatus());
+ } else {
+ logger.error("Couldn't forward 'bucket/sessionData' to uuid {}, ttl={}",
+ dest, ttl);
+ }
+ return;
+ }
+
+ byte[] decodedData = Base64.getDecoder().decode(data);
+ Bucket bucket = indexToBucket[bucketNumber];
+
+ logger.info("Bucket.sessionData: decoded data length = {}",
+ decodedData.length);
+
+ if (bucket.state == null) {
+ /*
+ * We received the serialized data prior to being notified
+ * that we are the owner -- this happens sometimes. Behave as
+ * though we are the new owner, but intidate it is unconfirmed.
+ */
+ logger.info("Bucket {} session data received unexpectedly",
+ bucketNumber);
+ bucket.state = bucket.new NewOwner(false, bucket.getOwner());
+ }
+ bucket.state.bulkSerializedData(decodedData);
+ }
+
+ /**
+ * This method is called whenever the bucket's state has changed in a
+ * way that it should be audited.
+ */
+ private synchronized void stateChanged() {
+ if (state != null) {
+ return;
+ }
+ // the audit should be run
+ Server thisServer = Server.getThisServer();
+ boolean isOwner = (thisServer == owner);
+ boolean isBackup = (!isOwner && (thisServer == primaryBackup
+ || thisServer == secondaryBackup));
+
+ // invoke 'TargetLock' directly
+ TargetLock.auditBucket(this, isOwner, isBackup);
+ for (ServerPoolApi feature : ServerPoolApi.impl.getList()) {
+ feature.auditBucket(this, isOwner, isBackup);
+ }
+ }
+
+ /**
+ * Returns an adjunct of the specified class
+ * (it is created if it doesn't exist).
+ *
+ * @param clazz this is the class of the adjunct
+ * @return an adjunct of the specified class ('null' may be returned if
+ * the 'newInstance' method is unable to create the adjunct)
+ */
+ public T getAdjunct(Class clazz) {
+ synchronized (adjuncts) {
+ // look up the adjunct in the table
+ Object adj = adjuncts.get(clazz);
+ if (adj == null) {
+ // lookup failed -- create one
+ try {
+ // create the adjunct (may trigger an exception)
+ adj = clazz.newInstance();
+
+ // update the table
+ adjuncts.put(clazz, adj);
+ } catch (Exception e) {
+ logger.error("Can't create adjunct of {}", clazz, e);
+ }
+ }
+ return clazz.cast(adj);
+ }
+ }
+
+ /**
+ * Returns an adjunct of the specified class.
+ *
+ * @param clazz this is the class of the adjunct
+ * @return an adjunct of the specified class, if it exists,
+ * and 'null' if it does not
+ */
+ public T getAdjunctDontCreate(Class clazz) {
+ synchronized (adjuncts) {
+ // look up the adjunct in the table
+ return clazz.cast(adjuncts.get(clazz));
+ }
+ }
+
+ /**
+ * Explicitly create an adjunct -- this is useful when the adjunct
+ * initialization requires that some parameters be passed.
+ *
+ * @param adj this is the adjunct to insert into the table
+ * @return the previous adjunct of this type ('null' if none)
+ */
+ public Object putAdjunct(Object adj) {
+ synchronized (adjuncts) {
+ Class clazz = adj.getClass();
+ return adjuncts.put(clazz, adj);
+ }
+ }
+
+ /**
+ * Remove an adjunct.
+ *
+ * @param clazz this is the class of adjuncts to remove
+ * @return the object, if found, and 'null' if not
+ */
+ public T removeAdjunct(Class clazz) {
+ synchronized (adjuncts) {
+ // remove the adjunct in the table
+ return clazz.cast(adjuncts.remove(clazz));
+ }
+ }
+
+ /**
+ * Dump out all buckets with adjuncts.
+ *
+ * @param out the 'PrintStream' to use for displaying information
+ */
+ public static void dumpAdjuncts(PrintStream out) {
+ boolean noneFound = true;
+ String format = "%6s %s\n";
+
+ for (Bucket bucket : indexToBucket) {
+ synchronized (bucket.adjuncts) {
+ if (bucket.adjuncts.size() != 0) {
+ if (noneFound) {
+ out.printf(format, "Bucket", "Adjunct Classes");
+ out.printf(format, "------", "---------------");
+ noneFound = false;
+ }
+ boolean first = true;
+ for (Class> clazz : bucket.adjuncts.keySet()) {
+ if (first) {
+ out.printf(format, bucket.index, clazz.getName());
+ first = false;
+ } else {
+ out.printf(format, "", clazz.getName());
+ }
+ }
+ }
+ }
+ }
+ }
+
+ /* ============================================================ */
+
+ /**
+ * There is a single instance of this class (Bucket.eventHandler), which
+ * is registered to listen for notifications of state transitions. Note
+ * that all of these methods are running within the 'MainLoop' thread.
+ */
+ private static class EventHandler implements Events {
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void serverFailed(Server server) {
+ // remove this server from any bucket where it is referenced
+
+ Server thisServer = Server.getThisServer();
+ for (Bucket bucket : indexToBucket) {
+ synchronized (bucket) {
+ boolean changes = false;
+ if (bucket.getOwner() == server) {
+ // the failed server owns this bucket --
+ // move to the primary backup
+ bucket.setOwner(bucket.getPrimaryBackup());
+ bucket.primaryBackup = null;
+ changes = true;
+
+ if (bucket.getOwner() == null) {
+ // bucket owner is still null -- presumably, we had no
+ // primary backup, so use the secondary backup instead
+ bucket.setOwner(bucket.getSecondaryBackup());
+ bucket.setSecondaryBackup(null);
+ }
+ }
+ if (bucket.getPrimaryBackup() == server) {
+ // the failed server was a primary backup to this bucket --
+ // mark the entry as 'null'
+ bucket.setPrimaryBackup(null);
+ changes = true;
+ }
+ if (bucket.getSecondaryBackup() == server) {
+ // the failed server was a secondary backup to this bucket --
+ // mark the entry as 'null'
+ bucket.setSecondaryBackup(null);
+ changes = true;
+ }
+
+ if (bucket.owner == thisServer && bucket.state == null) {
+ // the current server is the new owner
+ bucket.setState(bucket.new NewOwner(false, null));
+ changes = true;
+ }
+
+ if (changes) {
+ // may give audits a chance to run
+ bucket.stateChanged();
+ }
+ }
+ }
+
+ // trigger a rebalance (only happens if we are the lead server)
+ rebalance();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void newLeader(Server server) {
+ // trigger a rebalance (only happens if we are the new lead server)
+ rebalance();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void leaderConfirmed(Server server) {
+ // trigger a rebalance (only happens if we are the lead server)
+ rebalance();
+ }
+ }
+
+ /* ============================================================ */
+
+ /**
+ * Instances of this class are created as part of the 'rebalance'
+ * operation on the lead server, or as part of a 'dumpBuckets' operation
+ * on any server.
+ * Each instance of this class corresponds to a 'Bucket' instance.
+ */
+ private static class TestBucket implements Comparable {
+ // bucket number
+ int index;
+
+ // owner of the bucket
+ TestServer owner;
+
+ // primary backup for this bucket
+
+ TestServer primaryBackup;
+
+ // secondary backup for this bucket
+ TestServer secondaryBackup;
+
+ /**
+ * Constructor -- initialize the 'TestBucket' instance.
+ *
+ * @param index the bucket number
+ */
+ TestBucket(int index) {
+ this.index = index;
+ }
+
+ /**
+ * Update the owner of a bucket, which also involves updating the
+ * backward links in the 'TestServer' instances.
+ *
+ * @param newOwner the new owner of the bucket
+ */
+ void setOwner(TestServer newOwner) {
+ if (owner != newOwner) {
+ // the 'owner' field does need to be changed
+ if (owner != null) {
+ // remove this bucket from the 'buckets' list of the old owner
+ owner.buckets.remove(this);
+ }
+ if (newOwner != null) {
+ // add this bucket to the 'buckets' list of the new owner
+ newOwner.buckets.add(this);
+ }
+ // update the 'owner' field in the bucket
+ owner = newOwner;
+ }
+ }
+
+ /**
+ * Update the primary backup of a bucket, which also involves updating
+ * the backward links in the 'TestServer' instances.
+ *
+ * @param newPrimaryBackup the new primary of the bucket
+ */
+ void setPrimaryBackup(TestServer newPrimaryBackup) {
+ if (primaryBackup != newPrimaryBackup) {
+ // the 'primaryBackup' field does need to be changed
+ if (primaryBackup != null) {
+ // remove this bucket from the 'buckets' list of the
+ // old primary backup
+ primaryBackup.primaryBackupBuckets.remove(this);
+ }
+ if (newPrimaryBackup != null) {
+ // add this bucket to the 'buckets' list of the
+ // new primary backup
+ newPrimaryBackup.primaryBackupBuckets.add(this);
+ }
+ // update the 'primaryBackup' field in the bucket
+ primaryBackup = newPrimaryBackup;
+ }
+ }
+
+ /**
+ * Update the secondary backup of a bucket, which also involves updating
+ * the backward links in the 'TestServer' instances.
+ *
+ * @param newSecondaryBackup the new secondary of the bucket
+ */
+ void setSecondaryBackup(TestServer newSecondaryBackup) {
+ if (secondaryBackup != newSecondaryBackup) {
+ // the 'secondaryBackup' field does need to be changed
+ if (secondaryBackup != null) {
+ // remove this bucket from the 'buckets' list of the
+ // old secondary backup
+ secondaryBackup.secondaryBackupBuckets.remove(this);
+ }
+ if (newSecondaryBackup != null) {
+ // add this bucket to the 'buckets' list of the
+ // new secondary backup
+ newSecondaryBackup.secondaryBackupBuckets.add(this);
+ }
+ // update the 'secondaryBackup' field in the bucket
+ secondaryBackup = newSecondaryBackup;
+ }
+ }
+
+ /*==================================*/
+ /* Comparable interface */
+ /*==================================*/
+
+ /**
+ * Compare two 'TestBucket' instances, for use in a 'TreeSet'.
+ *
+ * @param other the other 'TestBucket' instance to compare to
+ */
+ @Override
+ public int compareTo(TestBucket other) {
+ return index - other.index;
+ }
+
+ /**
+ * Return a string representation of this 'TestBucket' instance.
+ *
+ * @return a string representation of this 'TestBucket' instance
+ */
+ @Override
+ public String toString() {
+ return "TestBucket[" + index + "]";
+ }
+ }
+
+ /* ============================================================ */
+
+ /**
+ * Instances of this class are created as part of the 'rebalance'
+ * operation on the lead server, or as part of a 'dumpBuckets' operation
+ * on any server.
+ * Each instance of this class corresponds to a 'Server' instance.
+ * Unlike the actual 'Server' instances, each 'TestServer' instance
+ * contains back links to all of the buckets it is associated with.
+ */
+ private static class TestServer {
+ // unique id for this server
+ // (matches the one in the corresponding 'Server' entry)
+ final UUID uuid;
+
+ // site socket information (matches 'Server' entry)
+ final InetSocketAddress siteSocketAddress;
+
+ // the set of all 'TestBucket' instances,
+ // where this 'TestServer' is listed as 'owner'
+ final TreeSet buckets = new TreeSet<>();
+
+ // the set of all 'TestBucket' instances,
+ // where this 'TestServer' is listed as 'primaryBackup'
+ final TreeSet primaryBackupBuckets = new TreeSet<>();
+
+ // the set of all 'TestBucket' instances,
+ // where this 'TestServer' is listed as 'secondaryBackup'
+ final TreeSet secondaryBackupBuckets = new TreeSet<>();
+
+ /**
+ * Constructor.
+ *
+ * @param uuid uuid of this 'TestServer' instance
+ * @param siteSocketAddress matches the value in the corresponding 'Server'
+ */
+ TestServer(UUID uuid, InetSocketAddress siteSocketAddress) {
+ this.uuid = uuid;
+ this.siteSocketAddress = siteSocketAddress;
+ }
+
+ /**
+ * Return a string representation of this 'TestServer' instance.
+ *
+ * @return a string representation of this 'TestServer' instance
+ */
+ @Override
+ public String toString() {
+ return "TestServer[" + uuid + "]";
+ }
+ }
+
+ /* ============================================================ */
+
+ /**
+ * This class supports the 'rebalance' operation. Each instance is a wrapper
+ * around a 'TestServer' instance, as it would be if another specific
+ * server failed.
+ */
+ private static class AdjustedTestServer
+ implements Comparable {
+ TestServer server;
+
+ // simulated fail on this server
+ TestServer failedServer;
+
+ // expected bucket count if 'failedServer' fails
+ int bucketCount;
+
+ // total number of primary backup buckets used by this host
+ int primaryBackupBucketCount;
+
+ // total number of secondary backup buckets used by this host
+ int secondaryBackupBucketCount;
+
+ /**
+ * Constructor.
+ *
+ * @param server the server this 'AdjustedTestServer' instance represents
+ * @param failedServer the server going through a simulated failure --
+ * the 'bucketCount' value is adjusted based upon this
+ */
+ AdjustedTestServer(TestServer server, TestServer failedServer) {
+ this.server = server;
+ this.failedServer = failedServer;
+
+ this.bucketCount = server.buckets.size();
+ this.primaryBackupBucketCount = server.primaryBackupBuckets.size();
+ this.secondaryBackupBucketCount = server.secondaryBackupBuckets.size();
+
+ // need to adjust 'bucketCount' for the case where the current
+ // host fails
+ for (TestBucket bucket : server.primaryBackupBuckets) {
+ if (bucket.owner == failedServer) {
+ bucketCount += 1;
+ // TBD: should 'primaryBackupBucketCount' be decremented?
+ }
+ }
+
+ // need to adjust 'bucketCount' for the case where the current
+ // host fails
+ for (TestBucket bucket : server.secondaryBackupBuckets) {
+ if (bucket.owner == failedServer) {
+ bucketCount += 1;
+ // TBD: should 'secondaryBackupBucketCount' be decremented?
+ }
+ }
+ }
+
+ /********************************************/
+ /* Comparable interface */
+ /********************************************/
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public int compareTo(AdjustedTestServer other) {
+ /*
+ * Comparison order:
+ * 1) minimal expected bucket count if current host fails
+ * (differences of 1 are treated as a match)
+ * 2) minimal backup bucket count
+ * 3) UUID order
+ */
+ int rval = bucketCount - other.bucketCount;
+ if (rval <= 1 && rval >= -1) {
+ rval = (primaryBackupBucketCount + secondaryBackupBucketCount)
+ - (other.primaryBackupBucketCount
+ + other.secondaryBackupBucketCount);
+ if (rval == 0) {
+ rval = -Util.uuidComparator.compare(server.uuid, other.server.uuid);
+ }
+ }
+ return rval;
+ }
+ }
+
+ /* ============================================================ */
+
+ /**
+ * This class is primarily used to do a 'Rebalance' operation on the
+ * lead server, which is then distributed to all of the other servers.
+ * Part of it is also useful for implementing the /cmd/dumpBuckets
+ * REST message handler.
+ */
+ private static class Rebalance {
+ // this table resembles the 'Bucket.indexToBucket' table
+ TestBucket[] buckets = new TestBucket[indexToBucket.length];
+
+ // this table resembles the 'Server.servers' table
+ TreeMap testServers = new TreeMap<>(Util.uuidComparator);
+
+ /* this is a special server instance, which is allocated any
+ * owned, primary, or secondary buckets that haven't been allocated to
+ * any of the real servers
+ */
+ TestServer nullServer = new TestServer(null, null);
+
+ /**
+ * Copy all of the bucket data in the 'buckets' table, and also return
+ * a copy of the 'Server.servers' table
+ */
+ void copyData() {
+ // will contain a copy of the 'Bucket' table
+ final Bucket[] bucketSnapshot = new Bucket[indexToBucket.length];
+
+ /*
+ * This method is running within the 'MainLoop' thread,
+ * and builds a copy of the 'Bucket' table, as well as the
+ * list of active servers -- these can then be examined
+ * in a different thread, without potentially distrupting
+ * the 'MainLoop' thread.
+ *
+ * @return 0 (the return value is not significant at present)
+ */
+ Callable callable = () -> {
+ // copy the 'Bucket' table
+ for (int i = 0 ; i < indexToBucket.length; i += 1) {
+ // makes a snapshot of the bucket information
+ Bucket bucket = indexToBucket[i];
+
+ Bucket tmpBucket = new Bucket(i);
+ tmpBucket.setOwner(bucket.getOwner());
+ tmpBucket.setPrimaryBackup(bucket.getPrimaryBackup());
+ tmpBucket.setSecondaryBackup(bucket.getSecondaryBackup());
+ bucketSnapshot[i] = tmpBucket;
+ }
+
+ /*
+ * At this point, 'bucketSnapshot' and 'servers' should be
+ * complete. The next step is to create a 'TestServer' entry
+ * that matches each 'Server' entry.
+ */
+ for (Server server : Server.getServers()) {
+ UUID uuid = server.getUuid();
+ testServers.put(uuid, new TestServer(uuid, server.getSiteSocketAddress()));
+ }
+
+ return 0;
+ };
+ FutureTask ft = new FutureTask(callable);
+ MainLoop.queueWork(ft);
+ try {
+ ft.get(60, TimeUnit.SECONDS);
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ logger.error("Exception in Rebalance.copyData", e);
+ return;
+ }
+
+ /*
+ * Now, create a 'TestBucket' table that mirrors the 'Bucket' table.
+ * Unlike the standard 'Bucket' and 'Server' tables, the 'TestServer'
+ * entries contain links referring back to the 'TestBucket' entries.
+ * This information is useful when rebalancing.
+ */
+ for (Bucket bucket : bucketSnapshot) {
+ int index = bucket.index;
+ TestBucket testBucket = new TestBucket(index);
+
+ // populate the 'owner' field
+ if (bucket.getOwner() != null) {
+ testBucket.setOwner(testServers.get(bucket.getOwner().getUuid()));
+ } else {
+ testBucket.setOwner(nullServer);
+ }
+
+ // populate the 'primaryBackup' field
+ if (bucket.primaryBackup != null) {
+ testBucket.setPrimaryBackup(
+ testServers.get(bucket.primaryBackup.getUuid()));
+ } else {
+ testBucket.setPrimaryBackup(nullServer);
+ }
+
+ // populate the 'secondaryBackup' field
+ if (bucket.secondaryBackup != null) {
+ testBucket.setSecondaryBackup(
+ testServers.get(bucket.secondaryBackup.getUuid()));
+ } else {
+ testBucket.setSecondaryBackup(nullServer);
+ }
+ buckets[index] = testBucket;
+ }
+ }
+
+ /**
+ * Allocate unowned 'TestBucket' entries across all of the 'TestServer'
+ * entries. When every 'TestBucket' has an owner, rebalance as needed,
+ * so the 'TestServer' entry with the most buckets has at most one more
+ * bucket than the 'TestServer' entry with the least.
+ */
+ void allocateBuckets() {
+ /*
+ * the following 'Comparator' is used to control the order of the
+ * 'needBuckets' TreeSet: those with the fewest buckets allocated are
+ * at the head of the list.
+ */
+ Comparator bucketCount = new Comparator() {
+ @Override
+ public int compare(TestServer s1, TestServer s2) {
+ int rval = s1.buckets.size() - s2.buckets.size();
+ if (rval == 0) {
+ rval = Util.uuidComparator.compare(s1.uuid, s2.uuid);
+ }
+ return rval;
+ }
+ };
+
+ // sort servers according to the order in which they can
+ // take on ownership of buckets
+ TreeSet needBuckets = new TreeSet<>(bucketCount);
+ for (TestServer ts : testServers.values()) {
+ needBuckets.add(ts);
+ }
+
+ // go through all of the unowned buckets, and allocate them
+ for (TestBucket bucket : new LinkedList(nullServer.buckets)) {
+ // take first entry off of sorted server list
+ TestServer ts = needBuckets.first();
+ needBuckets.remove(ts);
+
+ // add this bucket to the 'buckets' list
+ bucket.setOwner(ts);
+
+ // place it back in the list, possibly in a new position
+ // (it's attributes have changed)
+ needBuckets.add(ts);
+ }
+ nullServer.buckets.clear();
+
+ // there may still be rebalancing needed -- no host should contain
+ // 2 or more buckets more than any other host
+ for ( ; ; ) {
+ TestServer first = needBuckets.first();
+ TestServer last = needBuckets.last();
+
+ if (last.buckets.size() - first.buckets.size() <= 1) {
+ // no more rebalancing needed
+ break;
+ }
+
+ // remove both from sorted list
+ needBuckets.remove(first);
+ needBuckets.remove(last);
+
+ // take one bucket from 'last', and assign it to 'first'
+ last.buckets.first().setOwner(first);
+
+ // place back in sorted list
+ needBuckets.add(first);
+ needBuckets.add(last);
+ }
+ }
+
+ /**
+ * Make sure that the primary backups have the same site as the owner,
+ * and the secondary backups have a different site.
+ */
+ void checkSiteValues() {
+ for (TestBucket bucket : buckets) {
+ if (bucket.owner != null) {
+ InetSocketAddress siteSocketAddress =
+ bucket.owner.siteSocketAddress;
+ TestServer primaryBackup = bucket.primaryBackup;
+ TestServer secondaryBackup = bucket.secondaryBackup;
+
+ validateSiteOwner(bucket, siteSocketAddress,
+ primaryBackup, secondaryBackup);
+ }
+ }
+ }
+
+ /**
+ * Validate primary site owner and secondary site owner are valid.
+ * @param bucket TestBucket
+ * @param siteSocketAddress site socket address
+ * @param primaryBackup primary backups
+ * @param secondaryBackup secondary backups
+ */
+ private void validateSiteOwner(TestBucket bucket, InetSocketAddress siteSocketAddress,
+ TestServer primaryBackup, TestServer secondaryBackup) {
+ if (primaryBackup != null
+ && !Objects.equals(siteSocketAddress,
+ primaryBackup.siteSocketAddress)) {
+ /**
+ * primary backup is from the wrong site -- see if we can
+ * use the secondary.
+ */
+ if (secondaryBackup != null
+ && Objects.equals(siteSocketAddress,
+ secondaryBackup.siteSocketAddress)) {
+ // swap primary and secondary
+ bucket.setPrimaryBackup(secondaryBackup);
+ bucket.setSecondaryBackup(primaryBackup);
+ } else {
+ // just invalidate primary backup
+ bucket.setPrimaryBackup(null);
+ }
+ } else if (secondaryBackup != null
+ && Objects.equals(siteSocketAddress,
+ secondaryBackup.siteSocketAddress)) {
+ // secondary backup is from the wrong site
+ bucket.setSecondaryBackup(null);
+ if (primaryBackup == null) {
+ // we can use this as the primary
+ bucket.setPrimaryBackup(secondaryBackup);
+ }
+ }
+ }
+
+ /**
+ * Allocate and rebalance the primary backups.
+ */
+ void rebalancePrimaryBackups() {
+ for (TestServer failedServer : testServers.values()) {
+ /*
+ * to allocate primary backups for this server,
+ * simulate a failure, and balance the backup hosts
+ */
+
+ // get siteSocketAddress from server
+ InetSocketAddress siteSocketAddress = failedServer.siteSocketAddress;
+
+ // populate a 'TreeSet' of 'AdjustedTestServer' instances based
+ // the failure of 'failedServer'
+ TreeSet adjustedTestServers =
+ new TreeSet();
+ for (TestServer server : testServers.values()) {
+ if (server == failedServer
+ || !Objects.equals(siteSocketAddress,
+ server.siteSocketAddress)) {
+ continue;
+ }
+ adjustedTestServers.add(new AdjustedTestServer(server, failedServer));
+ }
+
+ if (adjustedTestServers.isEmpty()) {
+ // this is presumably the only server -- there is no other server
+ // to act as primary backup, and no rebalancing can occur
+ continue;
+ }
+
+ // we need a backup host for each bucket
+ for (TestBucket bucket : failedServer.buckets) {
+ if (bucket.primaryBackup == null
+ || bucket.primaryBackup == nullServer) {
+ // need a backup host for this bucket -- remove the first
+ // entry from 'adjustedTestServers', which is most favored
+ AdjustedTestServer backupHost = adjustedTestServers.first();
+ adjustedTestServers.remove(backupHost);
+
+ // update add this bucket to the list
+ bucket.setPrimaryBackup(backupHost.server);
+
+ // update counts in 'AdjustedTestServer'
+ backupHost.bucketCount += 1;
+ backupHost.primaryBackupBucketCount += 1;
+
+ // place it back in the table, possibly in a new position
+ // (it's attributes have changed)
+ adjustedTestServers.add(backupHost);
+ }
+ }
+
+ // TBD: Is additional rebalancing needed?
+ }
+ }
+
+ /**
+ * Allocate and rebalance the secondary backups.
+ */
+ void rebalanceSecondaryBackups() {
+ for (TestServer failedServer : testServers.values()) {
+ /*
+ * to allocate secondary backups for this server,
+ * simulate a failure, and balance the backup hosts
+ */
+
+ // get siteSocketAddress from server
+ InetSocketAddress siteSocketAddress = failedServer.siteSocketAddress;
+
+ // populate a 'TreeSet' of 'AdjustedTestServer' instances based
+ // the failure of 'failedServer'
+ TreeSet adjustedTestServers =
+ new TreeSet();
+ for (TestServer server : testServers.values()) {
+ if (server == failedServer
+ || Objects.equals(siteSocketAddress,
+ server.siteSocketAddress)) {
+ continue;
+ }
+ adjustedTestServers.add(new AdjustedTestServer(server, failedServer));
+ }
+
+ if (adjustedTestServers.isEmpty()) {
+ // this is presumably the only server -- there is no other server
+ // to act as secondary backup, and no rebalancing can occur
+ continue;
+ }
+
+ // we need a backup host for each bucket
+ for (TestBucket bucket : failedServer.buckets) {
+ if (bucket.secondaryBackup == null
+ || bucket.secondaryBackup == nullServer) {
+ // need a backup host for this bucket -- remove the first
+ // entry from 'adjustedTestServers', which is most favored
+ AdjustedTestServer backupHost = adjustedTestServers.first();
+ adjustedTestServers.remove(backupHost);
+
+ // update add this bucket to the list
+ bucket.setSecondaryBackup(backupHost.server);
+
+ // update counts in 'AdjustedTestServer'
+ backupHost.bucketCount += 1;
+ backupHost.secondaryBackupBucketCount += 1;
+
+ // place it back in the table, possibly in a new position
+ // (it's attributes have changed)
+ adjustedTestServers.add(backupHost);
+ }
+ }
+
+ // TBD: Is additional rebalancing needed?
+ }
+ }
+
+ /**
+ * Generate a message with all of the bucket updates, process it locally,
+ * and send it to all servers in the "Notify List".
+ */
+ void generateBucketMessage() throws IOException {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(bos);
+
+ // go through the entire 'buckets' table
+ for (int i = 0 ; i < buckets.length ; i += 1) {
+ // fetch the 'TestBucket' associated with index 'i'
+ TestBucket testBucket = buckets[i];
+
+ /*
+ * Get the UUID of the owner, primary backup, and secondary backup
+ * for this bucket. If the associated value does not exist, 'null'
+ * is used.
+ */
+ UUID newOwner = null;
+ UUID newPrimary = null;
+ UUID newSecondary = null;
+
+ if (testBucket.owner != nullServer && testBucket.owner != null) {
+ newOwner = testBucket.owner.uuid;
+ }
+ if (testBucket.primaryBackup != nullServer
+ && testBucket.primaryBackup != null) {
+ newPrimary = testBucket.primaryBackup.uuid;
+ }
+ if (testBucket.secondaryBackup != nullServer
+ && testBucket.secondaryBackup != null) {
+ newSecondary = testBucket.secondaryBackup.uuid;
+ }
+
+ // write bucket number
+ dos.writeShort(i);
+
+ // 'owner' field
+ if (newOwner != null) {
+ dos.writeByte(OWNER_UPDATE);
+ Util.writeUuid(dos, newOwner);
+ } else {
+ dos.writeByte(OWNER_NULL);
+ }
+
+ // 'primaryBackup' field
+ if (newPrimary != null) {
+ dos.writeByte(PRIMARY_BACKUP_UPDATE);
+ Util.writeUuid(dos, newPrimary);
+ } else {
+ dos.writeByte(PRIMARY_BACKUP_NULL);
+ }
+
+ // 'secondaryBackup' field
+ if (newSecondary != null) {
+ dos.writeByte(SECONDARY_BACKUP_UPDATE);
+ Util.writeUuid(dos, newSecondary);
+ } else {
+ dos.writeByte(SECONDARY_BACKUP_NULL);
+ }
+
+ dos.writeByte(END_OF_PARAMETERS_TAG);
+ }
+
+ // get the unencoded 'packet'
+ final byte[] packet = bos.toByteArray();
+
+ // create an 'Entity' containing the encoded packet
+ final Entity entity =
+ Entity.entity(new String(Base64.getEncoder().encode(packet),
+ StandardCharsets.UTF_8), MediaType.APPLICATION_OCTET_STREAM_TYPE);
+ /**
+ * This method is running within the 'MainLoop' thread.
+ */
+ Runnable task = () -> {
+ try {
+ /*
+ * update the buckets on this host,
+ * which is presumably the lead server.
+ */
+ Bucket.updateBucketInternal(packet);
+ } catch (Exception e) {
+ logger.error("Exception updating buckets", e);
+ }
+
+ // send a message to all servers on the notify list
+ for (Server server : Server.getNotifyList()) {
+ server.post("bucket/update", entity);
+ }
+ };
+ MainLoop.queueWork(task);
+ }
+
+ /**
+ * Supports the '/cmd/dumpBuckets' REST message -- this isn't part of
+ * a 'rebalance' operation, but it turned out to be a convenient way
+ * to dump out the bucket table.
+ *
+ * @param out the output stream
+ */
+ private void dumpBucketsInternal(PrintStream out) {
+ // xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx xxxxxxxxx *
+ // UUID Type Buckets
+ String format = "%-36s %-9s %5s %s\n";
+
+ int totalOwner = 0;
+ int totalPrimary = 0;
+ int totalSecondary = 0;
+
+ out.printf(format, "UUID", "Type", "Count", "Buckets");
+ out.printf(format, "----", "----", "-----", "-------");
+ for (TestServer ts : testServers.values()) {
+ // dump out 'owned' bucket information
+ if (ts.buckets.isEmpty()) {
+ // no buckets owned by this server
+ out.printf(format, ts.uuid, "Owned", 0, "");
+ } else {
+ // dump out primary buckets information
+ totalOwner +=
+ dumpBucketsSegment(out, format, ts.buckets, ts.uuid.toString(), "Owned");
+ }
+ // optionally dump out primary buckets information
+ totalPrimary +=
+ dumpBucketsSegment(out, format, ts.primaryBackupBuckets, "", "Primary");
+ // optionally dump out secondary buckets information
+ totalSecondary +=
+ dumpBucketsSegment(out, format, ts.secondaryBackupBuckets, "", "Secondary");
+ }
+
+ if (!nullServer.buckets.isEmpty()
+ || !nullServer.primaryBackupBuckets.isEmpty()
+ || !nullServer.secondaryBackupBuckets.isEmpty()) {
+ /*
+ * There are some owned, primary, or secondary buckets that are
+ * unassigned. It is displayed in a manner similar to buckets that
+ * do have a server, but the UUID field is marked as 'UNASSIGNED'
+ * in the first line of the display.
+ */
+ String uuidField = "UNASSIGNED";
+
+ // optionally dump out unassigned owned buckets information
+ if (dumpBucketsSegment(out, format, nullServer.buckets,
+ uuidField, "Owned") != 0) {
+ uuidField = "";
+ }
+ // optionally dump out unassigned primary backup buckets information
+ if (dumpBucketsSegment(out, format, nullServer.primaryBackupBuckets,
+ uuidField, "Primary") != 0) {
+ uuidField = "";
+ }
+ // optionally dump out unassigned secondary backup buckets information
+ dumpBucketsSegment(out, format, nullServer.secondaryBackupBuckets,
+ uuidField, "Secondary");
+ }
+ out.println("\nTotal assigned: owner = " + totalOwner
+ + ", primary = " + totalPrimary
+ + ", secondary = " + totalSecondary);
+ }
+
+ /**
+ * Supports the 'dumpBucketsInternal' command, and indirectly, the
+ * '/cmd/dumpBuckets' REST message. It formats one segment of bucket data
+ * (owned, primary backup, or secondary backup), and dumps out the
+ * associated bucket data in segments of 8. Note: if the size of 'buckets'
+ * is 0, nothing is displayed.
+ *
+ * @param out the output stream
+ * @param format the message format string
+ * @param buckets the entire set of buckets to be displayed
+ * @param uuid string to display under the 'UUID' header
+ * @param segmentDescription string to display under the 'Type' header
+ * @return the size of the 'buckets' set
+ */
+ private static int dumpBucketsSegment(
+ PrintStream out, String format, TreeSet buckets,
+ String uuid, String segmentDescription) {
+
+ int size = buckets.size();
+ if (size != 0) {
+ // generate a linked list of the bucket data to display
+ LinkedList data = new LinkedList();
+ StringBuilder sb = new StringBuilder();
+ int count = 8;
+
+ for (TestBucket bucket : buckets) {
+ if (sb.length() != 0) {
+ // this is not the first bucket in the line --
+ // prepend a space
+ sb.append(' ');
+ }
+
+ // add the bucket number
+ sb.append(String.format("%4s", bucket.index));
+ if ((count -= 1) <= 0) {
+ // filled up a row --
+ // add it to the list, and start a new line
+ data.add(sb.toString());
+ sb = new StringBuilder();
+ count = 8;
+ }
+ }
+ if (sb.length() != 0) {
+ // there is a partial line remaining -- add it to the list
+ data.add(sb.toString());
+ }
+
+ /*
+ * The first line displayed includes the UUID and size information,
+ * and the first line of bucket data (owned, primary, or secondary).
+ * The remaining lines of bucket data are displayed alone,
+ * without any UUID or size information.
+ */
+ out.printf(format, uuid, segmentDescription, buckets.size(),
+ data.removeFirst());
+ while (!data.isEmpty()) {
+ out.printf(format, "", "", "", data.removeFirst());
+ }
+ }
+ return size;
+ }
+ }
+
+ /* ============================================================ */
+
+ /**
+ * This interface is an abstraction for all messages that are routed
+ * through buckets. It exists, so that messages may be queued while
+ * bucket migration is taking place, and it makes it possible to support
+ * multiple types of messages (routed UEB/DMAAP messages, or lock messages)
+ */
+ public static interface Message {
+ /**
+ * Process the current message -- this may mean delivering it locally,
+ * or forwarding it.
+ */
+ public void process();
+
+ /**
+ * Send the message to another host for processing.
+ *
+ * @param server the destination host (although it could end up being
+ * forwarded again)
+ * @param bucketNumber the bucket number determined by extracting the
+ * current message's keyword
+ */
+ public void sendToServer(Server server, int bucketNumber);
+ }
+
+ /* ============================================================ */
+
+ /**
+ * This interface implements a type of backup; for example, there is one
+ * for backing up Drools objects within sessions, and another for
+ * backing up lock data.
+ */
+ public static interface Backup {
+ /**
+ * This method is called to add a 'Backup' instance to the registered list.
+ *
+ * @param backup an object implementing the 'Backup' interface
+ */
+ public static void register(Backup backup) {
+ synchronized (backupList) {
+ if (!backupList.contains(backup)) {
+ backupList.add(backup);
+ }
+ }
+ }
+
+ /**
+ * Generate Serializable backup data for the specified bucket.
+ *
+ * @param bucketNumber the bucket number to back up
+ * @return a Serializable object containing backkup data
+ */
+ public Restore generate(int bucketNumber);
+ }
+
+ /* ============================================================ */
+
+ /**
+ * Objects implementing this interface may be serialized, and restored
+ * on a different host.
+ */
+ public static interface Restore extends Serializable {
+ /**
+ * Restore from deserialized data.
+ *
+ * @param bucketNumber the bucket number being restored
+ */
+ void restore(int bucketNumber);
+ }
+
+ /* ============================================================ */
+
+ /**
+ * This interface corresponds to a transient state within a Bucket.
+ */
+ private interface State {
+ /**
+ * This method allows state-specific handling of the
+ * 'Bucket.forward()' methods
+ *
+ * @param message the message to be forwarded/processed
+ * @return a value of 'true' indicates the message has been "handled"
+ * (forwarded or queued), and 'false' indicates it has not, and needs
+ * to be handled locally.
+ */
+ boolean forward(Message message);
+
+ /**
+ * This method indicates that the current server is the new owner
+ * of the current bucket.
+ */
+ void newOwner();
+
+ /**
+ * This method indicates that serialized data has been received,
+ * presumably from the old owner of the bucket. The data could correspond
+ * to Drools objects within sessions, as well as global locks.
+ *
+ * @param data serialized data associated with this bucket (at present,
+ * this is assumed to be complete, all within a single message)
+ */
+ void bulkSerializedData(byte[] data);
+ }
+
+ /* ============================================================ */
+
+ /**
+ * Each state instance is associated with a bucket, and is used when
+ * that bucket is in a transient state where it is the new owner of a
+ * bucket, or is presumed to be the new owner, based upon other events
+ * that have occurred.
+ */
+ private class NewOwner extends Thread implements State {
+ /*
+ * this value is 'true' if we have explicitly received a 'newOwner'
+ * indication, and 'false' if there was another trigger for entering this
+ * transient state (e.g. receiving serialized data)
+ */
+ boolean confirmed;
+
+ // when 'System.currentTimeMillis()' reaches this value, we time out
+ long endTime;
+
+ // If not 'null', we are queueing messages for this bucket;
+ // otherwise, we are sending them through.
+ Queue messages = new ConcurrentLinkedQueue<>();
+
+ // this is used to signal the thread that we have data available
+ CountDownLatch dataAvailable = new CountDownLatch(1);
+
+ // this is the data
+ byte[] data = null;
+
+ // this is the old owner of the bucket
+ Server oldOwner;
+
+ /**
+ * Constructor - a transient state, where we are expecting to receive
+ * bulk data from the old owner.
+ *
+ * @param confirmed 'true' if we were explicitly notified that we
+ * are the new owner of the bucket, 'false' if not
+ */
+ NewOwner(boolean confirmed, Server oldOwner) {
+ super("New Owner for Bucket " + index);
+ this.confirmed = confirmed;
+ this.oldOwner = oldOwner;
+ if (oldOwner == null) {
+ // we aren't expecting any data -- this is indicated by 0-length data
+ bulkSerializedData(new byte[0]);
+ }
+ endTime = System.currentTimeMillis()
+ + (confirmed ? confirmedTimeout : unconfirmedTimeout);
+ start();
+ }
+
+ /**
+ * Return the 'confirmed' indicator.
+ *
+ * @return the 'confirmed' indicator
+ */
+ private boolean getConfirmed() {
+ synchronized (Bucket.this) {
+ return confirmed;
+ }
+ }
+
+ /**
+ * This returns the timeout delay, which will always be less than or
+ * equal to 1 second. This allows us to periodically check whether the
+ * old server is still active.
+ *
+ * @return the timeout delay, which is the difference between the
+ * 'endTime' value and the current time or 1 second
+ * (whichever is less)
+ */
+ private long getTimeout() {
+ long lclEndTime;
+ synchronized (Bucket.this) {
+ lclEndTime = endTime;
+ }
+ return Math.min(lclEndTime - System.currentTimeMillis(), 1000L);
+ }
+
+ /**
+ * Return the current value of the 'data' field.
+ *
+ * @return the current value of the 'data' field
+ */
+ private byte[] getData() {
+ synchronized (Bucket.this) {
+ return data;
+ }
+ }
+
+ /*********************/
+ /* 'State' interface */
+ /*********************/
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean forward(Message message) {
+ // the caller of this method is synchronized on 'Bucket.this'
+ if (messages != null && Thread.currentThread() != this) {
+ // just queue the message
+ messages.add(message);
+ return true;
+ } else {
+ /*
+ * Either:
+ *
+ * 1) We are in a grace period, where 'state' is still set, but
+ * we are no longer forwarding messages.
+ * 2) We are calling 'message.process()' from this thread
+ * in the 'finally' block of 'NewOwner.run()'.
+ *
+ * In either case, messages should be processed locally.
+ */
+ return false;
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void newOwner() {
+ // the caller of this method is synchronized on 'Bucket.this'
+ if (!confirmed) {
+ confirmed = true;
+ endTime += (confirmedTimeout - unconfirmedTimeout);
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void bulkSerializedData(byte[] data) {
+ // the caller of this method is synchronized on 'Bucket.this'
+ if (this.data == null) {
+ this.data = data;
+ dataAvailable.countDown();
+ }
+ }
+
+ /**********************/
+ /* 'Thread' interface */
+ /**********************/
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void run() {
+ logger.info("{}: 'run' method invoked", this);
+ try {
+ byte[] lclData;
+ long delay;
+
+ while ((lclData = getData()) == null
+ && oldOwner.isActive()
+ && (delay = getTimeout()) > 0) {
+ // ignore return value -- 'data' will indicate the result
+ dataAvailable.await(delay, TimeUnit.MILLISECONDS);
+ }
+ if (lclData == null) {
+ // no data available -- log an error, and abort
+ if (getConfirmed()) {
+ // we never received the data, but we are the new owner
+ logger.error("{}: never received session data", this);
+ } else {
+ /*
+ * no data received, and it was never confirmed --
+ * assume that the forwarded message that triggered this was
+ * erroneus
+ */
+ logger.error("{}: no confirmation or data received -- aborting", this);
+ return;
+ }
+ } else {
+ logger.info("{}: {} bytes of data available",
+ this, lclData.length);
+ }
+
+ // if we reach this point, this server is the new owner
+ if (lclData == null || lclData.length == 0) {
+ // see if any features can do the restore
+ for (ServerPoolApi feature : ServerPoolApi.impl.getList()) {
+ feature.restoreBucket(Bucket.this);
+ }
+ } else {
+ // deserialize data
+ Object obj = Util.deserialize(lclData);
+ restoreBucketData(obj);
+ }
+ } catch (Exception e) {
+ logger.error("Exception in {}", this, e);
+ } finally {
+ /*
+ * cleanly leave state -- we want to make sure that messages
+ * are processed in order, so the queue needs to remain until
+ * it is empty
+ */
+ logger.info("{}: entering cleanup state", this);
+ for ( ; ; ) {
+ Message message = messages.poll();
+ if (message == null) {
+ // no messages left, but this could change
+ synchronized (Bucket.this) {
+ message = messages.poll();
+ if (message == null) {
+ // no messages left
+ if (state == this) {
+ if (owner == Server.getThisServer()) {
+ // we can now exit the state
+ state = null;
+ stateChanged();
+ } else {
+ /*
+ * We need a grace period before we can
+ * remove the 'state' value (this can happen
+ * if we receive and process the bulk data
+ * before receiving official confirmation
+ * that we are owner of the bucket.
+ */
+ messages = null;
+ }
+ }
+ break;
+ }
+ }
+ }
+ // this doesn't work -- it ends up right back in the queue
+ // if 'messages' is defined
+ message.process();
+ }
+ if (messages == null) {
+ // this indicates we need to enter a grace period before cleanup,
+ try {
+ logger.info("{}: entering grace period before terminating",
+ this);
+ Thread.sleep(unconfirmedGracePeriod);
+ } catch (InterruptedException e) {
+ // we are exiting in any case
+ Thread.currentThread().interrupt();
+ } finally {
+ synchronized (Bucket.this) {
+ // Do we need to confirm that we really are the owner?
+ // What does it mean if we are not?
+ if (state == this) {
+ state = null;
+ stateChanged();
+ }
+ }
+ }
+ }
+ logger.info("{}: exiting cleanup state", this);
+ }
+ }
+
+ /**
+ * Return a useful value to display in log messages.
+ *
+ * @return a useful value to display in log messages
+ */
+ public String toString() {
+ return "Bucket.NewOwner(" + index + ")";
+ }
+ }
+
+ /**
+ * Restore bucket data.
+ *
+ * @param obj deserialized bucket data
+ */
+ private void restoreBucketData(Object obj) {
+ if (obj instanceof List) {
+ for (Object entry : (List>)obj) {
+ if (entry instanceof Restore) {
+ // entry-specific 'restore' operation
+ ((Restore)entry).restore(this.index);
+ } else {
+ logger.error("{}: Expected '{}' but got '{}'",
+ this, Restore.class.getName(),
+ entry.getClass().getName());
+ }
+ }
+ } else {
+ logger.error("{}: expected 'List' but got '{}'",
+ this, obj.getClass().getName());
+ }
+ }
+
+ /* ============================================================ */
+
+ /**
+ * Each state instance is associated with a bucket, and is used when
+ * that bucket is in a transient state where it is the old owner of
+ * a bucket, and the data is being transferred to the new owner.
+ */
+ private class OldOwner extends Thread implements State {
+ Server newOwner;
+
+ OldOwner(Server newOwner) {
+ super("Old Owner for Bucket " + index);
+ this.newOwner = newOwner;
+ start();
+ }
+
+ /*********************/
+ /* 'State' interface */
+ /*********************/
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean forward(Message message) {
+ // forward message to new owner
+ message.sendToServer(newOwner, index);
+ return true;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void newOwner() {
+ // shouldn't happen -- just log an error
+ logger.error("{}: 'newOwner()' shouldn't be called in this state", this);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void bulkSerializedData(byte[] data) {
+ // shouldn't happen -- just log an error
+ logger.error("{}: 'bulkSerializedData()' shouldn't be called in this state", this);
+ }
+
+ /**********************/
+ /* 'Thread' interface */
+ /**********************/
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void run() {
+ logger.info("{}: 'run' method invoked", this);
+ try {
+ // go through all of the entries in the list, collecting restore data
+ List restoreData = new LinkedList<>();
+ for (Backup backup : backupList) {
+ Restore restore = backup.generate(index);
+ if (restore != null) {
+ restoreData.add(restore);
+ }
+ }
+
+ // serialize all of the objects,
+ // and send what we have to the new owner
+ Entity entity = Entity.entity(
+ new String(Base64.getEncoder().encode(Util.serialize(restoreData))),
+ MediaType.APPLICATION_OCTET_STREAM_TYPE);
+ newOwner.post("bucket/sessionData", entity, new Server.PostResponse() {
+ @Override
+ public WebTarget webTarget(WebTarget webTarget) {
+ return webTarget
+ .queryParam("bucket", index)
+ .queryParam("dest", newOwner.getUuid())
+ .queryParam("ttl", timeToLive);
+ }
+
+ @Override
+ public void response(Response response) {
+ logger.info("/bucket/sessionData response code = {}",
+ response.getStatus());
+ }
+ });
+ } catch (Exception e) {
+ logger.error("Exception in {}", this, e);
+ } finally {
+ synchronized (Bucket.this) {
+ // restore the state
+ if (state == this) {
+ state = null;
+ stateChanged();
+ }
+ }
+ }
+ }
+
+ /**
+ * Return a useful value to display in log messages.
+ *
+ * @return a useful value to display in log messages
+ */
+ public String toString() {
+ return "Bucket.OldOwner(" + index + ")";
+ }
+ }
+}
diff --git a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Discovery.java b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Discovery.java
new file mode 100644
index 00000000..c507e97d
--- /dev/null
+++ b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Discovery.java
@@ -0,0 +1,354 @@
+/*
+ * ============LICENSE_START=======================================================
+ * feature-server-pool
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.serverpool;
+
+import static org.onap.policy.drools.serverpool.ServerPoolProperties.DEFAULT_DISCOVERY_FETCH_LIMIT;
+import static org.onap.policy.drools.serverpool.ServerPoolProperties.DEFAULT_DISCOVERY_FETCH_TIMEOUT;
+import static org.onap.policy.drools.serverpool.ServerPoolProperties.DEFAULT_DISCOVER_PUBLISHER_LOOP_CYCLE_TIME;
+import static org.onap.policy.drools.serverpool.ServerPoolProperties.DISCOVERY_ALLOW_SELF_SIGNED_CERTIFICATES;
+import static org.onap.policy.drools.serverpool.ServerPoolProperties.DISCOVERY_API_KEY;
+import static org.onap.policy.drools.serverpool.ServerPoolProperties.DISCOVERY_API_SECRET;
+import static org.onap.policy.drools.serverpool.ServerPoolProperties.DISCOVERY_FETCH_LIMIT;
+import static org.onap.policy.drools.serverpool.ServerPoolProperties.DISCOVERY_FETCH_TIMEOUT;
+import static org.onap.policy.drools.serverpool.ServerPoolProperties.DISCOVERY_HTTPS;
+import static org.onap.policy.drools.serverpool.ServerPoolProperties.DISCOVERY_PASSWORD;
+import static org.onap.policy.drools.serverpool.ServerPoolProperties.DISCOVERY_SERVERS;
+import static org.onap.policy.drools.serverpool.ServerPoolProperties.DISCOVERY_TOPIC;
+import static org.onap.policy.drools.serverpool.ServerPoolProperties.DISCOVERY_USERNAME;
+import static org.onap.policy.drools.serverpool.ServerPoolProperties.DISCOVER_PUBLISHER_LOOP_CYCLE_TIME;
+import static org.onap.policy.drools.serverpool.ServerPoolProperties.getProperty;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
+import org.onap.policy.common.endpoints.event.comm.TopicListener;
+import org.onap.policy.common.endpoints.event.comm.TopicSink;
+import org.onap.policy.common.endpoints.event.comm.TopicSource;
+import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
+import org.onap.policy.common.utils.coder.CoderException;
+import org.onap.policy.common.utils.coder.StandardCoder;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class makes use of UEB/DMAAP to discover other servers in the pool.
+ * The discovery processes ordinarily run only on the lead server, but they
+ * run on other servers up until the point that they determine who the
+ * leader is.
+ */
+public class Discovery implements TopicListener {
+ private static Logger logger = LoggerFactory.getLogger(Discovery.class);
+
+ // used for JSON <-> String conversion
+ private static StandardCoder coder = new StandardCoder();
+
+ private static Discovery discovery = null;
+
+ private volatile Publisher publisherThread = null;
+
+ private List consumers = null;
+ private List publishers = null;
+
+ private Discovery() {
+ // we want to modify the properties we send to 'TopicManager'
+ PropBuilder builder = new PropBuilder(ServerPoolProperties.getProperties());
+ builder.convert(DISCOVERY_SERVERS, null,
+ PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
+ builder.convert(DISCOVERY_USERNAME, null,
+ PolicyEndPointProperties.PROPERTY_TOPIC_AAF_MECHID_SUFFIX);
+ builder.convert(DISCOVERY_PASSWORD, null,
+ PolicyEndPointProperties.PROPERTY_TOPIC_AAF_PASSWORD_SUFFIX);
+ builder.convert(DISCOVERY_HTTPS, null,
+ PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX);
+ builder.convert(DISCOVERY_API_KEY, null,
+ PolicyEndPointProperties.PROPERTY_TOPIC_API_KEY_SUFFIX);
+ builder.convert(DISCOVERY_API_SECRET, null,
+ PolicyEndPointProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX);
+ builder.convert(DISCOVERY_FETCH_TIMEOUT,
+ String.valueOf(DEFAULT_DISCOVERY_FETCH_TIMEOUT),
+ PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX);
+ builder.convert(DISCOVERY_FETCH_LIMIT,
+ String.valueOf(DEFAULT_DISCOVERY_FETCH_LIMIT),
+ PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX);
+ builder.convert(DISCOVERY_ALLOW_SELF_SIGNED_CERTIFICATES, null,
+ PolicyEndPointProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX);
+ Properties prop = builder.finish();
+ logger.debug("Discovery converted properties: {}", prop);
+
+ consumers = TopicEndpointManager.getManager().addTopicSources(prop);
+ publishers = TopicEndpointManager.getManager().addTopicSinks(prop);
+
+ if (consumers.isEmpty()) {
+ logger.error("No consumer topics");
+ }
+ if (publishers.isEmpty()) {
+ logger.error("No publisher topics");
+ }
+ logger.debug("Discovery: {} consumers, {} publishers",
+ consumers.size(), publishers.size());
+ }
+
+ /**
+ * Start all consumers and publishers, and start the publisher thread.
+ */
+ static synchronized void startDiscovery() {
+ if (discovery == null) {
+ discovery = new Discovery();
+ }
+ discovery.start();
+ }
+
+ /**
+ * Stop all consumers and publishers, and stop the publisher thread.
+ */
+ static synchronized void stopDiscovery() {
+ if (discovery != null) {
+ discovery.stop();
+ }
+ }
+
+ /**
+ * Start all consumers and publishers, and start the publisher thread.
+ */
+ private void start() {
+ for (TopicSource consumer : consumers) {
+ consumer.register(this);
+ consumer.start();
+ }
+ for (TopicSink publisher : publishers) {
+ publisher.start();
+ }
+ if (publisherThread == null) {
+ // send thread wasn't running -- start it
+ publisherThread = new Publisher();
+ publisherThread.start();
+ }
+ }
+
+ /**
+ * Stop all consumers and publishers, and stop the publisher thread.
+ */
+ private void stop() {
+ publisherThread = null;
+ for (TopicSink publisher : publishers) {
+ publisher.stop();
+ }
+ for (TopicSource consumer : consumers) {
+ consumer.unregister(this);
+ consumer.stop();
+ }
+ }
+
+ /*===========================*/
+ /* 'TopicListener' interface */
+ /*===========================*/
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void onTopicEvent(CommInfrastructure infra, String topic, String event) {
+ /*
+ * a JSON message has been received -- it should contain
+ * a single string parameter 'pingData', which contains the
+ * same format base64-encoded message that 'Server'
+ * instances periodically exchange
+ */
+ LinkedHashMap map = new LinkedHashMap<>();
+ try {
+ map = coder.decode(event, LinkedHashMap.class);
+ String message = map.get("pingData");
+ Server.adminRequest(message.getBytes(StandardCharsets.UTF_8));
+ logger.info("Received a message, server count={}", Server.getServerCount());
+ } catch (CoderException e) {
+ logger.error("Can't decode message: {}", e);
+ }
+ }
+
+ /* ============================================================ */
+
+ /**
+ * This class is used to convert internal 'discovery.*' properties to
+ * properties that 'TopicEndpointManager' can use.
+ */
+ private static class PropBuilder {
+ // properties being incrementally modified
+ Properties prop;
+
+ // value from 'discovery.topic' parameter
+ String topic;
+
+ // 'true' only if both 'discovery.topic' and 'discovery.servers'
+ // has been defined
+ boolean doConversion = false;
+
+ // contains "ueb.source.topics" or "dmaap.source.topics"
+ String sourceTopicsName = null;
+
+ // contains ".source.topics." ( = ueb|dmaap)
+ String sourcePrefix = null;
+
+ // contains "ueb.sink.topics" or "dmaap.sink.topics"
+ String sinkTopicsName = null;
+
+ // contains ".sink.topics." ( = ueb|dmaap)
+ String sinkPrefix = null;
+
+ /**
+ * Constructor - decide whether we are going to do conversion or not,
+ * and initialize accordingly.
+ *
+ * @param prop the initial list of properties
+ */
+ PropBuilder(Properties prop) {
+ this.prop = new Properties(prop);
+ this.topic = prop.getProperty(DISCOVERY_TOPIC);
+ String servers = prop.getProperty(DISCOVERY_SERVERS);
+ if (topic != null && servers != null) {
+ // we do have property conversion to do
+ doConversion = true;
+ String type = topic.contains(".") ? "dmaap" : "ueb";
+ sourceTopicsName = type + ".source.topics";
+ sourcePrefix = sourceTopicsName + "." + topic;
+ sinkTopicsName = type + ".sink.topics";
+ sinkPrefix = sinkTopicsName + "." + topic;
+ }
+ }
+
+ /**
+ * If we are doing conversion, convert an internal property
+ * to something that 'TopicEndpointManager' can use.
+ *
+ * @param intName server pool property name (e.g. "discovery.servers")
+ * @param defaultValue value to use if property 'intName' is not specified
+ * @param extSuffix TopicEndpointManager suffix, including leading "."
+ */
+ void convert(String intName, String defaultValue, String extSuffix) {
+ if (doConversion) {
+ String value = prop.getProperty(intName, defaultValue);
+ if (value != null) {
+ prop.setProperty(sourcePrefix + extSuffix, value);
+ prop.setProperty(sinkPrefix + extSuffix, value);
+ }
+ }
+ }
+
+ /**
+ * Generate/update the '*.source.topics' and '*.sink.topics' parameters.
+ *
+ * @return the updated properties list
+ */
+ Properties finish() {
+ if (doConversion) {
+ String currentValue = prop.getProperty(sourceTopicsName);
+ if (currentValue == null) {
+ // '*.source.topics' is not defined -- set it
+ prop.setProperty(sourceTopicsName, topic);
+ } else {
+ // '*.source.topics' is defined -- append to it
+ prop.setProperty(sourceTopicsName, currentValue + "," + topic);
+ }
+ currentValue = prop.getProperty(sinkTopicsName);
+ if (currentValue == null) {
+ // '*.sink.topics' is not defined -- set it
+ prop.setProperty(sinkTopicsName, topic);
+ } else {
+ // '*.sink.topics' is defined -- append to it
+ prop.setProperty(sinkTopicsName, currentValue + "," + topic);
+ }
+ }
+ return prop;
+ }
+ }
+
+ /* ============================================================ */
+
+ /**
+ * This is the sender thread, which periodically sends out 'ping' messages.
+ */
+ private class Publisher extends Thread {
+ /**
+ * Constructor -- read in the properties, and initialze 'publisher'.
+ */
+ Publisher() {
+ super("Discovery Publisher Thread");
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void run() {
+ // this loop will terminate once 'publisher' is set to 'null',
+ // or some other 'Publisher' instance replaces it
+ long cycleTime = getProperty(DISCOVER_PUBLISHER_LOOP_CYCLE_TIME,
+ DEFAULT_DISCOVER_PUBLISHER_LOOP_CYCLE_TIME);
+ while (this == publisherThread) {
+ try {
+ // wait 5 seconds (default)
+ Thread.sleep(cycleTime);
+
+ // generate a 'ping' message
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(bos);
+
+ // write the 'ping' data for this server
+ Server thisServer = Server.getThisServer();
+ thisServer.writeServerData(dos);
+ String encodedData =
+ new String(Base64.getEncoder().encode(bos.toByteArray()));
+
+ // base64-encoded value is passed as JSON parameter 'pingData'
+ LinkedHashMap map = new LinkedHashMap<>();
+ map.put("pingData", encodedData);
+ String jsonString = new Gson().toJson(map, Map.class);
+ for (TopicSink publisher : publishers) {
+ publisher.send(jsonString);
+ }
+ } catch (InterruptedException e) {
+ logger.error("Exception in Discovery.Publisher.run():", e);
+ return;
+ } catch (Exception e) {
+ logger.error("Exception in Discovery.Publisher.run():", e);
+ // grace period -- we don't want to get UEB upset at us
+ try {
+ Thread.sleep(15000);
+ } catch (InterruptedException e2) {
+ logger.error("Discovery.Publisher sleep interrupted");
+ }
+ return;
+ }
+ }
+ }
+ }
+}
diff --git a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Events.java b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Events.java
new file mode 100644
index 00000000..176d39ac
--- /dev/null
+++ b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Events.java
@@ -0,0 +1,103 @@
+/*
+ * ============LICENSE_START=======================================================
+ * feature-server-pool
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.serverpool;
+
+import java.util.Collection;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * This interface is used to distribute notifications of various system
+ * events, such as new 'Server' instances, or a server failing.
+ */
+public interface Events {
+ // set of listeners receiving event notifications
+ static final Queue listeners =
+ new ConcurrentLinkedQueue<>();
+
+ /**
+ * add a listener to the set of listeners receiving events.
+ *
+ * @param handler the listener
+ */
+ public static void register(Events handler) {
+ // if it is already here, remove it first
+ listeners.remove(handler);
+
+ // add to the end of the queue
+ listeners.add(handler);
+ }
+
+ /**
+ * remove a listener from the set of listeners.
+ */
+ public static boolean unregister(Events handler) {
+ return listeners.remove(handler);
+ }
+
+ public static Collection getListeners() {
+ return listeners;
+ }
+
+ /* ============================================================ */
+
+ /**
+ * Notification that a new server has been discovered.
+ *
+ * @param server this is the new server
+ */
+ public default void newServer(Server server) {
+ }
+
+ /**
+ * Notification that a server has failed.
+ *
+ * @param server this is the server that failed
+ */
+ public default void serverFailed(Server server) {
+ }
+
+ /**
+ * Notification that a new lead server has been selected.
+ *
+ * @param server this is the new lead server
+ */
+ public default void newLeader(Server server) {
+ }
+
+ /**
+ * Notification that the lead server has gone down.
+ *
+ * @param server the lead server that failed
+ */
+ public default void leaderFailed(Server server) {
+ }
+
+ /**
+ * Notification that a new selection just completed, but the same
+ * leader has been chosen (this may be in response to a new server
+ * joining earlier).
+ *
+ * @param server the current leader, which has been confirmed
+ */
+ public default void leaderConfirmed(Server server) {
+ }
+}
diff --git a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/ExtendedObjectInputStream.java b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/ExtendedObjectInputStream.java
new file mode 100644
index 00000000..5ec6f341
--- /dev/null
+++ b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/ExtendedObjectInputStream.java
@@ -0,0 +1,70 @@
+/*
+ * ============LICENSE_START=======================================================
+ * feature-server-pool
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.serverpool;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectStreamClass;
+
+/**
+ * This class provides an 'ObjectInputStream' variant that uses the
+ * specified 'ClassLoader' instance.
+ */
+public class ExtendedObjectInputStream extends ObjectInputStream {
+ // the 'ClassLoader' to use when doing class lookups
+ private ClassLoader classLoader;
+
+ /**
+ * Constructor -- invoke the superclass, and save the 'ClassLoader'.
+ *
+ * @param in input stream to read from
+ * @param classLoader 'ClassLoader' to use when doing class lookups
+ */
+ public ExtendedObjectInputStream(InputStream in, ClassLoader classLoader) throws IOException {
+ super(in);
+ this.classLoader = classLoader;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ protected Class> resolveClass(ObjectStreamClass desc) throws IOException, ClassNotFoundException {
+
+ // Standard ClassLoader implementations first attempt to load classes
+ // via the parent class loader, and then attempt to load it using the
+ // current class loader if that fails. For some reason, Drools container
+ // class loaders define a different order -- in theory, this is only a
+ // problem if different versions of the same class are accessible through
+ // different class loaders, which is exactly what happens in some Junit
+ // tests.
+ //
+ // This change restores the order, at least when deserializing objects
+ // into a Drools container.
+ try {
+ // try the parent class loader first
+ return classLoader.getParent().loadClass(desc.getName());
+ } catch (ClassNotFoundException e) {
+ return classLoader.loadClass(desc.getName());
+ }
+ }
+}
diff --git a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/FeatureServerPool.java b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/FeatureServerPool.java
new file mode 100644
index 00000000..748a38f3
--- /dev/null
+++ b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/FeatureServerPool.java
@@ -0,0 +1,986 @@
+/*
+ * ============LICENSE_START=======================================================
+ * feature-server-pool
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.serverpool;
+
+import static org.onap.policy.drools.serverpool.ServerPoolProperties.BUCKET_DROOLS_TIMEOUT;
+import static org.onap.policy.drools.serverpool.ServerPoolProperties.BUCKET_TIME_TO_LIVE;
+import static org.onap.policy.drools.serverpool.ServerPoolProperties.DEFAULT_BUCKET_DROOLS_TIMEOUT;
+import static org.onap.policy.drools.serverpool.ServerPoolProperties.DEFAULT_BUCKET_TIME_TO_LIVE;
+import static org.onap.policy.drools.serverpool.ServerPoolProperties.getProperty;
+
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.client.WebTarget;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+import lombok.AllArgsConstructor;
+
+import org.drools.core.definitions.InternalKnowledgePackage;
+import org.drools.core.impl.KnowledgeBaseImpl;
+import org.kie.api.runtime.KieSession;
+import org.kie.api.runtime.rule.FactHandle;
+import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.common.endpoints.event.comm.TopicListener;
+import org.onap.policy.common.utils.coder.CoderException;
+import org.onap.policy.common.utils.coder.StandardCoder;
+import org.onap.policy.common.utils.coder.StandardCoderObject;
+import org.onap.policy.drools.control.api.DroolsPdpStateControlApi;
+import org.onap.policy.drools.core.DroolsRunnable;
+import org.onap.policy.drools.core.PolicyContainer;
+import org.onap.policy.drools.core.PolicySession;
+import org.onap.policy.drools.core.PolicySessionFeatureApi;
+import org.onap.policy.drools.core.lock.PolicyResourceLockManager;
+import org.onap.policy.drools.features.PolicyControllerFeatureApi;
+import org.onap.policy.drools.features.PolicyEngineFeatureApi;
+import org.onap.policy.drools.system.PolicyController;
+import org.onap.policy.drools.system.PolicyControllerConstants;
+import org.onap.policy.drools.system.PolicyEngine;
+import org.onap.policy.drools.system.PolicyEngineConstants;
+import org.onap.policy.drools.utils.Pair;
+import org.onap.policy.drools.utils.PropertyUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ * This class hooks the server pool implementation into DroolsPDP.
+ *
+ *
PolicyEngineFeatureApi
- the afterStart hook is where we initialize.
+ *
PolicyControllerFeatureApi
- the beforeOffer hook is used to look
+ * at incoming topic messages, and decide whether to process them
+ * on this host, or forward to another host.
+ *
+ */
+public class FeatureServerPool
+ implements PolicyEngineFeatureApi, PolicySessionFeatureApi,
+ PolicyControllerFeatureApi, DroolsPdpStateControlApi {
+ private static Logger logger =
+ LoggerFactory.getLogger(FeatureServerPool.class);
+
+ // used for JSON <-> String conversion
+ private static StandardCoder coder = new StandardCoder();
+
+ private static final String configFile =
+ "config/feature-server-pool.properties";
+
+ /*
+ * Properties used when searching for keyword entries
+ *
+ * The following types are supported:
+ *
+ * 1) keyword..path=
+ * 2) keyword.path=
+ * 3) ueb.source.topics..keyword=
+ * 4) ueb.source.topics.keyword=
+ * 5) dmaap.source.topics..keyword=
+ * 6) dmaap.source.topics.keyword=
+ *
+ * 1, 3, and 5 are functionally equivalent
+ * 2, 4, and 6 are functionally equivalent
+ */
+
+ static final String KEYWORD_PROPERTY_START_1 = "keyword.";
+ static final String KEYWORD_PROPERTY_END_1 = ".path";
+ static final String KEYWORD_PROPERTY_START_2 = "ueb.source.topics.";
+ static final String KEYWORD_PROPERTY_END_2 = ".keyword";
+ static final String KEYWORD_PROPERTY_START_3 = "dmaap.source.topics.";
+ static final String KEYWORD_PROPERTY_END_3 = ".keyword";
+
+ /*
+ * maps topic names to a keyword table derived from (above)
+ *
+ * Example : requestID,CommonHeader.RequestID
+ *
+ * Table generated from this example has length 2:
+ * table[0] = {"requestID"}
+ * table[1] = {"CommonHeader", "RequestID"}
+ */
+ private static HashMap topicToPaths = new HashMap<>();
+
+ // this table is used for any topics that aren't in 'topicToPaths'
+ private static String[][] defaultPaths = new String[0][];
+
+ // extracted from properties
+ private static long droolsTimeoutMillis;
+ private static String timeToLiveSecond;
+
+ /******************************/
+ /* 'OrderedService' interface */
+ /******************************/
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public int getSequenceNumber() {
+ // we need to make sure we have an early position in 'selectThreadModel'
+ // (in case there is feature that provides a thread model)
+ return -1000000;
+ }
+
+ /**************************************/
+ /* 'PolicyEngineFeatureApi' interface */
+ /**************************************/
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean afterStart(PolicyEngine engine) {
+ logger.info("Starting FeatureServerPool");
+ Server.startup(configFile);
+ TargetLock.startup();
+ droolsTimeoutMillis =
+ getProperty(BUCKET_DROOLS_TIMEOUT, DEFAULT_BUCKET_DROOLS_TIMEOUT);
+ int intTimeToLive =
+ getProperty(BUCKET_TIME_TO_LIVE, DEFAULT_BUCKET_TIME_TO_LIVE);
+ timeToLiveSecond = String.valueOf(intTimeToLive);
+ buildKeywordTable();
+ Bucket.Backup.register(new DroolsSessionBackup());
+ Bucket.Backup.register(new TargetLock.LockBackup());
+ return false;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public PolicyResourceLockManager beforeCreateLockManager(
+ PolicyEngine engine, Properties properties) {
+
+ return TargetLock.getLockFactory();
+ }
+
+ /*=====================================*/
+ /* 'PolicySessionFeatureApi' interface */
+ /*=====================================*/
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean insertDrools(
+ final PolicySession session, final Object object) {
+
+ final String keyword = Keyword.lookupKeyword(object);
+ if (keyword == null) {
+ // no keyword was found, so we process locally
+ KieSession kieSession = session.getKieSession();
+ if (kieSession != null) {
+ kieSession.insert(object);
+ }
+ return true;
+ }
+
+ /*
+ * 'keyword' determines the destination host,
+ * which may be local or remote
+ */
+ Bucket.forwardAndProcess(keyword, new Bucket.Message() {
+ @Override
+ public void process() {
+ // if we reach this point, we process locally
+ KieSession kieSession = session.getKieSession();
+ if (kieSession != null) {
+ kieSession.insert(object);
+ }
+ }
+
+ @Override
+ public void sendToServer(Server server, int bucketNumber) {
+ // this object needs to sent to a remote host --
+ // first, serialize the object
+ byte[] data = null;
+ try {
+ data = Util.serialize(object);
+ } catch (IOException e) {
+ logger.error("insertDrools: can't serialize object of {}",
+ object.getClass(), e);
+ return;
+ }
+
+ // construct the message to insert remotely
+ Entity entity = Entity.entity(
+ new String(Base64.getEncoder().encode(data), StandardCharsets.UTF_8),
+ MediaType.APPLICATION_OCTET_STREAM_TYPE);
+ server.post("session/insertDrools", entity,
+ new Server.PostResponse() {
+ @Override
+ public WebTarget webTarget(WebTarget webTarget) {
+ PolicyContainer pc = session.getPolicyContainer();
+ String encodedSessionName =
+ pc.getGroupId() + ":" + pc.getArtifactId() + ":"
+ + session.getName();
+
+ return webTarget
+ .queryParam("keyword", keyword)
+ .queryParam("session", encodedSessionName)
+ .queryParam("bucket", bucketNumber)
+ .queryParam("ttl", timeToLiveSecond);
+ }
+
+ @Override
+ public void response(Response response) {
+ logger.info("/session/insertDrools response code = {}",
+ response.getStatus());
+ }
+ });
+ }
+ });
+ return true;
+ }
+
+ /******************************************/
+ /* 'PolicyControllerFeatureApi' interface */
+ /******************************************/
+
+ /**
+ * This method is called from 'AggregatedPolicyController.onTopicEvent',
+ * and provides a way to intercept the message before it is decoded and
+ * delivered to a local Drools session.
+ *
+ * @param controller the PolicyController instance receiving the message
+ * @param protocol communication infrastructure type
+ * @param topic topic name
+ * @param event event message as a string
+ * @return 'false' if this event should be processed locally, or 'true'
+ * if the message has been forwarded to a remote host, so local
+ * processing should be bypassed
+ */
+ @Override
+ public boolean beforeOffer(final PolicyController controller,
+ final CommInfrastructure protocol,
+ final String topic,
+ final String event) {
+ // choose the table, based upon the topic
+ String[][] table = topicToPaths.getOrDefault(topic, defaultPaths);
+
+ // build a JSON object from the event
+ StandardCoderObject sco;
+
+ try {
+ sco = coder.decode(event, StandardCoderObject.class);
+ } catch (CoderException e) {
+ return false;
+ }
+ String keyword = null;
+
+ for (String[] path : table) {
+ /*
+ * Each entry in 'table' is a String[] containing an encoding
+ * of a possible keyword field. Suppose the value is 'a.b.c.d.e' --
+ * 'path' would be encoded as 'String[] {"a", "b", "c", "d", "e"}'
+ */
+ String fieldName = path[path.length - 1];
+ String conversionFunctionName = null;
+ int index = fieldName.indexOf(':');
+
+ if (index > 0) {
+ conversionFunctionName = fieldName.substring(index + 1);
+ fieldName = fieldName.substring(0, index);
+ path = Arrays.copyOf(path, path.length);
+ path[path.length - 1] = fieldName;
+ }
+ keyword = sco.getString(path);
+ }
+
+ if (keyword == null) {
+ // couldn't find any keywords -- just process this message locally
+ logger.warn("Can't locate bucket keyword within message");
+ return false;
+ }
+
+ /*
+ * build a message object implementing the 'Bucket.Message' interface --
+ * it will be processed locally, forwarded, or queued based upon the
+ * current state.
+ */
+ TopicMessage message =
+ new TopicMessage(keyword, controller, protocol, topic, event);
+ int bucketNumber = Bucket.bucketNumber(keyword);
+ if (Bucket.forward(bucketNumber, message)) {
+ // message was queued or forwarded -- abort local processing
+ return true;
+ }
+
+ /*
+ * the bucket happens to be assigned to this server, and wasn't queued --
+ * return 'false', so it will be processed locally
+ */
+ logger.info("Keyword={}, bucket={} -- owned by this server",
+ keyword, bucketNumber);
+ return false;
+ }
+
+ /**
+ * Incoming topic message has been forwarded from a remote host.
+ *
+ * @param bucketNumber the bucket number calculated on the remote host
+ * @param keyword the keyword associated with the message
+ * @param controllerName the controller the message was directed to
+ * on the remote host
+ * @param protocol String value of the 'Topic.CommInfrastructure' value
+ * (UEB, DMAAP, NOOP, or REST -- NOOP and REST shouldn't be used
+ * here)
+ * @param topic the UEB/DMAAP topic name
+ * @param event this is the JSON message
+ */
+ static void topicMessage(
+ int bucketNumber, String keyword, String controllerName,
+ String protocol, String topic, String event) {
+
+ // @formatter:off
+ logger.info("Incoming topic message: Keyword={}, bucket={}\n"
+ + " controller = {}\n"
+ + " topic = {}",
+ keyword, bucketNumber, controllerName, topic);
+ // @formatter:on
+
+ // locate the 'PolicyController'
+ PolicyController controller = PolicyControllerConstants.getFactory().get(controllerName);
+ if (controller == null) {
+ /*
+ * This controller existed on the sender's host, but doesn't exist
+ * on the destination. This is a problem -- we are counting on all
+ * hosts being configured with the same controllers.
+ */
+ logger.error("Can't locate controller '{}' for incoming topic message",
+ controllerName);
+ } else if (controller instanceof TopicListener) {
+ /*
+ * This is the destination host -- repeat the 'onTopicEvent'
+ * method (the one that invoked 'beforeOffer' on the originating host).
+ * Note that this message could be forwarded again if the sender's
+ * bucket table was somehow different from ours -- perhaps there was
+ * an update in progress.
+ *
+ * TBD: it would be nice to limit the number of hops, in case we
+ * somehow have a loop.
+ */
+ ((TopicListener)controller).onTopicEvent(
+ CommInfrastructure.valueOf(protocol), topic, event);
+ } else {
+ /*
+ * This 'PolicyController' was also a 'TopicListener' on the sender's
+ * host -- it isn't on this host, and we are counting on them being
+ * config
+ */
+ logger.error("Controller {} is not a TopicListener", controllerName);
+ }
+ }
+
+ /**
+ * An incoming '/session/insertDrools' message was received.
+ *
+ * @param keyword the keyword associated with the incoming object
+ * @param sessionName encoded session name(groupId:artifactId:droolsSession)
+ * @param bucket the bucket associated with keyword
+ * @param ttl similar to IP time-to-live -- it controls the number of hops
+ * the message may take
+ * @param data base64-encoded serialized data for the object
+ */
+ static void incomingInsertDrools(
+ String keyword, String sessionName, int bucket, int ttl, byte[] data) {
+
+ logger.info("Incoming insertDrools: keyword={}, session={}, bucket={}, ttl={}",
+ keyword, sessionName, bucket, ttl);
+
+ if (Bucket.isKeyOnThisServer(keyword)) {
+ // process locally
+
+ // [0]="" [1]="", [2]=""
+ String[] nameSegments = sessionName.split(":");
+
+ // locate the 'PolicyContainer' and 'PolicySession'
+ PolicySession policySession = locatePolicySession(nameSegments);
+
+ if (policySession == null) {
+ logger.error("incomingInsertDrools: Can't find PolicySession={}",
+ sessionName);
+ } else {
+ KieSession kieSession = policySession.getKieSession();
+ if (kieSession != null) {
+ try {
+ // deserialization needs to use the correct class loader
+ Object obj = Util.deserialize(
+ Base64.getDecoder().decode(data),
+ policySession.getPolicyContainer().getClassLoader());
+ kieSession.insert(obj);
+ } catch (IOException | ClassNotFoundException
+ | IllegalArgumentException e) {
+ logger.error("incomingInsertDrools: failed to read data "
+ + "for session '{}'", sessionName, e);
+ }
+ }
+ }
+ } else if ((ttl -= 1) > 0) {
+ /*
+ * This host is not the intended destination -- this could happen
+ * if it was sent from another site. Forward the message in the
+ * same thread.
+ */
+ forwardInsertDroolsMessage(bucket, keyword, sessionName, ttl, data);
+ }
+ }
+
+ /**
+ * step through all 'PolicyContainer' instances looking
+ * for a matching 'artifactId' & 'groupId'.
+ * @param nameSegments name portion from sessionName
+ * @return policySession match artifactId and groupId
+ */
+ private static PolicySession locatePolicySession(String[] nameSegments) {
+ PolicySession policySession = null;
+ if (nameSegments.length == 3) {
+ for (PolicyContainer pc : PolicyContainer.getPolicyContainers()) {
+ if (nameSegments[1].equals(pc.getArtifactId())
+ && nameSegments[0].equals(pc.getGroupId())) {
+ policySession = pc.getPolicySession(nameSegments[2]);
+ break;
+ }
+ }
+ }
+ return policySession;
+ }
+
+ /**
+ * Forward the insertDrools message in the same thread.
+ */
+ private static void forwardInsertDroolsMessage(int bucket, String keyword,
+ String sessionName, int ttl, byte[] data) {
+ Server server = Bucket.bucketToServer(bucket);
+ WebTarget webTarget = server.getWebTarget("session/insertDrools");
+ if (webTarget != null) {
+ logger.info("Forwarding 'session/insertDrools' "
+ + "(key={},session={},bucket={},ttl={})",
+ keyword, sessionName, bucket, ttl);
+ Entity entity =
+ Entity.entity(new String(data, StandardCharsets.UTF_8),
+ MediaType.APPLICATION_OCTET_STREAM_TYPE);
+ webTarget
+ .queryParam("keyword", keyword)
+ .queryParam("session", sessionName)
+ .queryParam("bucket", bucket)
+ .queryParam("ttl", ttl)
+ .request().post(entity);
+ }
+ }
+
+ /**
+ * This method builds the table that is used to locate the appropriate
+ * keywords within incoming JSON messages (e.g. 'requestID'). The
+ * associated values are then mapped into bucket numbers.
+ */
+ private static void buildKeywordTable() {
+ Properties prop = ServerPoolProperties.getProperties();
+
+ // iterate over all of the properties, picking out those we are
+ // interested in
+ for (String name : prop.stringPropertyNames()) {
+ String topic = null;
+ String begin;
+ String end;
+
+ if (name.startsWith(KEYWORD_PROPERTY_START_1)
+ && name.endsWith(KEYWORD_PROPERTY_END_1)) {
+ // 1) keyword..path=
+ // 2) keyword.path=
+ begin = KEYWORD_PROPERTY_START_1;
+ end = KEYWORD_PROPERTY_END_1;
+ } else if (name.startsWith(KEYWORD_PROPERTY_START_2)
+ && name.endsWith(KEYWORD_PROPERTY_END_2)) {
+ // 3) ueb.source.topics..keyword=
+ // 4) ueb.source.topics.keyword=
+ begin = KEYWORD_PROPERTY_START_2;
+ end = KEYWORD_PROPERTY_END_2;
+ } else if (name.startsWith(KEYWORD_PROPERTY_START_3)
+ && name.endsWith(KEYWORD_PROPERTY_END_3)) {
+ // 5) dmaap.source.topics..keyword=
+ // 6) dmaap.source.topics.keyword=
+ begin = KEYWORD_PROPERTY_START_3;
+ end = KEYWORD_PROPERTY_END_3;
+ } else {
+ // we aren't interested in this property
+ continue;
+ }
+
+ int beginIndex = begin.length();
+ int endIndex = name.length() - end.length();
+ if (beginIndex < endIndex) {
+ // is specified, so this table is limited to this
+ // specific topic
+ topic = name.substring(beginIndex, endIndex);
+ }
+
+ // now, process the value
+ // Example: requestID,CommonHeader.RequestID
+ String[] commaSeparatedEntries = prop.getProperty(name).split(",");
+ String[][] paths = new String[commaSeparatedEntries.length][];
+ for (int i = 0 ; i < commaSeparatedEntries.length ; i += 1) {
+ paths[i] = commaSeparatedEntries[i].split("\\.");
+ }
+
+ if (topic == null) {
+ // these paths are used for any topics not explicitly
+ // in the 'topicToPaths' table
+ defaultPaths = paths;
+ } else {
+ // these paths are specific to 'topic'
+ topicToPaths.put(topic, paths);
+ }
+ }
+ }
+
+ /*======================================*/
+ /* 'DroolsPdpStateControlApi' interface */
+ /*======================================*/
+
+ /*
+ * Stop the processing of messages and server pool participation(non-Javadoc)
+ * Note: This is not static because it should only be used if feature-server-pool
+ * has been enabled.
+ * (non-Javadoc)
+ * @see org.onap.policy.drools.control.api.DroolsPdpStateControlApi#shutdown()
+ */
+ @Override
+ public void shutdown() {
+ PolicyEngineConstants.getManager().deactivate();
+ Server.shutdown();
+ }
+
+ /*
+ * Stop the processing of messages and server pool participation(non-Javadoc)
+ * Note: This is not static because it should only be used if feature-server-pool
+ * has been enabled.
+ * (non-Javadoc)
+ * @see org.onap.policy.drools.control.api.DroolsPdpStateControlApi#restart()
+ */
+ @Override
+ public void restart() {
+ MainLoop.startThread();
+ Discovery.startDiscovery();
+ PolicyEngineConstants.getManager().activate();
+ }
+
+ /* ============================================================ */
+
+ /**
+ * This class implements the 'Bucket.Message' interface for UEB/DMAAP
+ * messages.
+ */
+ @AllArgsConstructor
+ private static class TopicMessage implements Bucket.Message {
+ /*
+ * the keyword associated with this message
+ * (which determines the bucket number).
+ */
+ private final String keyword;
+
+ // the controller receiving this message
+ private final PolicyController controller;
+
+ // enumeration: UEB or DMAAP
+ private final CommInfrastructure protocol;
+
+ // UEB/DMAAP topic
+ private final String topic;
+
+ // JSON message as a String
+ private final String event;
+
+ /**
+ * Process this message locally using 'TopicListener.onTopicEvent'
+ * (the 'PolicyController' instance is assumed to implement
+ * the 'TopicListener' interface as well).
+ */
+ @Override
+ public void process() {
+ if (controller instanceof TopicListener) {
+ /*
+ * This is the destination host -- repeat the 'onTopicEvent' method
+ * (the one that invoked 'beforeOffer' on the originating host).
+ * Note that this message could be forwarded again if the sender's
+ * bucket table was somehow different from ours -- perhaps there was
+ * an update in progress.
+ *
+ * TBD: it would be nice to limit the number of hops, in case we
+ * somehow have a loop.
+ */
+ ((TopicListener)controller).onTopicEvent(protocol, topic, event);
+ } else {
+ /*
+ * This 'PolicyController' was also a 'TopicListener' on the sender's
+ * host -- it isn't on this host, and we are counting on them being
+ * configured the same way.
+ */
+ logger.error("Controller {} is not a TopicListener",
+ controller.getName());
+ }
+ }
+
+ /**
+ * Send this message to a remote server for processing (presumably, it
+ * is the destination host).
+ *
+ * @param server the Server instance to send the message to
+ * @param bucketNumber the bucket number to send it to
+ */
+ @Override
+ public void sendToServer(Server server, int bucketNumber) {
+ // if we reach this point, we have determined the remote server
+ // that should process this message
+
+ // @formatter:off
+ logger.info("Outgoing topic message: Keyword={}, bucket={}\n"
+ + " controller = {}"
+ + " topic = {}"
+ + " sender = {}"
+ + " receiver = {}",
+ keyword, bucketNumber, controller.getName(), topic,
+ Server.getThisServer().getUuid(), server.getUuid());
+ // @formatter:on
+
+ Entity entity = Entity.entity(event, MediaType.APPLICATION_JSON);
+ server.post("bucket/topic", entity, new Server.PostResponse() {
+ @Override
+ public WebTarget webTarget(WebTarget webTarget) {
+ return webTarget
+ .queryParam("bucket", bucketNumber)
+ .queryParam("keyword", keyword)
+ .queryParam("controller", controller.getName())
+ .queryParam("protocol", protocol.toString())
+ .queryParam("topic", topic);
+ }
+
+ @Override
+ public void response(Response response) {
+ // TODO: eventually, we will want to do something different
+ // based upon success/failure
+ }
+ });
+ }
+ }
+
+ /* ============================================================ */
+
+ /**
+ * Backup data associated with a Drools session.
+ */
+ static class DroolsSessionBackup implements Bucket.Backup {
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Bucket.Restore generate(int bucketNumber) {
+ // Go through all of the Drools sessions, and generate backup data.
+ // If there is no data to backup for this bucket, return 'null'
+
+ DroolsSessionRestore restore = new DroolsSessionRestore();
+ return restore.backup(bucketNumber) ? restore : null;
+ }
+ }
+
+ /* ============================================================ */
+
+ /**
+ * This class is used to generate and restore backup Drools data.
+ */
+ static class DroolsSessionRestore implements Bucket.Restore, Serializable {
+ // backup data for all Drools sessions on this host
+ private final List sessions = new LinkedList<>();
+
+ /**
+ * {@inheritDoc}
+ */
+ boolean backup(int bucketNumber) {
+ /*
+ * There may be multiple Drools sessions being backed up at the same
+ * time. There is one 'Pair' in the list for each session being
+ * backed up.
+ */
+ LinkedList>, PolicySession>>
+ pendingData = new LinkedList<>();
+ for (PolicyContainer pc : PolicyContainer.getPolicyContainers()) {
+ for (PolicySession session : pc.getPolicySessions()) {
+ // Wraps list of objects, to be populated in the session
+ final CompletableFuture> droolsObjectsWrapper =
+ new CompletableFuture<>();
+
+ // 'KieSessionObject'
+ final KieSession kieSession = session.getKieSession();
+
+ logger.info("{}: about to fetch data for session {}",
+ this, session.getFullName());
+ kieSession.insert(new DroolsRunnable() {
+ @Override
+ public void run() {
+ List