From c85baa839785a9bd02bc33e4302ac739dd5567c3 Mon Sep 17 00:00:00 2001 From: GuangrongFu Date: Fri, 17 Aug 2018 16:27:07 +0800 Subject: Added UT Codes Change-Id: I0f01e2ad4dde38e50231e3b5afef4786ccd0c54c Issue-ID: HOLMES-159 Signed-off-by: GuangrongFu --- .../holmes/rulemgt/msb/EngineInsQueryTool.java | 72 +++++++ .../org/onap/holmes/rulemgt/msb/EngineIpList.java | 75 ------- .../java/org/onap/holmes/rulemgt/msb/MsbQuery.java | 14 +- .../onap/holmes/rulemgt/send/RuleAllocation.java | 230 --------------------- .../onap/holmes/rulemgt/send/RuleAllocator.java | 228 ++++++++++++++++++++ 5 files changed, 307 insertions(+), 312 deletions(-) create mode 100644 rulemgt/src/main/java/org/onap/holmes/rulemgt/msb/EngineInsQueryTool.java delete mode 100644 rulemgt/src/main/java/org/onap/holmes/rulemgt/msb/EngineIpList.java delete mode 100644 rulemgt/src/main/java/org/onap/holmes/rulemgt/send/RuleAllocation.java create mode 100644 rulemgt/src/main/java/org/onap/holmes/rulemgt/send/RuleAllocator.java (limited to 'rulemgt/src/main/java/org/onap') diff --git a/rulemgt/src/main/java/org/onap/holmes/rulemgt/msb/EngineInsQueryTool.java b/rulemgt/src/main/java/org/onap/holmes/rulemgt/msb/EngineInsQueryTool.java new file mode 100644 index 0000000..00947bf --- /dev/null +++ b/rulemgt/src/main/java/org/onap/holmes/rulemgt/msb/EngineInsQueryTool.java @@ -0,0 +1,72 @@ +/** + * Copyright 2017 ZTE Corporation. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onap.holmes.rulemgt.msb; + + +import java.io.IOException; + +import lombok.extern.slf4j.Slf4j; +import org.apache.http.HttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.CloseableHttpClient; +import org.jvnet.hk2.annotations.Service; +import org.onap.holmes.common.api.entity.ServiceEntity; +import org.onap.holmes.common.api.entity.ServiceNode4Query; +import org.onap.holmes.common.config.MicroServiceConfig; +import org.onap.holmes.common.utils.GsonUtil; +import org.onap.holmes.common.utils.HttpsUtils; + +import javax.annotation.PostConstruct; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +@Service +@Slf4j +public class EngineInsQueryTool { + + private String url; + + @PostConstruct + public void init() { + String[] msbAddrInfo = MicroServiceConfig.getMsbIpAndPort(); + url = String.format("http://%s:%s/api/microservices/v1/services/holmes-engine-mgmt/version/v1", + msbAddrInfo[0], msbAddrInfo[1]); + } + + public List getInstanceList() throws Exception { + String response; + HttpGet httpGet = new HttpGet(url); + try (CloseableHttpClient httpClient = HttpsUtils.getHttpClient(HttpsUtils.DEFUALT_TIMEOUT)) { + HttpResponse httpResponse = HttpsUtils.get(httpGet, new HashMap<>(), httpClient); + response = HttpsUtils.extractResponseEntity(httpResponse); + } catch (Exception e) { + throw e; + } finally { + httpGet.releaseConnection(); + + } + ServiceEntity service = GsonUtil.jsonToBean(response, ServiceEntity.class); + List nodesList = service.getNodes(); + List ipList = new ArrayList<>(); + for (ServiceNode4Query node : nodesList) { + ipList.add(node.getIp()); + } + return ipList; + + } + +} diff --git a/rulemgt/src/main/java/org/onap/holmes/rulemgt/msb/EngineIpList.java b/rulemgt/src/main/java/org/onap/holmes/rulemgt/msb/EngineIpList.java deleted file mode 100644 index 2e91993..0000000 --- a/rulemgt/src/main/java/org/onap/holmes/rulemgt/msb/EngineIpList.java +++ /dev/null @@ -1,75 +0,0 @@ -/** - * Copyright 2017 ZTE Corporation. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onap.holmes.rulemgt.msb; - - -import java.io.IOException; -import lombok.extern.slf4j.Slf4j; -import org.apache.http.HttpResponse; -import org.apache.http.client.methods.HttpGet; -import org.apache.http.impl.client.CloseableHttpClient; -import org.jvnet.hk2.annotations.Service; -import org.onap.holmes.common.api.entity.ServiceEntity; -import org.onap.holmes.common.api.entity.ServiceNode4Query; -import org.onap.holmes.common.config.MicroServiceConfig; -import org.onap.holmes.common.utils.GsonUtil; -import org.onap.holmes.common.utils.HttpsUtils; - -import javax.annotation.PostConstruct; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; - -@Service -@Slf4j -public class EngineIpList { - - private String[] msbAddrInfo; - private String ip; - private String port; - private String url; - - @PostConstruct - public void init(){ - msbAddrInfo = MicroServiceConfig.getMsbIpAndPort(); - ip = msbAddrInfo[0]; - port = msbAddrInfo[1]; - url = "http://" + ip + ":" + port + "/api/microservices/v1/services/holmes-engine-mgmt/version/v1" ; - } - - public List getServiceCount()throws Exception{ - String response; - HttpGet httpGet = new HttpGet(url); - try (CloseableHttpClient httpClient = HttpsUtils.getHttpClient(HttpsUtils.DEFUALT_TIMEOUT)) { - HttpResponse httpResponse = HttpsUtils.get(httpGet, new HashMap<>(), httpClient); - response = HttpsUtils.extractResponseEntity(httpResponse); - } catch (Exception e) { - throw e; - } finally { - httpGet.releaseConnection(); - - } - ServiceEntity service = GsonUtil.jsonToBean(response, ServiceEntity.class); - List nodesList = service.getNodes(); - List ipList = new ArrayList<>(); - for (ServiceNode4Query node : nodesList) { - ipList.add(node.getIp()); - } - return ipList; - - } - -} diff --git a/rulemgt/src/main/java/org/onap/holmes/rulemgt/msb/MsbQuery.java b/rulemgt/src/main/java/org/onap/holmes/rulemgt/msb/MsbQuery.java index 2f440b6..51ed0f2 100644 --- a/rulemgt/src/main/java/org/onap/holmes/rulemgt/msb/MsbQuery.java +++ b/rulemgt/src/main/java/org/onap/holmes/rulemgt/msb/MsbQuery.java @@ -18,7 +18,7 @@ package org.onap.holmes.rulemgt.msb; import lombok.extern.slf4j.Slf4j; import org.glassfish.hk2.api.ServiceLocator; import org.onap.holmes.common.dropwizard.ioc.utils.ServiceLocatorHolder; -import org.onap.holmes.rulemgt.send.RuleAllocation; +import org.onap.holmes.rulemgt.send.RuleAllocator; import org.onap.holmes.rulemgt.send.Ip4AddingRule; import org.onap.holmes.rulemgt.wrapper.RuleMgtWrapper; @@ -30,22 +30,22 @@ import java.util.TimerTask; @Slf4j public class MsbQuery { - private RuleAllocation ruleAllocation; + private RuleAllocator ruleAllocator; private Ip4AddingRule ip4AddingRule; - private EngineIpList engineIpList; + private EngineInsQueryTool engineInsQueryTool; private RuleMgtWrapper ruleMgtWrapper; private List timerIpList; public MsbQuery() { - ruleAllocation = new RuleAllocation(); + ruleAllocator = new RuleAllocator(); ServiceLocator locator = ServiceLocatorHolder.getLocator(); ip4AddingRule = locator.getService(Ip4AddingRule.class); - engineIpList = locator.getService(EngineIpList.class); + engineInsQueryTool = locator.getService(EngineInsQueryTool.class); ruleMgtWrapper = locator.getService(RuleMgtWrapper.class); } @@ -70,11 +70,11 @@ public class MsbQuery { public void run() { try { - timerIpList = engineIpList.getServiceCount(); + timerIpList = engineInsQueryTool.getInstanceList(); log.info(String.format("There are %d engine instance(s) running currently.", timerIpList.size())); ip4AddingRule.setIpList(timerIpList); - ruleAllocation.judgeAndAllocateRule(timerIpList); + ruleAllocator.allocateRules(timerIpList); } catch (Exception e) { log.error("The timing query engine instance failed ", e); } diff --git a/rulemgt/src/main/java/org/onap/holmes/rulemgt/send/RuleAllocation.java b/rulemgt/src/main/java/org/onap/holmes/rulemgt/send/RuleAllocation.java deleted file mode 100644 index 9e7b5b2..0000000 --- a/rulemgt/src/main/java/org/onap/holmes/rulemgt/send/RuleAllocation.java +++ /dev/null @@ -1,230 +0,0 @@ -/** - * Copyright 2017 ZTE Corporation. - *

- * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onap.holmes.rulemgt.send; - -import lombok.extern.slf4j.Slf4j; -import org.glassfish.hk2.api.ServiceLocator; -import org.jvnet.hk2.annotations.Service; -import org.onap.holmes.common.api.entity.CorrelationRule; -import org.onap.holmes.common.dropwizard.ioc.utils.ServiceLocatorHolder; -import org.onap.holmes.common.exception.CorrelationException; -import org.onap.holmes.common.utils.DbDaoUtil; -import org.onap.holmes.rulemgt.bolt.enginebolt.EngineWrapper; -import org.onap.holmes.rulemgt.db.CorrelationRuleDao; -import org.onap.holmes.rulemgt.msb.EngineIpList; -import org.onap.holmes.rulemgt.wrapper.RuleQueryWrapper; -import org.onap.holmes.rulemgt.wrapper.RuleMgtWrapper; - -import javax.annotation.PostConstruct; -import javax.inject.Inject; -import java.util.*; - - -@Slf4j -public class RuleAllocation { - - private final static int ENABLE = 1; - private RuleMgtWrapper ruleMgtWrapper; - private RuleQueryWrapper ruleQueryWrapper; - private EngineWrapper engineWrapper; - private EngineIpList engineIpList; - private DbDaoUtil daoUtil; - private CorrelationRuleDao correlationRuleDao; - private int ruleCount; - private int serviceCount; - private List temIpList = new ArrayList<>(); - private List engineService = new ArrayList<>(); - private List allRules = new ArrayList<>(); - - public RuleAllocation() { - ServiceLocator locator = ServiceLocatorHolder.getLocator(); - ruleMgtWrapper = locator.getService(RuleMgtWrapper.class); - ruleQueryWrapper = locator.getService(RuleQueryWrapper.class); - engineWrapper = locator.getService(EngineWrapper.class); - engineIpList = locator.getService(EngineIpList.class); - daoUtil = locator.getService(DbDaoUtil.class); - - initDaoUtilAndEngineIp(); - } - - private void initDaoUtilAndEngineIp() { - correlationRuleDao = daoUtil.getJdbiDaoByOnDemand(CorrelationRuleDao.class); - try { - temIpList = engineIpList.getServiceCount(); - - } catch (Exception e) { - log.warn("Failed to get the number of engine instances.", e); - } - } - - public void judgeAndAllocateRule(List ipList) throws Exception { - if (ipList != null) { - engineService = ipList; - serviceCount = ipList.size(); - } - if (temIpList.size() < serviceCount) { - //extend - List deleteRule = calculateRule(temIpList); - List allocateRule = calculateRule(temIpList); - List extendIp = extendCompareIp(engineService, temIpList); - AllocateService(extendIp, allocateRule); - deleteRuleFromFormerEngine(deleteRule, temIpList); - - } else if (temIpList.size() > serviceCount) { - //destroy - List destroyIp = destroyCompareIp(engineService, temIpList); - AllocateService(restIp(destroyIp), relocateRuleAfterDestroy(destroyIp)); - - } else if (temIpList.size() == serviceCount) { - temIpList = engineService; - return; - } - temIpList = engineService; - - } - - - // When the engine is expanding, the rules that need to be allocated are calculated. - private List calculateRule(List oldIpList) throws Exception { - allRules = ruleQueryWrapper.queryRuleByEnable(ENABLE); - if (allRules != null) { - ruleCount = allRules.size(); - } - int count = ruleCount / serviceCount; - int remainder = ruleCount % serviceCount; - - List subRule = new ArrayList<>(); - for (String ip : oldIpList) { - List rules = ruleQueryWrapper.queryRuleByEngineInstance(ip); - List tem = rules.subList(count + (remainder-- / oldIpList.size()), rules.size()); - subRule.addAll(tem); - } - return subRule; - } - - //Rules that need to be allocated after the engine is destroyed - private List relocateRuleAfterDestroy(List destroyIpList) throws CorrelationException { - List rules = new ArrayList<>(); - try { - if (destroyIpList != null) { - for (String ip : destroyIpList) { - rules.addAll(ruleQueryWrapper.queryRuleByEngineInstance(ip)); - } - } - } catch (CorrelationException e) { - log.error("method relocateRuleAfterDestroy get data from DB failed !", e); - } - return rules; - } - - //Extended IP - private List extendCompareIp(List newList, List oldList) { - List extendIpList = new ArrayList<>(); - - for (String ip : newList) { - if (!oldList.contains(ip)) { - extendIpList.add(ip); - } - } - return extendIpList; - } - - //Destroyed IP - private List destroyCompareIp(List newList, List oldList) { - List destroyIpList = new ArrayList<>(); - for (String ip : oldList) { - if (!newList.contains(ip)) { - destroyIpList.add(ip); - } - } - return destroyIpList; - } - - //Residual IP after destruction - private List restIp(List destroyIp) { - List restIpList = new ArrayList<>(); - for (String ip : engineService) { - if (!destroyIp.contains(ip)) { - restIpList.add(ip); - } - } - return restIpList; - } - - public void AllocateService(List extendIpList, List subList) throws Exception { - List needIpList = getSortIp(extendIpList); - - for (int i = 0, j = 0; j < subList.size(); i++, j++) { - int index = i % needIpList.size(); - String deployIp = needIpList.get(index); - CorrelationRule rule = subList.get(j); - rule.setEngineInstance(deployIp); - allocateDeployRule(rule, deployIp); - } - } - - //The IP to be allocated is in ascending order, and the least is circulate. - private List getSortIp(List ipList) { - List ipRuleList = new ArrayList<>(); - HashMap hashMap = new HashMap(); - - try { - for (String ip : ipList) { - ipRuleList = ruleQueryWrapper.queryRuleByEngineInstance(ip); - if (ipRuleList != null) { - hashMap.put(ip, String.valueOf(ipRuleList.size())); - } - } - } catch (Exception e) { - log.error("getEngineIp4AddRule failed !", e); - } - - List> list_Data = new ArrayList<>(hashMap.entrySet()); - Collections.sort(list_Data,(o1,o2) -> o1.getValue().compareTo(o2.getValue())); - List needList = new ArrayList<>(); - for (Map.Entry map : list_Data) { - String key = map.getKey(); - needList.add(key); - } - return needList; - } - - private void allocateDeployRule(CorrelationRule rule, String ip) throws CorrelationException { - try { - ruleMgtWrapper.deployRule2Engine(rule, ip); - correlationRuleDao.updateRule(rule); - } catch (CorrelationException e) { - throw new CorrelationException("allocate Deploy Rule failed", e); - } - } - - private void deleteRuleFromFormerEngine(List subRule, List oldList) { - try { - for (String ip : oldList) { - for (CorrelationRule rule : subRule) { - if (ip.equals(rule.getEngineInstance())) { - engineWrapper.deleteRuleFromEngine(rule.getPackageName(), ip); - } - } - } - } catch (CorrelationException e) { - log.error("When the engine is extended, deleting rule failed", e); - } - - } - -} diff --git a/rulemgt/src/main/java/org/onap/holmes/rulemgt/send/RuleAllocator.java b/rulemgt/src/main/java/org/onap/holmes/rulemgt/send/RuleAllocator.java new file mode 100644 index 0000000..6779bf1 --- /dev/null +++ b/rulemgt/src/main/java/org/onap/holmes/rulemgt/send/RuleAllocator.java @@ -0,0 +1,228 @@ +/** + * Copyright 2017 ZTE Corporation. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.holmes.rulemgt.send; + +import lombok.extern.slf4j.Slf4j; +import org.glassfish.hk2.api.ServiceLocator; +import org.onap.holmes.common.api.entity.CorrelationRule; +import org.onap.holmes.common.dropwizard.ioc.utils.ServiceLocatorHolder; +import org.onap.holmes.common.exception.CorrelationException; +import org.onap.holmes.common.utils.DbDaoUtil; +import org.onap.holmes.rulemgt.bolt.enginebolt.EngineWrapper; +import org.onap.holmes.rulemgt.db.CorrelationRuleDao; +import org.onap.holmes.rulemgt.msb.EngineInsQueryTool; +import org.onap.holmes.rulemgt.wrapper.RuleMgtWrapper; +import org.onap.holmes.rulemgt.wrapper.RuleQueryWrapper; + +import java.util.*; + + +@Slf4j +public class RuleAllocator { + public final static int ENABLE = 1; + private RuleMgtWrapper ruleMgtWrapper; + private RuleQueryWrapper ruleQueryWrapper; + private EngineWrapper engineWrapper; + private EngineInsQueryTool engineInsQueryTool; + private DbDaoUtil daoUtil; + private CorrelationRuleDao correlationRuleDao; + private int latestEngineInsNum = 0; + private List existingEngineServiceIps = new ArrayList<>(); + private List latestEngineServiceIps = new ArrayList<>(); + + public RuleAllocator() { + ServiceLocator locator = ServiceLocatorHolder.getLocator(); + ruleMgtWrapper = locator.getService(RuleMgtWrapper.class); + ruleQueryWrapper = locator.getService(RuleQueryWrapper.class); + engineWrapper = locator.getService(EngineWrapper.class); + engineInsQueryTool = locator.getService(EngineInsQueryTool.class); + daoUtil = locator.getService(DbDaoUtil.class); + + initDaoUtilAndEngineIp(); + } + + private void initDaoUtilAndEngineIp() { + correlationRuleDao = daoUtil.getJdbiDaoByOnDemand(CorrelationRuleDao.class); + try { + existingEngineServiceIps = engineInsQueryTool.getInstanceList(); + + } catch (Exception e) { + log.warn("Failed to get the number of engine instances.", e); + } + } + + public synchronized void allocateRules(List latestEngineIps) throws Exception { + if (latestEngineIps == null) { + throw new NullPointerException("The parameter of " + this.getClass().getSimpleName() + + ".allocateRules(List) can not be null!"); + } + + latestEngineServiceIps = latestEngineIps; + latestEngineInsNum = latestEngineIps.size(); + if (existingEngineServiceIps.size() < latestEngineInsNum) { + //extend + List rules2Allocate = calculateRule(existingEngineServiceIps); + List rules2Delete = copyList(rules2Allocate); + List newInstanceIds = sortOutNewEngineInstances(latestEngineServiceIps, existingEngineServiceIps); + distributeRules(newInstanceIds, rules2Allocate); + cleanUpRulesFromEngines(rules2Delete, existingEngineServiceIps); + } else if (existingEngineServiceIps.size() > latestEngineInsNum) { + //destroy + List destroyed = getDestroyedEngines(latestEngineServiceIps, existingEngineServiceIps); + distributeRules(getRemainingEngines(destroyed), reallocateRules(destroyed)); + } + + existingEngineServiceIps = latestEngineServiceIps; + } + + private List copyList(List rules) { + List ret = new ArrayList<>(rules.size()); + for (CorrelationRule r : rules) { + ret.add((CorrelationRule) r.clone()); + } + return ret; + } + + // When the engine is expanding, the rules that need to be allocated are calculated. + private List calculateRule(List existingEngineIps) throws CorrelationException { + List enabledRules = ruleQueryWrapper.queryRuleByEnable(ENABLE); + int ruleCount = 0; + if (enabledRules != null) { + ruleCount = enabledRules.size(); + } + int count = ruleCount / latestEngineInsNum; + int remainder = ruleCount % latestEngineInsNum; + + List ret = new ArrayList<>(); + for (String ip : existingEngineIps) { + List rules = ruleQueryWrapper.queryRuleByEngineInstance(ip); + List tmp = rules.subList(count + (remainder-- / existingEngineIps.size()), rules.size()); + ret.addAll(tmp); + } + return ret; + } + + // Rules that need to be allocated after the engine is destroyed + private List reallocateRules(List destroyIpList) throws CorrelationException { + List rules = new ArrayList<>(); + try { + if (destroyIpList != null) { + for (String ip : destroyIpList) { + rules.addAll(ruleQueryWrapper.queryRuleByEngineInstance(ip)); + } + } + } catch (CorrelationException e) { + log.error("method reallocateRules get data from DB failed !", e); + } + return rules; + } + + // Extended IP + private List sortOutNewEngineInstances(List newIps, List oldIps) { + List ret = new ArrayList<>(); + + for (String ip : newIps) { + if (!oldIps.contains(ip)) { + ret.add(ip); + } + } + return ret; + } + + // Destroyed IP + private List getDestroyedEngines(List latest, List existing) { + List ret = new ArrayList<>(); + for (String ip : existing) { + if (!latest.contains(ip)) { + ret.add(ip); + } + } + return ret; + } + + // Residual IP after destruction + private List getRemainingEngines(List destroyed) { + List ret = new ArrayList<>(); + for (String ip : latestEngineServiceIps) { + if (!destroyed.contains(ip)) { + ret.add(ip); + } + } + return ret; + } + + private void distributeRules(List instanceIps, List rules) throws CorrelationException { + List sortedIps = sortIpByRuleNumDesc(instanceIps); + + for (int i = 0, j = 0; j < rules.size(); i++, j++) { + int index = i % sortedIps.size(); + String ip = sortedIps.get(index); + CorrelationRule rule = rules.get(j); + rule.setEngineInstance(ip); + allocateRule(rule, ip); + } + } + + // Sorted by the number of rules each engine contains, in a descending order. + private List sortIpByRuleNumDesc(List ips) { + List rules = null; + Map ruleNumOfEngines = new HashMap(); + + try { + for (String ip : ips) { + rules = ruleQueryWrapper.queryRuleByEngineInstance(ip); + if (rules != null) { + ruleNumOfEngines.put(ip, rules.size()); + } + } + } catch (Exception e) { + log.error("getEngineIp4AddRule failed !", e); + } + + List> sortedEntries = new ArrayList<>(ruleNumOfEngines.entrySet()); + Collections.sort(sortedEntries, (o1, o2) -> o2.getValue() - o1.getValue()); + + List ret = new ArrayList<>(); + for (Map.Entry entry : sortedEntries) { + ret.add(entry.getKey()); + } + return ret; + } + + private void allocateRule(CorrelationRule rule, String ip) throws CorrelationException { + try { + ruleMgtWrapper.deployRule2Engine(rule, ip); + correlationRuleDao.updateRule(rule); + } catch (CorrelationException e) { + throw new CorrelationException("allocate Deploy Rule failed", e); + } + } + + private void cleanUpRulesFromEngines(List rules, List ipList) { + try { + for (String ip : ipList) { + for (CorrelationRule rule : rules) { + if (ip.equals(rule.getEngineInstance())) { + engineWrapper.deleteRuleFromEngine(rule.getPackageName(), ip); + } + } + } + } catch (CorrelationException e) { + log.error("When the engine is extended, deleting rule failed", e); + } + } +} -- cgit 1.2.3-korg