aboutsummaryrefslogtreecommitdiffstats
path: root/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/FeatureServerPool.java
diff options
context:
space:
mode:
Diffstat (limited to 'feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/FeatureServerPool.java')
-rw-r--r--feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/FeatureServerPool.java1027
1 files changed, 0 insertions, 1027 deletions
diff --git a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/FeatureServerPool.java b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/FeatureServerPool.java
deleted file mode 100644
index 064af79e..00000000
--- a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/FeatureServerPool.java
+++ /dev/null
@@ -1,1027 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * feature-server-pool
- * ================================================================================
- * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
- * Modifications Copyright (C) 2020 Nordix Foundation
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.policy.drools.serverpool;
-
-import static org.onap.policy.drools.serverpool.ServerPoolProperties.BUCKET_DROOLS_TIMEOUT;
-import static org.onap.policy.drools.serverpool.ServerPoolProperties.BUCKET_TIME_TO_LIVE;
-import static org.onap.policy.drools.serverpool.ServerPoolProperties.DEFAULT_BUCKET_DROOLS_TIMEOUT;
-import static org.onap.policy.drools.serverpool.ServerPoolProperties.DEFAULT_BUCKET_TIME_TO_LIVE;
-import static org.onap.policy.drools.serverpool.ServerPoolProperties.getProperty;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.Serializable;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Base64;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import javax.ws.rs.client.Entity;
-import javax.ws.rs.client.WebTarget;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-import lombok.AllArgsConstructor;
-import org.apache.commons.lang3.tuple.Pair;
-import org.kie.api.runtime.KieSession;
-import org.kie.api.runtime.rule.FactHandle;
-import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
-import org.onap.policy.common.endpoints.event.comm.TopicListener;
-import org.onap.policy.common.utils.coder.CoderException;
-import org.onap.policy.common.utils.coder.StandardCoder;
-import org.onap.policy.common.utils.coder.StandardCoderObject;
-import org.onap.policy.drools.control.api.DroolsPdpStateControlApi;
-import org.onap.policy.drools.core.DroolsRunnable;
-import org.onap.policy.drools.core.PolicyContainer;
-import org.onap.policy.drools.core.PolicySession;
-import org.onap.policy.drools.core.PolicySessionFeatureApi;
-import org.onap.policy.drools.core.lock.PolicyResourceLockManager;
-import org.onap.policy.drools.features.PolicyControllerFeatureApi;
-import org.onap.policy.drools.features.PolicyEngineFeatureApi;
-import org.onap.policy.drools.system.PolicyController;
-import org.onap.policy.drools.system.PolicyControllerConstants;
-import org.onap.policy.drools.system.PolicyEngine;
-import org.onap.policy.drools.system.PolicyEngineConstants;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * </p>
- * This class hooks the server pool implementation into DroolsPDP.
- * <dl>
- * <dt>PolicyEngineFeatureApi</dt><dd> - the <i>afterStart</i> hook is where we initialize.</dd>
- * <dt>PolicyControllerFeatureApi</dt><dd> - the <i>beforeOffer</i> hook is used to look
- * at incoming topic messages, and decide whether to process them
- * on this host, or forward to another host.</dd>
- * </dl>
- */
-public class FeatureServerPool
- implements PolicyEngineFeatureApi, PolicySessionFeatureApi,
- PolicyControllerFeatureApi, DroolsPdpStateControlApi {
- private static Logger logger =
- LoggerFactory.getLogger(FeatureServerPool.class);
-
- // used for JSON <-> String conversion
- private static StandardCoder coder = new StandardCoder();
-
- private static final String CONFIG_FILE =
- "config/feature-server-pool.properties";
-
- /*
- * Properties used when searching for keyword entries
- *
- * The following types are supported:
- *
- * 1) keyword.<topic>.path=<field-list>
- * 2) keyword.path=<field-list>
- * 3) ueb.source.topics.<topic>.keyword=<field-list>
- * 4) ueb.source.topics.keyword=<field-list>
- * 5) dmaap.source.topics.<topic>.keyword=<field-list>
- * 6) dmaap.source.topics.keyword=<field-list>
- *
- * 1, 3, and 5 are functionally equivalent
- * 2, 4, and 6 are functionally equivalent
- */
-
- static final String KEYWORD_PROPERTY_START_1 = "keyword.";
- static final String KEYWORD_PROPERTY_END_1 = ".path";
- static final String KEYWORD_PROPERTY_START_2 = "ueb.source.topics.";
- static final String KEYWORD_PROPERTY_END_2 = ".keyword";
- static final String KEYWORD_PROPERTY_START_3 = "dmaap.source.topics.";
- static final String KEYWORD_PROPERTY_END_3 = ".keyword";
-
- /*
- * maps topic names to a keyword table derived from <field-list> (above)
- *
- * Example <field-list>: requestID,CommonHeader.RequestID
- *
- * Table generated from this example has length 2:
- * table 0 is "requestID"
- * table 1 is "CommonHeader", "RequestID"
- */
- private static HashMap<String, String[][]> topicToPaths = new HashMap<>();
-
- // this table is used for any topics that aren't in 'topicToPaths'
- private static String[][] defaultPaths = new String[0][];
-
- // extracted from properties
- private static long droolsTimeoutMillis;
- private static String timeToLiveSecond;
-
- // HTTP query parameters
- private static final String QP_KEYWORD = "keyword";
- private static final String QP_SESSION = "session";
- private static final String QP_BUCKET = "bucket";
- private static final String QP_TTL = "ttl";
- private static final String QP_CONTROLLER = "controller";
- private static final String QP_PROTOCOL = "protocol";
- private static final String QP_TOPIC = "topic";
-
- /* **************************** */
- /* 'OrderedService' interface */
- /* **************************** */
-
- /**
- * {@inheritDoc}
- */
- @Override
- public int getSequenceNumber() {
- // we need to make sure we have an early position in 'selectThreadModel'
- // (in case there is feature that provides a thread model)
- return -1000000;
- }
-
- /* ************************************ */
- /* 'PolicyEngineFeatureApi' interface */
- /* ************************************ */
-
- /**
- * {@inheritDoc}
- */
- @Override
- public boolean afterStart(PolicyEngine engine) {
- logger.info("Starting FeatureServerPool");
- Server.startup(CONFIG_FILE);
- TargetLock.startup();
- setDroolsTimeoutMillis(
- getProperty(BUCKET_DROOLS_TIMEOUT, DEFAULT_BUCKET_DROOLS_TIMEOUT));
- int intTimeToLive =
- getProperty(BUCKET_TIME_TO_LIVE, DEFAULT_BUCKET_TIME_TO_LIVE);
- setTimeToLiveSecond(String.valueOf(intTimeToLive));
- buildKeywordTable();
- Bucket.Backup.register(new DroolsSessionBackup());
- Bucket.Backup.register(new TargetLock.LockBackup());
- return false;
- }
-
- private static void setDroolsTimeoutMillis(long timeoutMs) {
- droolsTimeoutMillis = timeoutMs;
- }
-
- private static void setTimeToLiveSecond(String ttlSec) {
- timeToLiveSecond = ttlSec;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public PolicyResourceLockManager beforeCreateLockManager(
- PolicyEngine engine, Properties properties) {
-
- return TargetLock.getLockFactory();
- }
-
- /*=====================================*/
- /* 'PolicySessionFeatureApi' interface */
- /*=====================================*/
-
- /**
- * {@inheritDoc}
- */
- @Override
- public boolean insertDrools(final PolicySession session, final Object object) { // NOSONAR
- // sonar complained that the method always returns the same value. However,
- // we prefer the code be structured this way, thus disabled sonar
-
- final String keyword = Keyword.lookupKeyword(object);
- if (keyword == null) {
- // no keyword was found, so we process locally
- KieSession kieSession = session.getKieSession();
- if (kieSession != null) {
- kieSession.insert(object);
- }
- return true;
- }
-
- /*
- * 'keyword' determines the destination host,
- * which may be local or remote
- */
- Bucket.forwardAndProcess(keyword, new Bucket.Message() {
- @Override
- public void process() {
- // if we reach this point, we process locally
- KieSession kieSession = session.getKieSession();
- if (kieSession != null) {
- kieSession.insert(object);
- }
- }
-
- @Override
- public void sendToServer(Server server, int bucketNumber) {
- // this object needs to sent to a remote host --
- // first, serialize the object
- byte[] data = null;
- try {
- data = Util.serialize(object);
- } catch (IOException e) {
- logger.error("insertDrools: can't serialize object of {}",
- object.getClass(), e);
- return;
- }
-
- // construct the message to insert remotely
- Entity<String> entity = Entity.entity(
- new String(Base64.getEncoder().encode(data), StandardCharsets.UTF_8),
- MediaType.APPLICATION_OCTET_STREAM_TYPE);
- server.post("session/insertDrools", entity,
- new Server.PostResponse() {
- @Override
- public WebTarget webTarget(WebTarget webTarget) {
- PolicyContainer pc = session.getPolicyContainer();
- String encodedSessionName =
- pc.getGroupId() + ":" + pc.getArtifactId() + ":"
- + session.getName();
-
- return webTarget
- .queryParam(QP_KEYWORD, keyword)
- .queryParam(QP_SESSION, encodedSessionName)
- .queryParam(QP_BUCKET, bucketNumber)
- .queryParam(QP_TTL, timeToLiveSecond);
- }
-
- @Override
- public void response(Response response) {
- logger.info("/session/insertDrools response code = {}",
- response.getStatus());
- }
- });
- }
- });
- return true;
- }
-
- /* **************************************** */
- /* 'PolicyControllerFeatureApi' interface */
- /* **************************************** */
-
- /**
- * This method is called from 'AggregatedPolicyController.onTopicEvent',
- * and provides a way to intercept the message before it is decoded and
- * delivered to a local Drools session.
- *
- * @param controller the PolicyController instance receiving the message
- * @param protocol communication infrastructure type
- * @param topic topic name
- * @param event event message as a string
- * @return 'false' if this event should be processed locally, or 'true'
- * if the message has been forwarded to a remote host, so local
- * processing should be bypassed
- */
- @Override
- public boolean beforeOffer(final PolicyController controller,
- final CommInfrastructure protocol,
- final String topic,
- final String event) {
- // choose the table, based upon the topic
- String[][] table = topicToPaths.getOrDefault(topic, defaultPaths);
-
- // build a JSON object from the event
- StandardCoderObject sco;
-
- try {
- sco = coder.decode(event, StandardCoderObject.class);
- } catch (CoderException e) {
- return false;
- }
- String keyword = null;
-
- for (String[] path : table) {
- /*
- * Each entry in 'table' is a String[] containing an encoding
- * of a possible keyword field. Suppose the value is 'a.b.c.d.e' --
- * 'path' would be encoded as 'String[] {"a", "b", "c", "d", "e"}'
- */
- String fieldName = path[path.length - 1];
- String conversionFunctionName = null;
- int index = fieldName.indexOf(':');
-
- if (index > 0) {
- conversionFunctionName = fieldName.substring(index + 1);
- fieldName = fieldName.substring(0, index);
- path = Arrays.copyOf(path, path.length);
- path[path.length - 1] = fieldName;
- }
- keyword = sco.getString((Object[]) path);
-
- if (keyword != null) {
- if (conversionFunctionName != null) {
- keyword = Keyword.convertKeyword(keyword, conversionFunctionName);
- }
- if (keyword != null) {
- break;
- }
- }
- }
-
- if (keyword == null) {
- // couldn't find any keywords -- just process this message locally
- logger.warn("Can't locate bucket keyword within message");
- return false;
- }
-
- /*
- * build a message object implementing the 'Bucket.Message' interface --
- * it will be processed locally, forwarded, or queued based upon the
- * current state.
- */
- TopicMessage message =
- new TopicMessage(keyword, controller, protocol, topic, event);
- int bucketNumber = Bucket.bucketNumber(keyword);
- if (Bucket.forward(bucketNumber, message)) {
- // message was queued or forwarded -- abort local processing
- return true;
- }
-
- /*
- * the bucket happens to be assigned to this server, and wasn't queued --
- * return 'false', so it will be processed locally
- */
- logger.info("Keyword={}, bucket={} -- owned by this server",
- keyword, bucketNumber);
- return false;
- }
-
- /**
- * Incoming topic message has been forwarded from a remote host.
- *
- * @param bucketNumber the bucket number calculated on the remote host
- * @param keyword the keyword associated with the message
- * @param controllerName the controller the message was directed to
- * on the remote host
- * @param protocol String value of the 'Topic.CommInfrastructure' value
- * (UEB, DMAAP, NOOP, or REST -- NOOP and REST shouldn't be used
- * here)
- * @param topic the UEB/DMAAP topic name
- * @param event this is the JSON message
- */
- static void topicMessage(
- int bucketNumber, String keyword, String controllerName,
- String protocol, String topic, String event) {
-
- // @formatter:off
- logger.info("Incoming topic message: Keyword={}, bucket={}\n"
- + " controller = {}\n"
- + " topic = {}",
- keyword, bucketNumber, controllerName, topic);
- // @formatter:on
-
- // locate the 'PolicyController'
- PolicyController controller = PolicyControllerConstants.getFactory().get(controllerName);
- if (controller == null) {
- /*
- * This controller existed on the sender's host, but doesn't exist
- * on the destination. This is a problem -- we are counting on all
- * hosts being configured with the same controllers.
- */
- logger.error("Can't locate controller '{}' for incoming topic message",
- controllerName);
- } else if (controller instanceof TopicListener) {
- /*
- * This is the destination host -- repeat the 'onTopicEvent'
- * method (the one that invoked 'beforeOffer' on the originating host).
- * Note that this message could be forwarded again if the sender's
- * bucket table was somehow different from ours -- perhaps there was
- * an update in progress.
- *
- * TBD: it would be nice to limit the number of hops, in case we
- * somehow have a loop.
- */
- ((TopicListener) controller).onTopicEvent(
- CommInfrastructure.valueOf(protocol), topic, event);
- } else {
- /*
- * This 'PolicyController' was also a 'TopicListener' on the sender's
- * host -- it isn't on this host, and we are counting on them being
- * config
- */
- logger.error("Controller {} is not a TopicListener", controllerName);
- }
- }
-
- /**
- * An incoming '/session/insertDrools' message was received.
- *
- * @param keyword the keyword associated with the incoming object
- * @param sessionName encoded session name(groupId:artifactId:droolsSession)
- * @param bucket the bucket associated with keyword
- * @param ttl similar to IP time-to-live -- it controls the number of hops
- * the message may take
- * @param data base64-encoded serialized data for the object
- */
- static void incomingInsertDrools(
- String keyword, String sessionName, int bucket, int ttl, byte[] data) {
-
- logger.info("Incoming insertDrools: keyword={}, session={}, bucket={}, ttl={}",
- keyword, sessionName, bucket, ttl);
-
- if (Bucket.isKeyOnThisServer(keyword)) {
- // process locally
-
- // [0]="<groupId>" [1]="<artifactId>", [2]="<sessionName>"
- String[] nameSegments = sessionName.split(":");
-
- // locate the 'PolicyContainer' and 'PolicySession'
- PolicySession policySession = locatePolicySession(nameSegments);
-
- if (policySession == null) {
- logger.error("incomingInsertDrools: Can't find PolicySession={}",
- sessionName);
- } else {
- KieSession kieSession = policySession.getKieSession();
- if (kieSession != null) {
- try {
- // deserialization needs to use the correct class loader
- Object obj = Util.deserialize(
- Base64.getDecoder().decode(data),
- policySession.getPolicyContainer().getClassLoader());
- kieSession.insert(obj);
- } catch (IOException | ClassNotFoundException
- | IllegalArgumentException e) {
- logger.error("incomingInsertDrools: failed to read data "
- + "for session '{}'", sessionName, e);
- }
- }
- }
- } else {
- ttl -= 1;
- if (ttl > 0) {
- /*
- * This host is not the intended destination -- this could happen
- * if it was sent from another site. Forward the message in the
- * same thread.
- */
- forwardInsertDroolsMessage(bucket, keyword, sessionName, ttl, data);
- }
- }
- }
-
- /**
- * step through all 'PolicyContainer' instances looking
- * for a matching 'artifactId' & 'groupId'.
- * @param nameSegments name portion from sessionName
- * @return policySession match artifactId and groupId
- */
- private static PolicySession locatePolicySession(String[] nameSegments) {
- PolicySession policySession = null;
- if (nameSegments.length == 3) {
- for (PolicyContainer pc : PolicyContainer.getPolicyContainers()) {
- if (nameSegments[1].equals(pc.getArtifactId())
- && nameSegments[0].equals(pc.getGroupId())) {
- policySession = pc.getPolicySession(nameSegments[2]);
- break;
- }
- }
- }
- return policySession;
- }
-
- /**
- * Forward the insertDrools message in the same thread.
- */
- private static void forwardInsertDroolsMessage(int bucket, String keyword,
- String sessionName, int ttl, byte[] data) {
- Server server = Bucket.bucketToServer(bucket);
- WebTarget webTarget = server.getWebTarget("session/insertDrools");
- if (webTarget != null) {
- logger.info("Forwarding 'session/insertDrools' "
- + "(key={},session={},bucket={},ttl={})",
- keyword, sessionName, bucket, ttl);
- Entity<String> entity =
- Entity.entity(new String(data, StandardCharsets.UTF_8),
- MediaType.APPLICATION_OCTET_STREAM_TYPE);
- webTarget
- .queryParam(QP_KEYWORD, keyword)
- .queryParam(QP_SESSION, sessionName)
- .queryParam(QP_BUCKET, bucket)
- .queryParam(QP_TTL, ttl)
- .request().post(entity);
- }
- }
-
- /**
- * This method builds the table that is used to locate the appropriate
- * keywords within incoming JSON messages (e.g. 'requestID'). The
- * associated values are then mapped into bucket numbers.
- */
- private static void buildKeywordTable() {
- Properties prop = ServerPoolProperties.getProperties();
-
- // iterate over all of the properties, picking out those we are
- // interested in
- for (String name : prop.stringPropertyNames()) {
- String topic = null;
- String begin;
- String end;
-
- if (name.startsWith(KEYWORD_PROPERTY_START_1)
- && name.endsWith(KEYWORD_PROPERTY_END_1)) {
- // 1) keyword.<topic>.path=<field-list>
- // 2) keyword.path=<field-list>
- begin = KEYWORD_PROPERTY_START_1;
- end = KEYWORD_PROPERTY_END_1;
- } else if (name.startsWith(KEYWORD_PROPERTY_START_2)
- && name.endsWith(KEYWORD_PROPERTY_END_2)) {
- // 3) ueb.source.topics.<topic>.keyword=<field-list>
- // 4) ueb.source.topics.keyword=<field-list>
- begin = KEYWORD_PROPERTY_START_2;
- end = KEYWORD_PROPERTY_END_2;
- } else if (name.startsWith(KEYWORD_PROPERTY_START_3)
- && name.endsWith(KEYWORD_PROPERTY_END_3)) {
- // 5) dmaap.source.topics.<topic>.keyword=<field-list>
- // 6) dmaap.source.topics.keyword=<field-list>
- begin = KEYWORD_PROPERTY_START_3;
- end = KEYWORD_PROPERTY_END_3;
- } else {
- // we aren't interested in this property
- continue;
- }
-
- topic = detmTopic(name, begin, end);
-
- // now, process the value
- // Example: requestID,CommonHeader.RequestID
- String[][] paths = splitPaths(prop, name);
-
- if (topic == null) {
- // these paths are used for any topics not explicitly
- // in the 'topicToPaths' table
- defaultPaths = paths;
- } else {
- // these paths are specific to 'topic'
- topicToPaths.put(topic, paths);
- }
- }
- }
-
- private static String detmTopic(String name, String begin, String end) {
- int beginIndex = begin.length();
- int endIndex = name.length() - end.length();
- if (beginIndex < endIndex) {
- // <topic> is specified, so this table is limited to this
- // specific topic
- return name.substring(beginIndex, endIndex);
- }
-
- return null;
- }
-
- private static String[][] splitPaths(Properties prop, String name) {
- String[] commaSeparatedEntries = prop.getProperty(name).split(",");
- String[][] paths = new String[commaSeparatedEntries.length][];
- for (int i = 0; i < commaSeparatedEntries.length; i += 1) {
- paths[i] = commaSeparatedEntries[i].split("\\.");
- }
-
- return paths;
- }
-
- /*======================================*/
- /* 'DroolsPdpStateControlApi' interface */
- /*======================================*/
-
- /*
- * Stop the processing of messages and server pool participation(non-Javadoc)
- * Note: This is not static because it should only be used if feature-server-pool
- * has been enabled.
- * (non-Javadoc)
- * @see org.onap.policy.drools.control.api.DroolsPdpStateControlApi#shutdown()
- */
- @Override
- public void shutdown() {
- PolicyEngineConstants.getManager().deactivate();
- Server.shutdown();
- }
-
- /*
- * Stop the processing of messages and server pool participation(non-Javadoc)
- * Note: This is not static because it should only be used if feature-server-pool
- * has been enabled.
- * (non-Javadoc)
- * @see org.onap.policy.drools.control.api.DroolsPdpStateControlApi#restart()
- */
- @Override
- public void restart() {
- MainLoop.startThread();
- Discovery.startDiscovery();
- PolicyEngineConstants.getManager().activate();
- }
-
- /* ============================================================ */
-
- /**
- * This class implements the 'Bucket.Message' interface for UEB/DMAAP
- * messages.
- */
- @AllArgsConstructor
- private static class TopicMessage implements Bucket.Message {
- /*
- * the keyword associated with this message
- * (which determines the bucket number).
- */
- private final String keyword;
-
- // the controller receiving this message
- private final PolicyController controller;
-
- // enumeration: UEB or DMAAP
- private final CommInfrastructure protocol;
-
- // UEB/DMAAP topic
- private final String topic;
-
- // JSON message as a String
- private final String event;
-
- /**
- * Process this message locally using 'TopicListener.onTopicEvent'
- * (the 'PolicyController' instance is assumed to implement
- * the 'TopicListener' interface as well).
- */
- @Override
- public void process() {
- if (controller instanceof TopicListener) {
- /*
- * This is the destination host -- repeat the 'onTopicEvent' method
- * (the one that invoked 'beforeOffer' on the originating host).
- * Note that this message could be forwarded again if the sender's
- * bucket table was somehow different from ours -- perhaps there was
- * an update in progress.
- *
- * TBD: it would be nice to limit the number of hops, in case we
- * somehow have a loop.
- */
- ((TopicListener) controller).onTopicEvent(protocol, topic, event);
- } else {
- /*
- * This 'PolicyController' was also a 'TopicListener' on the sender's
- * host -- it isn't on this host, and we are counting on them being
- * configured the same way.
- */
- logger.error("Controller {} is not a TopicListener",
- controller.getName());
- }
- }
-
- /**
- * Send this message to a remote server for processing (presumably, it
- * is the destination host).
- *
- * @param server the Server instance to send the message to
- * @param bucketNumber the bucket number to send it to
- */
- @Override
- public void sendToServer(Server server, int bucketNumber) {
- // if we reach this point, we have determined the remote server
- // that should process this message
-
- // @formatter:off
- logger.info("Outgoing topic message: Keyword={}, bucket={}\n"
- + " controller = {}"
- + " topic = {}"
- + " sender = {}"
- + " receiver = {}",
- keyword, bucketNumber, controller.getName(), topic,
- Server.getThisServer().getUuid(), server.getUuid());
- // @formatter:on
-
- Entity<String> entity = Entity.entity(event, MediaType.APPLICATION_JSON);
- server.post("bucket/topic", entity, new Server.PostResponse() {
- @Override
- public WebTarget webTarget(WebTarget webTarget) {
- return webTarget
- .queryParam(QP_BUCKET, bucketNumber)
- .queryParam(QP_KEYWORD, keyword)
- .queryParam(QP_CONTROLLER, controller.getName())
- .queryParam(QP_PROTOCOL, protocol.toString())
- .queryParam(QP_TOPIC, topic);
- }
-
- @Override
- public void response(Response response) {
- // log a message indicating success/failure
- int status = response.getStatus();
- if (status >= 200 && status <= 299) {
- logger.info("/bucket/topic response code = {}", status);
- } else {
- logger.error("/bucket/topic response code = {}", status);
- }
- }
- });
- }
- }
-
- /* ============================================================ */
-
- /**
- * Backup data associated with a Drools session.
- */
- static class DroolsSessionBackup implements Bucket.Backup {
- /**
- * {@inheritDoc}
- */
- @Override
- public Bucket.Restore generate(int bucketNumber) {
- // Go through all of the Drools sessions, and generate backup data.
- // If there is no data to backup for this bucket, return 'null'
-
- DroolsSessionRestore restore = new DroolsSessionRestore();
- return restore.backup(bucketNumber) ? restore : null;
- }
- }
-
- /* ============================================================ */
-
- /**
- * This class is used to generate and restore backup Drools data.
- */
- static class DroolsSessionRestore implements Bucket.Restore, Serializable {
- private static final long serialVersionUID = 1L;
-
- // backup data for all Drools sessions on this host
- private final List<SingleSession> sessions = new LinkedList<>();
-
- /**
- * {@inheritDoc}
- */
- boolean backup(int bucketNumber) {
- /*
- * There may be multiple Drools sessions being backed up at the same
- * time. There is one 'Pair' in the list for each session being
- * backed up.
- */
- LinkedList<Pair<CompletableFuture<List<Object>>, PolicySession>>
- pendingData = new LinkedList<>();
- for (PolicyContainer pc : PolicyContainer.getPolicyContainers()) {
- for (PolicySession session : pc.getPolicySessions()) {
- // Wraps list of objects, to be populated in the session
- final CompletableFuture<List<Object>> droolsObjectsWrapper =
- new CompletableFuture<>();
-
- // 'KieSessionObject'
- final KieSession kieSession = session.getKieSession();
-
- logger.info("{}: about to fetch data for session {}",
- this, session.getFullName());
- DroolsRunnable backupAndRemove = () -> {
- List<Object> droolsObjects = new ArrayList<>();
- for (FactHandle fh : kieSession.getFactHandles()) {
- Object obj = kieSession.getObject(fh);
- String keyword = Keyword.lookupKeyword(obj);
- if (keyword != null
- && Bucket.bucketNumber(keyword) == bucketNumber) {
- // bucket matches -- include this object
- droolsObjects.add(obj);
- /*
- * delete this factHandle from Drools memory
- * this classes are used in bucket migration,
- * so the delete is intentional.
- */
- kieSession.delete(fh);
- }
- }
-
- // send notification that object list is complete
- droolsObjectsWrapper.complete(droolsObjects);
- };
- kieSession.insert(backupAndRemove);
-
- // add pending operation to the list
- pendingData.add(Pair.of(droolsObjectsWrapper, session));
- }
- }
-
- /*
- * data copying can start as soon as we receive results
- * from pending sessions (there may not be any)
- */
- copyDataFromSession(pendingData);
- return !sessions.isEmpty();
- }
-
- /**
- * Copy data from pending sessions.
- * @param pendingData a list of policy sessions
- */
- private void copyDataFromSession(List<Pair<CompletableFuture<List<Object>>, PolicySession>>
- pendingData) {
- long endTime = System.currentTimeMillis() + droolsTimeoutMillis;
-
- for (Pair<CompletableFuture<List<Object>>, PolicySession> pair :
- pendingData) {
- PolicySession session = pair.getRight();
- long delay = endTime - System.currentTimeMillis();
- if (delay < 0) {
- /*
- * we have already reached the time limit, so we will
- * only process data that has already been received
- */
- delay = 0;
- }
- try {
- List<Object> droolsObjects =
- pair.getLeft().get(delay, TimeUnit.MILLISECONDS);
-
- // if we reach this point, session data read has completed
- logger.info("{}: session={}, got {} object(s)",
- this, session.getFullName(),
- droolsObjects.size());
- if (!droolsObjects.isEmpty()) {
- sessions.add(new SingleSession(session, droolsObjects));
- }
- } catch (TimeoutException e) {
- logger.error("{}: Timeout waiting for data from session {}",
- this, session.getFullName());
- } catch (Exception e) {
- logger.error("{}: Exception writing output data", this, e);
- }
- }
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public void restore(int bucketNumber) {
- /*
- * There may be multiple Drools sessions being restored at the same
- * time. There is one entry in 'sessionLatches' for each session
- * being restored.
- */
- LinkedList<CountDownLatch> sessionLatches = new LinkedList<>();
- for (SingleSession session : sessions) {
- try {
- CountDownLatch sessionLatch = session.restore();
- if (sessionLatch != null) {
- // there is a restore in progress -- add it to the list
- sessionLatches.add(sessionLatch);
- }
- } catch (IOException | ClassNotFoundException e) {
- logger.error("Exception in {}", this, e);
- }
- }
-
- // wait for all sessions to be updated
- try {
- for (CountDownLatch sessionLatch : sessionLatches) {
- if (!sessionLatch.await(droolsTimeoutMillis, TimeUnit.MILLISECONDS)) {
- logger.error("{}: timed out waiting for session latch", this);
- }
- }
- } catch (InterruptedException e) {
- logger.error("Exception in {}", this, e);
- Thread.currentThread().interrupt();
- }
- }
- }
-
- /* ============================================================ */
-
- /**
- * Each instance of this class corresponds to a Drools session that has
- * been backed up, or is being restored.
- */
- static class SingleSession implements Serializable {
- private static final long serialVersionUID = 1L;
-
- // the group id associated with the Drools container
- String groupId;
-
- // the artifact id associated with the Drools container
- String artifactId;
-
- // the session name within the Drools container
- String sessionName;
-
- // serialized data associated with this session (and bucket)
- byte[] data;
-
- /**
- * Constructor - initialize the 'SingleSession' instance, so it can
- * be serialized.
- *
- * @param session the Drools session being backed up
- * @param droolsObjects the Drools objects from this session associated
- * with the bucket currently being backed up
- */
- SingleSession(PolicySession session, List<Object> droolsObjects) throws IOException {
- // 'groupId' and 'artifactId' are set from the 'PolicyContainer'
- PolicyContainer pc = session.getPolicyContainer();
- groupId = pc.getGroupId();
- artifactId = pc.getArtifactId();
-
- // 'sessionName' is set from the 'PolicySession'
- sessionName = session.getName();
-
- /*
- * serialize the Drools objects -- we serialize them here, because they
- * need to be deserialized within the scope of the Drools session
- */
- data = Util.serialize(droolsObjects);
- }
-
- CountDownLatch restore() throws IOException, ClassNotFoundException {
- PolicySession session = null;
-
- // locate the 'PolicyContainer', and 'PolicySession'
- for (PolicyContainer pc : PolicyContainer.getPolicyContainers()) {
- if (artifactId.equals(pc.getArtifactId())
- && groupId.equals(pc.getGroupId())) {
- session = pc.getPolicySession(sessionName);
- return insertSessionData(session, new ByteArrayInputStream(data));
- }
- }
- logger.error("{}: unable to locate session name {}", this, sessionName);
- return null;
- }
-
- /**
- * Deserialize session data, and insert the objects into the session
- * from within the Drools session thread.
- *
- * @param session the associated PolicySession instance
- * @param bis the data to be deserialized
- * @return a CountDownLatch, which will indicate when the operation has
- * completed (null in case of failure)
- * @throws IOException IO errors while creating or reading from
- * the object stream
- * @throws ClassNotFoundException class not found during deserialization
- */
- private CountDownLatch insertSessionData(PolicySession session, ByteArrayInputStream bis)
- throws IOException, ClassNotFoundException {
- ClassLoader classLoader = session.getPolicyContainer().getClassLoader();
- ExtendedObjectInputStream ois =
- new ExtendedObjectInputStream(bis, classLoader);
-
- /*
- * associate the current thread with the session,
- * and deserialize
- */
- session.setPolicySession();
- Object obj = ois.readObject();
-
- if (obj instanceof List) {
- final List<?> droolsObjects = (List<?>) obj;
- logger.info("{}: session={}, got {} object(s)",
- this, session.getFullName(), droolsObjects.size());
-
- // signal when session update is complete
- final CountDownLatch sessionLatch = new CountDownLatch(1);
-
- // 'KieSession' object
- final KieSession kieSession = session.getKieSession();
-
- // run the following within the Drools session thread
- DroolsRunnable doRestore = () -> {
- try {
- /*
- * Insert all of the objects -- note that this is running
- * in the session thread, so no other rules can fire
- * until all of the objects are inserted.
- */
- for (Object droolsObj : droolsObjects) {
- kieSession.insert(droolsObj);
- }
- } finally {
- // send notification that the inserts have completed
- sessionLatch.countDown();
- }
- };
- kieSession.insert(doRestore);
- ois.close();
- return sessionLatch;
- } else {
- ois.close();
- logger.error("{}: Invalid session data for session={}, type={}",
- this, session.getFullName(), obj.getClass().getName());
- }
- return null;
- }
- }
-}