From 2381590fc66aa088e0b016befaef09d21e618100 Mon Sep 17 00:00:00 2001
From: Taka Cho
Date: Fri, 19 Feb 2021 09:00:43 -0500
Subject: deprecate server-pool in droolspdp
deprecate server-pool in droolspdp
Issue-ID: POLICY-3079
Change-Id: Id25aea57cc4d119bba73de22c6dc77ab9a56c95f
Signed-off-by: Taka Cho
---
api-server-pool-state-control/pom.xml | 46 -
.../control/api/DroolsPdpStateControlApi.java | 40 -
.../api/DroolsPdpStateControlApiConstants.java | 37 -
feature-server-pool/lombok.config | 3 -
feature-server-pool/pom.xml | 177 --
feature-server-pool/src/assembly/assemble_zip.xml | 75 -
.../feature/config/feature-server-pool.properties | 142 -
.../org/onap/policy/drools/serverpool/Bucket.java | 2579 ------------------
.../onap/policy/drools/serverpool/Discovery.java | 352 ---
.../org/onap/policy/drools/serverpool/Events.java | 108 -
.../serverpool/ExtendedObjectInputStream.java | 70 -
.../drools/serverpool/FeatureServerPool.java | 1027 -------
.../org/onap/policy/drools/serverpool/Keyword.java | 500 ----
.../org/onap/policy/drools/serverpool/Leader.java | 594 ----
.../onap/policy/drools/serverpool/MainLoop.java | 195 --
.../policy/drools/serverpool/RestServerPool.java | 437 ---
.../org/onap/policy/drools/serverpool/Server.java | 1361 ----------
.../policy/drools/serverpool/ServerPoolApi.java | 82 -
.../drools/serverpool/ServerPoolProperties.java | 338 ---
.../onap/policy/drools/serverpool/TargetLock.java | 2852 --------------------
.../org/onap/policy/drools/serverpool/Util.java | 183 --
.../drools/serverpool/persistence/Persistence.java | 899 ------
...icy.drools.control.api.DroolsPdpStateControlApi | 1 -
...onap.policy.drools.core.PolicySessionFeatureApi | 2 -
...licy.drools.features.PolicyControllerFeatureApi | 1 -
...p.policy.drools.features.PolicyEngineFeatureApi | 1 -
...org.onap.policy.drools.serverpool.ServerPoolApi | 1 -
.../onap/policy/drools/serverpool/AdapterImpl.java | 455 ----
.../drools/serverpool/BucketWrapperImpl.java | 172 --
.../drools/serverpool/ServerWrapperImpl.java | 145 -
.../drools/serverpool/TargetLockWrapperImpl.java | 197 --
.../onap/policy/drools/serverpooltest/Adapter.java | 356 ---
.../drools/serverpooltest/BlockingClassLoader.java | 176 --
.../drools/serverpooltest/BucketWrapper.java | 132 -
.../drools/serverpooltest/ServerWrapper.java | 103 -
.../policy/drools/serverpooltest/SimDmaap.java | 327 ---
.../drools/serverpooltest/TargetLockWrapper.java | 98 -
.../onap/policy/drools/serverpooltest/Test1.java | 916 -------
.../drools/serverpooltest/TestDroolsObject.java | 58 -
.../resources/TestController-controller.properties | 13 -
.../src/test/resources/drools-artifact-1.1/pom.xml | 31 -
.../src/main/resources/META-INF/kmodule.xml | 25 -
.../src/main/resources/rules.drl | 51 -
.../resources/feature-server-pool-test.properties | 142 -
.../src/test/resources/logback-test.xml | 32 -
.../install/src/files/feature-server-pool.conf | 30 -
pom.xml | 2 -
47 files changed, 15564 deletions(-)
delete mode 100644 api-server-pool-state-control/pom.xml
delete mode 100644 api-server-pool-state-control/src/main/java/org/onap/policy/drools/control/api/DroolsPdpStateControlApi.java
delete mode 100644 api-server-pool-state-control/src/main/java/org/onap/policy/drools/control/api/DroolsPdpStateControlApiConstants.java
delete mode 100644 feature-server-pool/lombok.config
delete mode 100644 feature-server-pool/pom.xml
delete mode 100644 feature-server-pool/src/assembly/assemble_zip.xml
delete mode 100644 feature-server-pool/src/main/feature/config/feature-server-pool.properties
delete mode 100644 feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Bucket.java
delete mode 100644 feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Discovery.java
delete mode 100644 feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Events.java
delete mode 100644 feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/ExtendedObjectInputStream.java
delete mode 100644 feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/FeatureServerPool.java
delete mode 100644 feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Keyword.java
delete mode 100644 feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Leader.java
delete mode 100644 feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/MainLoop.java
delete mode 100644 feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/RestServerPool.java
delete mode 100644 feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Server.java
delete mode 100644 feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/ServerPoolApi.java
delete mode 100644 feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/ServerPoolProperties.java
delete mode 100644 feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/TargetLock.java
delete mode 100644 feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Util.java
delete mode 100644 feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/persistence/Persistence.java
delete mode 100644 feature-server-pool/src/main/resources/META-INF/services/org.onap.policy.drools.control.api.DroolsPdpStateControlApi
delete mode 100644 feature-server-pool/src/main/resources/META-INF/services/org.onap.policy.drools.core.PolicySessionFeatureApi
delete mode 100644 feature-server-pool/src/main/resources/META-INF/services/org.onap.policy.drools.features.PolicyControllerFeatureApi
delete mode 100644 feature-server-pool/src/main/resources/META-INF/services/org.onap.policy.drools.features.PolicyEngineFeatureApi
delete mode 100644 feature-server-pool/src/main/resources/META-INF/services/org.onap.policy.drools.serverpool.ServerPoolApi
delete mode 100644 feature-server-pool/src/test/java/org/onap/policy/drools/serverpool/AdapterImpl.java
delete mode 100644 feature-server-pool/src/test/java/org/onap/policy/drools/serverpool/BucketWrapperImpl.java
delete mode 100644 feature-server-pool/src/test/java/org/onap/policy/drools/serverpool/ServerWrapperImpl.java
delete mode 100644 feature-server-pool/src/test/java/org/onap/policy/drools/serverpool/TargetLockWrapperImpl.java
delete mode 100644 feature-server-pool/src/test/java/org/onap/policy/drools/serverpooltest/Adapter.java
delete mode 100644 feature-server-pool/src/test/java/org/onap/policy/drools/serverpooltest/BlockingClassLoader.java
delete mode 100644 feature-server-pool/src/test/java/org/onap/policy/drools/serverpooltest/BucketWrapper.java
delete mode 100644 feature-server-pool/src/test/java/org/onap/policy/drools/serverpooltest/ServerWrapper.java
delete mode 100644 feature-server-pool/src/test/java/org/onap/policy/drools/serverpooltest/SimDmaap.java
delete mode 100644 feature-server-pool/src/test/java/org/onap/policy/drools/serverpooltest/TargetLockWrapper.java
delete mode 100644 feature-server-pool/src/test/java/org/onap/policy/drools/serverpooltest/Test1.java
delete mode 100644 feature-server-pool/src/test/java/org/onap/policy/drools/serverpooltest/TestDroolsObject.java
delete mode 100644 feature-server-pool/src/test/resources/TestController-controller.properties
delete mode 100644 feature-server-pool/src/test/resources/drools-artifact-1.1/pom.xml
delete mode 100644 feature-server-pool/src/test/resources/drools-artifact-1.1/src/main/resources/META-INF/kmodule.xml
delete mode 100644 feature-server-pool/src/test/resources/drools-artifact-1.1/src/main/resources/rules.drl
delete mode 100644 feature-server-pool/src/test/resources/feature-server-pool-test.properties
delete mode 100644 feature-server-pool/src/test/resources/logback-test.xml
delete mode 100644 packages/install/src/files/feature-server-pool.conf
diff --git a/api-server-pool-state-control/pom.xml b/api-server-pool-state-control/pom.xml
deleted file mode 100644
index 7627951c..00000000
--- a/api-server-pool-state-control/pom.xml
+++ /dev/null
@@ -1,46 +0,0 @@
-
-
-
-
- 4.0.0
-
-
- org.onap.policy.drools-pdp
- drools-pdp
- 1.8.0-SNAPSHOT
-
-
- api-server-pool-state-control
-
- api-server-pool-state-control
- APIs for server pool state control
-
-
-
- org.onap.policy.drools-pdp
- policy-core
- ${project.version}
- provided
-
-
-
-
\ No newline at end of file
diff --git a/api-server-pool-state-control/src/main/java/org/onap/policy/drools/control/api/DroolsPdpStateControlApi.java b/api-server-pool-state-control/src/main/java/org/onap/policy/drools/control/api/DroolsPdpStateControlApi.java
deleted file mode 100644
index 7148f30e..00000000
--- a/api-server-pool-state-control/src/main/java/org/onap/policy/drools/control/api/DroolsPdpStateControlApi.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * drools-pdp-state-control-api
- * ================================================================================
- * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.control.api;
-
-import org.onap.policy.common.utils.services.OrderedService;
-
-public interface DroolsPdpStateControlApi extends OrderedService {
-
- /**
- * This method is called when wanting to interrupt the operation of the
- * drools pdp. It locks the endpoints, stops the message processing
- * and removes the instance of the drools pdp from the pool.
- */
- void shutdown();
-
- /**
- * This method is called when wanting to resume the operation of the
- * drools pdp. It unlocks the endpoints, resumes message processing
- * and adds the instance of the drools pdp to the pool.
- */
- void restart();
-}
diff --git a/api-server-pool-state-control/src/main/java/org/onap/policy/drools/control/api/DroolsPdpStateControlApiConstants.java b/api-server-pool-state-control/src/main/java/org/onap/policy/drools/control/api/DroolsPdpStateControlApiConstants.java
deleted file mode 100644
index df88bf30..00000000
--- a/api-server-pool-state-control/src/main/java/org/onap/policy/drools/control/api/DroolsPdpStateControlApiConstants.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*-
- * ============LICENSE_START=======================================================
- * drools-pdp-state-control-api
- * ================================================================================
- * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.control.api;
-
-import org.onap.policy.common.utils.services.OrderedServiceImpl;
-
-public class DroolsPdpStateControlApiConstants {
-
- /**
- * 'FeatureAPI.impl.getList()' returns an ordered list of objects
- * implementing the 'FeatureAPI' interface.
- */
- public static final OrderedServiceImpl impl =
- new OrderedServiceImpl<>(DroolsPdpStateControlApi.class);
-
- private DroolsPdpStateControlApiConstants() {
- // do nothing
- }
-}
diff --git a/feature-server-pool/lombok.config b/feature-server-pool/lombok.config
deleted file mode 100644
index c8811fdb..00000000
--- a/feature-server-pool/lombok.config
+++ /dev/null
@@ -1,3 +0,0 @@
-config.stopBubbling = true
-lombok.addLombokGeneratedAnnotation = true
-lombok.nonNull.exceptionType = IllegalArgumentException
\ No newline at end of file
diff --git a/feature-server-pool/pom.xml b/feature-server-pool/pom.xml
deleted file mode 100644
index b24ffbec..00000000
--- a/feature-server-pool/pom.xml
+++ /dev/null
@@ -1,177 +0,0 @@
-
-
-
-
- 4.0.0
-
-
- org.onap.policy.drools-pdp
- drools-pdp
- 1.8.0-SNAPSHOT
-
-
- feature-server-pool
-
- feature-server-pool
- Allows multiple DroolsPDP hosts to be active at once
-
-
-
-
- maven-assembly-plugin
-
-
- zipfile
-
- single
-
- package
-
- true
- ${project.artifactId}-${project.version}
-
- src/assembly/assemble_zip.xml
-
- false
-
-
-
-
-
- org.apache.maven.plugins
- maven-dependency-plugin
-
-
- copy-dependencies
-
- copy-dependencies
-
- prepare-package
-
- false
- ${project.build.directory}/assembly/lib
- false
- true
- true
- false
- false
- false
- runtime
- true
-
-
-
-
-
-
-
-
-
- org.onap.policy.drools-pdp
- policy-core
- ${project.version}
- provided
-
-
-
- org.onap.policy.common
- policy-endpoints
- ${policy.common.version}
- provided
-
-
-
- org.onap.policy.common
- utils
- ${policy.common.version}
- provided
-
-
-
- org.onap.policy.drools-pdp
- policy-management
- ${project.version}
- provided
-
-
-
- com.att.nsa
- cambriaClient
- provided
-
-
-
- org.projectlombok
- lombok
- provided
-
-
-
- org.onap.policy.drools-pdp
- api-server-pool-state-control
- ${project.version}
-
-
-
- org.glassfish.jersey.core
- jersey-common
- test
-
-
-
- com.google.guava
- guava
- test
-
-
-
- org.powermock
- powermock-api-mockito2
- test
-
-
-
- org.powermock
- powermock-module-junit4
- test
-
-
-
- junit
- junit
- provided
-
-
-
- org.assertj
- assertj-core
- test
-
-
-
- org.awaitility
- awaitility
- test
-
-
-
-
diff --git a/feature-server-pool/src/assembly/assemble_zip.xml b/feature-server-pool/src/assembly/assemble_zip.xml
deleted file mode 100644
index e735a8d6..00000000
--- a/feature-server-pool/src/assembly/assemble_zip.xml
+++ /dev/null
@@ -1,75 +0,0 @@
-
-
-
-
-
- feature-server-pool-package
-
- zip
-
-
- false
-
-
-
- target
- lib/feature
-
- feature-server-pool-${project.version}.jar
-
-
-
- target/assembly/lib
- lib/dependencies
-
- *.jar
-
-
-
- src/main/feature/config
- config
- 0644
-
-
-
- src/main/feature/bin
- bin
- 0744
-
-
-
- src/main/feature/db
- db
- 0744
-
-
-
- src/main/feature/install
- install
- 0744
-
-
-
-
-
diff --git a/feature-server-pool/src/main/feature/config/feature-server-pool.properties b/feature-server-pool/src/main/feature/config/feature-server-pool.properties
deleted file mode 100644
index 00380294..00000000
--- a/feature-server-pool/src/main/feature/config/feature-server-pool.properties
+++ /dev/null
@@ -1,142 +0,0 @@
-###
-# ============LICENSE_START=======================================================
-# feature-server-pool
-# ================================================================================
-# Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
-# ================================================================================
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# ============LICENSE_END=========================================================
-###
-
-# The following properties control the IP address and port that a given
-# server binds to. The default will bind to all interfaces on the host,
-# and choose a port number at random.
-
-server.pool.server.ipAddress=${envd:SERVER_POOL_SERVER_IP}
-server.pool.server.port=${envd:SERVER_POOL_PORT:20000}
-
-# The following properties determine whether HTTPS is used -- note that HTTPS
-# also requires that the 'java.net.ssl.*' parameters in 'system.properties' be
-# specified, and the key store and trust store be configured, as appropriate.
-server.pool.server.https=${envd:SERVER_POOL_HTTPS}
-server.pool.server.selfSignedCerts=false
-
-# The IP address and port that servers on the geo-redundant site
-# should use to connect to servers on this site.
-server.pool.server.site.ip=${envd:SERVER_POOL_SITE_IP}
-server.pool.server.site.port=${envd:SERVER_POOL_SITE_PORT}
-
-# A comma-separated list of host names -- if an entry is found that refers
-# to an HTTP/HTTPS destination IP address, the host name will used as the
-# destination, instead of the IP address
-server.pool.server.hostlist=${envd:SERVER_POOL_HOST_LIST}
-
-# The servers send 'pings' to each other once per main loop cycle. They
-# also measure the gap between 'pings' from each server, and calculate
-# an allowed time gap based upon this. 'server.pool.server.allowedGap' is the initial
-# allowed gap prior to receiving any pings (default=30 seconds), and
-# 'server.pool.server.adaptiveGapAdjust' is a value that is added to the calculated
-# gap "just in case" (default=5 seconds)
-
-server.pool.server.allowedGap=30000
-server.pool.server.adaptiveGapAdjust=5000
-
-# 'connectTimeout' and 'readTimeout' affect the client end of a REST
-# connection (default=10 seconds each)
-
-server.pool.server.connectTimeout=10000
-server.pool.server.readTimeout=10000
-
-# Each server has a thread pool per remote server, which is used when
-# sending HTTP REST messages -- the following parameters determine the
-# configuration.
-
-server.pool.server.threads.corePoolSize=5
-server.pool.server.threads.maximumPoolSize=10
-server.pool.server.threads.keepAliveTime=5000
-
-# The server pool members use a UEB/DMAAP topic to connect with other
-# servers in the pool. The following set of parameters are passed to
-# the CambriaClient library, and are used in setting up the consumer and
-# publisher ends of the connection. 'discovery.servers' and 'discovery.topic'
-# are the minimum values that need to be specified. The last parameter in
-# this set, 'discovery.publisherLoopCycleTime' isn't passed to the
-# CambriaClient library; instead, it controls how frequently the 'ping'
-# messages are sent out on this channel. Note that only the lead server
-# keeps this channel open long-term.
-
-server.pool.discovery.servers=${envd:SERVER_POOL_DISCOVERY_SERVERS}
-server.pool.discovery.port=${envd:SERVER_POOL_DISCOVERY_PORT:3904}
-server.pool.discovery.topic=${envd:SERVER_POOL_DISCOVERY_TOPIC:DISCOVERY-TOPIC}
-server.pool.discovery.username=${envd:SERVER_POOL_DISCOVERY_USERNAME}
-server.pool.discovery.password=${envd:SERVER_POOL_DISCOVERY_PASSWORD}
-server.pool.discovery.https=${envd:DMAAP_USE_HTTPS}
-server.pool.discovery.apiKey=
-server.pool.discovery.apiSecret=
-#server.pool.discovery.publisherSocketTimeout=5000
-#server.pool.discovery.consumerSocketTimeout=70000
-server.pool.discovery.fetchTimeout=60000
-server.pool.discovery.fetchLimit=100
-server.pool.discovery.selfSignedCertificates=false
-server.pool.discovery.publisherLoopCycleTime=5000
-
-# The 'leader.*' parameters affect behavior during an election. The value of
-# 'mainLoop.cycle' determines the actual time delay. 'leader.stableIdCycles'
-# is the required minimum number of "stable" cycles before voting can start
-# (in this case, "stable" means no servers added or failing). Once voting has
-# started, "leader.stableVotingCycles' is the minimum number of "stable"
-# cycles needed before declaring a winner (in this case, "stable" means no
-# votes changing).
-
-server.pool.leader.stableIdleCycles=5
-server.pool.leader.stableVotingCycles=5
-
-# The value of 'mainLoop.cycle' (default = 1 second) determines how frequently
-# various events occur, such as the sending of 'ping' messages, and the
-# duration of a "cycle" while voting for a lead server.
-
-server.pool.mainLoop.cycle=1000
-
-# 'keyword.path' is used when looking for "keywords" within JSON messages.
-# The first keyword located is hashed to determine which bucket to route
-# through.
-
-keyword.path=requestID,CommonHeader.RequestID,body.output.common-header.request-id,result-info.request-id:uuid
-# 'keyword..lookup' is used to locate "keywords" within objects.
-# The 'value' field contains a list of method calls or field names separated
-# by '.' that are used to locate the keyword
-# (e.g. 'method1().field2.method3()')
-
-keyword.java.lang.String.lookup=toString()
-keyword.org.onap.policy.m2.base.Transaction.lookup=getRequestID()
-keyword.org.onap.policy.controlloop.ControlLoopEvent.lookup=requestID
-keyword.org.onap.policy.appclcm.LcmRequestWrapper.lookup=getBody().getCommonHeader().getRequestId()
-keyword.org.onap.policy.appclcm.LcmResponseWrapper.lookup=getBody().getCommonHeader().getRequestId()
-keyword.org.onap.policy.drools.serverpool.TargetLock.lookup=getOwnerKey()
-keyword.org.onap.policy.drools.serverpooltest.TestDroolsObject.lookup=getKey()
-keyword.org.onap.policy.drools.serverpooltest.Test1$KeywordWrapper.lookup=key
-
-# The following properties affect distributed 'TargetLock' behavior.
-#
-# server.pool.lock.ttl - controls how many hops a 'TargetLock' message can take
-# server.pool.lock.audit.period - how frequently should the audit run?
-# server.pool.lock.audit.gracePeriod - how long to wait after bucket reassignments
-# before running the audit again
-# server.pool.lock.audit.retryDelay - mismatches can occur due to the transient nature
-# of the lock state: this property controls how long to wait before
-# trying again
-
-server.pool.lock.ttl=3
-server.pool.lock.audit.period=300000
-server.pool.lock.audit.gracePeriod=60000
-server.pool.lock.audit.retryDelay=5000
\ No newline at end of file
diff --git a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Bucket.java b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Bucket.java
deleted file mode 100644
index a1afebc9..00000000
--- a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Bucket.java
+++ /dev/null
@@ -1,2579 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * feature-server-pool
- * ================================================================================
- * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.serverpool;
-
-import static org.onap.policy.drools.serverpool.ServerPoolProperties.BUCKET_CONFIRMED_TIMEOUT;
-import static org.onap.policy.drools.serverpool.ServerPoolProperties.BUCKET_TIME_TO_LIVE;
-import static org.onap.policy.drools.serverpool.ServerPoolProperties.BUCKET_UNCONFIRMED_GRACE_PERIOD;
-import static org.onap.policy.drools.serverpool.ServerPoolProperties.BUCKET_UNCONFIRMED_TIMEOUT;
-import static org.onap.policy.drools.serverpool.ServerPoolProperties.DEFAULT_BUCKET_CONFIRMED_TIMEOUT;
-import static org.onap.policy.drools.serverpool.ServerPoolProperties.DEFAULT_BUCKET_TIME_TO_LIVE;
-import static org.onap.policy.drools.serverpool.ServerPoolProperties.DEFAULT_BUCKET_UNCONFIRMED_GRACE_PERIOD;
-import static org.onap.policy.drools.serverpool.ServerPoolProperties.DEFAULT_BUCKET_UNCONFIRMED_TIMEOUT;
-import static org.onap.policy.drools.serverpool.ServerPoolProperties.getProperty;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.PrintStream;
-import java.io.Serializable;
-import java.net.InetSocketAddress;
-import java.nio.charset.StandardCharsets;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
-import java.util.Base64;
-import java.util.Comparator;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Queue;
-import java.util.Random;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.TreeSet;
-import java.util.UUID;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.FutureTask;
-import java.util.concurrent.LinkedTransferQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import javax.ws.rs.client.Entity;
-import javax.ws.rs.client.WebTarget;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-import lombok.EqualsAndHashCode;
-import lombok.Getter;
-import lombok.Setter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * The server pool uses an algorithmic way to map things like transactions
- * (identified by a 'requestID') and locks (identified by a string key)
- * into a server handling that transaction or lock. It does this by mapping
- * the string name into one of a set of predefined hash buckets, with each
- * bucket being assigned to one of the active servers.
- * In other words:
- * string key -> hash bucket (fixed mapping, known to all servers)
- * hash bucket -> server (assignments may change when servers go up or down,
- * but remains fairly static when the system is stable)
- * With this approach, there is no global dynamic table that needs to be
- * updated as transactions, or other objects come and go.
- * Each instance of class 'Bucket' corresponds to one of the hash buckets,
- * there are static methods that provide the overall abstraction, as well
- * as some supporting classes.
- */
-
-@Getter
-@Setter
-public class Bucket {
- private static Logger logger = LoggerFactory.getLogger(Bucket.class);
-
- /*
- * Listener class to handle state changes that may lead to
- * reassignments of buckets
- */
- private static EventHandler eventHandler = new EventHandler();
-
- // Used to hash keywords into buckets
- private static MessageDigest messageDigest;
-
- static {
- // register Listener class
- Events.register(eventHandler);
-
- // create MD5 MessageDigest -- used to hash keywords
- try {
- // disabling sonar, as the digest is only used to hash keywords - it isn't
- // used for security purposes
- messageDigest = MessageDigest.getInstance("MD5"); // NOSONAR
- } catch (NoSuchAlgorithmException e) {
- throw new ExceptionInInitializerError(e);
- }
- }
-
- /*
- * Values extracted from properties
- */
-
- private static String timeToLive;
- private static long confirmedTimeout;
- private static long unconfirmedTimeout;
- private static long unconfirmedGracePeriod;
-
- /*
- * Tags for encoding of bucket data
- */
- private static final int END_OF_PARAMETERS_TAG = 0;
- private static final int OWNER_UPDATE = 1;
- private static final int OWNER_NULL = 2;
- private static final int PRIMARY_BACKUP_UPDATE = 3;
- private static final int PRIMARY_BACKUP_NULL = 4;
- private static final int SECONDARY_BACKUP_UPDATE = 5;
- private static final int SECONDARY_BACKUP_NULL = 6;
-
- // This is the table itself -- the current size is fixed at 1024 buckets
- public static final int BUCKETCOUNT = 1024;
- private static Bucket[] indexToBucket = new Bucket[BUCKETCOUNT];
-
- static {
- // create hash bucket entries, but there are no assignments yet
- for (int i = 0; i < indexToBucket.length; i += 1) {
- Bucket bucket = new Bucket(i);
- indexToBucket[i] = bucket;
- }
- }
-
- // this is a list of all objects registered for the 'Backup' interface
- private static List backupList = new LinkedList<>();
-
- // 'rebalance' is a non-null value when rebalancing is in progress
- private static Object rebalanceLock = new Object();
- private static Rebalance rebalance = null;
-
- // bucket number
- private volatile int index;
-
- // owner of the bucket -- this is the host where messages should be directed
- private volatile Server owner = null;
-
- // this host will take over as the owner if the current owner goes down,
- // and may also contain backup data to support persistence
- private volatile Server primaryBackup = null;
-
- // this is a secondary backup host, which can be used if both owner and
- // primary backup go out in quick succession
- private volatile Server secondaryBackup = null;
-
- // when we are in a transient state, certain events are forwarded to
- // this object
- private volatile State state = null;
-
- // storage for additional data
- private Map, Object> adjuncts = new HashMap<>();
-
- // HTTP query parameters
- private static final String QP_BUCKET = "bucket";
- private static final String QP_KEYWORD = "keyword";
- private static final String QP_DEST = "dest";
- private static final String QP_TTL = "ttl";
- private static final String OWNED_STR = "Owned";
-
- // BACKUP data (only buckets for where we are the owner, or a backup)
-
- // TBD: need fields for outgoing queues for application message transfers
-
- /**
- * This method triggers registration of 'eventHandler', and also extracts
- * property values.
- */
- static void startup() {
- int intTimeToLive =
- getProperty(BUCKET_TIME_TO_LIVE, DEFAULT_BUCKET_TIME_TO_LIVE);
- timeToLive = String.valueOf(intTimeToLive);
- confirmedTimeout =
- getProperty(BUCKET_CONFIRMED_TIMEOUT, DEFAULT_BUCKET_CONFIRMED_TIMEOUT);
- unconfirmedTimeout =
- getProperty(BUCKET_UNCONFIRMED_TIMEOUT,
- DEFAULT_BUCKET_UNCONFIRMED_TIMEOUT);
- unconfirmedGracePeriod =
- getProperty(BUCKET_UNCONFIRMED_GRACE_PERIOD,
- DEFAULT_BUCKET_UNCONFIRMED_GRACE_PERIOD);
- }
-
- /**
- * Constructor -- called when building the 'indexToBucket' table.
- *
- * @param index the bucket number
- */
- private Bucket(int index) {
- this.index = index;
- }
-
- /**
- * This method converts a String keyword into the corresponding bucket
- * number.
- *
- * @param value the keyword to be converted
- * @return the bucket number
- */
- public static int bucketNumber(String value) {
- /*
- * It would be possible to create a new 'MessageDigest' instance each
- * It would be possible to create a new 'MessageDigest' instance each
- * time this method is called, and avoid the need for synchronization.
- * However, past experience has taught me that this might involve a
- * considerable amount of computation, due to internal table
- * initialization, so it shouldn't be done this way for performance
- * reasons.
- * If we start running into blocking issues because there are too many
- * simultaneous calls to this method, we can initialize an array of these
- * objects, and iterate over them using an AtomicInteger index.
- */
- synchronized (messageDigest) {
- /*
- * Note that we only need the first two bytes of this, even though
- * 16 bytes are produced. There may be other operations that can be
- * used to more efficiently map keyword -> hash bucket. The only
- * issue is the same algorithm must be used on all servers, and it
- * should produce a fairly even distribution across all of the buckets.
- */
- byte[] digest = messageDigest.digest(value.getBytes());
- return ((Byte.toUnsignedInt(digest[0]) << 8)
- | Byte.toUnsignedInt(digest[1])) & 0x3ff;
- }
- }
-
- /**
- * Fetch the server associated with a particular bucket number.
- *
- * @param bucketNumber a bucket number in the range 0-1023
- * @return the Server that currently handles the bucket,
- * or 'null' if none is currently assigned
- */
- public static Server bucketToServer(int bucketNumber) {
- Bucket bucket = indexToBucket[bucketNumber];
- return bucket.getOwner();
- }
-
- /**
- * Fetch the bucket object associated with a particular bucket number.
- *
- * @param bucketNumber a bucket number in the range 0-1023
- * @return the Bucket associated with this bucket number
- */
- public static Bucket getBucket(int bucketNumber) {
- return indexToBucket[bucketNumber];
- }
-
- /**
- * Fetch the bucket object associated with a particular keyword.
- *
- * @param value the keyword to be converted
- * @return the Bucket associated with this keyword
- */
- public static Bucket getBucket(String value) {
- return indexToBucket[bucketNumber(value)];
- }
-
- /**
- * Determine if the associated key is assigned to the current server.
- *
- * @param key the keyword to be hashed
- * @return 'true' if the associated bucket is assigned to this server,
- * 'false' if not
- */
- public static boolean isKeyOnThisServer(String key) {
- int bucketNumber = bucketNumber(key);
- Bucket bucket = indexToBucket[bucketNumber];
- return bucket.getOwner() == Server.getThisServer();
- }
-
- /**
- * Handle an incoming /bucket/update REST message.
- *
- * @param data base64-encoded data, containing all bucket updates
- */
- static void updateBucket(byte[] data) {
- final byte[] packet = Base64.getDecoder().decode(data);
- Runnable task = () -> {
- try {
- /*
- * process the packet, handling any updates
- */
- if (updateBucketInternal(packet)) {
- /*
- * updates have occurred -- forward this packet to
- * all servers in the "notify list"
- */
- logger.info("One or more bucket updates occurred");
- Entity entity =
- Entity.entity(new String(data, StandardCharsets.UTF_8),
- MediaType.APPLICATION_OCTET_STREAM_TYPE);
- for (Server server : Server.getNotifyList()) {
- server.post("bucket/update", entity);
- }
- }
- } catch (Exception e) {
- logger.error("Exception in Bucket.updateBucket", e);
- }
- };
- MainLoop.queueWork(task);
- }
-
- /**
- * This method supports the 'updateBucket' method, and runs entirely within
- * the 'MainLoop' thread.
- */
- private static boolean updateBucketInternal(byte[] packet) throws IOException {
- boolean changes = false;
-
- ByteArrayInputStream bis = new ByteArrayInputStream(packet);
- DataInputStream dis = new DataInputStream(bis);
-
- // the packet contains a sequence of bucket updates
- while (dis.available() != 0) {
- // first parameter = bucket number
- int index = dis.readUnsignedShort();
-
- // locate the corresponding 'Bucket' object
- Bucket bucket = indexToBucket[index];
-
- // indicates whether changes occurred to the bucket
- boolean bucketChanges = false;
-
- /*
- * the remainder of the information for this bucket consists of
- * a sequence of ' [ ]' followed by the tag
- * value 'END_OF_PARAMETERS_TAG'.
- */
- int tag;
- while ((tag = dis.readUnsignedByte()) != END_OF_PARAMETERS_TAG) {
- switch (tag) {
- case OWNER_UPDATE:
- // -- owner UUID specified
- bucketChanges = updateBucketInternalOwnerUpdate(bucket, dis, index);
- break;
-
- case OWNER_NULL:
- // -- owner UUID should be set to 'null'
- bucketChanges = nullifyOwner(index, bucket, bucketChanges);
- break;
-
- case PRIMARY_BACKUP_UPDATE:
- // --
- // primary backup UUID specified
- bucketChanges = updatePrimaryBackup(dis, index, bucket, bucketChanges);
- break;
-
- case PRIMARY_BACKUP_NULL:
- // --
- // primary backup should be set to 'null'
- bucketChanges = nullifyPrimaryBackup(index, bucket, bucketChanges);
- break;
-
- case SECONDARY_BACKUP_UPDATE:
- // --
- // secondary backup UUID specified
- bucketChanges = updateSecondaryBackup(dis, index, bucket, bucketChanges);
- break;
-
- case SECONDARY_BACKUP_NULL:
- // --
- // secondary backup should be set to 'null'
- bucketChanges = nullifySecondaryBackup(index, bucket, bucketChanges);
- break;
-
- default:
- logger.error("Illegal tag: {}", tag);
- break;
- }
- }
- if (bucketChanges) {
- // give audit a chance to run
- changes = true;
- bucket.stateChanged();
- }
- }
- return changes;
- }
-
- private static boolean nullifyOwner(int index, Bucket bucket, boolean bucketChanges) {
- if (bucket.getOwner() != null) {
- logger.info("Bucket {} owner: {}->null",
- index, bucket.getOwner());
- bucketChanges = true;
- bucket.nullifyOwner();
- }
- return bucketChanges;
- }
-
- private synchronized void nullifyOwner() {
- setOwner(null);
- setState(null);
- }
-
- /**
- * Gets the set of backups.
- *
- * @return the set of backups
- */
- public synchronized Set getBackups() {
- /*
- * For some reason, the junit tests break if Set.of() is used, so we'll stick with
- * the long way for now.
- */
- Set backups = new HashSet<>();
- backups.add(getPrimaryBackup());
- backups.add(getSecondaryBackup());
- return backups;
- }
-
- private static boolean updatePrimaryBackup(DataInputStream dis, int index, Bucket bucket, boolean bucketChanges)
- throws IOException {
- Server newPrimaryBackup =
- Server.getServer(Util.readUuid(dis));
- if (bucket.primaryBackup != newPrimaryBackup) {
- logger.info("Bucket {} primary backup: {}->{}", index,
- bucket.primaryBackup, newPrimaryBackup);
- bucketChanges = true;
- bucket.primaryBackup = newPrimaryBackup;
- }
- return bucketChanges;
- }
-
- private static boolean nullifyPrimaryBackup(int index, Bucket bucket, boolean bucketChanges) {
- if (bucket.primaryBackup != null) {
- logger.info("Bucket {} primary backup: {}->null",
- index, bucket.primaryBackup);
- bucketChanges = true;
- bucket.primaryBackup = null;
- }
- return bucketChanges;
- }
-
- private static boolean updateSecondaryBackup(DataInputStream dis, int index, Bucket bucket, boolean bucketChanges)
- throws IOException {
- Server newSecondaryBackup =
- Server.getServer(Util.readUuid(dis));
- if (bucket.secondaryBackup != newSecondaryBackup) {
- logger.info("Bucket {} secondary backup: {}->{}", index,
- bucket.secondaryBackup, newSecondaryBackup);
- bucketChanges = true;
- bucket.secondaryBackup = newSecondaryBackup;
- }
- return bucketChanges;
- }
-
- private static boolean nullifySecondaryBackup(int index, Bucket bucket, boolean bucketChanges) {
- if (bucket.secondaryBackup != null) {
- logger.info("Bucket {} secondary backup: {}->null",
- index, bucket.secondaryBackup);
- bucketChanges = true;
- bucket.secondaryBackup = null;
- }
- return bucketChanges;
- }
-
- /**
- * Update bucket owner information.
- *
- * @param bucket the bucket in process
- * @param dis data input stream contains the update
- * @param index the bucket number
- * @return a value indicate bucket changes
- */
- private static boolean updateBucketInternalOwnerUpdate(Bucket bucket, DataInputStream dis,
- int index) throws IOException {
- boolean bucketChanges = false;
- Server newOwner = Server.getServer(Util.readUuid(dis));
- if (bucket.getOwner() != newOwner) {
- logger.info("Bucket {} owner: {}->{}",
- index, bucket.getOwner(), newOwner);
- bucketChanges = true;
-
- Server thisServer = Server.getThisServer();
- Server oldOwner = bucket.getOwner();
- bucket.setOwner(newOwner);
- if (thisServer == oldOwner) {
- // the current server is the old owner
- if (bucket.getState() == null) {
- bucket.state = bucket.new OldOwner(newOwner);
- }
- } else if (thisServer == newOwner) {
- // the current server the new owner
- if (bucket.getState() == null) {
- bucket.state = bucket.new NewOwner(true, oldOwner);
- } else {
- // new owner has been confirmed
- bucket.state.newOwner();
- }
- }
- }
- return bucketChanges;
- }
-
- /**
- * Forward a message to the specified bucket number. If the bucket is
- * in a transient state (the value of 'state' is not 'null'), the handling
- * is determined by that state.
- *
- * @param bucketNumber the bucket number determined by extracting the
- * keyword from 'message'
- * @param message the message to be forwarded/processed
- * @return a value of 'true' indicates the message has been "handled"
- * (forwarded or queued), and 'false' indicates it has not, and needs
- * to be handled locally.
- */
- public static boolean forward(int bucketNumber, Message message) {
- Bucket bucket = indexToBucket[bucketNumber];
- Server server;
-
- synchronized (bucket) {
- if (bucket.state != null) {
- // we are in a transient state -- the handling is state-specific
- return bucket.state.forward(message);
- }
- server = bucket.getOwner();
- }
-
- if (server == null || server == Server.getThisServer()) {
- // this needs to be processed locally
- return false;
- } else {
- // send message to remote server
- message.sendToServer(server, bucketNumber);
- return true;
- }
- }
-
- /**
- * This is a convenience method, which forwards a message through the
- * bucket associated with the specified keyword.
- *
- * @param keyword the keyword extracted from 'message'
- * keyword from 'message'
- * @param message the message to be forwarded/processed
- * @return a value of 'true' indicates the message has been "handled"
- * (forwarded or queued), and 'false' indicates it has not, and needs
- * to be handled locally.
- */
- public static boolean forward(String keyword, Message message) {
- return forward(bucketNumber(keyword), message);
- }
-
- /**
- * Forward a message to the specified bucket number. If the bucket is
- * in a transient state (the value of 'state' is not 'null'), the handling
- * is determined by that state. This is a variant of the 'forward' method,
- * which handles local processing, instead of just returning 'false'.
- *
- * @param bucketNumber the bucket number determined by extracting the
- * keyword from 'message'
- * @param message the message to be forwarded/processed
- */
- public static void forwardAndProcess(int bucketNumber, Message message) {
- if (!forward(bucketNumber, message)) {
- message.process();
- }
- }
-
- /**
- * Forward a message to the specified bucket number. If the bucket is
- * in a transient state (the value of 'state' is not 'null'), the handling
- * is determined by that state. This is a variant of the 'forward' method,
- * which handles local processing, instead of just returning 'false'.
- *
- * @param keyword the keyword extracted from 'message'
- * keyword from 'message'
- * @param message the message to be forwarded/processed
- */
- public static void forwardAndProcess(String keyword, Message message) {
- forwardAndProcess(bucketNumber(keyword), message);
- }
-
- /**
- * Handle an incoming /cmd/dumpBuckets REST message.
- *
- * @param out the 'PrintStream' to use for displaying information
- */
- public static void dumpBuckets(final PrintStream out) {
- /*
- * we aren't really doing a 'Rebalance' here, but the 'copyData' method
- * is useful for extracting the data, and determining the buckets
- * associated with each server.
- */
- Rebalance rb = new Rebalance();
- rb.copyData();
-
- /*
- * this method is not accessing anything in the 'Server' or 'Bucket'
- * table, so it doesn't need to run within the 'MainLoop' thread.
- */
- rb.dumpBucketsInternal(out);
- }
-
- /**
- * Handle an incoming /cmd/bucketMessage REST message -- this is only
- * used for testing the routing of messages between servers.
- *
- * @param out the 'PrintStream' to use for displaying information
- * @param keyword the keyword that is hashed to select the bucket number
- * @param message the message to send to the remote end
- * @throws IOException when error occurred
- */
- public static void bucketMessage(
- final PrintStream out, final String keyword, String message) {
-
- if (keyword == null) {
- out.println("'keyword' is mandatory");
- return;
- }
- if (message == null) {
- message = "Message generated at " + new Date();
- }
- final int bucketNumber = bucketNumber(keyword);
- Server server = bucketToServer(bucketNumber);
-
- if (server == null) {
- /*
- * selected bucket has no server assigned -- this should only be a
- * transient situation, until 'rebalance' is run.
- */
- out.format("Bucket is %d, which has no owner%n", bucketNumber);
- } else if (server == Server.getThisServer()) {
- /*
- * the selected bucket is associated with this particular server --
- * no forwarding is needed.
- */
- out.format("Bucket is %d, which is owned by this server: %s%n",
- bucketNumber, server.getUuid());
- } else {
- /*
- * the selected bucket is assigned to a different server -- forward
- * the message.
- */
- out.format("Bucket is %d: sending from%n"
- + " %s to%n"
- + " %s%n",
- bucketNumber, Server.getThisServer().getUuid(), server.getUuid());
-
- // do a POST call of /bucket/bucketResponse to the remoote server
- Entity entity =
- Entity.entity(new String(message.getBytes(), StandardCharsets.UTF_8),
- MediaType.TEXT_PLAIN);
-
- /*
- * the POST itself runs in a server-specific thread, and
- * 'responseQueue' is used to pass back the response.
- */
- final LinkedTransferQueue responseQueue =
- new LinkedTransferQueue<>();
-
- server.post("bucket/bucketResponse", entity, new Server.PostResponse() {
- /**
- * {@inheritDoc}
- */
- @Override
- public WebTarget webTarget(WebTarget webTarget) {
- // we need to include the 'bucket' and 'keyword' parameters
- // in the POST that we are sending out
- return webTarget
- .queryParam(QP_BUCKET, bucketNumber)
- .queryParam(QP_KEYWORD, keyword);
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void response(Response response) {
- // this is the POST response --
- // pass it back to the calling thread
- responseQueue.put(response);
- }
- });
-
- try {
- // this is the calling thread -- wait for the POST response
- Response response = responseQueue.poll(60, TimeUnit.SECONDS);
- if (response == null) {
- out.println("Timed out waiting for a response");
- } else {
- out.format("Received response code %s%nEntity = %s%n",
- response.getStatus(), response.readEntity(String.class));
- }
- } catch (InterruptedException e) {
- out.println(e);
- Thread.currentThread().interrupt();
- }
- }
- }
-
- /**
- * Handle an incoming /bucket/bucketResponse REST message -- this runs on
- * the destination host, and is the continuation of an operation triggered
- * by the /cmd/bucketMessage REST message running on the originating host.
- *
- * @param out the 'PrintStream' to use for passing back information
- * in a human-readable form
- * @param bucket the bucket number, which should be owned by this host
- * if we are in sync with the sending host, and didn't get caught
- * in a transient state
- * @param keyword the keyword selected on the originating end, which should
- * hash to 'bucket'
- * @param message the message selected on the originating end
- */
- public static void bucketResponse(
- final PrintStream out, int bucket, String keyword, byte[] message) {
-
- Server thisServer = Server.getThisServer();
- Server server = bucketToServer(bucket);
-
- if (server != thisServer) {
- /*
- * this isn't expected, and either indicates we are out-of-sync with
- * pthe originating server, or this operation was triggered while in
- * a transient state.
- */
- out.println("ERROR: " + thisServer.toString() + ": bucket " + bucket
- + "is owned by\n " + server);
- } else {
- /*
- * As expected, we are the owner of this bucket. Print out a message,
- * which will be returned to the originating host, and displayed.
- */
- out.println(thisServer.toString() + ":\n"
- + " bucket = " + bucket
- + "\n keyword = " + keyword
- + "\n message = " + new String(message));
- }
- }
-
- /**
- * Handle an incoming /cmd/moveBucket REST message -- this is only
- * used for testing bucket migration. It only works on the lead server.
- *
- * @param out the 'PrintStream' to use for displaying information
- * @param bucketNumber the bucket number to be moved
- * @param newHostUuid the UUID of the destination host (if 'null', a
- * destination host will be chosen at random)
- */
- public static void moveBucket(PrintStream out, int bucketNumber, String newHostUuid) {
- Server leader = Leader.getLeader();
- if (leader != Server.getThisServer()) {
- out.println("This is not the lead server");
- return;
- }
-
- if (bucketNumber < 0 || bucketNumber >= indexToBucket.length) {
- out.println("Bucket number out of range");
- return;
- }
-
- Rebalance rb = new Rebalance();
- rb.copyData();
-
- TestBucket bucket = rb.buckets[bucketNumber];
- TestServer oldHost = bucket.owner;
-
- if (oldHost == rb.nullServer) {
- out.println("Bucket " + bucketNumber + " is currently unassigned");
- return;
- }
-
- TestServer newHost = null;
-
- if (newHostUuid != null) {
- // the UUID of a destination host has been specified
- newHost = rb.testServers.get(UUID.fromString(newHostUuid));
- if (newHost == null) {
- out.println("Can't locate UUID " + newHostUuid);
- return;
- }
- } else {
- /*
- * Choose a destination host at random, other than the current owner.
- * Step a random count in the range of 1 to (n-1) relative to the
- * current host.
- */
- UUID key = oldHost.uuid;
- /*
- * Disabling sonar, because this Random() is not used for security purposes.
- */
- int randomStart = new Random().nextInt(rb.testServers.size() - 1); // NOSONAR
- for (int count = randomStart; count >= 0; count -= 1) {
- key = rb.testServers.higherKey(key);
- if (key == null) {
- // wrap to the beginning of the list
- key = rb.testServers.firstKey();
- }
- }
- newHost = rb.testServers.get(key);
- }
- out.println("Moving bucket " + bucketNumber + " from "
- + oldHost + " to " + newHost);
-
- updateOwner(out, bucket, oldHost, newHost);
-
- try {
- /*
- * build a message containing all of the updated bucket
- * information, process it internally in this host
- * (lead server), and send it out to others in the
- * "notify list".
- */
- rb.generateBucketMessage();
- } catch (IOException e) {
- logger.error("Exception in Rebalance.generateBucketMessage",
- e);
- }
- }
-
- private static void updateOwner(PrintStream out, TestBucket bucket, TestServer oldHost, TestServer newHost) {
- /*
- * update the owner, and ensure that the primary and secondary backup
- * remain different from the owner.
- */
- bucket.setOwner(newHost);
- if (newHost == bucket.primaryBackup) {
- out.println("Moving primary back from " + newHost + " to " + oldHost);
- bucket.setPrimaryBackup(oldHost);
- } else if (newHost == bucket.secondaryBackup) {
- out.println("Moving secondary back from " + newHost
- + " to " + oldHost);
- bucket.setSecondaryBackup(oldHost);
- }
- }
-
- /**
- * This method is called when an incoming /bucket/sessionData message is
- * received from the old owner of the bucket, which presumably means that
- * we are the new owner of the bucket.
- *
- * @param bucketNumber the bucket number
- * @param dest the UUID of the intended destination
- * @param ttl similar to IP time-to-live -- it controls the number of hops
- * the message may take
- * @param data serialized data associated with this bucket, encoded using
- * base64
- */
-
- static void sessionData(int bucketNumber, UUID dest, int ttl, byte[] data) {
- logger.info("Bucket.sessionData: bucket={}, data length={}",
- bucketNumber, data.length);
-
- if (dest != null && !dest.equals(Server.getThisServer().getUuid())) {
- // the message needs to be forwarded to the intended destination
- Server server;
- WebTarget webTarget;
-
- ttl -= 1;
- if (ttl > 0
- && (server = Server.getServer(dest)) != null
- && (webTarget = server.getWebTarget("bucket/sessionData")) != null) {
- logger.info("Forwarding 'bucket/sessionData' to uuid {}",
- server.getUuid());
- Entity entity =
- Entity.entity(new String(data, StandardCharsets.UTF_8),
- MediaType.APPLICATION_OCTET_STREAM_TYPE);
- Response response =
- webTarget
- .queryParam(QP_BUCKET, bucketNumber)
- .queryParam(QP_DEST, dest)
- .queryParam(QP_TTL, String.valueOf(ttl))
- .request().post(entity);
- logger.info("/bucket/sessionData response code = {}",
- response.getStatus());
- } else {
- logger.error("Couldn't forward 'bucket/sessionData' to uuid {}, ttl={}",
- dest, ttl);
- }
- return;
- }
-
- byte[] decodedData = Base64.getDecoder().decode(data);
- Bucket bucket = indexToBucket[bucketNumber];
-
- logger.info("Bucket.sessionData: decoded data length = {}",
- decodedData.length);
-
- if (bucket.state == null) {
- /*
- * We received the serialized data prior to being notified
- * that we are the owner -- this happens sometimes. Behave as
- * though we are the new owner, but intidate it is unconfirmed.
- */
- logger.info("Bucket {} session data received unexpectedly",
- bucketNumber);
- bucket.state = bucket.new NewOwner(false, bucket.getOwner());
- }
- bucket.state.bulkSerializedData(decodedData);
- }
-
- /**
- * This method is called whenever the bucket's state has changed in a
- * way that it should be audited.
- */
- private synchronized void stateChanged() {
- if (state != null) {
- return;
- }
- // the audit should be run
- Server thisServer = Server.getThisServer();
- boolean isOwner = (thisServer == owner);
- boolean isBackup = (!isOwner && (thisServer == primaryBackup
- || thisServer == secondaryBackup));
-
- // invoke 'TargetLock' directly
- TargetLock.auditBucket(this, isOwner);
- for (ServerPoolApi feature : ServerPoolApi.impl.getList()) {
- feature.auditBucket(this, isOwner, isBackup);
- }
- }
-
- /**
- * Returns an adjunct of the specified class
- * (it is created if it doesn't exist).
- *
- * @param clazz this is the class of the adjunct
- * @return an adjunct of the specified class ('null' may be returned if
- * the 'newInstance' method is unable to create the adjunct)
- */
- public T getAdjunct(Class clazz) {
- Object adj = adjuncts.computeIfAbsent(clazz, key -> {
- try {
- // create the adjunct, if needed
- return clazz.getDeclaredConstructor().newInstance();
- } catch (Exception e) {
- logger.error("Can't create adjunct of {}", clazz, e);
- return null;
- }
- });
- return clazz.cast(adj);
- }
-
- /**
- * Returns an adjunct of the specified class.
- *
- * @param clazz this is the class of the adjunct
- * @return an adjunct of the specified class, if it exists,
- * and 'null' if it does not
- */
- public T getAdjunctDontCreate(Class clazz) {
- synchronized (adjuncts) {
- // look up the adjunct in the table
- return clazz.cast(adjuncts.get(clazz));
- }
- }
-
- /**
- * Explicitly create an adjunct -- this is useful when the adjunct
- * initialization requires that some parameters be passed.
- *
- * @param adj this is the adjunct to insert into the table
- * @return the previous adjunct of this type ('null' if none)
- */
- public Object putAdjunct(Object adj) {
- synchronized (adjuncts) {
- Class> clazz = adj.getClass();
- return adjuncts.put(clazz, adj);
- }
- }
-
- /**
- * Remove an adjunct.
- *
- * @param clazz this is the class of adjuncts to remove
- * @return the object, if found, and 'null' if not
- */
- public T removeAdjunct(Class clazz) {
- synchronized (adjuncts) {
- // remove the adjunct in the table
- return clazz.cast(adjuncts.remove(clazz));
- }
- }
-
- /**
- * Dump out all buckets with adjuncts.
- *
- * @param out the 'PrintStream' to use for displaying information
- */
- public static void dumpAdjuncts(PrintStream out) {
- boolean noneFound = true;
- String format = "%6s %s\n";
-
- for (Bucket bucket : indexToBucket) {
- synchronized (bucket.adjuncts) {
- if (bucket.adjuncts.size() != 0) {
- if (noneFound) {
- out.printf(format, "Bucket", "Adjunct Classes");
- out.printf(format, "------", "---------------");
- noneFound = false;
- }
- boolean first = true;
- for (Class> clazz : bucket.adjuncts.keySet()) {
- if (first) {
- out.printf(format, bucket.index, clazz.getName());
- first = false;
- } else {
- out.printf(format, "", clazz.getName());
- }
- }
- }
- }
- }
- }
-
- /* ============================================================ */
-
- /**
- * There is a single instance of this class (Bucket.eventHandler), which
- * is registered to listen for notifications of state transitions. Note
- * that all of these methods are running within the 'MainLoop' thread.
- */
- private static class EventHandler extends Events {
- /**
- * {@inheritDoc}
- */
- @Override
- public void serverFailed(Server server) {
- // remove this server from any bucket where it is referenced
-
- Server thisServer = Server.getThisServer();
- for (Bucket bucket : indexToBucket) {
- synchronized (bucket) {
- boolean changes = false;
- if (bucket.getOwner() == server) {
- // the failed server owns this bucket --
- // move to the primary backup
- bucket.setOwner(bucket.getPrimaryBackup());
- bucket.primaryBackup = null;
- changes = true;
-
- if (bucket.getOwner() == null) {
- // bucket owner is still null -- presumably, we had no
- // primary backup, so use the secondary backup instead
- bucket.setOwner(bucket.getSecondaryBackup());
- bucket.setSecondaryBackup(null);
- }
- }
- if (bucket.getPrimaryBackup() == server) {
- // the failed server was a primary backup to this bucket --
- // mark the entry as 'null'
- bucket.setPrimaryBackup(null);
- changes = true;
- }
- if (bucket.getSecondaryBackup() == server) {
- // the failed server was a secondary backup to this bucket --
- // mark the entry as 'null'
- bucket.setSecondaryBackup(null);
- changes = true;
- }
-
- if (bucket.owner == thisServer && bucket.state == null) {
- // the current server is the new owner
- bucket.setState(bucket.new NewOwner(false, null));
- changes = true;
- }
-
- if (changes) {
- // may give audits a chance to run
- bucket.stateChanged();
- }
- }
- }
-
- // trigger a rebalance (only happens if we are the lead server)
- rebalance();
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void newLeader(Server server) {
- // trigger a rebalance (only happens if we are the new lead server)
- rebalance();
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void leaderConfirmed(Server server) {
- // trigger a rebalance (only happens if we are the lead server)
- rebalance();
- }
-
- /**
- * This method is called to start a 'rebalance' operation in a background
- * thread, but it only does this on the lead server. Being balanced means
- * the following:
- * 1) Each server owns approximately the same number of buckets
- * 2) If any server were to fail, and the designated primaries take over
- * for all of that server's buckets, all remaining servers would still
- * own approximately the same number of buckets.
- * 3) If any two servers were to fail, and the designated primaries were
- * to take over for the failed server's buckets (secondaries would take
- * for buckets where the owner and primary are OOS), all remaining
- * servers would still own approximately the same number of buckets.
- * 4) Each server should have approximately the same number of
- * (primary-backup + secondary-backup) buckets that it is responsible for.
- * 5) The primary backup for each bucket must be on the same site as the
- * owner, and the secondary backup must be on a different site.
- */
- private void rebalance() {
- if (Leader.getLeader() == Server.getThisServer()) {
- Rebalance rb = new Rebalance();
- synchronized (rebalanceLock) {
- // the most recent 'Rebalance' instance is the only valid one
- rebalance = rb;
- }
-
- new Thread("BUCKET REBALANCER") {
- @Override
- public void run() {
- /*
- * copy bucket and host data,
- * generating a temporary internal table.
- */
- rb.copyData();
-
- /*
- * allocate owners for all buckets without an owner,
- * and rebalance bucket owners, if necessary --
- * this takes card of item #1, above.
- */
- rb.allocateBuckets();
-
- /*
- * make sure that primary backups always have the same site
- * as the owner, and secondary backups always have a different
- * site -- this takes care of #5, above.
- */
- rb.checkSiteValues();
-
- /*
- * adjust primary backup lists to take care of item #2, above
- * (taking #5 into account).
- */
- rb.rebalancePrimaryBackups();
-
- /*
- * allocate secondary backups, and take care of items
- * #3 and #4, above (taking #5 into account).
- */
- rb.rebalanceSecondaryBackups();
-
- try {
- synchronized (rebalanceLock) {
- /*
- * if another 'Rebalance' instance has started in the
- * mean time, don't do the update.
- */
- if (rebalance == rb) {
- /*
- * build a message containing all of the updated bucket
- * information, process it internally in this host
- * (lead server), and send it out to others in the
- * "notify list".
- */
- rb.generateBucketMessage();
- rebalance = null;
- }
- }
- } catch (IOException e) {
- logger.error("Exception in Rebalance.generateBucketMessage",
- e);
- }
- }
- }.start();
- }
- }
- }
-
- /* ============================================================ */
-
- /**
- * Instances of this class are created as part of the 'rebalance'
- * operation on the lead server, or as part of a 'dumpBuckets' operation
- * on any server.
- * Each instance of this class corresponds to a 'Bucket' instance.
- */
- @EqualsAndHashCode
- private static class TestBucket implements Comparable {
- // bucket number
- int index;
-
- // owner of the bucket
- TestServer owner;
-
- // primary backup for this bucket
-
- TestServer primaryBackup;
-
- // secondary backup for this bucket
- TestServer secondaryBackup;
-
- /**
- * Constructor -- initialize the 'TestBucket' instance.
- *
- * @param index the bucket number
- */
- TestBucket(int index) {
- this.index = index;
- }
-
- /**
- * Update the owner of a bucket, which also involves updating the
- * backward links in the 'TestServer' instances.
- *
- * @param newOwner the new owner of the bucket
- */
- void setOwner(TestServer newOwner) {
- if (owner != newOwner) {
- // the 'owner' field does need to be changed
- if (owner != null) {
- // remove this bucket from the 'buckets' list of the old owner
- owner.buckets.remove(this);
- }
- if (newOwner != null) {
- // add this bucket to the 'buckets' list of the new owner
- newOwner.buckets.add(this);
- }
- // update the 'owner' field in the bucket
- owner = newOwner;
- }
- }
-
- /**
- * Update the primary backup of a bucket, which also involves updating
- * the backward links in the 'TestServer' instances.
- *
- * @param newPrimaryBackup the new primary of the bucket
- */
- void setPrimaryBackup(TestServer newPrimaryBackup) {
- if (primaryBackup != newPrimaryBackup) {
- // the 'primaryBackup' field does need to be changed
- if (primaryBackup != null) {
- // remove this bucket from the 'buckets' list of the
- // old primary backup
- primaryBackup.primaryBackupBuckets.remove(this);
- }
- if (newPrimaryBackup != null) {
- // add this bucket to the 'buckets' list of the
- // new primary backup
- newPrimaryBackup.primaryBackupBuckets.add(this);
- }
- // update the 'primaryBackup' field in the bucket
- primaryBackup = newPrimaryBackup;
- }
- }
-
- /**
- * Update the secondary backup of a bucket, which also involves updating
- * the backward links in the 'TestServer' instances.
- *
- * @param newSecondaryBackup the new secondary of the bucket
- */
- void setSecondaryBackup(TestServer newSecondaryBackup) {
- if (secondaryBackup != newSecondaryBackup) {
- // the 'secondaryBackup' field does need to be changed
- if (secondaryBackup != null) {
- // remove this bucket from the 'buckets' list of the
- // old secondary backup
- secondaryBackup.secondaryBackupBuckets.remove(this);
- }
- if (newSecondaryBackup != null) {
- // add this bucket to the 'buckets' list of the
- // new secondary backup
- newSecondaryBackup.secondaryBackupBuckets.add(this);
- }
- // update the 'secondaryBackup' field in the bucket
- secondaryBackup = newSecondaryBackup;
- }
- }
-
- /*==================================*/
- /* Comparable interface */
- /*==================================*/
-
- /**
- * Compare two 'TestBucket' instances, for use in a 'TreeSet'.
- *
- * @param other the other 'TestBucket' instance to compare to
- */
- @Override
- public int compareTo(TestBucket other) {
- return index - other.index;
- }
-
- /**
- * Return a string representation of this 'TestBucket' instance.
- *
- * @return a string representation of this 'TestBucket' instance
- */
- @Override
- public String toString() {
- return "TestBucket[" + index + "]";
- }
- }
-
- /* ============================================================ */
-
- /**
- * Instances of this class are created as part of the 'rebalance'
- * operation on the lead server, or as part of a 'dumpBuckets' operation
- * on any server.
- * Each instance of this class corresponds to a 'Server' instance.
- * Unlike the actual 'Server' instances, each 'TestServer' instance
- * contains back links to all of the buckets it is associated with.
- */
- private static class TestServer {
- // unique id for this server
- // (matches the one in the corresponding 'Server' entry)
- final UUID uuid;
-
- // site socket information (matches 'Server' entry)
- final InetSocketAddress siteSocketAddress;
-
- // the set of all 'TestBucket' instances,
- // where this 'TestServer' is listed as 'owner'
- final TreeSet buckets = new TreeSet<>();
-
- // the set of all 'TestBucket' instances,
- // where this 'TestServer' is listed as 'primaryBackup'
- final TreeSet primaryBackupBuckets = new TreeSet<>();
-
- // the set of all 'TestBucket' instances,
- // where this 'TestServer' is listed as 'secondaryBackup'
- final TreeSet secondaryBackupBuckets = new TreeSet<>();
-
- /**
- * Constructor.
- *
- * @param uuid uuid of this 'TestServer' instance
- * @param siteSocketAddress matches the value in the corresponding 'Server'
- */
- TestServer(UUID uuid, InetSocketAddress siteSocketAddress) {
- this.uuid = uuid;
- this.siteSocketAddress = siteSocketAddress;
- }
-
- /**
- * Return a string representation of this 'TestServer' instance.
- *
- * @return a string representation of this 'TestServer' instance
- */
- @Override
- public String toString() {
- return "TestServer[" + uuid + "]";
- }
- }
-
- /* ============================================================ */
-
- /**
- * This class supports the 'rebalance' operation. Each instance is a wrapper
- * around a 'TestServer' instance, as it would be if another specific
- * server failed.
- */
- @EqualsAndHashCode
- private static class AdjustedTestServer
- implements Comparable {
- TestServer server;
-
- // simulated fail on this server
- TestServer failedServer;
-
- // expected bucket count if 'failedServer' fails
- int bucketCount;
-
- // total number of primary backup buckets used by this host
- int primaryBackupBucketCount;
-
- // total number of secondary backup buckets used by this host
- int secondaryBackupBucketCount;
-
- /**
- * Constructor.
- *
- * @param server the server this 'AdjustedTestServer' instance represents
- * @param failedServer the server going through a simulated failure --
- * the 'bucketCount' value is adjusted based upon this
- */
- AdjustedTestServer(TestServer server, TestServer failedServer) {
- this.server = server;
- this.failedServer = failedServer;
-
- this.bucketCount = server.buckets.size();
- this.primaryBackupBucketCount = server.primaryBackupBuckets.size();
- this.secondaryBackupBucketCount = server.secondaryBackupBuckets.size();
-
- // need to adjust 'bucketCount' for the case where the current
- // host fails
- for (TestBucket bucket : server.primaryBackupBuckets) {
- if (bucket.owner == failedServer) {
- bucketCount += 1;
- // TBD: should 'primaryBackupBucketCount' be decremented?
- }
- }
-
- // need to adjust 'bucketCount' for the case where the current
- // host fails
- for (TestBucket bucket : server.secondaryBackupBuckets) {
- if (bucket.owner == failedServer) {
- bucketCount += 1;
- // TBD: should 'secondaryBackupBucketCount' be decremented?
- }
- }
- }
-
- /* ****************************************** */
- /* Comparable interface */
- /* ****************************************** */
-
- /**
- * {@inheritDoc}
- */
- @Override
- public int compareTo(AdjustedTestServer other) {
- /*
- * Comparison order:
- * 1) minimal expected bucket count if current host fails
- * (differences of 1 are treated as a match)
- * 2) minimal backup bucket count
- * 3) UUID order
- */
- int rval = bucketCount - other.bucketCount;
- if (rval <= 1 && rval >= -1) {
- rval = (primaryBackupBucketCount + secondaryBackupBucketCount)
- - (other.primaryBackupBucketCount
- + other.secondaryBackupBucketCount);
- if (rval == 0) {
- rval = -Util.uuidComparator.compare(server.uuid, other.server.uuid);
- }
- }
- return rval;
- }
- }
-
- /* ============================================================ */
-
- /**
- * This class is primarily used to do a 'Rebalance' operation on the
- * lead server, which is then distributed to all of the other servers.
- * Part of it is also useful for implementing the /cmd/dumpBuckets
- * REST message handler.
- */
- private static class Rebalance {
- // this table resembles the 'Bucket.indexToBucket' table
- TestBucket[] buckets = new TestBucket[indexToBucket.length];
-
- // this table resembles the 'Server.servers' table
- TreeMap testServers = new TreeMap<>(Util.uuidComparator);
-
- /* this is a special server instance, which is allocated any
- * owned, primary, or secondary buckets that haven't been allocated to
- * any of the real servers
- */
- TestServer nullServer = new TestServer(null, null);
-
- /**
- * Copy all of the bucket data in the 'buckets' table, and also return
- * a copy of the 'Server.servers' table
- */
- void copyData() {
- // will contain a copy of the 'Bucket' table
- final Bucket[] bucketSnapshot = new Bucket[indexToBucket.length];
-
- /*
- * This method is running within the 'MainLoop' thread,
- * and builds a copy of the 'Bucket' table, as well as the
- * list of active servers -- these can then be examined
- * in a different thread, without potentially distrupting
- * the 'MainLoop' thread.
- *
- * @return 0 (the return value is not significant at present)
- */
- Callable callable = () -> {
- // copy the 'Bucket' table
- for (int i = 0; i < indexToBucket.length; i += 1) {
- // makes a snapshot of the bucket information
- Bucket bucket = indexToBucket[i];
-
- Bucket tmpBucket = new Bucket(i);
- tmpBucket.setOwner(bucket.getOwner());
- tmpBucket.setPrimaryBackup(bucket.getPrimaryBackup());
- tmpBucket.setSecondaryBackup(bucket.getSecondaryBackup());
- bucketSnapshot[i] = tmpBucket;
- }
-
- /*
- * At this point, 'bucketSnapshot' and 'servers' should be
- * complete. The next step is to create a 'TestServer' entry
- * that matches each 'Server' entry.
- */
- for (Server server : Server.getServers()) {
- UUID uuid = server.getUuid();
- testServers.put(uuid, new TestServer(uuid, server.getSiteSocketAddress()));
- }
-
- return 0;
- };
- FutureTask ft = new FutureTask<>(callable);
- MainLoop.queueWork(ft);
- try {
- ft.get(60, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- logger.error("Interrupted", e);
- Thread.currentThread().interrupt();
- return;
- } catch (ExecutionException | TimeoutException e) {
- logger.error("Exception in Rebalance.copyData", e);
- return;
- }
-
- makeTestBucket(bucketSnapshot);
- }
-
- private void makeTestBucket(final Bucket[] bucketSnapshot) {
- /*
- * Now, create a 'TestBucket' table that mirrors the 'Bucket' table.
- * Unlike the standard 'Bucket' and 'Server' tables, the 'TestServer'
- * entries contain links referring back to the 'TestBucket' entries.
- * This information is useful when rebalancing.
- */
- for (Bucket bucket : bucketSnapshot) {
- int index = bucket.index;
- TestBucket testBucket = new TestBucket(index);
-
- // populate the 'owner' field
- if (bucket.getOwner() != null) {
- testBucket.setOwner(testServers.get(bucket.getOwner().getUuid()));
- } else {
- testBucket.setOwner(nullServer);
- }
-
- // populate the 'primaryBackup' field
- if (bucket.primaryBackup != null) {
- testBucket.setPrimaryBackup(
- testServers.get(bucket.primaryBackup.getUuid()));
- } else {
- testBucket.setPrimaryBackup(nullServer);
- }
-
- // populate the 'secondaryBackup' field
- if (bucket.secondaryBackup != null) {
- testBucket.setSecondaryBackup(
- testServers.get(bucket.secondaryBackup.getUuid()));
- } else {
- testBucket.setSecondaryBackup(nullServer);
- }
- buckets[index] = testBucket;
- }
- }
-
- /**
- * Allocate unowned 'TestBucket' entries across all of the 'TestServer'
- * entries. When every 'TestBucket' has an owner, rebalance as needed,
- * so the 'TestServer' entry with the most buckets has at most one more
- * bucket than the 'TestServer' entry with the least.
- */
- void allocateBuckets() {
- /*
- * the following 'Comparator' is used to control the order of the
- * 'needBuckets' TreeSet: those with the fewest buckets allocated are
- * at the head of the list.
- */
- Comparator bucketCount = (s1, s2) -> {
- int rval = s1.buckets.size() - s2.buckets.size();
- if (rval == 0) {
- rval = Util.uuidComparator.compare(s1.uuid, s2.uuid);
- }
- return rval;
- };
-
- // sort servers according to the order in which they can
- // take on ownership of buckets
- TreeSet needBuckets = new TreeSet<>(bucketCount);
- for (TestServer ts : testServers.values()) {
- needBuckets.add(ts);
- }
-
- // go through all of the unowned buckets, and allocate them
- for (TestBucket bucket : new LinkedList(nullServer.buckets)) {
- // take first entry off of sorted server list
- TestServer ts = needBuckets.first();
- needBuckets.remove(ts);
-
- // add this bucket to the 'buckets' list
- bucket.setOwner(ts);
-
- // place it back in the list, possibly in a new position
- // (it's attributes have changed)
- needBuckets.add(ts);
- }
- nullServer.buckets.clear();
-
- // there may still be rebalancing needed -- no host should contain
- // 2 or more buckets more than any other host
- for ( ; ; ) {
- TestServer first = needBuckets.first();
- TestServer last = needBuckets.last();
-
- if (last.buckets.size() - first.buckets.size() <= 1) {
- // no more rebalancing needed
- break;
- }
-
- // remove both from sorted list
- needBuckets.remove(first);
- needBuckets.remove(last);
-
- // take one bucket from 'last', and assign it to 'first'
- last.buckets.first().setOwner(first);
-
- // place back in sorted list
- needBuckets.add(first);
- needBuckets.add(last);
- }
- }
-
- /**
- * Make sure that the primary backups have the same site as the owner,
- * and the secondary backups have a different site.
- */
- void checkSiteValues() {
- for (TestBucket bucket : buckets) {
- if (bucket.owner != null) {
- InetSocketAddress siteSocketAddress =
- bucket.owner.siteSocketAddress;
- TestServer primaryBackup = bucket.primaryBackup;
- TestServer secondaryBackup = bucket.secondaryBackup;
-
- validateSiteOwner(bucket, siteSocketAddress,
- primaryBackup, secondaryBackup);
- }
- }
- }
-
- /**
- * Validate primary site owner and secondary site owner are valid.
- * @param bucket TestBucket
- * @param siteSocketAddress site socket address
- * @param primaryBackup primary backups
- * @param secondaryBackup secondary backups
- */
- private void validateSiteOwner(TestBucket bucket, InetSocketAddress siteSocketAddress,
- TestServer primaryBackup, TestServer secondaryBackup) {
- if (primaryBackup != null
- && !Objects.equals(siteSocketAddress,
- primaryBackup.siteSocketAddress)) {
- /*
- * primary backup is from the wrong site -- see if we can
- * use the secondary.
- */
- if (secondaryBackup != null
- && Objects.equals(siteSocketAddress,
- secondaryBackup.siteSocketAddress)) {
- // swap primary and secondary
- bucket.setPrimaryBackup(secondaryBackup);
- bucket.setSecondaryBackup(primaryBackup);
- } else {
- // just invalidate primary backup
- bucket.setPrimaryBackup(null);
- }
- } else if (secondaryBackup != null
- && Objects.equals(siteSocketAddress,
- secondaryBackup.siteSocketAddress)) {
- // secondary backup is from the wrong site
- bucket.setSecondaryBackup(null);
- if (primaryBackup == null) {
- // we can use this as the primary
- bucket.setPrimaryBackup(secondaryBackup);
- }
- }
- }
-
- /**
- * Allocate and rebalance the primary backups.
- */
- void rebalancePrimaryBackups() {
- for (TestServer failedServer : testServers.values()) {
- /*
- * to allocate primary backups for this server,
- * simulate a failure, and balance the backup hosts
- */
-
- // get siteSocketAddress from server
- InetSocketAddress siteSocketAddress = failedServer.siteSocketAddress;
-
- // populate a 'TreeSet' of 'AdjustedTestServer' instances based
- // the failure of 'failedServer'
- TreeSet adjustedTestServers = new TreeSet<>();
- for (TestServer server : testServers.values()) {
- if (server == failedServer
- || !Objects.equals(siteSocketAddress,
- server.siteSocketAddress)) {
- continue;
- }
- adjustedTestServers.add(new AdjustedTestServer(server, failedServer));
- }
-
- if (adjustedTestServers.isEmpty()) {
- // this is presumably the only server -- there is no other server
- // to act as primary backup, and no rebalancing can occur
- continue;
- }
-
- // we need a backup host for each bucket
- for (TestBucket bucket : failedServer.buckets) {
- if (bucket.primaryBackup == null
- || bucket.primaryBackup == nullServer) {
- // need a backup host for this bucket -- remove the first
- // entry from 'adjustedTestServers', which is most favored
- AdjustedTestServer backupHost = adjustedTestServers.first();
- adjustedTestServers.remove(backupHost);
-
- // update add this bucket to the list
- bucket.setPrimaryBackup(backupHost.server);
-
- // update counts in 'AdjustedTestServer'
- backupHost.bucketCount += 1;
- backupHost.primaryBackupBucketCount += 1;
-
- // place it back in the table, possibly in a new position
- // (it's attributes have changed)
- adjustedTestServers.add(backupHost);
- }
- }
-
- // TBD: Is additional rebalancing needed?
- }
- }
-
- /**
- * Allocate and rebalance the secondary backups.
- */
- void rebalanceSecondaryBackups() {
- for (TestServer failedServer : testServers.values()) {
- /*
- * to allocate secondary backups for this server,
- * simulate a failure, and balance the backup hosts
- */
-
- // get siteSocketAddress from server
- InetSocketAddress siteSocketAddress = failedServer.siteSocketAddress;
-
- // populate a 'TreeSet' of 'AdjustedTestServer' instances based
- // the failure of 'failedServer'
- TreeSet adjustedTestServers =
- new TreeSet<>();
- for (TestServer server : testServers.values()) {
- if (server == failedServer
- || Objects.equals(siteSocketAddress,
- server.siteSocketAddress)) {
- continue;
- }
- adjustedTestServers.add(new AdjustedTestServer(server, failedServer));
- }
-
- if (adjustedTestServers.isEmpty()) {
- // this is presumably the only server -- there is no other server
- // to act as secondary backup, and no rebalancing can occur
- continue;
- }
-
- // we need a backup host for each bucket
- for (TestBucket bucket : failedServer.buckets) {
- if (bucket.secondaryBackup == null
- || bucket.secondaryBackup == nullServer) {
- // need a backup host for this bucket -- remove the first
- // entry from 'adjustedTestServers', which is most favored
- AdjustedTestServer backupHost = adjustedTestServers.first();
- adjustedTestServers.remove(backupHost);
-
- // update add this bucket to the list
- bucket.setSecondaryBackup(backupHost.server);
-
- // update counts in 'AdjustedTestServer'
- backupHost.bucketCount += 1;
- backupHost.secondaryBackupBucketCount += 1;
-
- // place it back in the table, possibly in a new position
- // (it's attributes have changed)
- adjustedTestServers.add(backupHost);
- }
- }
-
- // TBD: Is additional rebalancing needed?
- }
- }
-
- /**
- * Generate a message with all of the bucket updates, process it locally,
- * and send it to all servers in the "Notify List".
- */
- void generateBucketMessage() throws IOException {
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- DataOutputStream dos = new DataOutputStream(bos);
-
- // go through the entire 'buckets' table
- for (int i = 0; i < buckets.length; i += 1) {
- // fetch the 'TestBucket' associated with index 'i'
- TestBucket testBucket = buckets[i];
-
- /*
- * Get the UUID of the owner, primary backup, and secondary backup
- * for this bucket. If the associated value does not exist, 'null'
- * is used.
- */
- UUID newOwner = null;
- UUID newPrimary = null;
- UUID newSecondary = null;
-
- if (testBucket.owner != nullServer && testBucket.owner != null) {
- newOwner = testBucket.owner.uuid;
- }
- if (testBucket.primaryBackup != nullServer
- && testBucket.primaryBackup != null) {
- newPrimary = testBucket.primaryBackup.uuid;
- }
- if (testBucket.secondaryBackup != nullServer
- && testBucket.secondaryBackup != null) {
- newSecondary = testBucket.secondaryBackup.uuid;
- }
-
- // write bucket number
- dos.writeShort(i);
-
- // 'owner' field
- writeOwner(dos, newOwner);
-
- // 'primaryBackup' field
- writePrimary(dos, newPrimary);
-
- // 'secondaryBackup' field
- writeSecondary(dos, newSecondary);
-
- dos.writeByte(END_OF_PARAMETERS_TAG);
- }
-
- // get the unencoded 'packet'
- final byte[] packet = bos.toByteArray();
-
- // create an 'Entity' containing the encoded packet
- final Entity entity =
- Entity.entity(new String(Base64.getEncoder().encode(packet),
- StandardCharsets.UTF_8), MediaType.APPLICATION_OCTET_STREAM_TYPE);
- /*
- * This method is running within the 'MainLoop' thread.
- */
- Runnable task = () -> {
- try {
- /*
- * update the buckets on this host,
- * which is presumably the lead server.
- */
- Bucket.updateBucketInternal(packet);
- } catch (Exception e) {
- logger.error("Exception updating buckets", e);
- }
-
- // send a message to all servers on the notify list
- for (Server server : Server.getNotifyList()) {
- server.post("bucket/update", entity);
- }
- };
- MainLoop.queueWork(task);
- }
-
- private void writeOwner(DataOutputStream dos, UUID newOwner) throws IOException {
- if (newOwner != null) {
- dos.writeByte(OWNER_UPDATE);
- Util.writeUuid(dos, newOwner);
- } else {
- dos.writeByte(OWNER_NULL);
- }
- }
-
- private void writePrimary(DataOutputStream dos, UUID newPrimary) throws IOException {
- if (newPrimary != null) {
- dos.writeByte(PRIMARY_BACKUP_UPDATE);
- Util.writeUuid(dos, newPrimary);
- } else {
- dos.writeByte(PRIMARY_BACKUP_NULL);
- }
- }
-
- private void writeSecondary(DataOutputStream dos, UUID newSecondary) throws IOException {
- if (newSecondary != null) {
- dos.writeByte(SECONDARY_BACKUP_UPDATE);
- Util.writeUuid(dos, newSecondary);
- } else {
- dos.writeByte(SECONDARY_BACKUP_NULL);
- }
- }
-
- /**
- * Supports the '/cmd/dumpBuckets' REST message -- this isn't part of
- * a 'rebalance' operation, but it turned out to be a convenient way
- * to dump out the bucket table.
- *
- * @param out the output stream
- */
- private void dumpBucketsInternal(PrintStream out) {
- // xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx xxxxxxxxx *
- // UUID Type Buckets
- String format = "%-36s %-9s %5s %s\n";
-
- int totalOwner = 0;
- int totalPrimary = 0;
- int totalSecondary = 0;
-
- out.printf(format, "UUID", "Type", "Count", "Buckets");
- out.printf(format, "----", "----", "-----", "-------");
- for (TestServer ts : testServers.values()) {
- // dump out 'owned' bucket information
- if (ts.buckets.isEmpty()) {
- // no buckets owned by this server
- out.printf(format, ts.uuid, OWNED_STR, 0, "");
- } else {
- // dump out primary buckets information
- totalOwner +=
- dumpBucketsSegment(out, format, ts.buckets, ts.uuid.toString(), OWNED_STR);
- }
- // optionally dump out primary buckets information
- totalPrimary +=
- dumpBucketsSegment(out, format, ts.primaryBackupBuckets, "", "Primary");
- // optionally dump out secondary buckets information
- totalSecondary +=
- dumpBucketsSegment(out, format, ts.secondaryBackupBuckets, "", "Secondary");
- }
-
- if (!nullServer.buckets.isEmpty()
- || !nullServer.primaryBackupBuckets.isEmpty()
- || !nullServer.secondaryBackupBuckets.isEmpty()) {
- /*
- * There are some owned, primary, or secondary buckets that are
- * unassigned. It is displayed in a manner similar to buckets that
- * do have a server, but the UUID field is marked as 'UNASSIGNED'
- * in the first line of the display.
- */
- String uuidField = "UNASSIGNED";
-
- // optionally dump out unassigned owned buckets information
- if (dumpBucketsSegment(out, format, nullServer.buckets,
- uuidField, OWNED_STR) != 0) {
- uuidField = "";
- }
- // optionally dump out unassigned primary backup buckets information
- if (dumpBucketsSegment(out, format, nullServer.primaryBackupBuckets,
- uuidField, "Primary") != 0) {
- uuidField = "";
- }
- // optionally dump out unassigned secondary backup buckets information
- dumpBucketsSegment(out, format, nullServer.secondaryBackupBuckets,
- uuidField, "Secondary");
- }
- out.println("\nTotal assigned: owner = " + totalOwner
- + ", primary = " + totalPrimary
- + ", secondary = " + totalSecondary);
- }
-
- /**
- * Supports the 'dumpBucketsInternal' command, and indirectly, the
- * '/cmd/dumpBuckets' REST message. It formats one segment of bucket data
- * (owned, primary backup, or secondary backup), and dumps out the
- * associated bucket data in segments of 8. Note: if the size of 'buckets'
- * is 0, nothing is displayed.
- *
- * @param out the output stream
- * @param format the message format string
- * @param buckets the entire set of buckets to be displayed
- * @param uuid string to display under the 'UUID' header
- * @param segmentDescription string to display under the 'Type' header
- * @return the size of the 'buckets' set
- */
- private static int dumpBucketsSegment(
- PrintStream out, String format, TreeSet buckets,
- String uuid, String segmentDescription) {
-
- int size = buckets.size();
- if (size != 0) {
- // generate a linked list of the bucket data to display
- LinkedList data = new LinkedList<>();
- StringBuilder sb = new StringBuilder();
- int count = 8;
-
- for (TestBucket bucket : buckets) {
- if (sb.length() != 0) {
- // this is not the first bucket in the line --
- // prepend a space
- sb.append(' ');
- }
-
- // add the bucket number
- sb.append(String.format("%4s", bucket.index));
- count -= 1;
- if (count <= 0) {
- // filled up a row --
- // add it to the list, and start a new line
- data.add(sb.toString());
- sb = new StringBuilder();
- count = 8;
- }
- }
- if (sb.length() != 0) {
- // there is a partial line remaining -- add it to the list
- data.add(sb.toString());
- }
-
- /*
- * The first line displayed includes the UUID and size information,
- * and the first line of bucket data (owned, primary, or secondary).
- * The remaining lines of bucket data are displayed alone,
- * without any UUID or size information.
- */
- out.printf(format, uuid, segmentDescription, buckets.size(),
- data.removeFirst());
- while (!data.isEmpty()) {
- out.printf(format, "", "", "", data.removeFirst());
- }
- }
- return size;
- }
- }
-
- /* ============================================================ */
-
- /**
- * This interface is an abstraction for all messages that are routed
- * through buckets. It exists, so that messages may be queued while
- * bucket migration is taking place, and it makes it possible to support
- * multiple types of messages (routed UEB/DMAAP messages, or lock messages)
- */
- public static interface Message {
- /**
- * Process the current message -- this may mean delivering it locally,
- * or forwarding it.
- */
- public void process();
-
- /**
- * Send the message to another host for processing.
- *
- * @param server the destination host (although it could end up being
- * forwarded again)
- * @param bucketNumber the bucket number determined by extracting the
- * current message's keyword
- */
- public void sendToServer(Server server, int bucketNumber);
- }
-
- /* ============================================================ */
-
- /**
- * This interface implements a type of backup; for example, there is one
- * for backing up Drools objects within sessions, and another for
- * backing up lock data.
- */
- public static interface Backup {
- /**
- * This method is called to add a 'Backup' instance to the registered list.
- *
- * @param backup an object implementing the 'Backup' interface
- */
- public static void register(Backup backup) {
- synchronized (backupList) {
- if (!backupList.contains(backup)) {
- backupList.add(backup);
- }
- }
- }
-
- /**
- * Generate Serializable backup data for the specified bucket.
- *
- * @param bucketNumber the bucket number to back up
- * @return a Serializable object containing backkup data
- */
- public Restore generate(int bucketNumber);
- }
-
- /* ============================================================ */
-
- /**
- * Objects implementing this interface may be serialized, and restored
- * on a different host.
- */
- public static interface Restore extends Serializable {
- /**
- * Restore from deserialized data.
- *
- * @param bucketNumber the bucket number being restored
- */
- void restore(int bucketNumber);
- }
-
- /* ============================================================ */
-
- /**
- * This interface corresponds to a transient state within a Bucket.
- */
- private interface State {
- /**
- * This method allows state-specific handling of the
- * 'Bucket.forward()' methods
- *
- * @param message the message to be forwarded/processed
- * @return a value of 'true' indicates the message has been "handled"
- * (forwarded or queued), and 'false' indicates it has not, and needs
- * to be handled locally.
- */
- boolean forward(Message message);
-
- /**
- * This method indicates that the current server is the new owner
- * of the current bucket.
- */
- void newOwner();
-
- /**
- * This method indicates that serialized data has been received,
- * presumably from the old owner of the bucket. The data could correspond
- * to Drools objects within sessions, as well as global locks.
- *
- * @param data serialized data associated with this bucket (at present,
- * this is assumed to be complete, all within a single message)
- */
- void bulkSerializedData(byte[] data);
- }
-
- /* ============================================================ */
-
- /**
- * Each state instance is associated with a bucket, and is used when
- * that bucket is in a transient state where it is the new owner of a
- * bucket, or is presumed to be the new owner, based upon other events
- * that have occurred.
- */
- private class NewOwner extends Thread implements State {
- /*
- * this value is 'true' if we have explicitly received a 'newOwner'
- * indication, and 'false' if there was another trigger for entering this
- * transient state (e.g. receiving serialized data)
- */
- boolean confirmed;
-
- // when 'System.currentTimeMillis()' reaches this value, we time out
- long endTime;
-
- // If not 'null', we are queueing messages for this bucket
- // otherwise, we are sending them through.
- Queue messages = new ConcurrentLinkedQueue<>();
-
- // this is used to signal the thread that we have data available
- CountDownLatch dataAvailable = new CountDownLatch(1);
-
- // this is the data
- byte[] data = null;
-
- // this is the old owner of the bucket
- Server oldOwner;
-
- /**
- * Constructor - a transient state, where we are expecting to receive
- * bulk data from the old owner.
- *
- * @param confirmed 'true' if we were explicitly notified that we
- * are the new owner of the bucket, 'false' if not
- */
- NewOwner(boolean confirmed, Server oldOwner) {
- super("New Owner for Bucket " + index);
- this.confirmed = confirmed;
- this.oldOwner = oldOwner;
- if (oldOwner == null) {
- // we aren't expecting any data -- this is indicated by 0-length data
- bulkSerializedData(new byte[0]);
- }
- endTime = System.currentTimeMillis()
- + (confirmed ? confirmedTimeout : unconfirmedTimeout);
- start();
- }
-
- /**
- * Return the 'confirmed' indicator.
- *
- * @return the 'confirmed' indicator
- */
- private boolean getConfirmed() {
- synchronized (Bucket.this) {
- return confirmed;
- }
- }
-
- /**
- * This returns the timeout delay, which will always be less than or
- * equal to 1 second. This allows us to periodically check whether the
- * old server is still active.
- *
- * @return the timeout delay, which is the difference between the
- * 'endTime' value and the current time or 1 second
- * (whichever is less)
- */
- private long getTimeout() {
- long lclEndTime;
- synchronized (Bucket.this) {
- lclEndTime = endTime;
- }
- return Math.min(lclEndTime - System.currentTimeMillis(), 1000L);
- }
-
- /**
- * Return the current value of the 'data' field.
- *
- * @return the current value of the 'data' field
- */
- private byte[] getData() {
- synchronized (Bucket.this) {
- return data;
- }
- }
-
- /* ******************* */
- /* 'State' interface */
- /* ******************* */
-
- /**
- * {@inheritDoc}
- */
- @Override
- public boolean forward(Message message) {
- // the caller of this method is synchronized on 'Bucket.this'
- if (messages != null && Thread.currentThread() != this) {
- // just queue the message
- messages.add(message);
- return true;
- } else {
- /*
- * Either:
- *
- * 1) We are in a grace period, where 'state' is still set, but
- * we are no longer forwarding messages.
- * 2) We are calling 'message.process()' from this thread
- * in the 'finally' block of 'NewOwner.run()'.
- *
- * In either case, messages should be processed locally.
- */
- return false;
- }
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void newOwner() {
- // the caller of this method is synchronized on 'Bucket.this'
- if (!confirmed) {
- confirmed = true;
- endTime += (confirmedTimeout - unconfirmedTimeout);
- }
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void bulkSerializedData(byte[] data) {
- // the caller of this method is synchronized on 'Bucket.this'
- if (this.data == null) {
- this.data = data;
- dataAvailable.countDown();
- }
- }
-
- /* ******************** */
- /* 'Thread' interface */
- /* ******************** */
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void run() {
- logger.info("{}: 'run' method invoked", this);
- try {
- byte[] lclData;
- long delay;
-
- while ((lclData = getData()) == null
- && oldOwner.isActive()
- && (delay = getTimeout()) > 0) {
- // ignore return value -- 'data' will indicate the result
- if (!dataAvailable.await(delay, TimeUnit.MILLISECONDS)) {
- logger.error("CountDownLatch await time reached");
- }
- }
- if (lclData == null) {
- // no data available -- log an error, and abort
- if (getConfirmed()) {
- // we never received the data, but we are the new owner
- logger.error("{}: never received session data", this);
- } else {
- /*
- * no data received, and it was never confirmed --
- * assume that the forwarded message that triggered this was
- * erroneus
- */
- logger.error("{}: no confirmation or data received -- aborting", this);
- return;
- }
- } else {
- logger.info("{}: {} bytes of data available",
- this, lclData.length);
- }
-
- // if we reach this point, this server is the new owner
- if (lclData == null || lclData.length == 0) {
- // see if any features can do the restore
- for (ServerPoolApi feature : ServerPoolApi.impl.getList()) {
- feature.restoreBucket(Bucket.this);
- }
- } else {
- // deserialize data
- Object obj = Util.deserialize(lclData);
- restoreBucketData(obj);
- }
- } catch (Exception e) {
- logger.error("Exception in {}", this, e);
- } finally {
- runCleanup();
- }
- }
-
- private void runCleanup() {
- /*
- * cleanly leave state -- we want to make sure that messages
- * are processed in order, so the queue needs to remain until
- * it is empty
- */
- logger.info("{}: entering cleanup state", this);
- for ( ; ; ) {
- Message message = messages.poll();
- if (message == null) {
- // no messages left, but this could change
- synchronized (Bucket.this) {
- message = messages.poll();
- if (message == null) {
- // no messages left
- noMoreMessages();
- break;
- }
- }
- }
- // this doesn't work -- it ends up right back in the queue
- // if 'messages' is defined
- message.process();
- }
- if (messages == null) {
- // this indicates we need to enter a grace period before cleanup,
- sleepBeforeCleanup();
- }
- logger.info("{}: exiting cleanup state", this);
- }
-
- private void noMoreMessages() {
- if (state == this) {
- if (owner == Server.getThisServer()) {
- // we can now exit the state
- state = null;
- stateChanged();
- } else {
- /*
- * We need a grace period before we can
- * remove the 'state' value (this can happen
- * if we receive and process the bulk data
- * before receiving official confirmation
- * that we are owner of the bucket.
- */
- messages = null;
- }
- }
- }
-
- private void sleepBeforeCleanup() {
- try {
- logger.info("{}: entering grace period before terminating",
- this);
- Thread.sleep(unconfirmedGracePeriod);
- } catch (InterruptedException e) {
- // we are exiting in any case
- Thread.currentThread().interrupt();
- } finally {
- synchronized (Bucket.this) {
- // Do we need to confirm that we really are the owner?
- // What does it mean if we are not?
- if (state == this) {
- state = null;
- stateChanged();
- }
- }
- }
- }
-
- /**
- * Return a useful value to display in log messages.
- *
- * @return a useful value to display in log messages
- */
- public String toString() {
- return "Bucket.NewOwner(" + index + ")";
- }
-
- /**
- * Restore bucket data.
- *
- * @param obj deserialized bucket data
- */
- private void restoreBucketData(Object obj) {
- if (obj instanceof List) {
- for (Object entry : (List>) obj) {
- if (entry instanceof Restore) {
- // entry-specific 'restore' operation
- ((Restore) entry).restore(Bucket.this.index);
- } else {
- logger.error("{}: Expected '{}' but got '{}'",
- this, Restore.class.getName(),
- entry.getClass().getName());
- }
- }
- } else {
- logger.error("{}: expected 'List' but got '{}'",
- this, obj.getClass().getName());
- }
- }
- }
-
-
- /* ============================================================ */
-
- /**
- * Each state instance is associated with a bucket, and is used when
- * that bucket is in a transient state where it is the old owner of
- * a bucket, and the data is being transferred to the new owner.
- */
- private class OldOwner extends Thread implements State {
- Server newOwner;
-
- OldOwner(Server newOwner) {
- super("Old Owner for Bucket " + index);
- this.newOwner = newOwner;
- start();
- }
-
- /* ******************* */
- /* 'State' interface */
- /* ******************* */
-
- /**
- * {@inheritDoc}
- */
- @Override
- public boolean forward(Message message) {
- // forward message to new owner
- message.sendToServer(newOwner, index);
- return true;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void newOwner() {
- // shouldn't happen -- just log an error
- logger.error("{}: 'newOwner()' shouldn't be called in this state", this);
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void bulkSerializedData(byte[] data) {
- // shouldn't happen -- just log an error
- logger.error("{}: 'bulkSerializedData()' shouldn't be called in this state", this);
- }
-
- /* ******************** */
- /* 'Thread' interface */
- /* ******************** */
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void run() {
- logger.info("{}: 'run' method invoked", this);
- try {
- // go through all of the entries in the list, collecting restore data
- List restoreData = new LinkedList<>();
- for (Backup backup : backupList) {
- Restore restore = backup.generate(index);
- if (restore != null) {
- restoreData.add(restore);
- }
- }
-
- // serialize all of the objects,
- // and send what we have to the new owner
- Entity entity = Entity.entity(
- new String(Base64.getEncoder().encode(Util.serialize(restoreData))),
- MediaType.APPLICATION_OCTET_STREAM_TYPE);
- newOwner.post("bucket/sessionData", entity, new Server.PostResponse() {
- @Override
- public WebTarget webTarget(WebTarget webTarget) {
- return webTarget
- .queryParam(QP_BUCKET, index)
- .queryParam(QP_DEST, newOwner.getUuid())
- .queryParam(QP_TTL, timeToLive);
- }
-
- @Override
- public void response(Response response) {
- logger.info("/bucket/sessionData response code = {}",
- response.getStatus());
- }
- });
- } catch (Exception e) {
- logger.error("Exception in {}", this, e);
- } finally {
- synchronized (Bucket.this) {
- // restore the state
- if (state == this) {
- state = null;
- stateChanged();
- }
- }
- }
- }
-
- /**
- * Return a useful value to display in log messages.
- *
- * @return a useful value to display in log messages
- */
- public String toString() {
- return "Bucket.OldOwner(" + index + ")";
- }
- }
-}
diff --git a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Discovery.java b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Discovery.java
deleted file mode 100644
index a53fb4d1..00000000
--- a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Discovery.java
+++ /dev/null
@@ -1,352 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * feature-server-pool
- * ================================================================================
- * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.serverpool;
-
-import static org.onap.policy.drools.serverpool.ServerPoolProperties.DEFAULT_DISCOVERY_FETCH_LIMIT;
-import static org.onap.policy.drools.serverpool.ServerPoolProperties.DEFAULT_DISCOVERY_FETCH_TIMEOUT;
-import static org.onap.policy.drools.serverpool.ServerPoolProperties.DEFAULT_DISCOVER_PUBLISHER_LOOP_CYCLE_TIME;
-import static org.onap.policy.drools.serverpool.ServerPoolProperties.DISCOVERY_ALLOW_SELF_SIGNED_CERTIFICATES;
-import static org.onap.policy.drools.serverpool.ServerPoolProperties.DISCOVERY_API_KEY;
-import static org.onap.policy.drools.serverpool.ServerPoolProperties.DISCOVERY_API_SECRET;
-import static org.onap.policy.drools.serverpool.ServerPoolProperties.DISCOVERY_FETCH_LIMIT;
-import static org.onap.policy.drools.serverpool.ServerPoolProperties.DISCOVERY_FETCH_TIMEOUT;
-import static org.onap.policy.drools.serverpool.ServerPoolProperties.DISCOVERY_HTTPS;
-import static org.onap.policy.drools.serverpool.ServerPoolProperties.DISCOVERY_PASSWORD;
-import static org.onap.policy.drools.serverpool.ServerPoolProperties.DISCOVERY_SERVERS;
-import static org.onap.policy.drools.serverpool.ServerPoolProperties.DISCOVERY_TOPIC;
-import static org.onap.policy.drools.serverpool.ServerPoolProperties.DISCOVERY_USERNAME;
-import static org.onap.policy.drools.serverpool.ServerPoolProperties.DISCOVER_PUBLISHER_LOOP_CYCLE_TIME;
-import static org.onap.policy.drools.serverpool.ServerPoolProperties.getProperty;
-
-import com.google.gson.Gson;
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.nio.charset.StandardCharsets;
-import java.util.Base64;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
-import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
-import org.onap.policy.common.endpoints.event.comm.TopicListener;
-import org.onap.policy.common.endpoints.event.comm.TopicSink;
-import org.onap.policy.common.endpoints.event.comm.TopicSource;
-import org.onap.policy.common.endpoints.properties.PolicyEndPointProperties;
-import org.onap.policy.common.utils.coder.CoderException;
-import org.onap.policy.common.utils.coder.StandardCoder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This class makes use of UEB/DMAAP to discover other servers in the pool.
- * The discovery processes ordinarily run only on the lead server, but they
- * run on other servers up until the point that they determine who the
- * leader is.
- */
-public class Discovery implements TopicListener {
- private static Logger logger = LoggerFactory.getLogger(Discovery.class);
-
- // used for JSON <-> String conversion
- private static StandardCoder coder = new StandardCoder();
-
- private static Discovery discovery = null;
-
- private volatile Publisher publisherThread = null;
-
- private List consumers = null;
- private List publishers = null;
-
- private Discovery() {
- // we want to modify the properties we send to 'TopicManager'
- PropBuilder builder = new PropBuilder(ServerPoolProperties.getProperties());
- builder.convert(DISCOVERY_SERVERS, null,
- PolicyEndPointProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
- builder.convert(DISCOVERY_USERNAME, null,
- PolicyEndPointProperties.PROPERTY_TOPIC_AAF_MECHID_SUFFIX);
- builder.convert(DISCOVERY_PASSWORD, null,
- PolicyEndPointProperties.PROPERTY_TOPIC_AAF_PASSWORD_SUFFIX);
- builder.convert(DISCOVERY_HTTPS, null,
- PolicyEndPointProperties.PROPERTY_HTTP_HTTPS_SUFFIX);
- builder.convert(DISCOVERY_API_KEY, null,
- PolicyEndPointProperties.PROPERTY_TOPIC_API_KEY_SUFFIX);
- builder.convert(DISCOVERY_API_SECRET, null,
- PolicyEndPointProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX);
- builder.convert(DISCOVERY_FETCH_TIMEOUT,
- String.valueOf(DEFAULT_DISCOVERY_FETCH_TIMEOUT),
- PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX);
- builder.convert(DISCOVERY_FETCH_LIMIT,
- String.valueOf(DEFAULT_DISCOVERY_FETCH_LIMIT),
- PolicyEndPointProperties.PROPERTY_TOPIC_SOURCE_FETCH_LIMIT_SUFFIX);
- builder.convert(DISCOVERY_ALLOW_SELF_SIGNED_CERTIFICATES, null,
- PolicyEndPointProperties.PROPERTY_ALLOW_SELF_SIGNED_CERTIFICATES_SUFFIX);
- Properties prop = builder.finish();
- logger.debug("Discovery converted properties: {}", prop);
-
- consumers = TopicEndpointManager.getManager().addTopicSources(prop);
- publishers = TopicEndpointManager.getManager().addTopicSinks(prop);
-
- if (consumers.isEmpty()) {
- logger.error("No consumer topics");
- }
- if (publishers.isEmpty()) {
- logger.error("No publisher topics");
- }
- logger.debug("Discovery: {} consumers, {} publishers",
- consumers.size(), publishers.size());
- }
-
- /**
- * Start all consumers and publishers, and start the publisher thread.
- */
- static synchronized void startDiscovery() {
- if (discovery == null) {
- discovery = new Discovery();
- }
- discovery.start();
- }
-
- /**
- * Stop all consumers and publishers, and stop the publisher thread.
- */
- static synchronized void stopDiscovery() {
- if (discovery != null) {
- discovery.stop();
- }
- }
-
- /**
- * Start all consumers and publishers, and start the publisher thread.
- */
- private void start() {
- for (TopicSource consumer : consumers) {
- consumer.register(this);
- consumer.start();
- }
- for (TopicSink publisher : publishers) {
- publisher.start();
- }
- if (publisherThread == null) {
- // send thread wasn't running -- start it
- publisherThread = new Publisher();
- publisherThread.start();
- }
- }
-
- /**
- * Stop all consumers and publishers, and stop the publisher thread.
- */
- private void stop() {
- publisherThread = null;
- for (TopicSink publisher : publishers) {
- publisher.stop();
- }
- for (TopicSource consumer : consumers) {
- consumer.unregister(this);
- consumer.stop();
- }
- }
-
- /*===========================*/
- /* 'TopicListener' interface */
- /*===========================*/
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void onTopicEvent(CommInfrastructure infra, String topic, String event) {
- /*
- * a JSON message has been received -- it should contain
- * a single string parameter 'pingData', which contains the
- * same format base64-encoded message that 'Server'
- * instances periodically exchange
- */
- try {
- @SuppressWarnings("unchecked")
- LinkedHashMap map = coder.decode(event, LinkedHashMap.class);
- String message = map.get("pingData");
- Server.adminRequest(message.getBytes(StandardCharsets.UTF_8));
- logger.info("Received a message, server count={}", Server.getServerCount());
- } catch (CoderException e) {
- logger.error("Can't decode message", e);
- }
- }
-
- /* ============================================================ */
-
- /**
- * This class is used to convert internal 'discovery.*' properties to
- * properties that 'TopicEndpointManager' can use.
- */
- private static class PropBuilder {
- // properties being incrementally modified
- Properties prop;
-
- // value from 'discovery.topic' parameter
- String topic;
-
- // 'true' only if both 'discovery.topic' and 'discovery.servers'
- // has been defined
- boolean doConversion = false;
-
- // contains "ueb.source.topics" or "dmaap.source.topics"
- String sourceTopicsName = null;
-
- // contains ".source.topics." ( = ueb|dmaap)
- String sourcePrefix = null;
-
- // contains "ueb.sink.topics" or "dmaap.sink.topics"
- String sinkTopicsName = null;
-
- // contains ".sink.topics." ( = ueb|dmaap)
- String sinkPrefix = null;
-
- /**
- * Constructor - decide whether we are going to do conversion or not,
- * and initialize accordingly.
- *
- * @param prop the initial list of properties
- */
- PropBuilder(Properties prop) {
- this.prop = new Properties(prop);
- this.topic = prop.getProperty(DISCOVERY_TOPIC);
- String servers = prop.getProperty(DISCOVERY_SERVERS);
- if (topic != null && servers != null) {
- // we do have property conversion to do
- doConversion = true;
- String type = topic.contains(".") ? "dmaap" : "ueb";
- sourceTopicsName = type + ".source.topics";
- sourcePrefix = sourceTopicsName + "." + topic;
- sinkTopicsName = type + ".sink.topics";
- sinkPrefix = sinkTopicsName + "." + topic;
- }
- }
-
- /**
- * If we are doing conversion, convert an internal property
- * to something that 'TopicEndpointManager' can use.
- *
- * @param intName server pool property name (e.g. "discovery.servers")
- * @param defaultValue value to use if property 'intName' is not specified
- * @param extSuffix TopicEndpointManager suffix, including leading "."
- */
- void convert(String intName, String defaultValue, String extSuffix) {
- if (doConversion) {
- String value = prop.getProperty(intName, defaultValue);
- if (value != null) {
- prop.setProperty(sourcePrefix + extSuffix, value);
- prop.setProperty(sinkPrefix + extSuffix, value);
- }
- }
- }
-
- /**
- * Generate/update the '*.source.topics' and '*.sink.topics' parameters.
- *
- * @return the updated properties list
- */
- Properties finish() {
- if (doConversion) {
- String currentValue = prop.getProperty(sourceTopicsName);
- if (currentValue == null) {
- // '*.source.topics' is not defined -- set it
- prop.setProperty(sourceTopicsName, topic);
- } else {
- // '*.source.topics' is defined -- append to it
- prop.setProperty(sourceTopicsName, currentValue + "," + topic);
- }
- currentValue = prop.getProperty(sinkTopicsName);
- if (currentValue == null) {
- // '*.sink.topics' is not defined -- set it
- prop.setProperty(sinkTopicsName, topic);
- } else {
- // '*.sink.topics' is defined -- append to it
- prop.setProperty(sinkTopicsName, currentValue + "," + topic);
- }
- }
- return prop;
- }
- }
-
- /* ============================================================ */
-
- /**
- * This is the sender thread, which periodically sends out 'ping' messages.
- */
- private class Publisher extends Thread {
- /**
- * Constructor -- read in the properties, and initialze 'publisher'.
- */
- Publisher() {
- super("Discovery Publisher Thread");
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void run() {
- // this loop will terminate once 'publisher' is set to 'null',
- // or some other 'Publisher' instance replaces it
- long cycleTime = getProperty(DISCOVER_PUBLISHER_LOOP_CYCLE_TIME,
- DEFAULT_DISCOVER_PUBLISHER_LOOP_CYCLE_TIME);
- while (this == publisherThread) {
- try {
- // wait 5 seconds (default)
- Thread.sleep(cycleTime);
-
- // generate a 'ping' message
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- DataOutputStream dos = new DataOutputStream(bos);
-
- // write the 'ping' data for this server
- Server thisServer = Server.getThisServer();
- thisServer.writeServerData(dos);
- String encodedData =
- new String(Base64.getEncoder().encode(bos.toByteArray()));
-
- // base64-encoded value is passed as JSON parameter 'pingData'
- LinkedHashMap map = new LinkedHashMap<>();
- map.put("pingData", encodedData);
- String jsonString = new Gson().toJson(map, Map.class);
- for (TopicSink publisher : publishers) {
- publisher.send(jsonString);
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- logger.error("Exception in Discovery.Publisher.run():", e);
- return;
- } catch (Exception e) {
- logger.error("Exception in Discovery.Publisher.run():", e);
- // grace period -- we don't want to get UEB upset at us
- try {
- Thread.sleep(15000);
- } catch (InterruptedException e2) {
- Thread.currentThread().interrupt();
- logger.error("Discovery.Publisher sleep interrupted");
- }
- return;
- }
- }
- }
- }
-}
diff --git a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Events.java b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Events.java
deleted file mode 100644
index d2ea1a5c..00000000
--- a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/Events.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * feature-server-pool
- * ================================================================================
- * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.serverpool;
-
-import java.util.Collection;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-/**
- * This interface is used to distribute notifications of various system
- * events, such as new 'Server' instances, or a server failing.
- */
-public class Events {
- // set of listeners receiving event notifications
- private static final Queue listeners =
- new ConcurrentLinkedQueue<>();
-
- /**
- * add a listener to the set of listeners receiving events.
- *
- * @param handler the listener
- */
- public static void register(Events handler) {
- // if it is already here, remove it first
- listeners.remove(handler);
-
- // add to the end of the queue
- listeners.add(handler);
- }
-
- /**
- * remove a listener from the set of listeners.
- */
- public static boolean unregister(Events handler) {
- return listeners.remove(handler);
- }
-
- public static Collection getListeners() {
- return listeners;
- }
-
- /* ============================================================ */
-
- /**
- * Notification that a new server has been discovered.
- *
- * @param server this is the new server
- */
- public void newServer(Server server) {
- // do nothing
- }
-
- /**
- * Notification that a server has failed.
- *
- * @param server this is the server that failed
- */
- public void serverFailed(Server server) {
- // do nothing
- }
-
- /**
- * Notification that a new lead server has been selected.
- *
- * @param server this is the new lead server
- */
- public void newLeader(Server server) {
- // do nothing
- }
-
- /**
- * Notification that the lead server has gone down.
- *
- * @param server the lead server that failed
- */
- public void leaderFailed(Server server) {
- // do nothing
- }
-
- /**
- * Notification that a new selection just completed, but the same
- * leader has been chosen (this may be in response to a new server
- * joining earlier).
- *
- * @param server the current leader, which has been confirmed
- */
- public void leaderConfirmed(Server server) {
- // do nothing
- }
-}
diff --git a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/ExtendedObjectInputStream.java b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/ExtendedObjectInputStream.java
deleted file mode 100644
index 5ec6f341..00000000
--- a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/ExtendedObjectInputStream.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * feature-server-pool
- * ================================================================================
- * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.serverpool;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.ObjectInputStream;
-import java.io.ObjectStreamClass;
-
-/**
- * This class provides an 'ObjectInputStream' variant that uses the
- * specified 'ClassLoader' instance.
- */
-public class ExtendedObjectInputStream extends ObjectInputStream {
- // the 'ClassLoader' to use when doing class lookups
- private ClassLoader classLoader;
-
- /**
- * Constructor -- invoke the superclass, and save the 'ClassLoader'.
- *
- * @param in input stream to read from
- * @param classLoader 'ClassLoader' to use when doing class lookups
- */
- public ExtendedObjectInputStream(InputStream in, ClassLoader classLoader) throws IOException {
- super(in);
- this.classLoader = classLoader;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- protected Class> resolveClass(ObjectStreamClass desc) throws IOException, ClassNotFoundException {
-
- // Standard ClassLoader implementations first attempt to load classes
- // via the parent class loader, and then attempt to load it using the
- // current class loader if that fails. For some reason, Drools container
- // class loaders define a different order -- in theory, this is only a
- // problem if different versions of the same class are accessible through
- // different class loaders, which is exactly what happens in some Junit
- // tests.
- //
- // This change restores the order, at least when deserializing objects
- // into a Drools container.
- try {
- // try the parent class loader first
- return classLoader.getParent().loadClass(desc.getName());
- } catch (ClassNotFoundException e) {
- return classLoader.loadClass(desc.getName());
- }
- }
-}
diff --git a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/FeatureServerPool.java b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/FeatureServerPool.java
deleted file mode 100644
index 064af79e..00000000
--- a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/FeatureServerPool.java
+++ /dev/null
@@ -1,1027 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * feature-server-pool
- * ================================================================================
- * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
- * Modifications Copyright (C) 2020 Nordix Foundation
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.serverpool;
-
-import static org.onap.policy.drools.serverpool.ServerPoolProperties.BUCKET_DROOLS_TIMEOUT;
-import static org.onap.policy.drools.serverpool.ServerPoolProperties.BUCKET_TIME_TO_LIVE;
-import static org.onap.policy.drools.serverpool.ServerPoolProperties.DEFAULT_BUCKET_DROOLS_TIMEOUT;
-import static org.onap.policy.drools.serverpool.ServerPoolProperties.DEFAULT_BUCKET_TIME_TO_LIVE;
-import static org.onap.policy.drools.serverpool.ServerPoolProperties.getProperty;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.Serializable;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Base64;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import javax.ws.rs.client.Entity;
-import javax.ws.rs.client.WebTarget;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-import lombok.AllArgsConstructor;
-import org.apache.commons.lang3.tuple.Pair;
-import org.kie.api.runtime.KieSession;
-import org.kie.api.runtime.rule.FactHandle;
-import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
-import org.onap.policy.common.endpoints.event.comm.TopicListener;
-import org.onap.policy.common.utils.coder.CoderException;
-import org.onap.policy.common.utils.coder.StandardCoder;
-import org.onap.policy.common.utils.coder.StandardCoderObject;
-import org.onap.policy.drools.control.api.DroolsPdpStateControlApi;
-import org.onap.policy.drools.core.DroolsRunnable;
-import org.onap.policy.drools.core.PolicyContainer;
-import org.onap.policy.drools.core.PolicySession;
-import org.onap.policy.drools.core.PolicySessionFeatureApi;
-import org.onap.policy.drools.core.lock.PolicyResourceLockManager;
-import org.onap.policy.drools.features.PolicyControllerFeatureApi;
-import org.onap.policy.drools.features.PolicyEngineFeatureApi;
-import org.onap.policy.drools.system.PolicyController;
-import org.onap.policy.drools.system.PolicyControllerConstants;
-import org.onap.policy.drools.system.PolicyEngine;
-import org.onap.policy.drools.system.PolicyEngineConstants;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- *
- * This class hooks the server pool implementation into DroolsPDP.
- *
- *
PolicyEngineFeatureApi
- the afterStart hook is where we initialize.
- *
PolicyControllerFeatureApi
- the beforeOffer hook is used to look
- * at incoming topic messages, and decide whether to process them
- * on this host, or forward to another host.
- *
- */
-public class FeatureServerPool
- implements PolicyEngineFeatureApi, PolicySessionFeatureApi,
- PolicyControllerFeatureApi, DroolsPdpStateControlApi {
- private static Logger logger =
- LoggerFactory.getLogger(FeatureServerPool.class);
-
- // used for JSON <-> String conversion
- private static StandardCoder coder = new StandardCoder();
-
- private static final String CONFIG_FILE =
- "config/feature-server-pool.properties";
-
- /*
- * Properties used when searching for keyword entries
- *
- * The following types are supported:
- *
- * 1) keyword..path=
- * 2) keyword.path=
- * 3) ueb.source.topics..keyword=
- * 4) ueb.source.topics.keyword=
- * 5) dmaap.source.topics..keyword=
- * 6) dmaap.source.topics.keyword=
- *
- * 1, 3, and 5 are functionally equivalent
- * 2, 4, and 6 are functionally equivalent
- */
-
- static final String KEYWORD_PROPERTY_START_1 = "keyword.";
- static final String KEYWORD_PROPERTY_END_1 = ".path";
- static final String KEYWORD_PROPERTY_START_2 = "ueb.source.topics.";
- static final String KEYWORD_PROPERTY_END_2 = ".keyword";
- static final String KEYWORD_PROPERTY_START_3 = "dmaap.source.topics.";
- static final String KEYWORD_PROPERTY_END_3 = ".keyword";
-
- /*
- * maps topic names to a keyword table derived from (above)
- *
- * Example : requestID,CommonHeader.RequestID
- *
- * Table generated from this example has length 2:
- * table 0 is "requestID"
- * table 1 is "CommonHeader", "RequestID"
- */
- private static HashMap topicToPaths = new HashMap<>();
-
- // this table is used for any topics that aren't in 'topicToPaths'
- private static String[][] defaultPaths = new String[0][];
-
- // extracted from properties
- private static long droolsTimeoutMillis;
- private static String timeToLiveSecond;
-
- // HTTP query parameters
- private static final String QP_KEYWORD = "keyword";
- private static final String QP_SESSION = "session";
- private static final String QP_BUCKET = "bucket";
- private static final String QP_TTL = "ttl";
- private static final String QP_CONTROLLER = "controller";
- private static final String QP_PROTOCOL = "protocol";
- private static final String QP_TOPIC = "topic";
-
- /* **************************** */
- /* 'OrderedService' interface */
- /* **************************** */
-
- /**
- * {@inheritDoc}
- */
- @Override
- public int getSequenceNumber() {
- // we need to make sure we have an early position in 'selectThreadModel'
- // (in case there is feature that provides a thread model)
- return -1000000;
- }
-
- /* ************************************ */
- /* 'PolicyEngineFeatureApi' interface */
- /* ************************************ */
-
- /**
- * {@inheritDoc}
- */
- @Override
- public boolean afterStart(PolicyEngine engine) {
- logger.info("Starting FeatureServerPool");
- Server.startup(CONFIG_FILE);
- TargetLock.startup();
- setDroolsTimeoutMillis(
- getProperty(BUCKET_DROOLS_TIMEOUT, DEFAULT_BUCKET_DROOLS_TIMEOUT));
- int intTimeToLive =
- getProperty(BUCKET_TIME_TO_LIVE, DEFAULT_BUCKET_TIME_TO_LIVE);
- setTimeToLiveSecond(String.valueOf(intTimeToLive));
- buildKeywordTable();
- Bucket.Backup.register(new DroolsSessionBackup());
- Bucket.Backup.register(new TargetLock.LockBackup());
- return false;
- }
-
- private static void setDroolsTimeoutMillis(long timeoutMs) {
- droolsTimeoutMillis = timeoutMs;
- }
-
- private static void setTimeToLiveSecond(String ttlSec) {
- timeToLiveSecond = ttlSec;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public PolicyResourceLockManager beforeCreateLockManager(
- PolicyEngine engine, Properties properties) {
-
- return TargetLock.getLockFactory();
- }
-
- /*=====================================*/
- /* 'PolicySessionFeatureApi' interface */
- /*=====================================*/
-
- /**
- * {@inheritDoc}
- */
- @Override
- public boolean insertDrools(final PolicySession session, final Object object) { // NOSONAR
- // sonar complained that the method always returns the same value. However,
- // we prefer the code be structured this way, thus disabled sonar
-
- final String keyword = Keyword.lookupKeyword(object);
- if (keyword == null) {
- // no keyword was found, so we process locally
- KieSession kieSession = session.getKieSession();
- if (kieSession != null) {
- kieSession.insert(object);
- }
- return true;
- }
-
- /*
- * 'keyword' determines the destination host,
- * which may be local or remote
- */
- Bucket.forwardAndProcess(keyword, new Bucket.Message() {
- @Override
- public void process() {
- // if we reach this point, we process locally
- KieSession kieSession = session.getKieSession();
- if (kieSession != null) {
- kieSession.insert(object);
- }
- }
-
- @Override
- public void sendToServer(Server server, int bucketNumber) {
- // this object needs to sent to a remote host --
- // first, serialize the object
- byte[] data = null;
- try {
- data = Util.serialize(object);
- } catch (IOException e) {
- logger.error("insertDrools: can't serialize object of {}",
- object.getClass(), e);
- return;
- }
-
- // construct the message to insert remotely
- Entity entity = Entity.entity(
- new String(Base64.getEncoder().encode(data), StandardCharsets.UTF_8),
- MediaType.APPLICATION_OCTET_STREAM_TYPE);
- server.post("session/insertDrools", entity,
- new Server.PostResponse() {
- @Override
- public WebTarget webTarget(WebTarget webTarget) {
- PolicyContainer pc = session.getPolicyContainer();
- String encodedSessionName =
- pc.getGroupId() + ":" + pc.getArtifactId() + ":"
- + session.getName();
-
- return webTarget
- .queryParam(QP_KEYWORD, keyword)
- .queryParam(QP_SESSION, encodedSessionName)
- .queryParam(QP_BUCKET, bucketNumber)
- .queryParam(QP_TTL, timeToLiveSecond);
- }
-
- @Override
- public void response(Response response) {
- logger.info("/session/insertDrools response code = {}",
- response.getStatus());
- }
- });
- }
- });
- return true;
- }
-
- /* **************************************** */
- /* 'PolicyControllerFeatureApi' interface */
- /* **************************************** */
-
- /**
- * This method is called from 'AggregatedPolicyController.onTopicEvent',
- * and provides a way to intercept the message before it is decoded and
- * delivered to a local Drools session.
- *
- * @param controller the PolicyController instance receiving the message
- * @param protocol communication infrastructure type
- * @param topic topic name
- * @param event event message as a string
- * @return 'false' if this event should be processed locally, or 'true'
- * if the message has been forwarded to a remote host, so local
- * processing should be bypassed
- */
- @Override
- public boolean beforeOffer(final PolicyController controller,
- final CommInfrastructure protocol,
- final String topic,
- final String event) {
- // choose the table, based upon the topic
- String[][] table = topicToPaths.getOrDefault(topic, defaultPaths);
-
- // build a JSON object from the event
- StandardCoderObject sco;
-
- try {
- sco = coder.decode(event, StandardCoderObject.class);
- } catch (CoderException e) {
- return false;
- }
- String keyword = null;
-
- for (String[] path : table) {
- /*
- * Each entry in 'table' is a String[] containing an encoding
- * of a possible keyword field. Suppose the value is 'a.b.c.d.e' --
- * 'path' would be encoded as 'String[] {"a", "b", "c", "d", "e"}'
- */
- String fieldName = path[path.length - 1];
- String conversionFunctionName = null;
- int index = fieldName.indexOf(':');
-
- if (index > 0) {
- conversionFunctionName = fieldName.substring(index + 1);
- fieldName = fieldName.substring(0, index);
- path = Arrays.copyOf(path, path.length);
- path[path.length - 1] = fieldName;
- }
- keyword = sco.getString((Object[]) path);
-
- if (keyword != null) {
- if (conversionFunctionName != null) {
- keyword = Keyword.convertKeyword(keyword, conversionFunctionName);
- }
- if (keyword != null) {
- break;
- }
- }
- }
-
- if (keyword == null) {
- // couldn't find any keywords -- just process this message locally
- logger.warn("Can't locate bucket keyword within message");
- return false;
- }
-
- /*
- * build a message object implementing the 'Bucket.Message' interface --
- * it will be processed locally, forwarded, or queued based upon the
- * current state.
- */
- TopicMessage message =
- new TopicMessage(keyword, controller, protocol, topic, event);
- int bucketNumber = Bucket.bucketNumber(keyword);
- if (Bucket.forward(bucketNumber, message)) {
- // message was queued or forwarded -- abort local processing
- return true;
- }
-
- /*
- * the bucket happens to be assigned to this server, and wasn't queued --
- * return 'false', so it will be processed locally
- */
- logger.info("Keyword={}, bucket={} -- owned by this server",
- keyword, bucketNumber);
- return false;
- }
-
- /**
- * Incoming topic message has been forwarded from a remote host.
- *
- * @param bucketNumber the bucket number calculated on the remote host
- * @param keyword the keyword associated with the message
- * @param controllerName the controller the message was directed to
- * on the remote host
- * @param protocol String value of the 'Topic.CommInfrastructure' value
- * (UEB, DMAAP, NOOP, or REST -- NOOP and REST shouldn't be used
- * here)
- * @param topic the UEB/DMAAP topic name
- * @param event this is the JSON message
- */
- static void topicMessage(
- int bucketNumber, String keyword, String controllerName,
- String protocol, String topic, String event) {
-
- // @formatter:off
- logger.info("Incoming topic message: Keyword={}, bucket={}\n"
- + " controller = {}\n"
- + " topic = {}",
- keyword, bucketNumber, controllerName, topic);
- // @formatter:on
-
- // locate the 'PolicyController'
- PolicyController controller = PolicyControllerConstants.getFactory().get(controllerName);
- if (controller == null) {
- /*
- * This controller existed on the sender's host, but doesn't exist
- * on the destination. This is a problem -- we are counting on all
- * hosts being configured with the same controllers.
- */
- logger.error("Can't locate controller '{}' for incoming topic message",
- controllerName);
- } else if (controller instanceof TopicListener) {
- /*
- * This is the destination host -- repeat the 'onTopicEvent'
- * method (the one that invoked 'beforeOffer' on the originating host).
- * Note that this message could be forwarded again if the sender's
- * bucket table was somehow different from ours -- perhaps there was
- * an update in progress.
- *
- * TBD: it would be nice to limit the number of hops, in case we
- * somehow have a loop.
- */
- ((TopicListener) controller).onTopicEvent(
- CommInfrastructure.valueOf(protocol), topic, event);
- } else {
- /*
- * This 'PolicyController' was also a 'TopicListener' on the sender's
- * host -- it isn't on this host, and we are counting on them being
- * config
- */
- logger.error("Controller {} is not a TopicListener", controllerName);
- }
- }
-
- /**
- * An incoming '/session/insertDrools' message was received.
- *
- * @param keyword the keyword associated with the incoming object
- * @param sessionName encoded session name(groupId:artifactId:droolsSession)
- * @param bucket the bucket associated with keyword
- * @param ttl similar to IP time-to-live -- it controls the number of hops
- * the message may take
- * @param data base64-encoded serialized data for the object
- */
- static void incomingInsertDrools(
- String keyword, String sessionName, int bucket, int ttl, byte[] data) {
-
- logger.info("Incoming insertDrools: keyword={}, session={}, bucket={}, ttl={}",
- keyword, sessionName, bucket, ttl);
-
- if (Bucket.isKeyOnThisServer(keyword)) {
- // process locally
-
- // [0]="" [1]="", [2]=""
- String[] nameSegments = sessionName.split(":");
-
- // locate the 'PolicyContainer' and 'PolicySession'
- PolicySession policySession = locatePolicySession(nameSegments);
-
- if (policySession == null) {
- logger.error("incomingInsertDrools: Can't find PolicySession={}",
- sessionName);
- } else {
- KieSession kieSession = policySession.getKieSession();
- if (kieSession != null) {
- try {
- // deserialization needs to use the correct class loader
- Object obj = Util.deserialize(
- Base64.getDecoder().decode(data),
- policySession.getPolicyContainer().getClassLoader());
- kieSession.insert(obj);
- } catch (IOException | ClassNotFoundException
- | IllegalArgumentException e) {
- logger.error("incomingInsertDrools: failed to read data "
- + "for session '{}'", sessionName, e);
- }
- }
- }
- } else {
- ttl -= 1;
- if (ttl > 0) {
- /*
- * This host is not the intended destination -- this could happen
- * if it was sent from another site. Forward the message in the
- * same thread.
- */
- forwardInsertDroolsMessage(bucket, keyword, sessionName, ttl, data);
- }
- }
- }
-
- /**
- * step through all 'PolicyContainer' instances looking
- * for a matching 'artifactId' & 'groupId'.
- * @param nameSegments name portion from sessionName
- * @return policySession match artifactId and groupId
- */
- private static PolicySession locatePolicySession(String[] nameSegments) {
- PolicySession policySession = null;
- if (nameSegments.length == 3) {
- for (PolicyContainer pc : PolicyContainer.getPolicyContainers()) {
- if (nameSegments[1].equals(pc.getArtifactId())
- && nameSegments[0].equals(pc.getGroupId())) {
- policySession = pc.getPolicySession(nameSegments[2]);
- break;
- }
- }
- }
- return policySession;
- }
-
- /**
- * Forward the insertDrools message in the same thread.
- */
- private static void forwardInsertDroolsMessage(int bucket, String keyword,
- String sessionName, int ttl, byte[] data) {
- Server server = Bucket.bucketToServer(bucket);
- WebTarget webTarget = server.getWebTarget("session/insertDrools");
- if (webTarget != null) {
- logger.info("Forwarding 'session/insertDrools' "
- + "(key={},session={},bucket={},ttl={})",
- keyword, sessionName, bucket, ttl);
- Entity entity =
- Entity.entity(new String(data, StandardCharsets.UTF_8),
- MediaType.APPLICATION_OCTET_STREAM_TYPE);
- webTarget
- .queryParam(QP_KEYWORD, keyword)
- .queryParam(QP_SESSION, sessionName)
- .queryParam(QP_BUCKET, bucket)
- .queryParam(QP_TTL, ttl)
- .request().post(entity);
- }
- }
-
- /**
- * This method builds the table that is used to locate the appropriate
- * keywords within incoming JSON messages (e.g. 'requestID'). The
- * associated values are then mapped into bucket numbers.
- */
- private static void buildKeywordTable() {
- Properties prop = ServerPoolProperties.getProperties();
-
- // iterate over all of the properties, picking out those we are
- // interested in
- for (String name : prop.stringPropertyNames()) {
- String topic = null;
- String begin;
- String end;
-
- if (name.startsWith(KEYWORD_PROPERTY_START_1)
- && name.endsWith(KEYWORD_PROPERTY_END_1)) {
- // 1) keyword..path=
- // 2) keyword.path=
- begin = KEYWORD_PROPERTY_START_1;
- end = KEYWORD_PROPERTY_END_1;
- } else if (name.startsWith(KEYWORD_PROPERTY_START_2)
- && name.endsWith(KEYWORD_PROPERTY_END_2)) {
- // 3) ueb.source.topics..keyword=
- // 4) ueb.source.topics.keyword=
- begin = KEYWORD_PROPERTY_START_2;
- end = KEYWORD_PROPERTY_END_2;
- } else if (name.startsWith(KEYWORD_PROPERTY_START_3)
- && name.endsWith(KEYWORD_PROPERTY_END_3)) {
- // 5) dmaap.source.topics..keyword=
- // 6) dmaap.source.topics.keyword=
- begin = KEYWORD_PROPERTY_START_3;
- end = KEYWORD_PROPERTY_END_3;
- } else {
- // we aren't interested in this property
- continue;
- }
-
- topic = detmTopic(name, begin, end);
-
- // now, process the value
- // Example: requestID,CommonHeader.RequestID
- String[][] paths = splitPaths(prop, name);
-
- if (topic == null) {
- // these paths are used for any topics not explicitly
- // in the 'topicToPaths' table
- defaultPaths = paths;
- } else {
- // these paths are specific to 'topic'
- topicToPaths.put(topic, paths);
- }
- }
- }
-
- private static String detmTopic(String name, String begin, String end) {
- int beginIndex = begin.length();
- int endIndex = name.length() - end.length();
- if (beginIndex < endIndex) {
- // is specified, so this table is limited to this
- // specific topic
- return name.substring(beginIndex, endIndex);
- }
-
- return null;
- }
-
- private static String[][] splitPaths(Properties prop, String name) {
- String[] commaSeparatedEntries = prop.getProperty(name).split(",");
- String[][] paths = new String[commaSeparatedEntries.length][];
- for (int i = 0; i < commaSeparatedEntries.length; i += 1) {
- paths[i] = commaSeparatedEntries[i].split("\\.");
- }
-
- return paths;
- }
-
- /*======================================*/
- /* 'DroolsPdpStateControlApi' interface */
- /*======================================*/
-
- /*
- * Stop the processing of messages and server pool participation(non-Javadoc)
- * Note: This is not static because it should only be used if feature-server-pool
- * has been enabled.
- * (non-Javadoc)
- * @see org.onap.policy.drools.control.api.DroolsPdpStateControlApi#shutdown()
- */
- @Override
- public void shutdown() {
- PolicyEngineConstants.getManager().deactivate();
- Server.shutdown();
- }
-
- /*
- * Stop the processing of messages and server pool participation(non-Javadoc)
- * Note: This is not static because it should only be used if feature-server-pool
- * has been enabled.
- * (non-Javadoc)
- * @see org.onap.policy.drools.control.api.DroolsPdpStateControlApi#restart()
- */
- @Override
- public void restart() {
- MainLoop.startThread();
- Discovery.startDiscovery();
- PolicyEngineConstants.getManager().activate();
- }
-
- /* ============================================================ */
-
- /**
- * This class implements the 'Bucket.Message' interface for UEB/DMAAP
- * messages.
- */
- @AllArgsConstructor
- private static class TopicMessage implements Bucket.Message {
- /*
- * the keyword associated with this message
- * (which determines the bucket number).
- */
- private final String keyword;
-
- // the controller receiving this message
- private final PolicyController controller;
-
- // enumeration: UEB or DMAAP
- private final CommInfrastructure protocol;
-
- // UEB/DMAAP topic
- private final String topic;
-
- // JSON message as a String
- private final String event;
-
- /**
- * Process this message locally using 'TopicListener.onTopicEvent'
- * (the 'PolicyController' instance is assumed to implement
- * the 'TopicListener' interface as well).
- */
- @Override
- public void process() {
- if (controller instanceof TopicListener) {
- /*
- * This is the destination host -- repeat the 'onTopicEvent' method
- * (the one that invoked 'beforeOffer' on the originating host).
- * Note that this message could be forwarded again if the sender's
- * bucket table was somehow different from ours -- perhaps there was
- * an update in progress.
- *
- * TBD: it would be nice to limit the number of hops, in case we
- * somehow have a loop.
- */
- ((TopicListener) controller).onTopicEvent(protocol, topic, event);
- } else {
- /*
- * This 'PolicyController' was also a 'TopicListener' on the sender's
- * host -- it isn't on this host, and we are counting on them being
- * configured the same way.
- */
- logger.error("Controller {} is not a TopicListener",
- controller.getName());
- }
- }
-
- /**
- * Send this message to a remote server for processing (presumably, it
- * is the destination host).
- *
- * @param server the Server instance to send the message to
- * @param bucketNumber the bucket number to send it to
- */
- @Override
- public void sendToServer(Server server, int bucketNumber) {
- // if we reach this point, we have determined the remote server
- // that should process this message
-
- // @formatter:off
- logger.info("Outgoing topic message: Keyword={}, bucket={}\n"
- + " controller = {}"
- + " topic = {}"
- + " sender = {}"
- + " receiver = {}",
- keyword, bucketNumber, controller.getName(), topic,
- Server.getThisServer().getUuid(), server.getUuid());
- // @formatter:on
-
- Entity entity = Entity.entity(event, MediaType.APPLICATION_JSON);
- server.post("bucket/topic", entity, new Server.PostResponse() {
- @Override
- public WebTarget webTarget(WebTarget webTarget) {
- return webTarget
- .queryParam(QP_BUCKET, bucketNumber)
- .queryParam(QP_KEYWORD, keyword)
- .queryParam(QP_CONTROLLER, controller.getName())
- .queryParam(QP_PROTOCOL, protocol.toString())
- .queryParam(QP_TOPIC, topic);
- }
-
- @Override
- public void response(Response response) {
- // log a message indicating success/failure
- int status = response.getStatus();
- if (status >= 200 && status <= 299) {
- logger.info("/bucket/topic response code = {}", status);
- } else {
- logger.error("/bucket/topic response code = {}", status);
- }
- }
- });
- }
- }
-
- /* ============================================================ */
-
- /**
- * Backup data associated with a Drools session.
- */
- static class DroolsSessionBackup implements Bucket.Backup {
- /**
- * {@inheritDoc}
- */
- @Override
- public Bucket.Restore generate(int bucketNumber) {
- // Go through all of the Drools sessions, and generate backup data.
- // If there is no data to backup for this bucket, return 'null'
-
- DroolsSessionRestore restore = new DroolsSessionRestore();
- return restore.backup(bucketNumber) ? restore : null;
- }
- }
-
- /* ============================================================ */
-
- /**
- * This class is used to generate and restore backup Drools data.
- */
- static class DroolsSessionRestore implements Bucket.Restore, Serializable {
- private static final long serialVersionUID = 1L;
-
- // backup data for all Drools sessions on this host
- private final List sessions = new LinkedList<>();
-
- /**
- * {@inheritDoc}
- */
- boolean backup(int bucketNumber) {
- /*
- * There may be multiple Drools sessions being backed up at the same
- * time. There is one 'Pair' in the list for each session being
- * backed up.
- */
- LinkedList>, PolicySession>>
- pendingData = new LinkedList<>();
- for (PolicyContainer pc : PolicyContainer.getPolicyContainers()) {
- for (PolicySession session : pc.getPolicySessions()) {
- // Wraps list of objects, to be populated in the session
- final CompletableFuture> droolsObjectsWrapper =
- new CompletableFuture<>();
-
- // 'KieSessionObject'
- final KieSession kieSession = session.getKieSession();
-
- logger.info("{}: about to fetch data for session {}",
- this, session.getFullName());
- DroolsRunnable backupAndRemove = () -> {
- List