/** * Copyright 2017-2021 ZTE Corporation. *

* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at *

* http://www.apache.org/licenses/LICENSE-2.0 *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.onap.holmes.rulemgt; import lombok.extern.slf4j.Slf4j; import org.jvnet.hk2.annotations.Service; import org.onap.holmes.common.api.entity.CorrelationRule; import org.onap.holmes.common.exception.CorrelationException; import org.onap.holmes.common.utils.DbDaoUtil; import org.onap.holmes.rulemgt.bolt.enginebolt.EngineWrapper; import org.onap.holmes.rulemgt.db.CorrelationRuleDao; import org.onap.holmes.rulemgt.tools.EngineTools; import org.onap.holmes.rulemgt.wrapper.RuleMgtWrapper; import org.onap.holmes.rulemgt.wrapper.RuleQueryWrapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.PostConstruct; import javax.inject.Inject; import java.util.*; import java.util.concurrent.TimeUnit; import static java.util.concurrent.TimeUnit.SECONDS; @Slf4j @Service public class RuleAllocator { private static final Logger LOGGER = LoggerFactory.getLogger(RuleAllocator.class); public final static int ENABLE = 1; public final static int RETRY_TIMES = 5; public final static long RETRY_INTERVAL_SEC = 15; private RuleMgtWrapper ruleMgtWrapper; private RuleQueryWrapper ruleQueryWrapper; private EngineWrapper engineWrapper; private EngineTools engineTools; private CorrelationRuleDao correlationRuleDao; @Inject public RuleAllocator(RuleMgtWrapper ruleMgtWrapper, RuleQueryWrapper ruleQueryWrapper, EngineWrapper engineWrapper, EngineTools engineTools, DbDaoUtil daoUtil) { this.ruleMgtWrapper = ruleMgtWrapper; this.ruleQueryWrapper = ruleQueryWrapper; this.engineWrapper = engineWrapper; this.engineTools = engineTools; correlationRuleDao = daoUtil.getJdbiDaoByOnDemand(CorrelationRuleDao.class); } @PostConstruct private void initialize() { new Timer("RuleAllocatorTimer").schedule(new TimerTask() { public void run() { try { allocateRules(); } catch (Exception e) { LOGGER.error("Failed to reallocate rules.", e); } } }, SECONDS.toMillis(10), SECONDS.toMillis(30)); } public synchronized void allocateRules() throws Exception { List engines = engineTools.getInstanceList(); if (engines == null) { return; } int numOfEngines = engines.size(); LOGGER.info(String.format("There are %d engine instance(s) running currently.", numOfEngines)); List legacyEngineInstances = engineTools.getLegacyEngineInstances(); if (legacyEngineInstances == null) { return; } if (legacyEngineInstances.size() < numOfEngines) { //extend List rules2Allocate = calculateRule(legacyEngineInstances, numOfEngines); List rules2Delete = copyList(rules2Allocate); List newInstanceIds = sortOutNewEngineInstances(engines, legacyEngineInstances); distributeRules(newInstanceIds, rules2Allocate); cleanUpRulesFromEngines(rules2Delete, legacyEngineInstances); } else { //destroy List destroyed = getDestroyedEngines(engines, legacyEngineInstances); distributeRules(getRemainingEngines(engines, destroyed), getRules(destroyed)); } } private List copyList(List rules) { List ret = new ArrayList<>(rules.size()); for (CorrelationRule r : rules) { ret.add((CorrelationRule) r.clone()); } return ret; } // When the engine is expanding, the rules that need to be allocated are calculated. private List calculateRule(List existingEngineIps, int latestEngineInsNum) throws CorrelationException { List enabledRules = ruleQueryWrapper.queryRuleByEnable(ENABLE); int ruleCount = 0; if (enabledRules != null) { ruleCount = enabledRules.size(); } // Average number of rule that's to be allocate into each instance int count = ruleCount / latestEngineInsNum; // The number of remaining rules (to be allocated) after each instance has been allocated with the average number of rules. int remainder = ruleCount % latestEngineInsNum; List ret = new ArrayList<>(); for (String ip : existingEngineIps) { List rules = ruleQueryWrapper.queryRuleByEngineInstance(ip); List tmp = rules.subList(count + (remainder-- / existingEngineIps.size()), rules.size()); ret.addAll(tmp); } return ret; } // Rules that need to be allocated after the engine is destroyed private List getRules(List destroyIpList) throws CorrelationException { List rules = new ArrayList<>(); try { if (destroyIpList != null) { for (String ip : destroyIpList) { rules.addAll(ruleQueryWrapper.queryRuleByEngineInstance(ip)); } } } catch (CorrelationException e) { LOGGER.error("method getRules get data from DB failed !", e); } return rules; } // Extended IP private List sortOutNewEngineInstances(List newIps, List oldIps) { List ret = new ArrayList<>(); for (String ip : newIps) { if (!oldIps.contains(ip)) { ret.add(ip); } } return ret; } // Destroyed IP private List getDestroyedEngines(List latest, List existing) { List ret = new ArrayList<>(); for (String ip : existing) { if (!latest.contains(ip)) { ret.add(ip); } } return ret; } // Residual IP after destruction private List getRemainingEngines(List all, List destroyed) { List ret = new ArrayList<>(); for (String ip : all) { if (!destroyed.contains(ip)) { ret.add(ip); } } return ret; } private void distributeRules(List instanceIps, List rules) throws CorrelationException { List sortedIps = sortIpByRuleNumDesc(instanceIps); for (int i = 0, j = 0; j < rules.size(); i++, j++) { int index = i % sortedIps.size(); String ip = sortedIps.get(index); CorrelationRule rule = rules.get(j); rule.setEngineInstance(ip); allocateRule(rule, ip); } } // Sorted by the number of rules each engine contains, in a descending order. private List sortIpByRuleNumDesc(List ips) { List rules; Map ruleNumOfEngines = new HashMap(); try { for (String ip : ips) { rules = ruleQueryWrapper.queryRuleByEngineInstance(ip); if (rules != null) { ruleNumOfEngines.put(ip, rules.size()); } } } catch (Exception e) { LOGGER.error("getEngineWithLeastRules failed !", e); } List> sortedEntries = new ArrayList<>(ruleNumOfEngines.entrySet()); Collections.sort(sortedEntries, (o1, o2) -> o2.getValue() - o1.getValue()); List ret = new ArrayList<>(); for (Map.Entry entry : sortedEntries) { ret.add(entry.getKey()); } return ret; } private void allocateRule(CorrelationRule rule, String ip) throws CorrelationException { // Retry for a couple of times in case of deployment failure // due to unfinished initialization procedures of engine instances. for (int i = 0; i <= RETRY_TIMES; ++i) { try { ruleMgtWrapper.deployRule2Engine(rule, ip); correlationRuleDao.updateRule(rule); // If the codes reach here, it means everything's okay. There's no need to run the loop more. break; } catch (CorrelationException e) { LOGGER.warn(String.format("Failed to allocate rule <%s> to <%s>. Retry: %d.", rule.getName(), ip, i), e); if (i == RETRY_TIMES) { throw new CorrelationException(String.format("Failed to allocate rule <%s> to <%s>", rule.getName(), ip), e); } try { SECONDS.sleep(RETRY_INTERVAL_SEC * (i + 1)); } catch (InterruptedException interruptedException) { LOGGER.info(interruptedException.getMessage(), interruptedException); } } } } private void cleanUpRulesFromEngines(List rules, List ipList) { try { for (String ip : ipList) { for (CorrelationRule rule : rules) { if (ip.equals(rule.getEngineInstance())) { engineWrapper.deleteRuleFromEngine(rule.getPackageName(), ip); } } } } catch (CorrelationException e) { LOGGER.error("When the engine is extended, deleting rule failed", e); } } }