/* * ============LICENSE_START======================================================= * ONAP * ================================================================================ * Copyright (C) 2019-2022 AT&T Intellectual Property. All rights reserved. * Modifications Copyright (C) 2021, 2023-2024 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * ============LICENSE_END========================================================= */ package org.onap.policy.drools.lifecycle; import com.google.re2j.Pattern; import io.prometheus.client.Counter; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Properties; import java.util.Set; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import lombok.Getter; import lombok.NonNull; import lombok.Setter; import org.apache.commons.lang3.StringUtils; import org.onap.policy.common.capabilities.Startable; import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure; import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager; import org.onap.policy.common.endpoints.event.comm.TopicSink; import org.onap.policy.common.endpoints.event.comm.TopicSource; import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClient; import org.onap.policy.common.endpoints.listeners.MessageTypeDispatcher; import org.onap.policy.common.endpoints.listeners.ScoListener; import org.onap.policy.common.gson.annotation.GsonJsonIgnore; import org.onap.policy.common.utils.coder.StandardCoderObject; import org.onap.policy.common.utils.resources.PrometheusUtils; import org.onap.policy.drools.persistence.SystemPersistenceConstants; import org.onap.policy.drools.policies.DomainMaker; import org.onap.policy.drools.system.PolicyController; import org.onap.policy.drools.system.PolicyEngineConstants; import org.onap.policy.models.pdp.concepts.PdpResponseDetails; import org.onap.policy.models.pdp.concepts.PdpStateChange; import org.onap.policy.models.pdp.concepts.PdpStatus; import org.onap.policy.models.pdp.concepts.PdpUpdate; import org.onap.policy.models.pdp.enums.PdpHealthStatus; import org.onap.policy.models.pdp.enums.PdpMessageType; import org.onap.policy.models.pdp.enums.PdpResponseStatus; import org.onap.policy.models.pdp.enums.PdpState; import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier; import org.onap.policy.models.tosca.authorative.concepts.ToscaPolicy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Lifecycle FSM. */ public class LifecycleFsm implements Startable { /** * Default Status Timer in seconds. */ public static final long DEFAULT_STATUS_TIMER_SECONDS = 120L; private static final Logger logger = LoggerFactory.getLogger(LifecycleFsm.class); private static final Pattern COMMA_SPACE_PAT = Pattern.compile("\\s*,\\s*"); protected static final String CONFIGURATION_PROPERTIES_NAME = "feature-lifecycle"; protected static final String GROUP_NAME = "lifecycle.pdp.group"; protected static final String PDP_TYPE = "lifecycle.pdp.type"; protected static final String MANDATORY_POLICY_TYPES = "lifecycle.pdp.policytypes"; protected static final String DEFAULT_PDP_GROUP = "defaultGroup"; protected static final String DEFAULT_PDP_TYPE = "drools"; protected static final long MIN_STATUS_INTERVAL_SECONDS = 5L; protected static final String PDP_MESSAGE_NAME = "messageName"; protected static final ToscaConceptIdentifier POLICY_TYPE_DROOLS_NATIVE_RULES = new ToscaConceptIdentifier("onap.policies.native.drools.Artifact", "1.0.0"); protected static final ToscaConceptIdentifier POLICY_TYPE_DROOLS_NATIVE_CONTROLLER = new ToscaConceptIdentifier("onap.policies.native.drools.Controller", "1.0.0"); protected static final String PROMETHEUS_NAMESPACE = "pdpd"; protected static final Counter deploymentsCounter = Counter.build().namespace(PROMETHEUS_NAMESPACE).name(PrometheusUtils.POLICY_DEPLOYMENTS_METRIC) .labelNames(PrometheusUtils.STATE_METRIC_LABEL, PrometheusUtils.OPERATION_METRIC_LABEL, PrometheusUtils.STATUS_METRIC_LABEL) .help(PrometheusUtils.POLICY_DEPLOYMENT_HELP) .register(); @Getter protected final Properties properties; @Getter protected TopicSource source; @Getter protected TopicSinkClient client; protected LifecycleState state = new LifecycleStateTerminated(this); @GsonJsonIgnore protected ScheduledExecutorService scheduler = makeExecutor(); @GsonJsonIgnore protected ScheduledFuture statusTask; @GsonJsonIgnore protected MessageTypeDispatcher sourceDispatcher = new MessageTypeDispatcher(PDP_MESSAGE_NAME); @GsonJsonIgnore protected PdpStateChangeFeed stateChangeFeed = new PdpStateChangeFeed(PdpStateChange.class, this); @GsonJsonIgnore protected PdpUpdateFeed updateFeed = new PdpUpdateFeed(PdpUpdate.class, this); @Getter @Setter protected long statusTimerSeconds = DEFAULT_STATUS_TIMER_SECONDS; @Getter private String group; @Getter protected String subGroup; @Getter @Setter protected String pdpType; protected volatile String pdpName; @Getter protected Set mandatoryPolicyTypes = new HashSet<>(); @Getter protected final Map policyTypesMap = new HashMap<>(); @Getter protected final Map policiesMap = new HashMap<>(); /** * Constructor. */ public LifecycleFsm() { properties = SystemPersistenceConstants.getManager().getProperties(CONFIGURATION_PROPERTIES_NAME); setGroup(properties.getProperty(GROUP_NAME, DEFAULT_PDP_GROUP)); setPdpType(properties.getProperty(PDP_TYPE, DEFAULT_PDP_TYPE)); policyTypesMap.put(POLICY_TYPE_DROOLS_NATIVE_CONTROLLER, new PolicyTypeNativeDroolsController(POLICY_TYPE_DROOLS_NATIVE_CONTROLLER, this)); policyTypesMap.put( POLICY_TYPE_DROOLS_NATIVE_RULES, new PolicyTypeNativeArtifactController(POLICY_TYPE_DROOLS_NATIVE_RULES, this)); String commaSeparatedPolicyTypes = properties.getProperty(MANDATORY_POLICY_TYPES); if (!StringUtils.isBlank(commaSeparatedPolicyTypes)) { Collections.addAll(mandatoryPolicyTypes, COMMA_SPACE_PAT.split(commaSeparatedPolicyTypes)); } logger.info("The mandatory Policy Types are {}. Compliance is {}", mandatoryPolicyTypes, isMandatoryPolicyTypesCompliant()); } @GsonJsonIgnore public DomainMaker getDomainMaker() { return PolicyEngineConstants.getManager().getDomainMaker(); } @Override public boolean isAlive() { return client != null && client.getSink().isAlive(); } /** * Returns the PDP Name. */ public String getPdpName() { if (this.pdpName == null) { this.pdpName = PolicyEngineConstants.getManager().getPdpName(); } return this.pdpName; } /** * Current state. */ public PdpState state() { return state.state(); } /** * set group. */ public synchronized void setGroup(String group) { this.group = group; } /** * set subgroup. */ public synchronized void setSubGroup(String subGroup) { this.subGroup = subGroup; } /* ** FSM events - entry points of events into the FSM ** */ @Override public synchronized boolean start() { this.pdpName = PolicyEngineConstants.getManager().getPdpName(); logger.info("lifecycle event: start engine"); return state.start(); } /** * Start a controller event. */ public synchronized void start(@NonNull PolicyController controller) { logger.info("lifecycle event: start controller: {}", controller.getName()); if (!controller.getDrools().isBrained()) { logger.warn("ignoring lifecycle event: start controller: {}", controller); return; } for (ToscaConceptIdentifier id : controller.getPolicyTypes()) { PolicyTypeDroolsController ptDc = (PolicyTypeDroolsController) policyTypesMap.get(id); //NOSONAR if (ptDc == null) { policyTypesMap.put(id, new PolicyTypeDroolsController(id, this, controller)); logger.info("policy-type {} added", id); } else { ptDc.add(controller); } } } /** * Patch a controller event. */ public synchronized void patch(@NonNull PolicyController controller) { logger.info("lifecycle event: patch controller: {}", controller.getName()); if (controller.getDrools().isBrained()) { this.start(controller); } else { this.stop(controller); } } @Override public synchronized boolean stop() { logger.info("lifecycle event: stop engine"); return state.stop(); } /** * Stop a controller event. */ public synchronized void stop(@NonNull PolicyController controller) { logger.info("lifecycle event: stop controller: {}", controller.getName()); List opControllers = policyTypesMap.values().stream() .filter(PolicyTypeDroolsController.class::isInstance) .map(PolicyTypeDroolsController.class::cast) .filter(opController -> opController.getControllers().containsKey(controller.getName())).toList(); for (PolicyTypeDroolsController opController : opControllers) { opController.remove(controller); if (opController.controllers().isEmpty()) { policyTypesMap.remove(opController.getPolicyType()); logger.info("policy-type {} removed", opController.getPolicyType()); } } } @Override public synchronized void shutdown() { logger.info("lifecycle event: shutdown engine"); state.shutdown(); } /** * Status reporting event. * @return true if successful */ public synchronized boolean status() { logger.info("lifecycle event: status"); return state.status(); } public synchronized boolean stateChange(PdpStateChange stateChange) { logger.info("lifecycle event: state-change"); return state.stateChange(stateChange); } public synchronized boolean update(PdpUpdate update) { logger.info("lifecycle event: update"); return state.update(update); } /* FSM State Actions (executed sequentially) */ protected boolean startAction() { if (isAlive()) { return true; } return startIo() && startTimers(); } protected boolean stopAction() { if (!isAlive()) { return true; } boolean successTimers = stopTimers(); boolean successIo = stopIo(); return successTimers && successIo; } protected void shutdownAction() { shutdownIo(); shutdownTimers(); } protected boolean statusAction() { return statusAction(null); } protected boolean statusAction(PdpResponseDetails response) { return statusAction(state(), response); } protected boolean statusAction(PdpState state, PdpResponseDetails response) { if (!isAlive()) { return false; } PdpStatus status = statusPayload(state); if (response != null) { status.setRequestId(response.getResponseTo()); status.setResponse(response); } return client.send(status); } protected synchronized void transitionToAction(@NonNull LifecycleState newState) { state = newState; } protected boolean setStatusIntervalAction(long intervalSeconds) { if (intervalSeconds == statusTimerSeconds || intervalSeconds == 0) { return true; } if (intervalSeconds <= MIN_STATUS_INTERVAL_SECONDS) { logger.warn("interval is too low (< {}): {}", MIN_STATUS_INTERVAL_SECONDS, intervalSeconds); return false; } setStatusTimerSeconds(intervalSeconds); return stopTimers() && startTimers(); } protected List getDeployablePoliciesAction(@NonNull List policies) { List deployPolicies = new ArrayList<>(policies); deployPolicies.removeAll(getActivePolicies()); // Ensure that the sequence of policy deployments is sane to minimize potential errors, // First policies to deploy are the controller related ones, those that affect the lifecycle of // controllers, starting with the ones that affect the existence of the controller (native controller), // second the ones that "brain" the controller with application logic (native artifacts). // Lastly the application specific ones such as operational policies. // group policies by policy types Map> policyTypeGroups = groupPoliciesByPolicyType(deployPolicies); // place native controller policies at the start of the list List orderedDeployableList = new ArrayList<>( policyTypeGroups.getOrDefault(POLICY_TYPE_DROOLS_NATIVE_CONTROLLER.getName(), Collections.emptyList())); // add to the working list the native controller policies orderedDeployableList.addAll( policyTypeGroups.getOrDefault(POLICY_TYPE_DROOLS_NATIVE_RULES.getName(), Collections.emptyList())); // place non-native policies to place at the end of the list orderedDeployableList.addAll(getNonNativePolicies(policyTypeGroups)); return orderedDeployableList; } protected List getUndeployablePoliciesAction(@NonNull List policies) { List undeployPolicies = new ArrayList<>(getActivePolicies()); undeployPolicies.removeAll(policies); if (undeployPolicies.isEmpty()) { return undeployPolicies; } // Ensure that the sequence of policy undeployments is sane to minimize potential errors, // as it is assumed not smart ordering from the policies sent by the PAP. // First policies to undeploy are those that are only of relevance within a drools container, // such as the operational policies. The next set of policies to undeploy are those that // affect the overall PDP-D application support, firstly the ones that supports the // application software wiring (native rules policies), and second those that relate // to the PDP-D controllers lifecycle. // group policies by policy types Map> policyTypeGroups = groupPoliciesByPolicyType(undeployPolicies); // place controller only (non-native policies) at the start of the list of the undeployment list List orderedUndeployableList = getNonNativePolicies(policyTypeGroups); // add to the working list the native rules policies if any orderedUndeployableList.addAll( policyTypeGroups.getOrDefault(POLICY_TYPE_DROOLS_NATIVE_RULES.getName(), Collections.emptyList())); // finally add to the working list native controller policies if any orderedUndeployableList.addAll( policyTypeGroups.getOrDefault(POLICY_TYPE_DROOLS_NATIVE_CONTROLLER.getName(), Collections.emptyList())); return orderedUndeployableList; } protected void deployedPolicyAction(@NonNull ToscaPolicy policy) { policiesMap.computeIfAbsent(policy.getIdentifier(), key -> { // avoid counting reapplies in a second pass when a mix of native and non-native // policies are present. deploymentsCounter.labels(state.state().name(), PrometheusUtils.DEPLOY_OPERATION, PdpResponseStatus.SUCCESS.name()).inc(); return policy; }); } protected void undeployedPolicyAction(@NonNull ToscaPolicy policy) { policiesMap.computeIfPresent(policy.getIdentifier(), (key, value) -> { // avoid counting reapplies in a second pass when a mix of native and non-native // policies are present. deploymentsCounter.labels(state.state().name(), PrometheusUtils.UNDEPLOY_OPERATION, PdpResponseStatus.SUCCESS.name()).inc(); return null; }); } protected void failedDeployPolicyAction(@NonNull ToscaPolicy failedPolicy) { // NOSONAR deploymentsCounter.labels(state.state().name(), PrometheusUtils.DEPLOY_OPERATION, PdpResponseStatus.FAIL.name()).inc(); } protected void failedUndeployPolicyAction(ToscaPolicy failedPolicy) { deploymentsCounter.labels(state.state().name(), PrometheusUtils.UNDEPLOY_OPERATION, PdpResponseStatus.FAIL.name()).inc(); policiesMap.remove(failedPolicy.getIdentifier()); } protected List resetPoliciesAction() { List policies = new ArrayList<>(getActivePolicies()); policiesMap.clear(); return policies; } protected void updatePoliciesAction(List toscaPolicies) { this.scheduler.submit(() -> state.updatePolicies(toscaPolicies)); } protected PolicyTypeController getController(ToscaConceptIdentifier policyType) { return policyTypesMap.get(policyType); } protected Map> groupPoliciesByPolicyType(List deployPolicies) { return deployPolicies.stream() .distinct() .collect(Collectors.groupingBy(policy -> policy.getTypeIdentifier().getName())); } protected List getNonNativePolicies(@NonNull Map> policyTypeGroups) { return policyTypeGroups.entrySet().stream() .filter(entry -> !entry.getKey().equals(POLICY_TYPE_DROOLS_NATIVE_RULES.getName()) && !entry.getKey().equals(POLICY_TYPE_DROOLS_NATIVE_CONTROLLER.getName())) .flatMap(entry -> entry.getValue().stream()).collect(Collectors.toList()); } protected List getNativeArtifactPolicies(@NonNull Map> policyTypeGroups) { return policyTypeGroups.entrySet().stream() .filter(entry -> entry.getKey().equals(POLICY_TYPE_DROOLS_NATIVE_RULES.getName())) .flatMap(entry -> entry.getValue().stream()).collect(Collectors.toList()); } protected List getNativeControllerPolicies(@NonNull Map> policyTypeGroups) { return policyTypeGroups.entrySet().stream() .filter(entry -> entry.getKey().equals(POLICY_TYPE_DROOLS_NATIVE_CONTROLLER.getName())) .flatMap(entry -> entry.getValue().stream()).collect(Collectors.toList()); } /** * Get the policy identifiers. */ public List getPolicyIds(List policies) { return policies.stream() .map(ToscaPolicy::getIdentifier) .distinct() .collect(Collectors.toList()); } protected String getPolicyIdsMessage(List policies) { return getPolicyIds(policies).toString(); } protected List removeByPolicyId(@NonNull List policies, @NonNull List toRemoveList) { policies.removeIf(policy -> toRemoveList.contains(policy.getIdentifier())); return policies; } protected List removeByPolicyId(@NonNull List toRemoveList) { return removeByPolicyId(getActivePolicies(), toRemoveList); } protected List mergePolicies(@NonNull List addPolicies, @NonNull List removePolicies) { if (addPolicies.isEmpty() && removePolicies.isEmpty()) { return getActivePolicies(); } List policies = getActivePolicies(); policies.addAll(addPolicies); return removeByPolicyId(new ArrayList<>(new HashSet<>(policies)), removePolicies); } /** * Do I support the mandatory policy types?. */ protected boolean isMandatoryPolicyTypesCompliant() { return getCurrentPolicyTypes().containsAll(getMandatoryPolicyTypes()); } protected Set getCurrentPolicyTypes() { return getPolicyTypesMap().keySet().stream() .map(ToscaConceptIdentifier::getName).collect(Collectors.toSet()); } protected List getActivePolicies() { return new ArrayList<>(policiesMap.values()); } /* ** Action Helpers ** */ private boolean startIo() { return source() && sink(); } private boolean startTimers() { statusTask = this.scheduler.scheduleAtFixedRate(this::status, 0, statusTimerSeconds, TimeUnit.SECONDS); return !statusTask.isCancelled() && !statusTask.isDone(); } private boolean stopIo() { source.unregister(sourceDispatcher); boolean successSource = source.stop(); boolean successSink = client.getSink().stop(); return successSource && successSink; } private boolean stopTimers() { var success = true; if (statusTask != null) { success = statusTask.cancel(false); } return success; } private void shutdownIo() { client.getSink().shutdown(); source.shutdown(); } private void shutdownTimers() { scheduler.shutdownNow(); } protected PdpStatus statusPayload(@NonNull PdpState state) { var status = new PdpStatus(); status.setName(getPdpName()); status.setPdpGroup(group); status.setPdpSubgroup(subGroup); status.setState(state); status.setHealthy(isAlive() ? PdpHealthStatus.HEALTHY : PdpHealthStatus.NOT_HEALTHY); status.setPdpType(getPdpType()); status.setPolicies(new ArrayList<>(policiesMap.keySet())); return status; } private boolean source() { List sources = TopicEndpointManager.getManager().addTopicSources(properties); if (sources.isEmpty()) { return false; } if (sources.size() != 1) { logger.warn("Lifecycle Manager: unexpected: more than one source configured ({})", sources.size()); } this.source = sources.get(0); this.source.register(this.sourceDispatcher); this.sourceDispatcher.register(PdpMessageType.PDP_STATE_CHANGE.name(), stateChangeFeed); this.sourceDispatcher.register(PdpMessageType.PDP_UPDATE.name(), updateFeed); return source.start(); } private boolean sink() { List sinks = TopicEndpointManager.getManager().addTopicSinks(properties); if (sinks.isEmpty()) { logger.error("Lifecycle Manager sinks have not been configured"); return false; } if (sinks.size() != 1) { logger.warn("Lifecycle Manager: unexpected: more than one sink configured ({})", sinks.size()); } this.client = new TopicSinkClient(sinks.get(0)); return this.client.getSink().start(); } protected boolean isItMe(String name, String group, String subgroup) { if (Objects.equals(name, getPdpName())) { return true; } return name == null && group != null && Objects.equals(group, getGroup()) && Objects.equals(subgroup, getSubGroup()); } /* **** IO listeners ***** */ /** * PDP State Change Message Listener. */ public static class PdpStateChangeFeed extends ScoListener { protected final LifecycleFsm fsm; protected PdpStateChangeFeed(Class clazz, LifecycleFsm fsm) { super(clazz); this.fsm = fsm; } @Override public void onTopicEvent(CommInfrastructure comm, String topic, StandardCoderObject coder, PdpStateChange stateChange) { if (!isMine(stateChange)) { logger.warn("pdp-state-chage from {}:{} is invalid: {}", comm, topic, stateChange); return; } fsm.stateChange(stateChange); } protected boolean isMine(PdpStateChange change) { if (change == null) { return false; } return fsm.isItMe(change.getName(), change.getPdpGroup(), change.getPdpSubgroup()); } } /** * PDP Update Message Listener. */ public static class PdpUpdateFeed extends ScoListener { protected final LifecycleFsm fsm; public PdpUpdateFeed(Class clazz, LifecycleFsm fsm) { super(clazz); this.fsm = fsm; } @Override public void onTopicEvent(CommInfrastructure comm, String topic, StandardCoderObject coder, PdpUpdate update) { if (!isMine(update)) { logger.warn("pdp-update from {}:{} is invalid: {}", comm, topic, update); return; } fsm.update(update); } protected boolean isMine(PdpUpdate update) { if (update == null) { return false; } return fsm.isItMe(update.getName(), update.getPdpGroup(), update.getPdpSubgroup()); } } // these may be overridden by junit tests protected ScheduledExecutorService makeExecutor() { var exec = new ScheduledThreadPoolExecutor(1); exec.setContinueExistingPeriodicTasksAfterShutdownPolicy(false); exec.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); exec.setRemoveOnCancelPolicy(true); return exec; } }