/* * ============LICENSE_START======================================================= * ONAP * ================================================================================ * Copyright (C) 2018-2021 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * ============LICENSE_END========================================================= */ package org.onap.policy.drools.pooling; import java.util.List; import java.util.Properties; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import lombok.AccessLevel; import lombok.Getter; import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager; import org.onap.policy.common.endpoints.event.comm.TopicSink; import org.onap.policy.common.endpoints.event.comm.TopicSource; import org.onap.policy.common.utils.properties.SpecProperties; import org.onap.policy.common.utils.properties.exception.PropertyException; import org.onap.policy.drools.controller.DroolsController; import org.onap.policy.drools.features.DroolsControllerFeatureApi; import org.onap.policy.drools.features.PolicyControllerFeatureApi; import org.onap.policy.drools.features.PolicyEngineFeatureApi; import org.onap.policy.drools.persistence.SystemPersistenceConstants; import org.onap.policy.drools.system.PolicyController; import org.onap.policy.drools.system.PolicyControllerConstants; import org.onap.policy.drools.system.PolicyEngine; import org.onap.policy.drools.util.FeatureEnabledChecker; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Controller/session pooling. Multiple hosts may be launched, all servicing the same * controllers/sessions. When this feature is enabled, the requests are divided across the different * hosts, instead of all running on a single, active host. * *

With each controller, there is an * associated DMaaP topic that is used for internal communication between the different hosts * serving the controller. */ public class PoolingFeature implements PolicyEngineFeatureApi, PolicyControllerFeatureApi, DroolsControllerFeatureApi { private static final Logger logger = LoggerFactory.getLogger(PoolingFeature.class); /** * ID of this host. */ @Getter private final String host; /** * Entire set of feature properties, including those specific to various controllers. */ private Properties featProps = null; /** * Maps a controller name to its associated manager. */ private final ConcurrentHashMap ctlr2pool = new ConcurrentHashMap<>(107); /** * Decremented each time a manager enters the Active state. Used by junit tests. */ @Getter(AccessLevel.PROTECTED) private final CountDownLatch activeLatch = new CountDownLatch(1); /** * Topic names passed to beforeOffer(), which are saved for when the beforeInsert() is * called later. As multiple threads can be active within the methods at the same * time, we must keep this in thread local storage. */ private ThreadLocal offerTopics = new ThreadLocal<>(); /** * Constructor. */ public PoolingFeature() { super(); this.host = UUID.randomUUID().toString(); } @Override public int getSequenceNumber() { return 0; } @Override public boolean beforeStart(PolicyEngine engine) { logger.info("initializing {}", PoolingProperties.FEATURE_NAME); featProps = getProperties(PoolingProperties.FEATURE_NAME); // remove any generic pooling topic - always use controller-specific property featProps.remove(PoolingProperties.POOLING_TOPIC); initTopicSources(featProps); initTopicSinks(featProps); return false; } @Override public boolean beforeStart(PolicyController controller) { return doManager(controller, mgr -> { mgr.beforeStart(); return false; }); } /** * Adds the controller and a new pooling manager to {@link #ctlr2pool}. * * @throws PoolingFeatureRtException if an error occurs */ @Override public boolean afterCreate(PolicyController controller) { if (featProps == null) { logger.error("pooling feature properties have not been loaded"); throw new PoolingFeatureRtException(new IllegalStateException("missing pooling feature properties")); } String name = controller.getName(); var specProps = new SpecProperties(PoolingProperties.PREFIX, name, featProps); if (FeatureEnabledChecker.isFeatureEnabled(specProps, PoolingProperties.FEATURE_ENABLED)) { try { // get & validate the properties var props = new PoolingProperties(name, featProps); logger.info("pooling enabled for {}", name); ctlr2pool.computeIfAbsent(name, xxx -> makeManager(host, controller, props, activeLatch)); } catch (PropertyException e) { logger.error("pooling disabled due to exception for {}", name); throw new PoolingFeatureRtException(e); } } else { logger.info("pooling disabled for {}", name); } return false; } @Override public boolean afterStart(PolicyController controller) { return doManager(controller, mgr -> { mgr.afterStart(); return false; }); } @Override public boolean beforeStop(PolicyController controller) { return doManager(controller, mgr -> { mgr.beforeStop(); return false; }); } @Override public boolean afterStop(PolicyController controller) { return doManager(controller, mgr -> { mgr.afterStop(); return false; }); } @Override public boolean afterShutdown(PolicyController controller) { return commonShutdown(controller); } @Override public boolean afterHalt(PolicyController controller) { return commonShutdown(controller); } private boolean commonShutdown(PolicyController controller) { deleteManager(controller); return false; } @Override public boolean beforeLock(PolicyController controller) { return doManager(controller, mgr -> { mgr.beforeLock(); return false; }); } @Override public boolean afterUnlock(PolicyController controller) { return doManager(controller, mgr -> { mgr.afterUnlock(); return false; }); } @Override public boolean beforeOffer(PolicyController controller, CommInfrastructure protocol, String topic2, String event) { /* * As this is invoked a lot, we'll directly call the manager's method instead of using the * functional interface via doManager(). */ PoolingManagerImpl mgr = ctlr2pool.get(controller.getName()); if (mgr == null) { return false; } if (mgr.beforeOffer(topic2, event)) { return true; } offerTopics.set(topic2); return false; } @Override public boolean beforeInsert(DroolsController droolsController, Object fact) { String topic = offerTopics.get(); if (topic == null) { logger.warn("missing arguments for feature-pooling-dmaap in beforeInsert"); return false; } PolicyController controller; try { controller = getController(droolsController); } catch (IllegalArgumentException | IllegalStateException e) { logger.warn("cannot get controller for {} {}", droolsController.getGroupId(), droolsController.getArtifactId(), e); return false; } if (controller == null) { logger.warn("cannot determine controller for {} {}", droolsController.getGroupId(), droolsController.getArtifactId()); return false; } /* * As this is invoked a lot, we'll directly call the manager's method instead of using the * functional interface via doManager(). */ PoolingManagerImpl mgr = ctlr2pool.get(controller.getName()); if (mgr == null) { return false; } return mgr.beforeInsert(topic, fact); } @Override public boolean afterOffer(PolicyController controller, CommInfrastructure protocol, String topic, String event, boolean success) { // clear any stored arguments offerTopics.remove(); return false; } /** * Executes a function using the manager associated with the controller. Catches any exceptions * from the function and re-throws it as a runtime exception. * * @param controller controller * @param func function to be executed * @return {@code true} if the function handled the request, {@code false} otherwise * @throws PoolingFeatureRtException if an error occurs */ private boolean doManager(PolicyController controller, MgrFunc func) { PoolingManagerImpl mgr = ctlr2pool.get(controller.getName()); if (mgr == null) { return false; } try { return func.apply(mgr); } catch (PoolingFeatureException e) { throw new PoolingFeatureRtException(e); } } /** * Deletes the manager associated with a controller. * * @param controller controller * @throws PoolingFeatureRtException if an error occurs */ private void deleteManager(PolicyController controller) { String name = controller.getName(); logger.info("remove feature-pool-dmaap manager for {}", name); ctlr2pool.remove(name); } /** * Function that operates on a manager. */ @FunctionalInterface private static interface MgrFunc { /** * Apply. * * @param mgr manager * @return {@code true} if the request was handled by the manager, {@code false} otherwise * @throws PoolingFeatureException feature exception */ boolean apply(PoolingManagerImpl mgr) throws PoolingFeatureException; } /* * The remaining methods may be overridden by junit tests. */ /** * Get properties. * * @param featName feature name * @return the properties for the specified feature */ protected Properties getProperties(String featName) { return SystemPersistenceConstants.getManager().getProperties(featName); } /** * Makes a pooling manager for a controller. * * @param host name/uuid of this host * @param controller controller * @param props properties to use to configure the manager * @param activeLatch decremented when the manager goes Active * @return a new pooling manager */ protected PoolingManagerImpl makeManager(String host, PolicyController controller, PoolingProperties props, CountDownLatch activeLatch) { return new PoolingManagerImpl(host, controller, props, activeLatch); } /** * Gets the policy controller associated with a drools controller. * * @param droolsController drools controller * @return the policy controller associated with a drools controller */ protected PolicyController getController(DroolsController droolsController) { return PolicyControllerConstants.getFactory().get(droolsController); } /** * Initializes the topic sources. * * @param props properties used to configure the topics * @return the topic sources */ protected List initTopicSources(Properties props) { return TopicEndpointManager.getManager().addTopicSources(props); } /** * Initializes the topic sinks. * * @param props properties used to configure the topics * @return the topic sinks */ protected List initTopicSinks(Properties props) { return TopicEndpointManager.getManager().addTopicSinks(props); } }