aboutsummaryrefslogtreecommitdiffstats
path: root/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/FeatureServerPool.java
diff options
context:
space:
mode:
Diffstat (limited to 'feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/FeatureServerPool.java')
-rw-r--r--feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/FeatureServerPool.java159
1 files changed, 90 insertions, 69 deletions
diff --git a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/FeatureServerPool.java b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/FeatureServerPool.java
index 748a38f3..dfe211ce 100644
--- a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/FeatureServerPool.java
+++ b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/FeatureServerPool.java
@@ -26,9 +26,6 @@ import static org.onap.policy.drools.serverpool.ServerPoolProperties.DEFAULT_BUC
import static org.onap.policy.drools.serverpool.ServerPoolProperties.DEFAULT_BUCKET_TIME_TO_LIVE;
import static org.onap.policy.drools.serverpool.ServerPoolProperties.getProperty;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.Serializable;
@@ -36,7 +33,6 @@ import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
-import java.util.Enumeration;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
@@ -53,8 +49,6 @@ import javax.ws.rs.core.Response;
import lombok.AllArgsConstructor;
-import org.drools.core.definitions.InternalKnowledgePackage;
-import org.drools.core.impl.KnowledgeBaseImpl;
import org.kie.api.runtime.KieSession;
import org.kie.api.runtime.rule.FactHandle;
import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
@@ -75,7 +69,6 @@ import org.onap.policy.drools.system.PolicyControllerConstants;
import org.onap.policy.drools.system.PolicyEngine;
import org.onap.policy.drools.system.PolicyEngineConstants;
import org.onap.policy.drools.utils.Pair;
-import org.onap.policy.drools.utils.PropertyUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -98,7 +91,7 @@ public class FeatureServerPool
// used for JSON <-> String conversion
private static StandardCoder coder = new StandardCoder();
- private static final String configFile =
+ private static final String CONFIG_FILE =
"config/feature-server-pool.properties";
/*
@@ -142,6 +135,15 @@ public class FeatureServerPool
private static long droolsTimeoutMillis;
private static String timeToLiveSecond;
+ // HTTP query parameters
+ private static final String QP_KEYWORD = "keyword";
+ private static final String QP_SESSION = "session";
+ private static final String QP_BUCKET = "bucket";
+ private static final String QP_TTL = "ttl";
+ private static final String QP_CONTROLLER = "controller";
+ private static final String QP_PROTOCOL = "protocol";
+ private static final String QP_TOPIC = "topic";
+
/******************************/
/* 'OrderedService' interface */
/******************************/
@@ -166,7 +168,7 @@ public class FeatureServerPool
@Override
public boolean afterStart(PolicyEngine engine) {
logger.info("Starting FeatureServerPool");
- Server.startup(configFile);
+ Server.startup(CONFIG_FILE);
TargetLock.startup();
droolsTimeoutMillis =
getProperty(BUCKET_DROOLS_TIMEOUT, DEFAULT_BUCKET_DROOLS_TIMEOUT);
@@ -251,10 +253,10 @@ public class FeatureServerPool
+ session.getName();
return webTarget
- .queryParam("keyword", keyword)
- .queryParam("session", encodedSessionName)
- .queryParam("bucket", bucketNumber)
- .queryParam("ttl", timeToLiveSecond);
+ .queryParam(QP_KEYWORD, keyword)
+ .queryParam(QP_SESSION, encodedSessionName)
+ .queryParam(QP_BUCKET, bucketNumber)
+ .queryParam(QP_TTL, timeToLiveSecond);
}
@Override
@@ -320,6 +322,20 @@ public class FeatureServerPool
path[path.length - 1] = fieldName;
}
keyword = sco.getString(path);
+ if (keyword != null) {
+ if (conversionFunctionName == null) {
+ // We found a keyword -- we don't need to try other paths,
+ // so we should break out of the loop
+ break;
+ }
+
+ // we have post-processing to do
+ keyword = Keyword.convertKeyword(keyword, conversionFunctionName);
+ if (keyword != null) {
+ // conversion was successful
+ break;
+ }
+ }
}
if (keyword == null) {
@@ -451,13 +467,16 @@ public class FeatureServerPool
}
}
}
- } else if ((ttl -= 1) > 0) {
- /*
- * This host is not the intended destination -- this could happen
- * if it was sent from another site. Forward the message in the
- * same thread.
- */
- forwardInsertDroolsMessage(bucket, keyword, sessionName, ttl, data);
+ } else {
+ ttl -= 1;
+ if (ttl > 0) {
+ /*
+ * This host is not the intended destination -- this could happen
+ * if it was sent from another site. Forward the message in the
+ * same thread.
+ */
+ forwardInsertDroolsMessage(bucket, keyword, sessionName, ttl, data);
+ }
}
}
@@ -496,10 +515,10 @@ public class FeatureServerPool
Entity.entity(new String(data, StandardCharsets.UTF_8),
MediaType.APPLICATION_OCTET_STREAM_TYPE);
webTarget
- .queryParam("keyword", keyword)
- .queryParam("session", sessionName)
- .queryParam("bucket", bucket)
- .queryParam("ttl", ttl)
+ .queryParam(QP_KEYWORD, keyword)
+ .queryParam(QP_SESSION, sessionName)
+ .queryParam(QP_BUCKET, bucket)
+ .queryParam(QP_TTL, ttl)
.request().post(entity);
}
}
@@ -683,17 +702,22 @@ public class FeatureServerPool
@Override
public WebTarget webTarget(WebTarget webTarget) {
return webTarget
- .queryParam("bucket", bucketNumber)
- .queryParam("keyword", keyword)
- .queryParam("controller", controller.getName())
- .queryParam("protocol", protocol.toString())
- .queryParam("topic", topic);
+ .queryParam(QP_BUCKET, bucketNumber)
+ .queryParam(QP_KEYWORD, keyword)
+ .queryParam(QP_CONTROLLER, controller.getName())
+ .queryParam(QP_PROTOCOL, protocol.toString())
+ .queryParam(QP_TOPIC, topic);
}
@Override
public void response(Response response) {
- // TODO: eventually, we will want to do something different
- // based upon success/failure
+ // log a message indicating success/failure
+ int status = response.getStatus();
+ if (status >= 200 && status <= 299) {
+ logger.info("/bucket/topic response code = {}", status);
+ } else {
+ logger.error("/bucket/topic response code = {}", status);
+ }
}
});
}
@@ -749,30 +773,28 @@ public class FeatureServerPool
logger.info("{}: about to fetch data for session {}",
this, session.getFullName());
- kieSession.insert(new DroolsRunnable() {
- @Override
- public void run() {
- List<Object> droolsObjects = new ArrayList<>();
- for (FactHandle fh : kieSession.getFactHandles()) {
- Object obj = kieSession.getObject(fh);
- String keyword = Keyword.lookupKeyword(obj);
- if (keyword != null
- && Bucket.bucketNumber(keyword) == bucketNumber) {
- // bucket matches -- include this object
- droolsObjects.add(obj);
- /*
- * delete this factHandle from Drools memory
- * this classes are used in bucket migration,
- * so the delete is intentional.
- */
- kieSession.delete(fh);
- }
+ DroolsRunnable backupAndRemove = () -> {
+ List<Object> droolsObjects = new ArrayList<>();
+ for (FactHandle fh : kieSession.getFactHandles()) {
+ Object obj = kieSession.getObject(fh);
+ String keyword = Keyword.lookupKeyword(obj);
+ if (keyword != null
+ && Bucket.bucketNumber(keyword) == bucketNumber) {
+ // bucket matches -- include this object
+ droolsObjects.add(obj);
+ /*
+ * delete this factHandle from Drools memory
+ * this classes are used in bucket migration,
+ * so the delete is intentional.
+ */
+ kieSession.delete(fh);
}
-
- // send notification that object list is complete
- droolsObjectsWrapper.complete(droolsObjects);
}
- });
+
+ // send notification that object list is complete
+ droolsObjectsWrapper.complete(droolsObjects);
+ };
+ kieSession.insert(backupAndRemove);
// add pending operation to the list
pendingData.add(new Pair<>(droolsObjectsWrapper, session));
@@ -858,6 +880,7 @@ public class FeatureServerPool
}
} catch (InterruptedException e) {
logger.error("Exception in {}", this, e);
+ Thread.currentThread().interrupt();
}
}
}
@@ -957,24 +980,22 @@ public class FeatureServerPool
final KieSession kieSession = session.getKieSession();
// run the following within the Drools session thread
- kieSession.insert(new DroolsRunnable() {
- @Override
- public void run() {
- try {
- /*
- * Insert all of the objects -- note that this is running
- * in the session thread, so no other rules can fire
- * until all of the objects are inserted.
- */
- for (Object obj : droolsObjects) {
- kieSession.insert(obj);
- }
- } finally {
- // send notification that the inserts have completed
- sessionLatch.countDown();
+ DroolsRunnable doRestore = () -> {
+ try {
+ /*
+ * Insert all of the objects -- note that this is running
+ * in the session thread, so no other rules can fire
+ * until all of the objects are inserted.
+ */
+ for (Object droolsObj : droolsObjects) {
+ kieSession.insert(droolsObj);
}
+ } finally {
+ // send notification that the inserts have completed
+ sessionLatch.countDown();
}
- });
+ };
+ kieSession.insert(doRestore);
return sessionLatch;
} else {
logger.error("{}: Invalid session data for session={}, type={}",