diff options
author | tang peng <tang.peng5@zte.com.cn> | 2020-10-10 11:19:22 +0800 |
---|---|---|
committer | tang peng <tang.peng5@zte.com.cn> | 2020-10-10 11:19:22 +0800 |
commit | 348ce6e112876f552a939e58d74376704537344e (patch) | |
tree | e8bed6f7be5a87dd1eb770bb9f57965e7a8a117c /rulemgt/src/main/java/org | |
parent | 7dbc431fa39b0c0094033780c78cb6f871221f85 (diff) |
Fixed MSB Invocation Issues
Issue-ID: HOLMES-365
Signed-off-by: tang peng <tang.peng5@zte.com.cn>
Change-Id: Ibae0924268e25f0af5f13ded1e2e1be51e8106d8
Diffstat (limited to 'rulemgt/src/main/java/org')
11 files changed, 233 insertions, 291 deletions
diff --git a/rulemgt/src/main/java/org/onap/holmes/rulemgt/RuleActiveApp.java b/rulemgt/src/main/java/org/onap/holmes/rulemgt/RuleActiveApp.java index ba61a28..5833a94 100644 --- a/rulemgt/src/main/java/org/onap/holmes/rulemgt/RuleActiveApp.java +++ b/rulemgt/src/main/java/org/onap/holmes/rulemgt/RuleActiveApp.java @@ -1,5 +1,5 @@ /** - * Copyright 2017-2018 ZTE Corporation. + * Copyright 2017-2020 ZTE Corporation. * <p> * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,8 +24,6 @@ import org.onap.holmes.common.utils.HttpsUtils; import org.onap.holmes.common.utils.MSBRegisterUtil; import org.onap.holmes.common.utils.transactionid.TransactionIdFilter; import org.onap.holmes.rulemgt.dcae.DcaeConfigurationPolling; -import org.onap.holmes.rulemgt.msb.MsbQuery; -import org.onap.holmes.rulemgt.resources.RuleMgtResources; import org.onap.msb.sdk.discovery.entity.MicroServiceInfo; import org.onap.msb.sdk.discovery.entity.Node; import org.slf4j.Logger; @@ -66,8 +64,6 @@ public class RuleActiveApp extends IOCApplication<RuleAppConfig> { environment.servlets().addFilter("customFilter", new TransactionIdFilter()).addMappingForUrlPatterns(EnumSet .allOf(DispatcherType.class), true, "/*"); - - new MsbQuery().startTimer(); } private MicroServiceInfo createMicroServiceInfo() { diff --git a/rulemgt/src/main/java/org/onap/holmes/rulemgt/send/RuleAllocator.java b/rulemgt/src/main/java/org/onap/holmes/rulemgt/RuleAllocator.java index 6779bf1..2dc05ee 100644 --- a/rulemgt/src/main/java/org/onap/holmes/rulemgt/send/RuleAllocator.java +++ b/rulemgt/src/main/java/org/onap/holmes/rulemgt/RuleAllocator.java @@ -1,5 +1,5 @@ /** - * Copyright 2017 ZTE Corporation. + * Copyright 2017-2020 ZTE Corporation. * <p> * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -14,79 +14,91 @@ * limitations under the License. */ -package org.onap.holmes.rulemgt.send; +package org.onap.holmes.rulemgt; import lombok.extern.slf4j.Slf4j; -import org.glassfish.hk2.api.ServiceLocator; +import org.jvnet.hk2.annotations.Service; import org.onap.holmes.common.api.entity.CorrelationRule; -import org.onap.holmes.common.dropwizard.ioc.utils.ServiceLocatorHolder; import org.onap.holmes.common.exception.CorrelationException; import org.onap.holmes.common.utils.DbDaoUtil; import org.onap.holmes.rulemgt.bolt.enginebolt.EngineWrapper; import org.onap.holmes.rulemgt.db.CorrelationRuleDao; -import org.onap.holmes.rulemgt.msb.EngineInsQueryTool; +import org.onap.holmes.rulemgt.tools.EngineTools; import org.onap.holmes.rulemgt.wrapper.RuleMgtWrapper; import org.onap.holmes.rulemgt.wrapper.RuleQueryWrapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import javax.annotation.PostConstruct; +import javax.inject.Inject; import java.util.*; +import static java.util.concurrent.TimeUnit.SECONDS; @Slf4j +@Service public class RuleAllocator { + private static final Logger LOGGER = LoggerFactory.getLogger(RuleAllocator.class); + public final static int ENABLE = 1; private RuleMgtWrapper ruleMgtWrapper; private RuleQueryWrapper ruleQueryWrapper; private EngineWrapper engineWrapper; - private EngineInsQueryTool engineInsQueryTool; - private DbDaoUtil daoUtil; + private EngineTools engineTools; private CorrelationRuleDao correlationRuleDao; - private int latestEngineInsNum = 0; - private List<String> existingEngineServiceIps = new ArrayList<>(); - private List<String> latestEngineServiceIps = new ArrayList<>(); - - public RuleAllocator() { - ServiceLocator locator = ServiceLocatorHolder.getLocator(); - ruleMgtWrapper = locator.getService(RuleMgtWrapper.class); - ruleQueryWrapper = locator.getService(RuleQueryWrapper.class); - engineWrapper = locator.getService(EngineWrapper.class); - engineInsQueryTool = locator.getService(EngineInsQueryTool.class); - daoUtil = locator.getService(DbDaoUtil.class); - - initDaoUtilAndEngineIp(); - } - private void initDaoUtilAndEngineIp() { + @Inject + public RuleAllocator(RuleMgtWrapper ruleMgtWrapper, RuleQueryWrapper ruleQueryWrapper, + EngineWrapper engineWrapper, EngineTools engineTools, DbDaoUtil daoUtil) { + this.ruleMgtWrapper = ruleMgtWrapper; + this.ruleQueryWrapper = ruleQueryWrapper; + this.engineWrapper = engineWrapper; + this.engineTools = engineTools; correlationRuleDao = daoUtil.getJdbiDaoByOnDemand(CorrelationRuleDao.class); - try { - existingEngineServiceIps = engineInsQueryTool.getInstanceList(); + } - } catch (Exception e) { - log.warn("Failed to get the number of engine instances.", e); - } + @PostConstruct + private void initialize() { + new Timer("RuleAllocatorTimer").schedule(new TimerTask() { + + public void run() { + try { + allocateRules(); + } catch (Exception e) { + LOGGER.error("Failed to reallocate rules.", e); + } + } + + }, SECONDS.toMillis(10), SECONDS.toMillis(30)); } - public synchronized void allocateRules(List<String> latestEngineIps) throws Exception { - if (latestEngineIps == null) { - throw new NullPointerException("The parameter of " + this.getClass().getSimpleName() - + ".allocateRules(List<String>) can not be null!"); + public synchronized void allocateRules() throws Exception { + List<String> engines = engineTools.getInstanceList(); + + if (engines == null) { + return; } - latestEngineServiceIps = latestEngineIps; - latestEngineInsNum = latestEngineIps.size(); - if (existingEngineServiceIps.size() < latestEngineInsNum) { + int numOfEngines = engines.size(); + LOGGER.info(String.format("There are %d engine instance(s) running currently.", numOfEngines)); + + List<String> legacyEngineInstances = engineTools.getLegacyEngineInstances(); + if (legacyEngineInstances == null) { + return; + } + + if (legacyEngineInstances.size() < numOfEngines) { //extend - List<CorrelationRule> rules2Allocate = calculateRule(existingEngineServiceIps); + List<CorrelationRule> rules2Allocate = calculateRule(legacyEngineInstances, numOfEngines); List<CorrelationRule> rules2Delete = copyList(rules2Allocate); - List<String> newInstanceIds = sortOutNewEngineInstances(latestEngineServiceIps, existingEngineServiceIps); + List<String> newInstanceIds = sortOutNewEngineInstances(engines, legacyEngineInstances); distributeRules(newInstanceIds, rules2Allocate); - cleanUpRulesFromEngines(rules2Delete, existingEngineServiceIps); - } else if (existingEngineServiceIps.size() > latestEngineInsNum) { + cleanUpRulesFromEngines(rules2Delete, legacyEngineInstances); + } else { //destroy - List<String> destroyed = getDestroyedEngines(latestEngineServiceIps, existingEngineServiceIps); - distributeRules(getRemainingEngines(destroyed), reallocateRules(destroyed)); + List<String> destroyed = getDestroyedEngines(engines, legacyEngineInstances); + distributeRules(getRemainingEngines(engines, destroyed), getRules(destroyed)); } - - existingEngineServiceIps = latestEngineServiceIps; } private List<CorrelationRule> copyList(List<CorrelationRule> rules) { @@ -98,13 +110,16 @@ public class RuleAllocator { } // When the engine is expanding, the rules that need to be allocated are calculated. - private List<CorrelationRule> calculateRule(List<String> existingEngineIps) throws CorrelationException { + private List<CorrelationRule> calculateRule(List<String> existingEngineIps, + int latestEngineInsNum) throws CorrelationException { List<CorrelationRule> enabledRules = ruleQueryWrapper.queryRuleByEnable(ENABLE); int ruleCount = 0; if (enabledRules != null) { ruleCount = enabledRules.size(); } + // Average number of rule that's to be allocate into each instance int count = ruleCount / latestEngineInsNum; + // The number of remaining rules (to be allocated) after each instance has been allocated with the average number of rules. int remainder = ruleCount % latestEngineInsNum; List<CorrelationRule> ret = new ArrayList<>(); @@ -117,7 +132,7 @@ public class RuleAllocator { } // Rules that need to be allocated after the engine is destroyed - private List<CorrelationRule> reallocateRules(List<String> destroyIpList) throws CorrelationException { + private List<CorrelationRule> getRules(List<String> destroyIpList) throws CorrelationException { List<CorrelationRule> rules = new ArrayList<>(); try { if (destroyIpList != null) { @@ -126,7 +141,7 @@ public class RuleAllocator { } } } catch (CorrelationException e) { - log.error("method reallocateRules get data from DB failed !", e); + LOGGER.error("method getRules get data from DB failed !", e); } return rules; } @@ -155,9 +170,9 @@ public class RuleAllocator { } // Residual IP after destruction - private List<String> getRemainingEngines(List<String> destroyed) { + private List<String> getRemainingEngines(List<String> all, List<String> destroyed) { List<String> ret = new ArrayList<>(); - for (String ip : latestEngineServiceIps) { + for (String ip : all) { if (!destroyed.contains(ip)) { ret.add(ip); } @@ -190,7 +205,7 @@ public class RuleAllocator { } } } catch (Exception e) { - log.error("getEngineIp4AddRule failed !", e); + LOGGER.error("getEngineWithLeastRules failed !", e); } List<Map.Entry<String, Integer>> sortedEntries = new ArrayList<>(ruleNumOfEngines.entrySet()); @@ -208,7 +223,8 @@ public class RuleAllocator { ruleMgtWrapper.deployRule2Engine(rule, ip); correlationRuleDao.updateRule(rule); } catch (CorrelationException e) { - throw new CorrelationException("allocate Deploy Rule failed", e); + throw new CorrelationException(String.format("Failed to allocate rule <%s> to <%s>", + rule.getName(), ip), e); } } @@ -222,7 +238,7 @@ public class RuleAllocator { } } } catch (CorrelationException e) { - log.error("When the engine is extended, deleting rule failed", e); + LOGGER.error("When the engine is extended, deleting rule failed", e); } } } diff --git a/rulemgt/src/main/java/org/onap/holmes/rulemgt/bolt/enginebolt/EngineService.java b/rulemgt/src/main/java/org/onap/holmes/rulemgt/bolt/enginebolt/EngineService.java index cc20cab..8782d7a 100644 --- a/rulemgt/src/main/java/org/onap/holmes/rulemgt/bolt/enginebolt/EngineService.java +++ b/rulemgt/src/main/java/org/onap/holmes/rulemgt/bolt/enginebolt/EngineService.java @@ -1,12 +1,12 @@ /**
- * Copyright 2017 ZTE Corporation.
- *
+ * Copyright 2017-2020 ZTE Corporation.
+ * <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -15,9 +15,6 @@ */
package org.onap.holmes.rulemgt.bolt.enginebolt;
-import java.io.IOException;
-import java.util.HashMap;
-import javax.ws.rs.core.MediaType;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpDelete;
@@ -32,6 +29,10 @@ import org.onap.holmes.rulemgt.bean.request.CorrelationCheckRule4Engine; import org.onap.holmes.rulemgt.bean.request.CorrelationDeployRule4Engine;
import org.onap.holmes.rulemgt.constant.RuleMgtConstant;
+import javax.ws.rs.core.MediaType;
+import java.io.IOException;
+import java.util.HashMap;
+
@Slf4j
@Service
public class EngineService {
@@ -78,7 +79,7 @@ public class EngineService { HttpPut httpPut = new HttpPut(url);
try {
httpClient = HttpsUtils.getConditionalHttpsClient(HttpsUtils.DEFUALT_TIMEOUT);
- return HttpsUtils.put(httpPut, headers, new HashMap<>(), new StringEntity(content),httpClient);
+ return HttpsUtils.put(httpPut, headers, new HashMap<>(), new StringEntity(content), httpClient);
} finally {
closeHttpClient(httpClient);
}
@@ -101,7 +102,7 @@ public class EngineService { return headers;
}
- private String getRequestPref(){
+ private String getRequestPref() {
return HttpsUtils.isHttpsEnabled() ? HTTPS : HTTP;
}
}
diff --git a/rulemgt/src/main/java/org/onap/holmes/rulemgt/bolt/enginebolt/EngineWrapper.java b/rulemgt/src/main/java/org/onap/holmes/rulemgt/bolt/enginebolt/EngineWrapper.java index 479437e..4fe5896 100644 --- a/rulemgt/src/main/java/org/onap/holmes/rulemgt/bolt/enginebolt/EngineWrapper.java +++ b/rulemgt/src/main/java/org/onap/holmes/rulemgt/bolt/enginebolt/EngineWrapper.java @@ -1,12 +1,12 @@ /**
* Copyright 2017-2020 ZTE Corporation.
- *
+ * <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -35,7 +35,7 @@ public class EngineWrapper { @Inject
private EngineService engineService;
- public String deployEngine(CorrelationDeployRule4Engine correlationRule,String ip) throws CorrelationException {
+ public String deployEngine(CorrelationDeployRule4Engine correlationRule, String ip) throws CorrelationException {
HttpResponse response;
try {
response = engineService.deploy(correlationRule, ip);
@@ -55,7 +55,7 @@ public class EngineWrapper { }
}
- public boolean deleteRuleFromEngine(String packageName,String ip) throws CorrelationException {
+ public boolean deleteRuleFromEngine(String packageName, String ip) throws CorrelationException {
HttpResponse response;
try {
response = engineService.delete(packageName, ip);
@@ -70,7 +70,7 @@ public class EngineWrapper { }
}
- public boolean checkRuleFromEngine(CorrelationCheckRule4Engine correlationCheckRule4Engine,String ip)
+ public boolean checkRuleFromEngine(CorrelationCheckRule4Engine correlationCheckRule4Engine, String ip)
throws CorrelationException {
log.info("Rule Contents: " + correlationCheckRule4Engine.getContent());
HttpResponse response;
diff --git a/rulemgt/src/main/java/org/onap/holmes/rulemgt/controller/EngineInstanceController.java b/rulemgt/src/main/java/org/onap/holmes/rulemgt/controller/EngineInstanceController.java new file mode 100644 index 0000000..2850708 --- /dev/null +++ b/rulemgt/src/main/java/org/onap/holmes/rulemgt/controller/EngineInstanceController.java @@ -0,0 +1,56 @@ +/** + * Copyright 2020 ZTE Corporation. + * <p> + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.holmes.rulemgt.controller; + +import org.jvnet.hk2.annotations.Service; +import org.onap.holmes.common.engine.entity.EngineEntity; +import org.onap.holmes.common.engine.service.EngineEntityService; + +import javax.annotation.PostConstruct; +import javax.inject.Inject; +import javax.inject.Named; +import java.util.List; +import java.util.Timer; +import java.util.TimerTask; + +import static java.util.concurrent.TimeUnit.MINUTES; +import static java.util.concurrent.TimeUnit.SECONDS; + +@Service +public class EngineInstanceController extends TimerTask { + private static final long INTERVAL = SECONDS.toMillis(15); + private static final long THRESHOLD = 3 * INTERVAL; + private Timer timer = new Timer("EngineInstanceController", true); + + @Inject + private EngineEntityService engineEntityService; + + @PostConstruct + public void initialize() { + timer.schedule(this, MINUTES.toMillis(1), INTERVAL); + } + + @Override + public void run() { + List<EngineEntity> entityList = engineEntityService.getAllEntities(); + for (EngineEntity entity : entityList) { + if (System.currentTimeMillis() - entity.getLastModified() > THRESHOLD) { + engineEntityService.deleteEntity(entity.getId()); + } + } + } +} diff --git a/rulemgt/src/main/java/org/onap/holmes/rulemgt/db/CorrelationRuleDao.java b/rulemgt/src/main/java/org/onap/holmes/rulemgt/db/CorrelationRuleDao.java index 64dcea2..a9be49f 100644 --- a/rulemgt/src/main/java/org/onap/holmes/rulemgt/db/CorrelationRuleDao.java +++ b/rulemgt/src/main/java/org/onap/holmes/rulemgt/db/CorrelationRuleDao.java @@ -16,6 +16,8 @@ package org.onap.holmes.rulemgt.db;
import java.util.List;
+
+import org.jvnet.hk2.annotations.Service;
import org.onap.holmes.common.api.entity.CorrelationRule;
import org.onap.holmes.common.exception.CorrelationException;
import org.onap.holmes.common.utils.CorrelationRuleMapper;
@@ -26,6 +28,7 @@ import org.skife.jdbi.v2.sqlobject.SqlQuery; import org.skife.jdbi.v2.sqlobject.SqlUpdate;
import org.skife.jdbi.v2.sqlobject.customizers.RegisterMapper;
+@Service
@RegisterMapper(CorrelationRuleMapper.class)
public abstract class CorrelationRuleDao {
diff --git a/rulemgt/src/main/java/org/onap/holmes/rulemgt/msb/EngineInsQueryTool.java b/rulemgt/src/main/java/org/onap/holmes/rulemgt/msb/EngineInsQueryTool.java deleted file mode 100644 index 36b5b6f..0000000 --- a/rulemgt/src/main/java/org/onap/holmes/rulemgt/msb/EngineInsQueryTool.java +++ /dev/null @@ -1,70 +0,0 @@ -/** - * Copyright 2017 ZTE Corporation. - * <p> - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onap.holmes.rulemgt.msb; - - -import lombok.extern.slf4j.Slf4j; -import org.apache.http.HttpResponse; -import org.apache.http.client.methods.HttpGet; -import org.apache.http.impl.client.CloseableHttpClient; -import org.jvnet.hk2.annotations.Service; -import org.onap.holmes.common.api.entity.ServiceEntity; -import org.onap.holmes.common.api.entity.ServiceNode4Query; -import org.onap.holmes.common.config.MicroServiceConfig; -import org.onap.holmes.common.utils.GsonUtil; -import org.onap.holmes.common.utils.HttpsUtils; - -import javax.annotation.PostConstruct; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; - -@Service -@Slf4j -public class EngineInsQueryTool { - - private String url; - - @PostConstruct - public void init() { - String[] msbAddrInfo = MicroServiceConfig.getMsbIpAndPort(); - url = String.format("http://%s:%s/api/microservices/v1/services/holmes-engine-mgmt/version/v1", - msbAddrInfo[0], msbAddrInfo[1]); - } - - public List<String> getInstanceList() throws Exception { - String response; - HttpGet httpGet = new HttpGet(url); - try (CloseableHttpClient httpClient = HttpsUtils.getConditionalHttpsClient(HttpsUtils.DEFUALT_TIMEOUT)) { - HttpResponse httpResponse = HttpsUtils.get(httpGet, new HashMap<>(), httpClient); - response = HttpsUtils.extractResponseEntity(httpResponse); - } catch (Exception e) { - throw e; - } finally { - httpGet.releaseConnection(); - - } - ServiceEntity service = GsonUtil.jsonToBean(response, ServiceEntity.class); - List<ServiceNode4Query> nodesList = service.getNodes(); - List<String> ipList = new ArrayList<>(); - for (ServiceNode4Query node : nodesList) { - ipList.add(node.getIp()); - } - return ipList; - - } - -} diff --git a/rulemgt/src/main/java/org/onap/holmes/rulemgt/msb/MsbQuery.java b/rulemgt/src/main/java/org/onap/holmes/rulemgt/msb/MsbQuery.java deleted file mode 100644 index df0783d..0000000 --- a/rulemgt/src/main/java/org/onap/holmes/rulemgt/msb/MsbQuery.java +++ /dev/null @@ -1,69 +0,0 @@ -/** - * Copyright 2017-2020 ZTE Corporation. - * <p> - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onap.holmes.rulemgt.msb; - -import org.onap.holmes.common.dropwizard.ioc.utils.ServiceLocatorHolder; -import org.onap.holmes.rulemgt.send.Ip4AddingRule; -import org.onap.holmes.rulemgt.send.RuleAllocator; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.List; -import java.util.Timer; -import java.util.TimerTask; - -import static java.util.concurrent.TimeUnit.SECONDS; - -public class MsbQuery { - - static final private Logger log = LoggerFactory.getLogger(MsbQuery.class); - final private RuleAllocator ruleAllocator; - private Ip4AddingRule ip4AddingRule; - private EngineInsQueryTool engineInsQueryTool; - - public MsbQuery() { - ruleAllocator = new RuleAllocator(); - ip4AddingRule = ServiceLocatorHolder.getLocator().getService(Ip4AddingRule.class); - engineInsQueryTool = ServiceLocatorHolder.getLocator().getService(EngineInsQueryTool.class); - } - - public void startTimer() { - try { - new Timer().schedule(new TimerTask() { - - public void run() { - try { - List<String> timerIpList = engineInsQueryTool.getInstanceList(); - log.info(String.format("There are %d engine instance(s) running currently.", timerIpList.size())); - - ip4AddingRule.setIpList(timerIpList); - ruleAllocator.allocateRules(timerIpList); - } catch (Exception e) { - log.error("The timing query engine instance failed ", e); - } - } - - }, SECONDS.toMillis(10), SECONDS.toMillis(30)); - } catch (Exception e) { - log.error("MSBQuery startTimer timer task failed !" + e.getMessage(), e); - try { - SECONDS.sleep(30); - } catch (InterruptedException e1) { - Thread.currentThread().interrupt(); - } - } - } -} diff --git a/rulemgt/src/main/java/org/onap/holmes/rulemgt/send/Ip4AddingRule.java b/rulemgt/src/main/java/org/onap/holmes/rulemgt/send/Ip4AddingRule.java deleted file mode 100644 index 4a2dba3..0000000 --- a/rulemgt/src/main/java/org/onap/holmes/rulemgt/send/Ip4AddingRule.java +++ /dev/null @@ -1,66 +0,0 @@ -/** - * Copyright 2017 ZTE Corporation. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.onap.holmes.rulemgt.send; - -import lombok.extern.slf4j.Slf4j; -import org.jvnet.hk2.annotations.Service; -import org.onap.holmes.common.api.entity.CorrelationRule; -import org.onap.holmes.rulemgt.wrapper.RuleQueryWrapper; - -import javax.inject.Inject; -import java.util.*; - -@Service -@Slf4j -public class Ip4AddingRule { - - @Inject - private RuleQueryWrapper ruleQueryWrapper; - private List<String> engineService; - - public void setIpList(List<String> ipList){ - engineService = ipList; - } - - public String getEngineIp4AddRule() { - List<CorrelationRule> ipRuleList = new ArrayList<>(); - LinkedHashMap<String,Integer> linkedHashMap = new LinkedHashMap<>(); - - try{ - for(String ip : engineService){ - ipRuleList = ruleQueryWrapper.queryRuleByEngineInstance(ip); - if(ipRuleList != null) { - linkedHashMap.put(ip, ipRuleList.size()); - } - } - }catch (Exception e){ - log.error("getEngineIp4AddRule failed !" + e.getMessage()); - } - - //min - Collection<Integer> c = linkedHashMap.values(); - Object[] obj = c.toArray(); - Arrays.sort(obj); - String ip = null; - for(String getKey: linkedHashMap.keySet()){ - if(linkedHashMap.get(getKey).equals(obj[0])){ - ip = getKey; - } - } - return ip; - } -} diff --git a/rulemgt/src/main/java/org/onap/holmes/rulemgt/tools/EngineTools.java b/rulemgt/src/main/java/org/onap/holmes/rulemgt/tools/EngineTools.java new file mode 100644 index 0000000..4896bde --- /dev/null +++ b/rulemgt/src/main/java/org/onap/holmes/rulemgt/tools/EngineTools.java @@ -0,0 +1,74 @@ +/** + * Copyright 2017-2020 ZTE Corporation. + * <p> + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.holmes.rulemgt.tools; + +import lombok.extern.slf4j.Slf4j; +import org.jvnet.hk2.annotations.Service; +import org.onap.holmes.common.api.entity.CorrelationRule; +import org.onap.holmes.common.engine.entity.EngineEntity; +import org.onap.holmes.common.engine.service.EngineEntityService; +import org.onap.holmes.rulemgt.wrapper.RuleQueryWrapper; + +import javax.inject.Inject; +import java.util.Arrays; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.stream.Collectors; + +@Service +@Slf4j +public class EngineTools { + + @Inject + private RuleQueryWrapper ruleQueryWrapper; + @Inject + private EngineEntityService engineEntityService; + + public List<String> getInstanceList() { + List<EngineEntity> entities = engineEntityService.getAllEntities(); + return entities.stream().map(EngineEntity::getIp).collect(Collectors.toList()); + } + + public List<String> getLegacyEngineInstances() { + return engineEntityService.getLegacyEngines(); + } + + public String getEngineWithLeastRules() { + LinkedHashMap<String, Integer> ruleNumInEachEngine = new LinkedHashMap<>(); + + try { + for (String ip : getInstanceList()) { + List<CorrelationRule> rules = ruleQueryWrapper.queryRuleByEngineInstance(ip); + if (rules != null) { + ruleNumInEachEngine.put(ip, rules.size()); + } + } + } catch (Exception e) { + log.error("getEngineWithLeastRules failed!" + e.getMessage()); + } + + Integer[] numOfRules = ruleNumInEachEngine.values().toArray(new Integer[0]); + Arrays.sort(numOfRules); + + for (String ip : ruleNumInEachEngine.keySet()) { + if (ruleNumInEachEngine.get(ip) == numOfRules[0]) { + return ip; + } + } + return null; + } +} diff --git a/rulemgt/src/main/java/org/onap/holmes/rulemgt/wrapper/RuleMgtWrapper.java b/rulemgt/src/main/java/org/onap/holmes/rulemgt/wrapper/RuleMgtWrapper.java index 196b21a..22982da 100644 --- a/rulemgt/src/main/java/org/onap/holmes/rulemgt/wrapper/RuleMgtWrapper.java +++ b/rulemgt/src/main/java/org/onap/holmes/rulemgt/wrapper/RuleMgtWrapper.java @@ -1,12 +1,12 @@ /**
* Copyright 2017 ZTE Corporation.
- *
+ * <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -21,6 +21,7 @@ import java.util.List; import javax.annotation.PostConstruct;
import javax.inject.Inject;
import javax.inject.Singleton;
+
import lombok.extern.slf4j.Slf4j;
import org.jvnet.hk2.annotations.Service;
import org.onap.holmes.rulemgt.bean.request.CorrelationCheckRule4Engine;
@@ -39,7 +40,7 @@ import org.onap.holmes.rulemgt.bean.response.RuleAddAndUpdateResponse; import org.onap.holmes.rulemgt.bean.response.RuleQueryListResponse;
import org.onap.holmes.rulemgt.bolt.enginebolt.EngineWrapper;
import org.onap.holmes.rulemgt.db.CorrelationRuleQueryDao;
-import org.onap.holmes.rulemgt.send.Ip4AddingRule;
+import org.onap.holmes.rulemgt.tools.EngineTools;
@Service
@@ -48,7 +49,7 @@ import org.onap.holmes.rulemgt.send.Ip4AddingRule; public class RuleMgtWrapper {
@Inject
- private Ip4AddingRule ip4AddingRule;
+ private EngineTools engineTools;
@Inject
private RuleQueryWrapper ruleQueryWrapper;
@@ -79,10 +80,10 @@ public class RuleMgtWrapper { if (ruleTemp != null) {
throw new CorrelationException("A rule with the same name already exists.");
}
- String ip ="";
- try{
- ip = ip4AddingRule.getEngineIp4AddRule();
- }catch(Exception e){
+ String ip = "";
+ try {
+ ip = engineTools.getEngineWithLeastRules();
+ } catch (Exception e) {
log.error("When adding rules, can not get engine instance ip");
}
String packageName = deployRule2Engine(correlationRule, ip);
@@ -180,7 +181,7 @@ public class RuleMgtWrapper { }
private CorrelationRule convertCreateRequest2Rule(String userName,
- RuleCreateRequest ruleCreateRequest) throws CorrelationException {
+ RuleCreateRequest ruleCreateRequest) throws CorrelationException {
String tempContent = ruleCreateRequest.getContent();
CorrelationRule correlationRule = new CorrelationRule();
String ruleId = "rule_" + System.currentTimeMillis();
@@ -205,7 +206,7 @@ public class RuleMgtWrapper { }
private CorrelationRule convertRuleUpdateRequest2CorrelationRule(String modifier,
- RuleUpdateRequest ruleUpdateRequest, String ruleName) throws CorrelationException {
+ RuleUpdateRequest ruleUpdateRequest, String ruleName) throws CorrelationException {
CorrelationRule correlationRule = new CorrelationRule();
String description = ruleUpdateRequest.getDescription() == null ? "" : ruleUpdateRequest.getDescription();
correlationRule.setRid(ruleUpdateRequest.getRuleId());
@@ -221,7 +222,7 @@ public class RuleMgtWrapper { public String deployRule2Engine(CorrelationRule correlationRule, String ip)
throws CorrelationException {
- if (engineWarpper.checkRuleFromEngine(correlationRules2CheckRule(correlationRule), ip) && (
+ if (engineWarpper.checkRuleFromEngine(toCorrelationCheckRule(correlationRule), ip) && (
correlationRule.getEnabled() == RuleMgtConstant.STATUS_RULE_OPEN)) {
return engineWarpper.deployEngine(correlationRules2DeployRule(correlationRule), ip);
}
@@ -269,7 +270,7 @@ public class RuleMgtWrapper { return correlationDeployRule4Engine;
}
- private CorrelationCheckRule4Engine correlationRules2CheckRule(
+ private CorrelationCheckRule4Engine toCorrelationCheckRule(
CorrelationRule correlationRule) {
CorrelationCheckRule4Engine correlationCheckRule4Engine = new CorrelationCheckRule4Engine();
correlationCheckRule4Engine.setContent(correlationRule.getContent());
|