diff options
Diffstat (limited to 'feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/FeatureServerPool.java')
-rw-r--r-- | feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/FeatureServerPool.java | 159 |
1 files changed, 90 insertions, 69 deletions
diff --git a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/FeatureServerPool.java b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/FeatureServerPool.java index 748a38f3..dfe211ce 100644 --- a/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/FeatureServerPool.java +++ b/feature-server-pool/src/main/java/org/onap/policy/drools/serverpool/FeatureServerPool.java @@ -26,9 +26,6 @@ import static org.onap.policy.drools.serverpool.ServerPoolProperties.DEFAULT_BUC import static org.onap.policy.drools.serverpool.ServerPoolProperties.DEFAULT_BUCKET_TIME_TO_LIVE; import static org.onap.policy.drools.serverpool.ServerPoolProperties.getProperty; -import com.google.gson.JsonElement; -import com.google.gson.JsonObject; - import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.Serializable; @@ -36,7 +33,6 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.Base64; -import java.util.Enumeration; import java.util.HashMap; import java.util.LinkedList; import java.util.List; @@ -53,8 +49,6 @@ import javax.ws.rs.core.Response; import lombok.AllArgsConstructor; -import org.drools.core.definitions.InternalKnowledgePackage; -import org.drools.core.impl.KnowledgeBaseImpl; import org.kie.api.runtime.KieSession; import org.kie.api.runtime.rule.FactHandle; import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; @@ -75,7 +69,6 @@ import org.onap.policy.drools.system.PolicyControllerConstants; import org.onap.policy.drools.system.PolicyEngine; import org.onap.policy.drools.system.PolicyEngineConstants; import org.onap.policy.drools.utils.Pair; -import org.onap.policy.drools.utils.PropertyUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -98,7 +91,7 @@ public class FeatureServerPool // used for JSON <-> String conversion private static StandardCoder coder = new StandardCoder(); - private static final String configFile = + private static final String CONFIG_FILE = "config/feature-server-pool.properties"; /* @@ -142,6 +135,15 @@ public class FeatureServerPool private static long droolsTimeoutMillis; private static String timeToLiveSecond; + // HTTP query parameters + private static final String QP_KEYWORD = "keyword"; + private static final String QP_SESSION = "session"; + private static final String QP_BUCKET = "bucket"; + private static final String QP_TTL = "ttl"; + private static final String QP_CONTROLLER = "controller"; + private static final String QP_PROTOCOL = "protocol"; + private static final String QP_TOPIC = "topic"; + /******************************/ /* 'OrderedService' interface */ /******************************/ @@ -166,7 +168,7 @@ public class FeatureServerPool @Override public boolean afterStart(PolicyEngine engine) { logger.info("Starting FeatureServerPool"); - Server.startup(configFile); + Server.startup(CONFIG_FILE); TargetLock.startup(); droolsTimeoutMillis = getProperty(BUCKET_DROOLS_TIMEOUT, DEFAULT_BUCKET_DROOLS_TIMEOUT); @@ -251,10 +253,10 @@ public class FeatureServerPool + session.getName(); return webTarget - .queryParam("keyword", keyword) - .queryParam("session", encodedSessionName) - .queryParam("bucket", bucketNumber) - .queryParam("ttl", timeToLiveSecond); + .queryParam(QP_KEYWORD, keyword) + .queryParam(QP_SESSION, encodedSessionName) + .queryParam(QP_BUCKET, bucketNumber) + .queryParam(QP_TTL, timeToLiveSecond); } @Override @@ -320,6 +322,20 @@ public class FeatureServerPool path[path.length - 1] = fieldName; } keyword = sco.getString(path); + if (keyword != null) { + if (conversionFunctionName == null) { + // We found a keyword -- we don't need to try other paths, + // so we should break out of the loop + break; + } + + // we have post-processing to do + keyword = Keyword.convertKeyword(keyword, conversionFunctionName); + if (keyword != null) { + // conversion was successful + break; + } + } } if (keyword == null) { @@ -451,13 +467,16 @@ public class FeatureServerPool } } } - } else if ((ttl -= 1) > 0) { - /* - * This host is not the intended destination -- this could happen - * if it was sent from another site. Forward the message in the - * same thread. - */ - forwardInsertDroolsMessage(bucket, keyword, sessionName, ttl, data); + } else { + ttl -= 1; + if (ttl > 0) { + /* + * This host is not the intended destination -- this could happen + * if it was sent from another site. Forward the message in the + * same thread. + */ + forwardInsertDroolsMessage(bucket, keyword, sessionName, ttl, data); + } } } @@ -496,10 +515,10 @@ public class FeatureServerPool Entity.entity(new String(data, StandardCharsets.UTF_8), MediaType.APPLICATION_OCTET_STREAM_TYPE); webTarget - .queryParam("keyword", keyword) - .queryParam("session", sessionName) - .queryParam("bucket", bucket) - .queryParam("ttl", ttl) + .queryParam(QP_KEYWORD, keyword) + .queryParam(QP_SESSION, sessionName) + .queryParam(QP_BUCKET, bucket) + .queryParam(QP_TTL, ttl) .request().post(entity); } } @@ -683,17 +702,22 @@ public class FeatureServerPool @Override public WebTarget webTarget(WebTarget webTarget) { return webTarget - .queryParam("bucket", bucketNumber) - .queryParam("keyword", keyword) - .queryParam("controller", controller.getName()) - .queryParam("protocol", protocol.toString()) - .queryParam("topic", topic); + .queryParam(QP_BUCKET, bucketNumber) + .queryParam(QP_KEYWORD, keyword) + .queryParam(QP_CONTROLLER, controller.getName()) + .queryParam(QP_PROTOCOL, protocol.toString()) + .queryParam(QP_TOPIC, topic); } @Override public void response(Response response) { - // TODO: eventually, we will want to do something different - // based upon success/failure + // log a message indicating success/failure + int status = response.getStatus(); + if (status >= 200 && status <= 299) { + logger.info("/bucket/topic response code = {}", status); + } else { + logger.error("/bucket/topic response code = {}", status); + } } }); } @@ -749,30 +773,28 @@ public class FeatureServerPool logger.info("{}: about to fetch data for session {}", this, session.getFullName()); - kieSession.insert(new DroolsRunnable() { - @Override - public void run() { - List<Object> droolsObjects = new ArrayList<>(); - for (FactHandle fh : kieSession.getFactHandles()) { - Object obj = kieSession.getObject(fh); - String keyword = Keyword.lookupKeyword(obj); - if (keyword != null - && Bucket.bucketNumber(keyword) == bucketNumber) { - // bucket matches -- include this object - droolsObjects.add(obj); - /* - * delete this factHandle from Drools memory - * this classes are used in bucket migration, - * so the delete is intentional. - */ - kieSession.delete(fh); - } + DroolsRunnable backupAndRemove = () -> { + List<Object> droolsObjects = new ArrayList<>(); + for (FactHandle fh : kieSession.getFactHandles()) { + Object obj = kieSession.getObject(fh); + String keyword = Keyword.lookupKeyword(obj); + if (keyword != null + && Bucket.bucketNumber(keyword) == bucketNumber) { + // bucket matches -- include this object + droolsObjects.add(obj); + /* + * delete this factHandle from Drools memory + * this classes are used in bucket migration, + * so the delete is intentional. + */ + kieSession.delete(fh); } - - // send notification that object list is complete - droolsObjectsWrapper.complete(droolsObjects); } - }); + + // send notification that object list is complete + droolsObjectsWrapper.complete(droolsObjects); + }; + kieSession.insert(backupAndRemove); // add pending operation to the list pendingData.add(new Pair<>(droolsObjectsWrapper, session)); @@ -858,6 +880,7 @@ public class FeatureServerPool } } catch (InterruptedException e) { logger.error("Exception in {}", this, e); + Thread.currentThread().interrupt(); } } } @@ -957,24 +980,22 @@ public class FeatureServerPool final KieSession kieSession = session.getKieSession(); // run the following within the Drools session thread - kieSession.insert(new DroolsRunnable() { - @Override - public void run() { - try { - /* - * Insert all of the objects -- note that this is running - * in the session thread, so no other rules can fire - * until all of the objects are inserted. - */ - for (Object obj : droolsObjects) { - kieSession.insert(obj); - } - } finally { - // send notification that the inserts have completed - sessionLatch.countDown(); + DroolsRunnable doRestore = () -> { + try { + /* + * Insert all of the objects -- note that this is running + * in the session thread, so no other rules can fire + * until all of the objects are inserted. + */ + for (Object droolsObj : droolsObjects) { + kieSession.insert(droolsObj); } + } finally { + // send notification that the inserts have completed + sessionLatch.countDown(); } - }); + }; + kieSession.insert(doRestore); return sessionLatch; } else { logger.error("{}: Invalid session data for session={}, type={}", |