summaryrefslogtreecommitdiffstats
path: root/rulemgt/src/main/java
diff options
context:
space:
mode:
authorGuangrongFu <fu.guangrong@zte.com.cn>2018-03-29 17:20:07 +0800
committerGuangrongFu <fu.guangrong@zte.com.cn>2018-03-29 18:30:01 +0800
commit82d2318187ea739a0b14442cbd82a3a7f6a419a7 (patch)
tree65f8503abef34470483bb89a585053b7f18d4d25 /rulemgt/src/main/java
parentadf983c3fa5c0be8fe05d349190b149da1effb2e (diff)
Fixed the Start-up Logic
Change-Id: I5646e771e05f5f774162373a2f69a0815acb53e5 Issue-ID: HOLMES-106 Signed-off-by: GuangrongFu <fu.guangrong@zte.com.cn>
Diffstat (limited to 'rulemgt/src/main/java')
-rw-r--r--rulemgt/src/main/java/org/onap/holmes/rulemgt/RuleActiveApp.java3
-rw-r--r--rulemgt/src/main/java/org/onap/holmes/rulemgt/msb/MsbQuery.java54
-rw-r--r--rulemgt/src/main/java/org/onap/holmes/rulemgt/send/Ip4AddingRule.java2
-rw-r--r--rulemgt/src/main/java/org/onap/holmes/rulemgt/send/RuleAllocation.java135
4 files changed, 106 insertions, 88 deletions
diff --git a/rulemgt/src/main/java/org/onap/holmes/rulemgt/RuleActiveApp.java b/rulemgt/src/main/java/org/onap/holmes/rulemgt/RuleActiveApp.java
index 5f96912..daebff3 100644
--- a/rulemgt/src/main/java/org/onap/holmes/rulemgt/RuleActiveApp.java
+++ b/rulemgt/src/main/java/org/onap/holmes/rulemgt/RuleActiveApp.java
@@ -31,6 +31,7 @@ import org.onap.holmes.common.exception.CorrelationException;
import org.onap.holmes.common.utils.MSBRegisterUtil;
import org.onap.holmes.common.utils.transactionid.TransactionIdFilter;
import org.onap.holmes.rulemgt.dcae.DcaeConfigurationPolling;
+import org.onap.holmes.rulemgt.msb.MsbQuery;
import org.onap.holmes.rulemgt.resources.RuleMgtResources;
import org.onap.msb.sdk.discovery.entity.MicroServiceInfo;
import org.onap.msb.sdk.discovery.entity.Node;
@@ -64,6 +65,8 @@ public class RuleActiveApp extends IOCApplication<RuleAppConfig> {
DcaeConfigurationPolling.POLLING_PERIOD, TimeUnit.MILLISECONDS);
environment.servlets().addFilter("customFilter",new TransactionIdFilter()).addMappingForUrlPatterns(EnumSet
.allOf(DispatcherType.class),true,"/*");
+
+ new MsbQuery().startTimer();
}
private MicroServiceInfo createMicroServiceInfo() {
diff --git a/rulemgt/src/main/java/org/onap/holmes/rulemgt/msb/MsbQuery.java b/rulemgt/src/main/java/org/onap/holmes/rulemgt/msb/MsbQuery.java
index ed9b9af..2f440b6 100644
--- a/rulemgt/src/main/java/org/onap/holmes/rulemgt/msb/MsbQuery.java
+++ b/rulemgt/src/main/java/org/onap/holmes/rulemgt/msb/MsbQuery.java
@@ -1,12 +1,12 @@
/**
* Copyright 2017 ZTE Corporation.
- *
+ * <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -16,63 +16,71 @@
package org.onap.holmes.rulemgt.msb;
import lombok.extern.slf4j.Slf4j;
-import org.jvnet.hk2.annotations.Service;
+import org.glassfish.hk2.api.ServiceLocator;
+import org.onap.holmes.common.dropwizard.ioc.utils.ServiceLocatorHolder;
import org.onap.holmes.rulemgt.send.RuleAllocation;
import org.onap.holmes.rulemgt.send.Ip4AddingRule;
import org.onap.holmes.rulemgt.wrapper.RuleMgtWrapper;
-import javax.annotation.PostConstruct;
-import javax.inject.Inject;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
-@Service
@Slf4j
public class MsbQuery {
- @Inject
private RuleAllocation ruleAllocation;
- @Inject
private Ip4AddingRule ip4AddingRule;
- @Inject
private EngineIpList engineIpList;
- @Inject
private RuleMgtWrapper ruleMgtWrapper;
- private List<String> timerIpList;
+ private List<String> timerIpList;
- @PostConstruct
- public void init() {
+ public MsbQuery() {
+ ruleAllocation = new RuleAllocation();
- try{
+ ServiceLocator locator = ServiceLocatorHolder.getLocator();
+ ip4AddingRule = locator.getService(Ip4AddingRule.class);
+ engineIpList = locator.getService(EngineIpList.class);
+ ruleMgtWrapper = locator.getService(RuleMgtWrapper.class);
+ }
+
+ public void startTimer() {
+ try {
timer();
- }catch(Exception e){
- log.error("MSBQuery init timer task failed !" + e.getMessage());
+ } catch (Exception e) {
+ log.error("MSBQuery startTimer timer task failed !" + e.getMessage(), e);
+ try {
+ Thread.sleep(30000);
+ } catch (InterruptedException e1) {
+ Thread.currentThread().interrupt();
+ }
}
+
}
- public void timer() throws Exception{
+ public void timer() throws Exception {
Timer timer = new Timer();
timer.schedule(new TimerTask() {
public void run() {
try {
timerIpList = engineIpList.getServiceCount();
- ip4AddingRule.getIpList(timerIpList);
- ruleAllocation.judgeAndAllocateRule(timerIpList);
+ log.info(String.format("There are %d engine instance(s) running currently.", timerIpList.size()));
+ ip4AddingRule.setIpList(timerIpList);
+ ruleAllocation.judgeAndAllocateRule(timerIpList);
} catch (Exception e) {
- log.error("The timing query engine instance failed " ,e);
+ log.error("The timing query engine instance failed ", e);
}
}
- }, 5000, 30000);
+ }, 10000, 30000);
}
diff --git a/rulemgt/src/main/java/org/onap/holmes/rulemgt/send/Ip4AddingRule.java b/rulemgt/src/main/java/org/onap/holmes/rulemgt/send/Ip4AddingRule.java
index f82d3a4..e224bb5 100644
--- a/rulemgt/src/main/java/org/onap/holmes/rulemgt/send/Ip4AddingRule.java
+++ b/rulemgt/src/main/java/org/onap/holmes/rulemgt/send/Ip4AddingRule.java
@@ -32,7 +32,7 @@ public class Ip4AddingRule {
private RuleQueryWrapper ruleQueryWrapper;
private List<String> engineService;
- public void getIpList(List<String> ipList){
+ public void setIpList(List<String> ipList){
engineService = ipList;
}
diff --git a/rulemgt/src/main/java/org/onap/holmes/rulemgt/send/RuleAllocation.java b/rulemgt/src/main/java/org/onap/holmes/rulemgt/send/RuleAllocation.java
index e69be51..75f0a08 100644
--- a/rulemgt/src/main/java/org/onap/holmes/rulemgt/send/RuleAllocation.java
+++ b/rulemgt/src/main/java/org/onap/holmes/rulemgt/send/RuleAllocation.java
@@ -1,12 +1,12 @@
/**
* Copyright 2017 ZTE Corporation.
- *
+ * <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,8 +17,10 @@
package org.onap.holmes.rulemgt.send;
import lombok.extern.slf4j.Slf4j;
+import org.glassfish.hk2.api.ServiceLocator;
import org.jvnet.hk2.annotations.Service;
import org.onap.holmes.common.api.entity.CorrelationRule;
+import org.onap.holmes.common.dropwizard.ioc.utils.ServiceLocatorHolder;
import org.onap.holmes.common.exception.CorrelationException;
import org.onap.holmes.common.utils.DbDaoUtil;
import org.onap.holmes.rulemgt.bolt.enginebolt.EngineWrapper;
@@ -32,56 +34,62 @@ import javax.inject.Inject;
import java.util.*;
-@Service
@Slf4j
public class RuleAllocation {
private final static int ENABLE = 1;
-
- @Inject
private RuleMgtWrapper ruleMgtWrapper;
- @Inject
private RuleQueryWrapper ruleQueryWrapper;
- @Inject
private EngineWrapper engineWrapper;
- @Inject
private EngineIpList engineIpList;
- @Inject
private DbDaoUtil daoUtil;
-
private CorrelationRuleDao correlationRuleDao;
-
private int ruleCount;
private int serviceCount;
private List<String> temIpList = new ArrayList<>();
private List<String> engineService = new ArrayList<>();
private List<CorrelationRule> allRules = new ArrayList<>();
- @PostConstruct
- public void initDaoUtilAndEngineIp() throws Exception{
+ public RuleAllocation() {
+ ServiceLocator locator = ServiceLocatorHolder.getLocator();
+ ruleMgtWrapper = locator.getService(RuleMgtWrapper.class);
+ ruleQueryWrapper = locator.getService(RuleQueryWrapper.class);
+ engineWrapper = locator.getService(EngineWrapper.class);
+ engineIpList = locator.getService(EngineIpList.class);
+ daoUtil = locator.getService(DbDaoUtil.class);
+
+ initDaoUtilAndEngineIp();
+ }
+
+ private void initDaoUtilAndEngineIp() {
correlationRuleDao = daoUtil.getJdbiDaoByOnDemand(CorrelationRuleDao.class);
- temIpList = engineIpList.getServiceCount();
+ try {
+ temIpList = engineIpList.getServiceCount();
+
+ } catch (Exception e) {
+ log.warn("Failed to get the number of engine instances.", e);
+ }
}
- public void judgeAndAllocateRule(List<String> ipList)throws Exception{
- if(ipList != null) {
+ public void judgeAndAllocateRule(List<String> ipList) throws Exception {
+ if (ipList != null) {
engineService = ipList;
serviceCount = ipList.size();
}
- if(temIpList.size() < serviceCount){
+ if (temIpList.size() < serviceCount) {
//extend
List<CorrelationRule> deleteRule = calculateRule(temIpList);
- List<CorrelationRule> allocateRule = calculateRule(temIpList);
- List<String> extendIp = extendCompareIp(engineService,temIpList);
- AllocateService(extendIp,allocateRule);
- deleteRuleFromFormerEngine(deleteRule,temIpList);
+ List<CorrelationRule> allocateRule = calculateRule(temIpList);
+ List<String> extendIp = extendCompareIp(engineService, temIpList);
+ AllocateService(extendIp, allocateRule);
+ deleteRuleFromFormerEngine(deleteRule, temIpList);
} else if (temIpList.size() > serviceCount) {
//destroy
List<String> destroyIp = destroyCompareIp(engineService, temIpList);
AllocateService(restIp(destroyIp), relocateRuleAfterDestroy(destroyIp));
- } else if(temIpList.size() == serviceCount) {
+ } else if (temIpList.size() == serviceCount) {
temIpList = engineService;
return;
}
@@ -91,18 +99,18 @@ public class RuleAllocation {
// When the engine is expanding, the rules that need to be allocated are calculated.
- private List<CorrelationRule> calculateRule(List<String> oldIpList) throws Exception{
+ private List<CorrelationRule> calculateRule(List<String> oldIpList) throws Exception {
allRules = ruleQueryWrapper.queryRuleByEnable(ENABLE);
- if(allRules != null) {
+ if (allRules != null) {
ruleCount = allRules.size();
}
int count = ruleCount / serviceCount;
int remainder = ruleCount % serviceCount;
List<CorrelationRule> subRule = new ArrayList<>();
- for(String ip : oldIpList) {
+ for (String ip : oldIpList) {
List<CorrelationRule> rules = ruleQueryWrapper.queryRuleByEngineInstance(ip);
- List<CorrelationRule> tem = rules.subList(count + (remainder-- / oldIpList.size()),rules.size());
+ List<CorrelationRule> tem = rules.subList(count + (remainder-- / oldIpList.size()), rules.size());
subRule.addAll(tem);
}
return subRule;
@@ -111,24 +119,24 @@ public class RuleAllocation {
//Rules that need to be allocated after the engine is destroyed
private List<CorrelationRule> relocateRuleAfterDestroy(List<String> destroyIpList) throws CorrelationException {
List<CorrelationRule> rules = new ArrayList<>();
- try{
- if(destroyIpList != null){
- for(String ip : destroyIpList) {
+ try {
+ if (destroyIpList != null) {
+ for (String ip : destroyIpList) {
rules.addAll(ruleQueryWrapper.queryRuleByEngineInstance(ip));
}
}
- }catch(CorrelationException e) {
- log.error("method relocateRuleAfterDestroy get data from DB failed !" +e.getMessage());
+ } catch (CorrelationException e) {
+ log.error("method relocateRuleAfterDestroy get data from DB failed !", e);
}
return rules;
}
//Extended IP
- private List<String> extendCompareIp(List<String> newList, List<String> oldList){
+ private List<String> extendCompareIp(List<String> newList, List<String> oldList) {
List<String> extendIpList = new ArrayList<>();
- for( String ip :newList) {
- if(! oldList.contains(ip)) {
+ for (String ip : newList) {
+ if (!oldList.contains(ip)) {
extendIpList.add(ip);
}
}
@@ -138,8 +146,8 @@ public class RuleAllocation {
//Destroyed IP
private List<String> destroyCompareIp(List<String> newList, List<String> oldList) {
List<String> destroyIpList = new ArrayList<>();
- for(String ip : oldList) {
- if(!newList.contains(ip)) {
+ for (String ip : oldList) {
+ if (!newList.contains(ip)) {
destroyIpList.add(ip);
}
}
@@ -149,52 +157,51 @@ public class RuleAllocation {
//Residual IP after destruction
private List<String> restIp(List<String> destroyIp) {
List<String> restIpList = new ArrayList<>();
- for(String ip : engineService) {
- if(!destroyIp.contains(ip)) {
+ for (String ip : engineService) {
+ if (!destroyIp.contains(ip)) {
restIpList.add(ip);
}
}
return restIpList;
}
- public void AllocateService(List<String> extendIpList, List<CorrelationRule> subList) throws Exception{
+ public void AllocateService(List<String> extendIpList, List<CorrelationRule> subList) throws Exception {
List<String> needIpList = getSortIp(extendIpList);
- for(int i=0,j=0;j < subList.size();i++,j++ ){
+ for (int i = 0, j = 0; j < subList.size(); i++, j++) {
int index = i % needIpList.size();
String deployIp = needIpList.get(index);
CorrelationRule rule = subList.get(j);
rule.setEngineInstance(deployIp);
- allocateDeployRule(rule,deployIp);
+ allocateDeployRule(rule, deployIp);
}
}
//The IP to be allocated is in ascending order, and the least is circulate.
- private List<String > getSortIp(List<String> ipList){
- List<CorrelationRule> ipRuleList = new ArrayList<>();
- HashMap<String,String> hashMap = new HashMap();
+ private List<String> getSortIp(List<String> ipList) {
+ List<CorrelationRule> ipRuleList = new ArrayList<>();
+ HashMap<String, String> hashMap = new HashMap();
- try{
- for(String ip : ipList){
+ try {
+ for (String ip : ipList) {
ipRuleList = ruleQueryWrapper.queryRuleByEngineInstance(ip);
- if(ipRuleList != null) {
+ if (ipRuleList != null) {
hashMap.put(ip, String.valueOf(ipRuleList.size()));
}
}
- }catch (Exception e){
- log.error("getEngineIp4AddRule failed !" + e.getMessage());
+ } catch (Exception e) {
+ log.error("getEngineIp4AddRule failed !", e);
}
List<Map.Entry<String, String>> list_Data = new ArrayList<Map.Entry<String, String>>(hashMap.entrySet());
Collections.sort(list_Data, new Comparator<Map.Entry<String, String>>() {
- public int compare(Map.Entry<String, String> o1, Map.Entry<String, String> o2)
- {
+ public int compare(Map.Entry<String, String> o1, Map.Entry<String, String> o2) {
return o1.getValue().compareTo(o2.getValue());
}
});
List<String> needList = new ArrayList<>();
- for(Map.Entry<String, String> map: list_Data) {
+ for (Map.Entry<String, String> map : list_Data) {
String key = map.getKey();
needList.add(key);
}
@@ -202,25 +209,25 @@ public class RuleAllocation {
}
private void allocateDeployRule(CorrelationRule rule, String ip) throws CorrelationException {
- try{
- ruleMgtWrapper.deployRule2Engine(rule,ip);
+ try {
+ ruleMgtWrapper.deployRule2Engine(rule, ip);
correlationRuleDao.updateRule(rule);
- }catch (CorrelationException e){
+ } catch (CorrelationException e) {
throw new CorrelationException("allocate Deploy Rule failed", e);
}
}
private void deleteRuleFromFormerEngine(List<CorrelationRule> subRule, List<String> oldList) {
- try{
- for(String ip : oldList){
- for(CorrelationRule rule: subRule) {
- if(ip.equals(rule.getEngineInstance())) {
- engineWrapper.deleteRuleFromEngine(rule.getPackageName(),ip);
+ try {
+ for (String ip : oldList) {
+ for (CorrelationRule rule : subRule) {
+ if (ip.equals(rule.getEngineInstance())) {
+ engineWrapper.deleteRuleFromEngine(rule.getPackageName(), ip);
}
}
}
- }catch (CorrelationException e) {
- log.error("When the engine is extended, deleting rule failed" +e.getMessage());
+ } catch (CorrelationException e) {
+ log.error("When the engine is extended, deleting rule failed", e);
}
}