diff options
Diffstat (limited to 'rulemgt/src/main/java/org/onap')
-rw-r--r-- | rulemgt/src/main/java/org/onap/holmes/rulemgt/msb/EngineInsQueryTool.java (renamed from rulemgt/src/main/java/org/onap/holmes/rulemgt/msb/EngineIpList.java) | 25 | ||||
-rw-r--r-- | rulemgt/src/main/java/org/onap/holmes/rulemgt/msb/MsbQuery.java | 14 | ||||
-rw-r--r-- | rulemgt/src/main/java/org/onap/holmes/rulemgt/send/RuleAllocation.java | 230 | ||||
-rw-r--r-- | rulemgt/src/main/java/org/onap/holmes/rulemgt/send/RuleAllocator.java | 228 |
4 files changed, 246 insertions, 251 deletions
diff --git a/rulemgt/src/main/java/org/onap/holmes/rulemgt/msb/EngineIpList.java b/rulemgt/src/main/java/org/onap/holmes/rulemgt/msb/EngineInsQueryTool.java index 2e91993..00947bf 100644 --- a/rulemgt/src/main/java/org/onap/holmes/rulemgt/msb/EngineIpList.java +++ b/rulemgt/src/main/java/org/onap/holmes/rulemgt/msb/EngineInsQueryTool.java @@ -1,12 +1,12 @@ /** * Copyright 2017 ZTE Corporation. - * + * <p> * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -17,6 +17,7 @@ package org.onap.holmes.rulemgt.msb; import java.io.IOException; + import lombok.extern.slf4j.Slf4j; import org.apache.http.HttpResponse; import org.apache.http.client.methods.HttpGet; @@ -35,22 +36,18 @@ import java.util.List; @Service @Slf4j -public class EngineIpList { +public class EngineInsQueryTool { - private String[] msbAddrInfo; - private String ip; - private String port; private String url; @PostConstruct - public void init(){ - msbAddrInfo = MicroServiceConfig.getMsbIpAndPort(); - ip = msbAddrInfo[0]; - port = msbAddrInfo[1]; - url = "http://" + ip + ":" + port + "/api/microservices/v1/services/holmes-engine-mgmt/version/v1" ; + public void init() { + String[] msbAddrInfo = MicroServiceConfig.getMsbIpAndPort(); + url = String.format("http://%s:%s/api/microservices/v1/services/holmes-engine-mgmt/version/v1", + msbAddrInfo[0], msbAddrInfo[1]); } - public List<String> getServiceCount()throws Exception{ + public List<String> getInstanceList() throws Exception { String response; HttpGet httpGet = new HttpGet(url); try (CloseableHttpClient httpClient = HttpsUtils.getHttpClient(HttpsUtils.DEFUALT_TIMEOUT)) { diff --git a/rulemgt/src/main/java/org/onap/holmes/rulemgt/msb/MsbQuery.java b/rulemgt/src/main/java/org/onap/holmes/rulemgt/msb/MsbQuery.java index 2f440b6..51ed0f2 100644 --- a/rulemgt/src/main/java/org/onap/holmes/rulemgt/msb/MsbQuery.java +++ b/rulemgt/src/main/java/org/onap/holmes/rulemgt/msb/MsbQuery.java @@ -18,7 +18,7 @@ package org.onap.holmes.rulemgt.msb; import lombok.extern.slf4j.Slf4j; import org.glassfish.hk2.api.ServiceLocator; import org.onap.holmes.common.dropwizard.ioc.utils.ServiceLocatorHolder; -import org.onap.holmes.rulemgt.send.RuleAllocation; +import org.onap.holmes.rulemgt.send.RuleAllocator; import org.onap.holmes.rulemgt.send.Ip4AddingRule; import org.onap.holmes.rulemgt.wrapper.RuleMgtWrapper; @@ -30,22 +30,22 @@ import java.util.TimerTask; @Slf4j public class MsbQuery { - private RuleAllocation ruleAllocation; + private RuleAllocator ruleAllocator; private Ip4AddingRule ip4AddingRule; - private EngineIpList engineIpList; + private EngineInsQueryTool engineInsQueryTool; private RuleMgtWrapper ruleMgtWrapper; private List<String> timerIpList; public MsbQuery() { - ruleAllocation = new RuleAllocation(); + ruleAllocator = new RuleAllocator(); ServiceLocator locator = ServiceLocatorHolder.getLocator(); ip4AddingRule = locator.getService(Ip4AddingRule.class); - engineIpList = locator.getService(EngineIpList.class); + engineInsQueryTool = locator.getService(EngineInsQueryTool.class); ruleMgtWrapper = locator.getService(RuleMgtWrapper.class); } @@ -70,11 +70,11 @@ public class MsbQuery { public void run() { try { - timerIpList = engineIpList.getServiceCount(); + timerIpList = engineInsQueryTool.getInstanceList(); log.info(String.format("There are %d engine instance(s) running currently.", timerIpList.size())); ip4AddingRule.setIpList(timerIpList); - ruleAllocation.judgeAndAllocateRule(timerIpList); + ruleAllocator.allocateRules(timerIpList); } catch (Exception e) { log.error("The timing query engine instance failed ", e); } diff --git a/rulemgt/src/main/java/org/onap/holmes/rulemgt/send/RuleAllocation.java b/rulemgt/src/main/java/org/onap/holmes/rulemgt/send/RuleAllocation.java deleted file mode 100644 index 9e7b5b2..0000000 --- a/rulemgt/src/main/java/org/onap/holmes/rulemgt/send/RuleAllocation.java +++ /dev/null @@ -1,230 +0,0 @@ -/** - * Copyright 2017 ZTE Corporation. - * <p> - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onap.holmes.rulemgt.send; - -import lombok.extern.slf4j.Slf4j; -import org.glassfish.hk2.api.ServiceLocator; -import org.jvnet.hk2.annotations.Service; -import org.onap.holmes.common.api.entity.CorrelationRule; -import org.onap.holmes.common.dropwizard.ioc.utils.ServiceLocatorHolder; -import org.onap.holmes.common.exception.CorrelationException; -import org.onap.holmes.common.utils.DbDaoUtil; -import org.onap.holmes.rulemgt.bolt.enginebolt.EngineWrapper; -import org.onap.holmes.rulemgt.db.CorrelationRuleDao; -import org.onap.holmes.rulemgt.msb.EngineIpList; -import org.onap.holmes.rulemgt.wrapper.RuleQueryWrapper; -import org.onap.holmes.rulemgt.wrapper.RuleMgtWrapper; - -import javax.annotation.PostConstruct; -import javax.inject.Inject; -import java.util.*; - - -@Slf4j -public class RuleAllocation { - - private final static int ENABLE = 1; - private RuleMgtWrapper ruleMgtWrapper; - private RuleQueryWrapper ruleQueryWrapper; - private EngineWrapper engineWrapper; - private EngineIpList engineIpList; - private DbDaoUtil daoUtil; - private CorrelationRuleDao correlationRuleDao; - private int ruleCount; - private int serviceCount; - private List<String> temIpList = new ArrayList<>(); - private List<String> engineService = new ArrayList<>(); - private List<CorrelationRule> allRules = new ArrayList<>(); - - public RuleAllocation() { - ServiceLocator locator = ServiceLocatorHolder.getLocator(); - ruleMgtWrapper = locator.getService(RuleMgtWrapper.class); - ruleQueryWrapper = locator.getService(RuleQueryWrapper.class); - engineWrapper = locator.getService(EngineWrapper.class); - engineIpList = locator.getService(EngineIpList.class); - daoUtil = locator.getService(DbDaoUtil.class); - - initDaoUtilAndEngineIp(); - } - - private void initDaoUtilAndEngineIp() { - correlationRuleDao = daoUtil.getJdbiDaoByOnDemand(CorrelationRuleDao.class); - try { - temIpList = engineIpList.getServiceCount(); - - } catch (Exception e) { - log.warn("Failed to get the number of engine instances.", e); - } - } - - public void judgeAndAllocateRule(List<String> ipList) throws Exception { - if (ipList != null) { - engineService = ipList; - serviceCount = ipList.size(); - } - if (temIpList.size() < serviceCount) { - //extend - List<CorrelationRule> deleteRule = calculateRule(temIpList); - List<CorrelationRule> allocateRule = calculateRule(temIpList); - List<String> extendIp = extendCompareIp(engineService, temIpList); - AllocateService(extendIp, allocateRule); - deleteRuleFromFormerEngine(deleteRule, temIpList); - - } else if (temIpList.size() > serviceCount) { - //destroy - List<String> destroyIp = destroyCompareIp(engineService, temIpList); - AllocateService(restIp(destroyIp), relocateRuleAfterDestroy(destroyIp)); - - } else if (temIpList.size() == serviceCount) { - temIpList = engineService; - return; - } - temIpList = engineService; - - } - - - // When the engine is expanding, the rules that need to be allocated are calculated. - private List<CorrelationRule> calculateRule(List<String> oldIpList) throws Exception { - allRules = ruleQueryWrapper.queryRuleByEnable(ENABLE); - if (allRules != null) { - ruleCount = allRules.size(); - } - int count = ruleCount / serviceCount; - int remainder = ruleCount % serviceCount; - - List<CorrelationRule> subRule = new ArrayList<>(); - for (String ip : oldIpList) { - List<CorrelationRule> rules = ruleQueryWrapper.queryRuleByEngineInstance(ip); - List<CorrelationRule> tem = rules.subList(count + (remainder-- / oldIpList.size()), rules.size()); - subRule.addAll(tem); - } - return subRule; - } - - //Rules that need to be allocated after the engine is destroyed - private List<CorrelationRule> relocateRuleAfterDestroy(List<String> destroyIpList) throws CorrelationException { - List<CorrelationRule> rules = new ArrayList<>(); - try { - if (destroyIpList != null) { - for (String ip : destroyIpList) { - rules.addAll(ruleQueryWrapper.queryRuleByEngineInstance(ip)); - } - } - } catch (CorrelationException e) { - log.error("method relocateRuleAfterDestroy get data from DB failed !", e); - } - return rules; - } - - //Extended IP - private List<String> extendCompareIp(List<String> newList, List<String> oldList) { - List<String> extendIpList = new ArrayList<>(); - - for (String ip : newList) { - if (!oldList.contains(ip)) { - extendIpList.add(ip); - } - } - return extendIpList; - } - - //Destroyed IP - private List<String> destroyCompareIp(List<String> newList, List<String> oldList) { - List<String> destroyIpList = new ArrayList<>(); - for (String ip : oldList) { - if (!newList.contains(ip)) { - destroyIpList.add(ip); - } - } - return destroyIpList; - } - - //Residual IP after destruction - private List<String> restIp(List<String> destroyIp) { - List<String> restIpList = new ArrayList<>(); - for (String ip : engineService) { - if (!destroyIp.contains(ip)) { - restIpList.add(ip); - } - } - return restIpList; - } - - public void AllocateService(List<String> extendIpList, List<CorrelationRule> subList) throws Exception { - List<String> needIpList = getSortIp(extendIpList); - - for (int i = 0, j = 0; j < subList.size(); i++, j++) { - int index = i % needIpList.size(); - String deployIp = needIpList.get(index); - CorrelationRule rule = subList.get(j); - rule.setEngineInstance(deployIp); - allocateDeployRule(rule, deployIp); - } - } - - //The IP to be allocated is in ascending order, and the least is circulate. - private List<String> getSortIp(List<String> ipList) { - List<CorrelationRule> ipRuleList = new ArrayList<>(); - HashMap<String, String> hashMap = new HashMap(); - - try { - for (String ip : ipList) { - ipRuleList = ruleQueryWrapper.queryRuleByEngineInstance(ip); - if (ipRuleList != null) { - hashMap.put(ip, String.valueOf(ipRuleList.size())); - } - } - } catch (Exception e) { - log.error("getEngineIp4AddRule failed !", e); - } - - List<Map.Entry<String, String>> list_Data = new ArrayList<>(hashMap.entrySet()); - Collections.sort(list_Data,(o1,o2) -> o1.getValue().compareTo(o2.getValue())); - List<String> needList = new ArrayList<>(); - for (Map.Entry<String, String> map : list_Data) { - String key = map.getKey(); - needList.add(key); - } - return needList; - } - - private void allocateDeployRule(CorrelationRule rule, String ip) throws CorrelationException { - try { - ruleMgtWrapper.deployRule2Engine(rule, ip); - correlationRuleDao.updateRule(rule); - } catch (CorrelationException e) { - throw new CorrelationException("allocate Deploy Rule failed", e); - } - } - - private void deleteRuleFromFormerEngine(List<CorrelationRule> subRule, List<String> oldList) { - try { - for (String ip : oldList) { - for (CorrelationRule rule : subRule) { - if (ip.equals(rule.getEngineInstance())) { - engineWrapper.deleteRuleFromEngine(rule.getPackageName(), ip); - } - } - } - } catch (CorrelationException e) { - log.error("When the engine is extended, deleting rule failed", e); - } - - } - -} diff --git a/rulemgt/src/main/java/org/onap/holmes/rulemgt/send/RuleAllocator.java b/rulemgt/src/main/java/org/onap/holmes/rulemgt/send/RuleAllocator.java new file mode 100644 index 0000000..6779bf1 --- /dev/null +++ b/rulemgt/src/main/java/org/onap/holmes/rulemgt/send/RuleAllocator.java @@ -0,0 +1,228 @@ +/** + * Copyright 2017 ZTE Corporation. + * <p> + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.holmes.rulemgt.send; + +import lombok.extern.slf4j.Slf4j; +import org.glassfish.hk2.api.ServiceLocator; +import org.onap.holmes.common.api.entity.CorrelationRule; +import org.onap.holmes.common.dropwizard.ioc.utils.ServiceLocatorHolder; +import org.onap.holmes.common.exception.CorrelationException; +import org.onap.holmes.common.utils.DbDaoUtil; +import org.onap.holmes.rulemgt.bolt.enginebolt.EngineWrapper; +import org.onap.holmes.rulemgt.db.CorrelationRuleDao; +import org.onap.holmes.rulemgt.msb.EngineInsQueryTool; +import org.onap.holmes.rulemgt.wrapper.RuleMgtWrapper; +import org.onap.holmes.rulemgt.wrapper.RuleQueryWrapper; + +import java.util.*; + + +@Slf4j +public class RuleAllocator { + public final static int ENABLE = 1; + private RuleMgtWrapper ruleMgtWrapper; + private RuleQueryWrapper ruleQueryWrapper; + private EngineWrapper engineWrapper; + private EngineInsQueryTool engineInsQueryTool; + private DbDaoUtil daoUtil; + private CorrelationRuleDao correlationRuleDao; + private int latestEngineInsNum = 0; + private List<String> existingEngineServiceIps = new ArrayList<>(); + private List<String> latestEngineServiceIps = new ArrayList<>(); + + public RuleAllocator() { + ServiceLocator locator = ServiceLocatorHolder.getLocator(); + ruleMgtWrapper = locator.getService(RuleMgtWrapper.class); + ruleQueryWrapper = locator.getService(RuleQueryWrapper.class); + engineWrapper = locator.getService(EngineWrapper.class); + engineInsQueryTool = locator.getService(EngineInsQueryTool.class); + daoUtil = locator.getService(DbDaoUtil.class); + + initDaoUtilAndEngineIp(); + } + + private void initDaoUtilAndEngineIp() { + correlationRuleDao = daoUtil.getJdbiDaoByOnDemand(CorrelationRuleDao.class); + try { + existingEngineServiceIps = engineInsQueryTool.getInstanceList(); + + } catch (Exception e) { + log.warn("Failed to get the number of engine instances.", e); + } + } + + public synchronized void allocateRules(List<String> latestEngineIps) throws Exception { + if (latestEngineIps == null) { + throw new NullPointerException("The parameter of " + this.getClass().getSimpleName() + + ".allocateRules(List<String>) can not be null!"); + } + + latestEngineServiceIps = latestEngineIps; + latestEngineInsNum = latestEngineIps.size(); + if (existingEngineServiceIps.size() < latestEngineInsNum) { + //extend + List<CorrelationRule> rules2Allocate = calculateRule(existingEngineServiceIps); + List<CorrelationRule> rules2Delete = copyList(rules2Allocate); + List<String> newInstanceIds = sortOutNewEngineInstances(latestEngineServiceIps, existingEngineServiceIps); + distributeRules(newInstanceIds, rules2Allocate); + cleanUpRulesFromEngines(rules2Delete, existingEngineServiceIps); + } else if (existingEngineServiceIps.size() > latestEngineInsNum) { + //destroy + List<String> destroyed = getDestroyedEngines(latestEngineServiceIps, existingEngineServiceIps); + distributeRules(getRemainingEngines(destroyed), reallocateRules(destroyed)); + } + + existingEngineServiceIps = latestEngineServiceIps; + } + + private List<CorrelationRule> copyList(List<CorrelationRule> rules) { + List<CorrelationRule> ret = new ArrayList<>(rules.size()); + for (CorrelationRule r : rules) { + ret.add((CorrelationRule) r.clone()); + } + return ret; + } + + // When the engine is expanding, the rules that need to be allocated are calculated. + private List<CorrelationRule> calculateRule(List<String> existingEngineIps) throws CorrelationException { + List<CorrelationRule> enabledRules = ruleQueryWrapper.queryRuleByEnable(ENABLE); + int ruleCount = 0; + if (enabledRules != null) { + ruleCount = enabledRules.size(); + } + int count = ruleCount / latestEngineInsNum; + int remainder = ruleCount % latestEngineInsNum; + + List<CorrelationRule> ret = new ArrayList<>(); + for (String ip : existingEngineIps) { + List<CorrelationRule> rules = ruleQueryWrapper.queryRuleByEngineInstance(ip); + List<CorrelationRule> tmp = rules.subList(count + (remainder-- / existingEngineIps.size()), rules.size()); + ret.addAll(tmp); + } + return ret; + } + + // Rules that need to be allocated after the engine is destroyed + private List<CorrelationRule> reallocateRules(List<String> destroyIpList) throws CorrelationException { + List<CorrelationRule> rules = new ArrayList<>(); + try { + if (destroyIpList != null) { + for (String ip : destroyIpList) { + rules.addAll(ruleQueryWrapper.queryRuleByEngineInstance(ip)); + } + } + } catch (CorrelationException e) { + log.error("method reallocateRules get data from DB failed !", e); + } + return rules; + } + + // Extended IP + private List<String> sortOutNewEngineInstances(List<String> newIps, List<String> oldIps) { + List<String> ret = new ArrayList<>(); + + for (String ip : newIps) { + if (!oldIps.contains(ip)) { + ret.add(ip); + } + } + return ret; + } + + // Destroyed IP + private List<String> getDestroyedEngines(List<String> latest, List<String> existing) { + List<String> ret = new ArrayList<>(); + for (String ip : existing) { + if (!latest.contains(ip)) { + ret.add(ip); + } + } + return ret; + } + + // Residual IP after destruction + private List<String> getRemainingEngines(List<String> destroyed) { + List<String> ret = new ArrayList<>(); + for (String ip : latestEngineServiceIps) { + if (!destroyed.contains(ip)) { + ret.add(ip); + } + } + return ret; + } + + private void distributeRules(List<String> instanceIps, List<CorrelationRule> rules) throws CorrelationException { + List<String> sortedIps = sortIpByRuleNumDesc(instanceIps); + + for (int i = 0, j = 0; j < rules.size(); i++, j++) { + int index = i % sortedIps.size(); + String ip = sortedIps.get(index); + CorrelationRule rule = rules.get(j); + rule.setEngineInstance(ip); + allocateRule(rule, ip); + } + } + + // Sorted by the number of rules each engine contains, in a descending order. + private List<String> sortIpByRuleNumDesc(List<String> ips) { + List<CorrelationRule> rules = null; + Map<String, Integer> ruleNumOfEngines = new HashMap(); + + try { + for (String ip : ips) { + rules = ruleQueryWrapper.queryRuleByEngineInstance(ip); + if (rules != null) { + ruleNumOfEngines.put(ip, rules.size()); + } + } + } catch (Exception e) { + log.error("getEngineIp4AddRule failed !", e); + } + + List<Map.Entry<String, Integer>> sortedEntries = new ArrayList<>(ruleNumOfEngines.entrySet()); + Collections.sort(sortedEntries, (o1, o2) -> o2.getValue() - o1.getValue()); + + List<String> ret = new ArrayList<>(); + for (Map.Entry<String, Integer> entry : sortedEntries) { + ret.add(entry.getKey()); + } + return ret; + } + + private void allocateRule(CorrelationRule rule, String ip) throws CorrelationException { + try { + ruleMgtWrapper.deployRule2Engine(rule, ip); + correlationRuleDao.updateRule(rule); + } catch (CorrelationException e) { + throw new CorrelationException("allocate Deploy Rule failed", e); + } + } + + private void cleanUpRulesFromEngines(List<CorrelationRule> rules, List<String> ipList) { + try { + for (String ip : ipList) { + for (CorrelationRule rule : rules) { + if (ip.equals(rule.getEngineInstance())) { + engineWrapper.deleteRuleFromEngine(rule.getPackageName(), ip); + } + } + } + } catch (CorrelationException e) { + log.error("When the engine is extended, deleting rule failed", e); + } + } +} |