summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoryangyan <yangyanyj@chinamobile.com>2018-03-28 13:28:12 +0800
committeryangyan <yangyanyj@chinamobile.com>2018-03-28 13:28:33 +0800
commitb1ba84e8491b52445bf4b05aba8153bc869cca2d (patch)
tree7034eaaff0c5b9bcbf3b3a804bb75b9a67a8179e
parent52e6e1e8e82dcd58b9bdb4461518383f8995a54b (diff)
Improve code format
Issue-ID: VFC-855 Change-Id: If3b50f97d35339246b5fb9adf178779d66ca2dee Signed-off-by: yangyan <yangyanyj@chinamobile.com>
-rw-r--r--ems/boco/data/License.txt14
-rw-r--r--ems/boco/src/main/java/org/onap/vfc/nfvo/emsdriver/collector/CollectMsgReceiverThread.java133
-rw-r--r--ems/boco/src/main/java/org/onap/vfc/nfvo/emsdriver/collector/TaskThread.java190
-rw-r--r--ems/boco/src/main/java/org/onap/vfc/nfvo/emsdriver/collector/TaskThreadService.java3
-rw-r--r--ems/boco/src/main/java/org/onap/vfc/nfvo/emsdriver/collector/alarm/AlarmManager.java100
-rw-r--r--ems/boco/src/main/java/org/onap/vfc/nfvo/emsdriver/collector/alarm/AlarmTaskThread.java454
-rw-r--r--ems/boco/src/main/java/org/onap/vfc/nfvo/emsdriver/taskscheduler/CollectManager.java2
7 files changed, 454 insertions, 442 deletions
diff --git a/ems/boco/data/License.txt b/ems/boco/data/License.txt
index a98c7aa..ba66b35 100644
--- a/ems/boco/data/License.txt
+++ b/ems/boco/data/License.txt
@@ -1,14 +1,14 @@
- Copyright 2017 BOCO Corporation. CMCC?Technologies?Co.,?Ltd
+Copyright 2017 BOCO Corporation. CMCC Technologies Co., Ltd
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
limitations under the License.
diff --git a/ems/boco/src/main/java/org/onap/vfc/nfvo/emsdriver/collector/CollectMsgReceiverThread.java b/ems/boco/src/main/java/org/onap/vfc/nfvo/emsdriver/collector/CollectMsgReceiverThread.java
index 89cf34b..6c41b65 100644
--- a/ems/boco/src/main/java/org/onap/vfc/nfvo/emsdriver/collector/CollectMsgReceiverThread.java
+++ b/ems/boco/src/main/java/org/onap/vfc/nfvo/emsdriver/collector/CollectMsgReceiverThread.java
@@ -23,74 +23,69 @@ import org.onap.vfc.nfvo.emsdriver.messagemgr.MessageChannelFactory;
public class CollectMsgReceiverThread extends DriverThread {
- private long timeStamp = System.currentTimeMillis();
-
- private MessageChannel collectChannel;
-
- private TaskThreadService taskService;
-
- private int threadMaxNum = 100;
-
-
- @Override
- public void dispose() {
- collectChannel = MessageChannelFactory.getMessageChannel(Constant.COLLECT_CHANNEL_KEY);
- taskService = TaskThreadService.getInstance(threadMaxNum);
- taskService.start();
-
- while (isRun()) {
-
- try {
- if (System.currentTimeMillis() - timeStamp > Constant.ONEMINUTE) {
- timeStamp = System.currentTimeMillis();
-
- log.debug("COLLECT_CHANNEL Msg size :" + collectChannel.size());
- }
-
- Object obj = collectChannel.poll();
- if (obj == null) {
- Thread.sleep(10);
- continue;
- }
- if (obj instanceof CollectMsg) {
- CollectMsg collectMsg = (CollectMsg) obj;
- taskService.add(collectMsg);
- log.debug("receive a CollectMsg id = " + collectMsg.getId());
- } else {
- log.error("receive Objcet not CollectMsg " + obj);
- }
-
- } catch (Exception e) {
- log.error("dispatch alarm exception", e);
-
- }
- }
-
- }
-
-
- /**
- * @return the threadMaxNum
- */
- public int getThreadMaxNum() {
- return threadMaxNum;
- }
-
-
- /**
- * @param threadMaxNum the threadMaxNum to set
- */
- public void setThreadMaxNum(int threadMaxNum) {
- this.threadMaxNum = threadMaxNum;
- }
-
-
- /**
- * @return the taskService
- */
- public TaskThreadService getTaskService() {
- return taskService;
- }
-
+ private long timeStamp = System.currentTimeMillis();
+
+ private MessageChannel collectChannel;
+
+ private TaskThreadService taskService;
+
+ private int threadMaxNum = 100;
+
+ @Override
+ public void dispose() {
+ collectChannel = MessageChannelFactory
+ .getMessageChannel(Constant.COLLECT_CHANNEL_KEY);
+ taskService = TaskThreadService.getInstance(threadMaxNum);
+ taskService.start();
+
+ while (isRun()) {
+
+ try {
+ if (System.currentTimeMillis() - timeStamp > Constant.ONEMINUTE) {
+ timeStamp = System.currentTimeMillis();
+ log.debug("COLLECT_CHANNEL Msg size :"
+ + collectChannel.size());
+ }
+ Object obj = collectChannel.poll();
+ if (obj == null) {
+ Thread.sleep(10);
+ continue;
+ }
+ if (obj instanceof CollectMsg) {
+ CollectMsg collectMsg = (CollectMsg) obj;
+ taskService.add(collectMsg);
+ log.debug("receive a CollectMsg id = " + collectMsg.getId());
+ } else {
+ log.error("receive Objcet not CollectMsg " + obj);
+ }
+
+ } catch (Exception e) {
+ log.error("dispatch alarm exception", e);
+
+ }
+ }
+
+ }
+
+ /**
+ * @return the threadMaxNum
+ */
+ public int getThreadMaxNum() {
+ return threadMaxNum;
+ }
+
+ /**
+ * @param threadMaxNum the threadMaxNum to set
+ */
+ public void setThreadMaxNum(int threadMaxNum) {
+ this.threadMaxNum = threadMaxNum;
+ }
+
+ /**
+ * @return the taskService
+ */
+ public TaskThreadService getTaskService() {
+ return taskService;
+ }
}
diff --git a/ems/boco/src/main/java/org/onap/vfc/nfvo/emsdriver/collector/TaskThread.java b/ems/boco/src/main/java/org/onap/vfc/nfvo/emsdriver/collector/TaskThread.java
index 30104f6..1ac304e 100644
--- a/ems/boco/src/main/java/org/onap/vfc/nfvo/emsdriver/collector/TaskThread.java
+++ b/ems/boco/src/main/java/org/onap/vfc/nfvo/emsdriver/collector/TaskThread.java
@@ -43,17 +43,13 @@ import java.util.regex.Pattern;
public class TaskThread implements Runnable {
private static final Log log = LogFactory.getLog(TaskThread.class);
- private MessageChannel pmResultChannel;
+ private MessageChannel pmResultChannel;
private MessageChannel cmResultChannel;
private CollectMsg data;
-
private ConfigurationInterface configurationInterface = new ConfigurationImp();
-
private String localPath = Constant.SYS_DATA_TEMP;
private String resultPath = Constant.SYS_DATA_RESULT;
-
private SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMddHHmmss");
-
private SimpleDateFormat dateFormat2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
public TaskThread(CollectMsg data) {
@@ -66,7 +62,6 @@ public class TaskThread implements Runnable {
@Override
public void run() {
-
cmResultChannel = MessageChannelFactory.getMessageChannel(Constant.COLLECT_RESULT_CHANNEL_KEY);
pmResultChannel = MessageChannelFactory.getMessageChannel(Constant.COLLECT_RESULT_PM_CHANNEL_KEY);
try {
@@ -80,7 +75,6 @@ public class TaskThread implements Runnable {
String emsName = collectMsg.getEmsName();
String type = collectMsg.getType();
CollectVo collectVo = configurationInterface.getCollectVoByEmsNameAndType(emsName, type);
-
// ftp download
List<String> downloadfiles = this.ftpDownload(collectVo);
// paser ftp update message send
@@ -90,29 +84,25 @@ public class TaskThread implements Runnable {
}
public void parseFtpAndSendMessage(String fileName, CollectVo collectVo) {
- //
List<File> filelist = decompressed(fileName);
-
for (File tempfile : filelist) {
-
String unfileName = tempfile.getName();
-
Pattern pa = Pattern.compile(".*-(.*)-\\w{2}-");
Matcher ma = pa.matcher(unfileName);
if (!ma.find())
continue;
- String nename = ma.group(1);
+ //String nename = ma.group(1);
boolean parseResult = false;
if (Constant.COLLECT_TYPE_CM.equalsIgnoreCase(collectVo.getType())) {
- //parseResult = processCMXml(tempfile, nename, "CM");//The logic is not exist now, but need to add in the future
+ // parseResult = processCMXml(tempfile, nename, "CM");//The
+ // logic is not exist now, but need to add in the future
} else {
- if (unfileName.indexOf(".csv") > -1) {//changed to -1 for coding practice as having ".csv" must have some some legal name
+ if (unfileName.indexOf(".csv") > -1) {// changed to -1 for coding practice as having ".csv" must have some some legal name
parseResult = processPMCsv(tempfile);
} else {
parseResult = processPMXml(tempfile);
}
}
-
if (parseResult) {
log.info("parser " + tempfile + " sucess");
tempfile.delete();
@@ -124,11 +114,10 @@ public class TaskThread implements Runnable {
}
public boolean processPMXml(File file) {
- try ( FileInputStream fis = new FileInputStream(file);
- InputStreamReader isr = new InputStreamReader(fis, Constant.ENCODING_UTF8)){
-
+ try (FileInputStream fis = new FileInputStream(file);
+ InputStreamReader isr = new InputStreamReader(fis,Constant.ENCODING_UTF8)) {
XMLInputFactory fac = XMLInputFactory.newInstance();
- XMLStreamReader reader = fac.createXMLStreamReader(isr);
+ XMLStreamReader reader = fac.createXMLStreamReader(isr);
boolean fileHeaderStart = false;
boolean measurementStart = false;
@@ -152,14 +141,13 @@ public class TaskThread implements Runnable {
while (reader.hasNext()) {
try {
event = reader.next();
-
switch (event) {
case XMLStreamConstants.START_ELEMENT:
localName = reader.getLocalName();
if ("FileHeader".equalsIgnoreCase(localName)) {
fileHeaderStart = true;
}
- if (fileHeaderStart && !"FileHeader".equalsIgnoreCase(localName)) {
+ if (fileHeaderStart&& !"FileHeader".equalsIgnoreCase(localName)) {
commonNameAndValue.put(localName, reader.getElementText().trim());
}
if ("Measurements".equalsIgnoreCase(localName)) {
@@ -195,7 +183,7 @@ public class TaskThread implements Runnable {
int n = reader.getAttributeCount();
for (int i = 0; i < n; i++) {
String name = reader.getAttributeLocalName(i);
- commonNameAndValue.put(name, reader.getAttributeValue(i));
+ commonNameAndValue.put(name,reader.getAttributeValue(i));
}
}
if (objectFlag) {
@@ -209,7 +197,7 @@ public class TaskThread implements Runnable {
index = Integer.parseInt(indexStr);
String name = pmNames.get(index);
if (name == null) {
- log.error("illegal data: valueIndex=" + index);
+ log.error("illegal data: valueIndex="+ index);
continue;
}
@@ -227,7 +215,7 @@ public class TaskThread implements Runnable {
currentMea = pmNames.get(index);
if (currentMea == null) {
- log.error("illegal data: valueIndex=" + index);
+ log.error("illegal data: valueIndex="+ index);
continue;
}
}
@@ -258,8 +246,8 @@ public class TaskThread implements Runnable {
pmResultChannel.put(pmDatas);
} catch (Exception e) {
- pmResultChannel.clear();
- log.error("collectResultChannel.put(resultMap) error ", e);
+ pmResultChannel.clear();
+ log.error("collectResultChannel.put(resultMap) error ",e);
}
}
if ("PmData".equalsIgnoreCase(endLocalName)) {
@@ -284,11 +272,11 @@ public class TaskThread implements Runnable {
event = reader.next();
}
}
- reader.close();
+ reader.close();
} catch (Exception e) {
log.error("processPMXml is Exception ", e);
return false;
- }
+ }
return true;
}
@@ -312,7 +300,7 @@ public class TaskThread implements Runnable {
List<String> columnNames = new ArrayList<>();
List<String> commonValues = new ArrayList<>();
try (FileInputStream brs = new FileInputStream(tempfile);
- InputStreamReader isr = new InputStreamReader(brs, Constant.ENCODING_UTF8);
+ InputStreamReader isr = new InputStreamReader(brs,Constant.ENCODING_UTF8);
BufferedReader br = new BufferedReader(isr)) {
// common field
@@ -343,11 +331,11 @@ public class TaskThread implements Runnable {
for (String value : values) {
valuelist.add(value);
}
- HashMap<String, String> resultMap = this.resultMap(columnNames, valuelist);
+ HashMap<String, String> resultMap = this.resultMap(columnNames,valuelist);
try {
pmResultChannel.put(resultMap);
} catch (Exception e) {
- pmResultChannel.clear();
+ pmResultChannel.clear();
log.error("collectResultChannel.put(resultMap) error ", e);
}
valuelist.clear();
@@ -360,7 +348,8 @@ public class TaskThread implements Runnable {
}
- private HashMap<String, String> resultMap(List<String> columnNames, List<String> valuelist) {
+ private HashMap<String, String> resultMap(List<String> columnNames,
+ List<String> valuelist) {
HashMap<String, String> resultMap = new HashMap<>();
if (columnNames.size() == valuelist.size()) {
@@ -505,16 +494,18 @@ public class TaskThread implements Runnable {
try {
cmResultChannel.put(message);
} catch (Exception e) {
- log.error("collectResultChannel.put(message) is error " + StringUtil.getStackTrace(e));
+ log.error("collectResultChannel.put(message) is error "
+ + StringUtil.getStackTrace(e));
}
}
- public String createMessage(String zipName, String user, String pwd, String ip, String port, int countNum,
- String nename) {
+ public String createMessage(String zipName, String user, String pwd,
+ String ip, String port, int countNum, String nename) {
StringBuilder strBuffer = new StringBuilder();
- strBuffer.append("<?xml version=\"1.0\" encoding=\"UTF-8\"?>"
- + "<FILE_DATA_READY_UL xmlns:xsi=\" http://www.w3.org/2001/XMLSchema-instance\">"
- + "<Header SessionID=\"");
+ strBuffer
+ .append("<?xml version=\"1.0\" encoding=\"UTF-8\"?>"
+ + "<FILE_DATA_READY_UL xmlns:xsi=\" http://www.w3.org/2001/XMLSchema-instance\">"
+ + "<Header SessionID=\"");
strBuffer.append("");
strBuffer.append("\" LicenceID=\"");
strBuffer.append("");
@@ -555,25 +546,28 @@ public class TaskThread implements Runnable {
strBuffer.append("</DataCount>");
strBuffer.append("<FileSize>").append("").append("</FileSize>");
- strBuffer.append("<DataGranularity>").append("").append("</DataGranularity>");
+ strBuffer.append("<DataGranularity>").append("")
+ .append("</DataGranularity>");
strBuffer.append("</Body></FILE_DATA_READY_UL>");
return strBuffer.toString();
}
- private void ftpStore(String[] fileKeys, String ip, String port, String ftp_user, String ftp_password,
- String ftp_passive, String ftp_type, String remoteFile) {
+ private void ftpStore(String[] fileKeys, String ip, String port,
+ String ftp_user, String ftp_password, String ftp_passive,
+ String ftp_type, String remoteFile) {
String zipFilePath = fileKeys[0];
FTPInterface ftpClient;
ftpClient = new FTPSrv();
// login
try {
- ftpClient.login(ip, Integer.parseInt(port), ftp_user, ftp_password, "GBK",
- Boolean.parseBoolean(ftp_passive), 5 * 60 * 1000);
+ ftpClient.login(ip, Integer.parseInt(port), ftp_user, ftp_password,
+ "GBK", Boolean.parseBoolean(ftp_passive), 5 * 60 * 1000);
} catch (Exception e) {
- log.error("login fail,ip=[" + ip + "] port=[" + port + "] user=[" + ftp_user + /*"]pwd=[" + ftp_password + */"]"
+ log.error("login fail,ip=[" + ip + "] port=[" + port + "] user=["
+ + ftp_user + /* "]pwd=[" + ftp_password + */"]"
+ StringUtil.getStackTrace(e));
return;
}
@@ -583,17 +577,19 @@ public class TaskThread implements Runnable {
}
- private String[] createZipFile(String csvpathAndFileName, String xmlPathAndFileName, String nename)
- throws IOException {
- String zipPath = resultPath + nename + dateFormat.format(new Date()) + "_" + System.nanoTime();
+ private String[] createZipFile(String csvpathAndFileName, String xmlPathAndFileName, String nename) throws IOException {
+ String zipPath = resultPath + nename + dateFormat.format(new Date())
+ + "_" + System.nanoTime();
File destDir = new File(zipPath);
destDir.mkdirs();
try {
- FileUtils.copyFileToDirectory(new File(csvpathAndFileName), destDir);
- FileUtils.copyFileToDirectory(new File(xmlPathAndFileName), destDir);
+ FileUtils
+ .copyFileToDirectory(new File(csvpathAndFileName), destDir);
+ FileUtils
+ .copyFileToDirectory(new File(xmlPathAndFileName), destDir);
} catch (IOException e) {
- throw new IOException("createZipFile",e);
+ throw new IOException("createZipFile", e);
}
String destFilePath = zipPath + ".zip";
@@ -605,7 +601,7 @@ public class TaskThread implements Runnable {
FileUtils.deleteDirectory(destDir);
} catch (IOException e) {
log.error("zip.compress() is fail " + StringUtil.getStackTrace(e));
- throw new IOException("createZipFile",e);
+ throw new IOException("createZipFile", e);
}
return new String[] { destFilePath, zipPath + ".zip" };
}
@@ -620,45 +616,54 @@ public class TaskThread implements Runnable {
String xmlFileName = nename + dateFormat.format(new Date()) + System.nanoTime();
String fieldLine = "";
for (int i = 0; i < names.size(); i++) {
- String field = "\t<Field>\r\n" + "\t\t<FieldNo>" + i + "</FieldNo>\r\n" + "\t\t<FieldName>" + names.get(i)
- + "</FieldName>\r\n" + "\t\t<FieldType>" + names.get(i) + "</FieldType>\r\n"
- + "\t\t<FieldNameOther>" + names.get(i) + "</FieldNameOther>\r\n" + "\t</Field>\r\n";
+ String field = "\t<Field>\r\n" + "\t\t<FieldNo>" + i
+ + "</FieldNo>\r\n" + "\t\t<FieldName>" + names.get(i)
+ + "</FieldName>\r\n" + "\t\t<FieldType>" + names.get(i)
+ + "</FieldType>\r\n" + "\t\t<FieldNameOther>"
+ + names.get(i) + "</FieldNameOther>\r\n" + "\t</Field>\r\n";
fieldLine = fieldLine + field;
}
- String str = "<?xml version=\"1.0\" encoding=\"gbk\"?>\r\n" + "<xml>\r\n" + "<FILE_STRUCTURE>\r\n" + fieldLine
+ String str = "<?xml version=\"1.0\" encoding=\"gbk\"?>\r\n"
+ + "<xml>\r\n" + "<FILE_STRUCTURE>\r\n" + fieldLine
+ "</FILE_STRUCTURE>\r\n" + "</xml>\r\n";
String xmlPathAndFileName = xmlpath + xmlFileName + ".xml";
try {
this.writeDetail(xmlPathAndFileName, str);
} catch (Exception e) {
- log.error("writeDetail is fail ,xmlFileName=" + xmlFileName + StringUtil.getStackTrace(e));
+ log.error("writeDetail is fail ,xmlFileName=" + xmlFileName
+ + StringUtil.getStackTrace(e));
}
return xmlPathAndFileName;
}
- private void writeDetail(String detailFileName, String str) throws IOException {
- try (OutputStream readOut = new FileOutputStream(new File(detailFileName), false);
+ private void writeDetail(String detailFileName, String str)
+ throws IOException {
+ try (OutputStream readOut = new FileOutputStream(new File(
+ detailFileName), false);
OutputStreamWriter writer = new OutputStreamWriter(readOut)) {
writer.write(str);
writer.flush();
} catch (IOException e) {
- throw new IOException("writeDetail",e);
+ throw new IOException("writeDetail", e);
}
}
- private void appendLine(LinkedHashMap<String, String> nameAndValue, BufferedOutputStream bos) {
- try{
+ private void appendLine(LinkedHashMap<String, String> nameAndValue,
+ BufferedOutputStream bos) {
+ try {
StringBuilder lineDatas = new StringBuilder();
-
+
for (String key : nameAndValue.keySet()) {
lineDatas.append(nameAndValue.get(key)).append("|");
}
- /*for (HashMap.Entry<String, String> entry : nameAndValue.entrySet()) {
- lineDatas.append(entry.getValue()).append("|");
- }*/
+ /*
+ * for (HashMap.Entry<String, String> entry :
+ * nameAndValue.entrySet()) {
+ * lineDatas.append(entry.getValue()).append("|"); }
+ */
bos.write(lineDatas.toString().getBytes());
bos.write("\n".getBytes());
} catch (IOException e) {
@@ -734,11 +739,13 @@ public class TaskThread implements Runnable {
// login
try {
- log.info("ftp login ,ip=[" + ip + "] port=[" + port + "] user=[" + user + /*"]password=[" + password +*/ "]");
- ftpClient.login(ip, Integer.parseInt(port), user, password, "GBK", Boolean.parseBoolean(passivemode),
- 5 * 60 * 1000);
+ log.info("ftp login ,ip=[" + ip + "] port=[" + port + "] user=["
+ + user + /* "]password=[" + password + */"]");
+ ftpClient.login(ip, Integer.parseInt(port), user, password, "GBK",
+ Boolean.parseBoolean(passivemode), 5 * 60 * 1000);
} catch (Exception e) {
- log.error("login fail,ip=[" + ip + "] port=[" + port + "] user=[" + user + /*"]password=[" + password +*/ "]"
+ log.error("login fail,ip=[" + ip + "] port=[" + port + "] user=["
+ + user + /* "]password=[" + password + */"]"
+ StringUtil.getStackTrace(e));
return fileList;
}
@@ -756,13 +763,14 @@ public class TaskThread implements Runnable {
collectPeriod = Long.parseLong(collectVo.getGranularity()) * 60;
log.info("collectPeriod =[" + collectPeriod + "]");
} catch (NumberFormatException e) {
- log.error("NumberFormatException" ,e);
+ log.error("NumberFormatException", e);
}
long[] d = DateUtil.getScanScope(new Date(), collectPeriod);
searchExprList.add(VarExprParser.replaceVar(conpath, d[0], d[1]));
varMap.clear();
- log.info("[" + conpath + "] result[" + (searchExprList.size() - oldSize) + "] path");
+ log.info("[" + conpath + "] result["
+ + (searchExprList.size() - oldSize) + "] path");
}
String nowdir = null;
try {
@@ -782,7 +790,8 @@ public class TaskThread implements Runnable {
boolean cdsucess = ftpClient.chdir(ftpDir);
if (cdsucess) {
AFtpRemoteFile[] arf = (AFtpRemoteFile[]) ftpClient.list();
- log.info(" list [" + ftpDir + "] result[" + (arf == null ? "null" : arf.length) + "] files");
+ log.info(" list [" + ftpDir + "] result["
+ + (arf == null ? "null" : arf.length) + "] files");
// filter
rfileFilter(remoteFiles, arf, ftpRegular);
@@ -792,7 +801,8 @@ public class TaskThread implements Runnable {
try {
new File(localPath).mkdir();
} catch (Exception e) {
- log.error("create localPath is fail localPath=" + localPath + " "
+ log.error("create localPath is fail localPath="
+ + localPath + " "
+ StringUtil.getStackTrace(e));
}
}
@@ -801,17 +811,20 @@ public class TaskThread implements Runnable {
new File(localPath).mkdirs();
}
- String localFileName = localPath + ftpRemoteFile.getFileName();
- if(new File(localFileName).exists()){
+ String localFileName = localPath
+ + ftpRemoteFile.getFileName();
+ if (new File(localFileName).exists()) {
new File(localFileName).delete();
}
- boolean flag = ftpClient.downloadFile(ftpRemoteFile.getAbsFileName(), localFileName);
+ boolean flag = ftpClient.downloadFile(
+ ftpRemoteFile.getAbsFileName(), localFileName);
if (flag) {
fileList.add(localFileName);
} else {
- log.error("download file fail fileName=" + ftpRemoteFile.getAbsFileName());
+ log.error("download file fail fileName="
+ + ftpRemoteFile.getAbsFileName());
}
}
@@ -823,13 +836,14 @@ public class TaskThread implements Runnable {
return fileList;
}
- private void rfileFilter(List<AFtpRemoteFile> fileContainer, AFtpRemoteFile[] arfs, String ftpRegular) {
+ private void rfileFilter(List<AFtpRemoteFile> fileContainer,
+ AFtpRemoteFile[] arfs, String ftpRegular) {
if (ftpRegular != null && ftpRegular.length() > 0) {
Pattern pattern = null;
try {
pattern = Pattern.compile(ftpRegular, Pattern.CASE_INSENSITIVE);
} catch (Exception e) {
- log.info("[" + ftpRegular + "]Pattern.compile exception:",e);
+ log.info("[" + ftpRegular + "]Pattern.compile exception:", e);
// should rethrow exception or return from here
}
int hisSize = fileContainer.size();
@@ -842,10 +856,11 @@ public class TaskThread implements Runnable {
// define the flow when pattern is null
}
- if (null != matcher && matcher.find())
+ if (null != matcher && matcher.find())
fileContainer.add(arfs[j]);
}
- log.info("[" + ftpRegular + "]filter[" + (fileContainer.size() - hisSize) + "]filse");
+ log.info("[" + ftpRegular + "]filter["
+ + (fileContainer.size() - hisSize) + "]filse");
} else {
for (int j = 0; arfs != null && j < arfs.length; j++)
fileContainer.add(arfs[j]);
@@ -875,7 +890,8 @@ public class TaskThread implements Runnable {
return dirkeys;
}
- public List<String> getPathNoRegular(List<String> searchExprList, FTPInterface ftpCache) throws IOException {
+ public List<String> getPathNoRegular(List<String> searchExprList,
+ FTPInterface ftpCache) throws IOException {
boolean isregular = false;
List<String> regularList = new ArrayList<>();
for (String regular : searchExprList) {
@@ -905,12 +921,14 @@ public class TaskThread implements Runnable {
}
} catch (Exception e) {
log.error(" cd dir [" + parpath + "]fail", e);
- throw new IOException("ftpCache.chdir",e);
+ throw new IOException("ftpCache.chdir", e);
}
RemoteFile[] remotef = ftpCache.list();
for (RemoteFile aremote : remotef) {
- if (aremote.isDirectory() && aremote.getFileName().matches(matcher.group(2))) {
- regularList.add(matcher.group(1) + aremote.getFileName() + matcher.group(3));
+ if (aremote.isDirectory()
+ && aremote.getFileName().matches(matcher.group(2))) {
+ regularList.add(matcher.group(1)
+ + aremote.getFileName() + matcher.group(3));
}
}
} else {
@@ -922,7 +940,7 @@ public class TaskThread implements Runnable {
}
return regularList;
}
-
+
public MessageChannel getPmResultChannel() {
return pmResultChannel;
}
diff --git a/ems/boco/src/main/java/org/onap/vfc/nfvo/emsdriver/collector/TaskThreadService.java b/ems/boco/src/main/java/org/onap/vfc/nfvo/emsdriver/collector/TaskThreadService.java
index 2a7bac4..566da0f 100644
--- a/ems/boco/src/main/java/org/onap/vfc/nfvo/emsdriver/collector/TaskThreadService.java
+++ b/ems/boco/src/main/java/org/onap/vfc/nfvo/emsdriver/collector/TaskThreadService.java
@@ -43,13 +43,11 @@ public class TaskThreadService extends Thread {
public void run() { // run the service
try {
while (startFlag) {
-
try {
if (System.currentTimeMillis() - timeStamp > Constant.ONEMINUTE) {
timeStamp = System.currentTimeMillis();
log.debug("task queue size " + queue.size());
}
-
CollectMsg data = receive();
if (data == null) {
continue;
@@ -90,7 +88,6 @@ public class TaskThreadService extends Thread {
}
}
-
public int size() {
return queue.size();
}
diff --git a/ems/boco/src/main/java/org/onap/vfc/nfvo/emsdriver/collector/alarm/AlarmManager.java b/ems/boco/src/main/java/org/onap/vfc/nfvo/emsdriver/collector/alarm/AlarmManager.java
index 22224e7..6de72be 100644
--- a/ems/boco/src/main/java/org/onap/vfc/nfvo/emsdriver/collector/alarm/AlarmManager.java
+++ b/ems/boco/src/main/java/org/onap/vfc/nfvo/emsdriver/collector/alarm/AlarmManager.java
@@ -27,59 +27,61 @@ import java.util.List;
public class AlarmManager extends DriverThread {
- private ConfigurationInterface configurationInterface;
+ private ConfigurationInterface configurationInterface;
- @Override
- public void dispose() {
- log.debug("AlarmManager is start");
- //get alarm CONFIG_PROPERTIES_LOCATION
- List<EMSInfo> emsInfos = configurationInterface.getAllEMSInfo();
- while (isRun() && emsInfos.isEmpty()) {
- emsInfos = configurationInterface.getAllEMSInfo();
- if (emsInfos.isEmpty()) {
- try {
- Thread.sleep(1000);
- log.debug("The configuration properties from " + ConfigurationManager.CONFIG_PROPERTIES_LOCATION + " is not load");
- } catch (Exception e) {
- log.error("Exception",e);
- }
- }
- }
- List<CollectVo> collectVos = new ArrayList<>();
- for (EMSInfo emsInfo : emsInfos) {
- //alarm
- CollectVo collectVo = emsInfo.getCollectVoByType(Constant.COLLECT_TYPE_ALARM);
- if (collectVo != null) {
- collectVo.setEmsName(emsInfo.getName());
- collectVos.add(collectVo);
- } else {
- log.error("emsInfo.getCollectVoByType(EMS_RESOUCE) result CollectVo = null emsInfo =" + emsInfo);
- }
- }
+ @Override
+ public void dispose() {
+ log.debug("AlarmManager is start");
+ // get alarm CONFIG_PROPERTIES_LOCATION
+ List<EMSInfo> emsInfos = configurationInterface.getAllEMSInfo();
+ while (isRun() && emsInfos.isEmpty()) {
+ emsInfos = configurationInterface.getAllEMSInfo();
+ if (emsInfos.isEmpty()) {
+ try {
+ Thread.sleep(1000);
+ log.debug("The configuration properties from "
+ + ConfigurationManager.CONFIG_PROPERTIES_LOCATION
+ + " is not load");
+ } catch (Exception e) {
+ log.error("Exception", e);
+ }
+ }
+ }
+ List<CollectVo> collectVos = new ArrayList<>();
+ for (EMSInfo emsInfo : emsInfos) {
+ // alarm
+ CollectVo collectVo = emsInfo.getCollectVoByType(Constant.COLLECT_TYPE_ALARM);
+ if (collectVo != null) {
+ collectVo.setEmsName(emsInfo.getName());
+ collectVos.add(collectVo);
+ } else {
+ log.error("emsInfo.getCollectVoByType(EMS_RESOUCE) result CollectVo = null emsInfo ="
+ + emsInfo);
+ }
+ }
- for (CollectVo collectVo : collectVos) {
- AlarmTaskThread alarm = new AlarmTaskThread(collectVo);
- alarm.setName(collectVo.getIP() + collectVo.getPort());
- alarm.start();
- log.info("AlarmTaskThread is start");
- }
+ for (CollectVo collectVo : collectVos) {
+ AlarmTaskThread alarm = new AlarmTaskThread(collectVo);
+ alarm.setName(collectVo.getIP() + collectVo.getPort());
+ alarm.start();
+ log.info("AlarmTaskThread is start");
+ }
- }
+ }
- /**
- * @return the configurationInterface
- */
- public ConfigurationInterface getConfigurationInterface() {
- return configurationInterface;
- }
-
- /**
- * @param configurationInterface the configurationInterface to set
- */
- public void setConfigurationInterface(
- ConfigurationInterface configurationInterface) {
- this.configurationInterface = configurationInterface;
- }
+ /**
+ * @return the configurationInterface
+ */
+ public ConfigurationInterface getConfigurationInterface() {
+ return configurationInterface;
+ }
+ /**
+ * @param configurationInterface the configurationInterface to set
+ */
+ public void setConfigurationInterface(
+ ConfigurationInterface configurationInterface) {
+ this.configurationInterface = configurationInterface;
+ }
}
diff --git a/ems/boco/src/main/java/org/onap/vfc/nfvo/emsdriver/collector/alarm/AlarmTaskThread.java b/ems/boco/src/main/java/org/onap/vfc/nfvo/emsdriver/collector/alarm/AlarmTaskThread.java
index 20dd4fd..f90de5f 100644
--- a/ems/boco/src/main/java/org/onap/vfc/nfvo/emsdriver/collector/alarm/AlarmTaskThread.java
+++ b/ems/boco/src/main/java/org/onap/vfc/nfvo/emsdriver/collector/alarm/AlarmTaskThread.java
@@ -30,259 +30,259 @@ import java.net.Socket;
import java.net.SocketException;
import java.net.UnknownHostException;
-
public class AlarmTaskThread extends Thread {
- private static final Log log = LogFactory.getLog(AlarmTaskThread.class);
+ private static final Log log = LogFactory.getLog(AlarmTaskThread.class);
- private HeartBeat heartBeat = null;
+ private HeartBeat heartBeat = null;
+ private boolean isStop = false;
+ private CollectVo collectVo = null;
+ private int readTimeout = Constant.READ_TIMEOUT_MILLISECOND;
+ private int reqId;
+ private Socket socket = null;
+ private BufferedInputStream is = null;
+ private BufferedOutputStream dos = null;
- private boolean isStop = false;
- private CollectVo collectVo = null;
- private int readTimeout = Constant.READ_TIMEOUT_MILLISECOND;
- private int reqId;
+ private MessageChannel alarmChannel;
- private Socket socket = null;
- private BufferedInputStream is = null;
- private BufferedOutputStream dos = null;
+ public AlarmTaskThread() {
+ super();
+ }
- private MessageChannel alarmChannel;
+ public AlarmTaskThread(CollectVo collectVo) {
+ this.collectVo = collectVo;
+ }
- public AlarmTaskThread() {
- super();
- }
+ @Override
+ public void run() {
+ try {
+ alarmChannel = MessageChannelFactory.getMessageChannel(Constant.RESULT_CHANNEL_KEY);
+ this.init();
+ while (!this.isStop) {
+ String body;
+ try {
+ body = this.receive();
+ alarmChannel.put(body);
+ } catch (Exception e) {
+ log.error("alarmChannel.put Exception: ", e);
+ reinit();
+ }
+ }
+ } catch (Exception e) {
+ log.error("run Exception:", e);
+ }
+ }
- public AlarmTaskThread(CollectVo collectVo) {
+ public String receive() throws IOException {
+ try {
+ Msg msg = null;
+ String retString = null;
+ while (retString == null && !this.isStop) {
+ msg = MessageUtil.readOneMsg(is);
+ log.debug("msg = " + msg.toString(true));
+ log.info("msg.getMsgType().name = " + msg.getMsgType().name);
+ if ("ackLoginAlarm".equalsIgnoreCase(msg.getMsgType().name)) {
+ log.debug("receive login ack");
+ boolean suc = this.ackLoginAlarm(msg);
+ if (suc) {
+ if (reqId == Integer.MAX_VALUE)
+ reqId = 0;
+ reqId++;
+ Msg msgheart = MessageUtil.putHeartBeatMsg(reqId);
+ heartBeat = new HeartBeat(socket, msgheart);
+ heartBeat.setName("CMCC_JT_HeartBeat");
+ // start heartBeat
+ heartBeat.start();
+ }
+ retString = null;
+ }
- this.collectVo = collectVo;
- }
+ if ("ackHeartBeat".equalsIgnoreCase(msg.getMsgType().name)) {
+ log.debug("received heartBeat message:" + msg.getBody());
+ retString = null;
+ }
+
+ if ("realTimeAlarm".equalsIgnoreCase(msg.getMsgType().name)) {
+ log.debug("received alarm message");
+ retString = msg.getBody();
+ }
+ if (retString == null) {
+ Thread.sleep(100);
+ }
+
+ }// while
+ return retString;
- @Override
- public void run() {
- try {
- alarmChannel = MessageChannelFactory.getMessageChannel(Constant.RESULT_CHANNEL_KEY);
- this.init();
- while (!this.isStop) {
- String body;
- try {
- body = this.receive();
- alarmChannel.put(body);
} catch (Exception e) {
- log.error("alarmChannel.put Exception: ",e);
- reinit();
- }
- }
- } catch (Exception e) {
- log.error("run Exception:",e);
- }
- }
-
-
- public String receive() throws IOException {
- try{
- Msg msg = null;
- String retString = null;
- while (retString == null && !this.isStop) {
- msg = MessageUtil.readOneMsg(is);
- log.debug("msg = " + msg.toString(true));
- log.info("msg.getMsgType().name = " + msg.getMsgType().name);
- if ("ackLoginAlarm".equalsIgnoreCase(msg.getMsgType().name)) {
- log.debug("receive login ack");
- boolean suc = this.ackLoginAlarm(msg);
- if (suc) {
- if (reqId == Integer.MAX_VALUE)
- reqId=0;
- reqId++;
- Msg msgheart = MessageUtil.putHeartBeatMsg(reqId);
- heartBeat = new HeartBeat(socket, msgheart);
- heartBeat.setName("CMCC_JT_HeartBeat");
- // start heartBeat
- heartBeat.start();
- }
- retString = null;
- }
-
- if ("ackHeartBeat".equalsIgnoreCase(msg.getMsgType().name)) {
- log.debug("received heartBeat message:" + msg.getBody());
- retString = null;
+ log.error("receive Error: ", e);
+ throw new IOException("receive Error: ", e);
}
+ }
+
+ public void init() throws IOException {
+ isStop = false;
+ // host
+ String host = collectVo.getIP();
+ // port
+ String port = collectVo.getPort();
+ // user
+ String user = collectVo.getUser();
+ // password
+ String password = collectVo.getPassword();
+ try {
+ if ((collectVo.getReadTimeout()).trim().length() > 0)
+ this.readTimeout = Integer.parseInt(collectVo.getReadTimeout());
- if ("realTimeAlarm".equalsIgnoreCase(msg.getMsgType().name)) {
- log.debug("received alarm message");
- retString = msg.getBody();
+ } catch (NumberFormatException e) {
+ log.error("Unable to parse read_timout: ", e);
+ throw new NumberFormatException("Unable to parse read_timout: " + e);
}
- if (retString == null) {
- Thread.sleep(100);
+
+ log.info("socket connect host=" + host + ", port=" + port);
+ try {
+ int portInt = Integer.parseInt(port);
+ socket = new Socket(host, portInt);
+
+ } catch (UnknownHostException e) {
+ log.error("remote host [" + host + "]connect fail"
+ + StringUtil.getStackTrace(e));
+ throw new UnknownHostException("remote host [" + host
+ + "]connect fail" + e);
+ } catch (IOException e1) {
+ log.error("create socket IOException ", e1);
+ throw new SocketException("create socket IOException " + e1);
}
-
- }//while
- return retString;
+ try {
+ socket.setSoTimeout(this.readTimeout);
+ socket.setTcpNoDelay(true);
+ socket.setKeepAlive(true);
+ } catch (SocketException e) {
+ log.error(" SocketException " + StringUtil.getStackTrace(e));
+ throw new SocketException(" SocketException "
+ + StringUtil.getStackTrace(e));
+ }
+ try {
+ dos = new BufferedOutputStream(socket.getOutputStream());
- }catch(Exception e){
- log.error("receive Error: ",e);
- throw new IOException("receive Error: ",e);
- }
-}
+ Msg msg = MessageUtil.putLoginMsg(user, password);
+
+ try {
+ log.debug("send login message " + msg.toString(false));
+ MessageUtil.writeMsg(msg, dos);
+
+ } catch (Exception e) {
+ log.error("send login message is fail "
+ + StringUtil.getStackTrace(e));
+ }
- public void init() throws IOException{
- isStop = false;
- //host
- String host = collectVo.getIP();
- //port
- String port = collectVo.getPort();
- //user
- String user = collectVo.getUser();
- //password
- String password = collectVo.getPassword();
-
- try{
- if((collectVo.getReadTimeout()).trim().length()>0)
- this.readTimeout = Integer.parseInt(collectVo.getReadTimeout());
-
- } catch (NumberFormatException e) {
- log.error("Unable to parse read_timout: ",e);
- throw new NumberFormatException("Unable to parse read_timout: " + e);
+ is = new BufferedInputStream(socket.getInputStream());
+
+ } catch (SocketException e) {
+ log.error("SocketException ", e);
+ throw new SocketException("SocketException " + e);
+ }
}
- log.info("socket connect host=" + host + ", port=" + port);
- try {
- int portInt = Integer.parseInt(port);
- socket = new Socket(host, portInt);
-
- } catch (UnknownHostException e) {
- log.error("remote host [" + host + "]connect fail" + StringUtil.getStackTrace(e));
- throw new UnknownHostException("remote host [" + host + "]connect fail" + e);
- } catch (IOException e1) {
- log.error("create socket IOException ", e1);
- throw new SocketException("create socket IOException " + e1);
- }
- try {
- socket.setSoTimeout(this.readTimeout);
- socket.setTcpNoDelay(true);
- socket.setKeepAlive(true);
- } catch (SocketException e) {
- log.error(" SocketException " + StringUtil.getStackTrace(e));
- throw new SocketException(" SocketException " + StringUtil.getStackTrace(e));
- }
- try {
- dos = new BufferedOutputStream(socket.getOutputStream());
-
- Msg msg = MessageUtil.putLoginMsg(user, password);
-
- try {
- log.debug("send login message " + msg.toString(false));
- MessageUtil.writeMsg(msg, dos);
-
- } catch (Exception e) {
- log.error("send login message is fail " + StringUtil.getStackTrace(e));
- }
-
- is = new BufferedInputStream(socket.getInputStream());
-
- } catch (SocketException e) {
- log.error("SocketException ",e);
- throw new SocketException("SocketException " + e);
- }
- }
-
- private boolean ackLoginAlarm(Msg msg) throws IOException {
- boolean ret = false;
- try {
- String loginres = msg.getBody();
- String[] loginbody = loginres.split(";");
- if (loginbody.length > 1) {
- for (String str : loginbody) {
- if (str.contains("=")) {
- String[] paras1 = str.split("=", -1);
- if ("result".equalsIgnoreCase(paras1[0].trim())) {
- if("succ".equalsIgnoreCase(paras1[1].trim()))
- ret = true;
- else ret = false;
+ private boolean ackLoginAlarm(Msg msg) throws IOException {
+ boolean ret = false;
+ try {
+ String loginres = msg.getBody();
+ String[] loginbody = loginres.split(";");
+ if (loginbody.length > 1) {
+ for (String str : loginbody) {
+ if (str.contains("=")) {
+ String[] paras1 = str.split("=", -1);
+ if ("result".equalsIgnoreCase(paras1[0].trim())) {
+ if ("succ".equalsIgnoreCase(paras1[1].trim()))
+ ret = true;
+ else
+ ret = false;
+ }
}
}
+ } else {
+ log.error("login ack body Incorrect formatbody=" + loginres);
}
+
+ } catch (Exception e) {
+ log.error("pocess login ack fail" + StringUtil.getStackTrace(e));
+ }
+ if (ret) {
+ log.info("login sucess receive login ack " + msg.getBody());
} else {
- log.error("login ack body Incorrect formatbody=" + loginres);
+ log.error("login fail receive login ack " + msg.getBody());
+ this.close();
+ this.isStop = true;
+ throw new IOException("pocess login ack fail");
}
+ return ret;
+ }
- } catch (Exception e) {
- log.error("pocess login ack fail" + StringUtil.getStackTrace(e));
- }
- if (ret) {
- log.info("login sucess receive login ack " + msg.getBody());
- } else {
- log.error("login fail receive login ack " + msg.getBody());
- this.close();
- this.isStop = true;
- throw new IOException("pocess login ack fail");
- }
- return ret;
- }
-
- public void close() {
- if (heartBeat != null) {
- heartBeat.setStop(true);
- }
- if (is != null) {
- try {
- is.close();
- } catch (IOException e) {
- log.error("Unable to close BufferedInput Stream",e);
- } finally {
- is = null;
- }
- }
- if (dos != null) {
- try {
- dos.close();
- } catch (IOException e) {
- log.error("Unable to close BufferedOutput Stream",e);
- } finally {
- dos = null;
- }
- }
- if (socket != null) {
- try {
- socket.close();
- } catch (IOException e) {
- log.error("Unable to close Socket",e);
- } finally {
- socket = null;
- }
-
- }
- }
-
- public void reinit() {
- int time = 0;
- close();
- while (!this.isStop) {
- close();
- time++;
- try {
- Thread.sleep(1000L * 30);
- init();
- return;
- } catch (Exception e) {
- log.error("Number [" + time + "]reconnect [" + collectVo.getIP() + "]fail" + e);
- }
- }
- }
-
- /**
- * @param isStop the isStop to set
- */
- public void setStop(boolean isStop) {
- this.isStop = isStop;
- }
-
- /**
- * @return the heartBeat
- */
- public HeartBeat getHeartBeat() {
- return heartBeat;
- }
+ public void close() {
+ if (heartBeat != null) {
+ heartBeat.setStop(true);
+ }
+ if (is != null) {
+ try {
+ is.close();
+ } catch (IOException e) {
+ log.error("Unable to close BufferedInput Stream", e);
+ } finally {
+ is = null;
+ }
+ }
+ if (dos != null) {
+ try {
+ dos.close();
+ } catch (IOException e) {
+ log.error("Unable to close BufferedOutput Stream", e);
+ } finally {
+ dos = null;
+ }
+ }
+ if (socket != null) {
+ try {
+ socket.close();
+ } catch (IOException e) {
+ log.error("Unable to close Socket", e);
+ } finally {
+ socket = null;
+ }
+ }
+ }
+
+ public void reinit() {
+ int time = 0;
+ close();
+ while (!this.isStop) {
+ close();
+ time++;
+ try {
+ Thread.sleep(1000L * 30);
+ init();
+ return;
+ } catch (Exception e) {
+ log.error("Number [" + time + "]reconnect ["
+ + collectVo.getIP() + "]fail" + e);
+ }
+ }
+ }
+
+ /**
+ * @param isStop
+ * the isStop to set
+ */
+ public void setStop(boolean isStop) {
+ this.isStop = isStop;
+ }
+
+ /**
+ * @return the heartBeat
+ */
+ public HeartBeat getHeartBeat() {
+ return heartBeat;
+ }
}
diff --git a/ems/boco/src/main/java/org/onap/vfc/nfvo/emsdriver/taskscheduler/CollectManager.java b/ems/boco/src/main/java/org/onap/vfc/nfvo/emsdriver/taskscheduler/CollectManager.java
index 9e8aa26..db1a676 100644
--- a/ems/boco/src/main/java/org/onap/vfc/nfvo/emsdriver/taskscheduler/CollectManager.java
+++ b/ems/boco/src/main/java/org/onap/vfc/nfvo/emsdriver/taskscheduler/CollectManager.java
@@ -1,4 +1,4 @@
-/**
+/*
* Copyright 2017 BOCO Corporation. CMCC Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");