diff options
7 files changed, 454 insertions, 442 deletions
diff --git a/ems/boco/data/License.txt b/ems/boco/data/License.txt index a98c7aa..ba66b35 100644 --- a/ems/boco/data/License.txt +++ b/ems/boco/data/License.txt @@ -1,14 +1,14 @@ - Copyright 2017 BOCO Corporation. CMCC?Technologies?Co.,?Ltd +Copyright 2017 BOCO Corporation. CMCC Technologies Co., Ltd - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and limitations under the License. diff --git a/ems/boco/src/main/java/org/onap/vfc/nfvo/emsdriver/collector/CollectMsgReceiverThread.java b/ems/boco/src/main/java/org/onap/vfc/nfvo/emsdriver/collector/CollectMsgReceiverThread.java index 89cf34b..6c41b65 100644 --- a/ems/boco/src/main/java/org/onap/vfc/nfvo/emsdriver/collector/CollectMsgReceiverThread.java +++ b/ems/boco/src/main/java/org/onap/vfc/nfvo/emsdriver/collector/CollectMsgReceiverThread.java @@ -23,74 +23,69 @@ import org.onap.vfc.nfvo.emsdriver.messagemgr.MessageChannelFactory; public class CollectMsgReceiverThread extends DriverThread { - private long timeStamp = System.currentTimeMillis(); - - private MessageChannel collectChannel; - - private TaskThreadService taskService; - - private int threadMaxNum = 100; - - - @Override - public void dispose() { - collectChannel = MessageChannelFactory.getMessageChannel(Constant.COLLECT_CHANNEL_KEY); - taskService = TaskThreadService.getInstance(threadMaxNum); - taskService.start(); - - while (isRun()) { - - try { - if (System.currentTimeMillis() - timeStamp > Constant.ONEMINUTE) { - timeStamp = System.currentTimeMillis(); - - log.debug("COLLECT_CHANNEL Msg size :" + collectChannel.size()); - } - - Object obj = collectChannel.poll(); - if (obj == null) { - Thread.sleep(10); - continue; - } - if (obj instanceof CollectMsg) { - CollectMsg collectMsg = (CollectMsg) obj; - taskService.add(collectMsg); - log.debug("receive a CollectMsg id = " + collectMsg.getId()); - } else { - log.error("receive Objcet not CollectMsg " + obj); - } - - } catch (Exception e) { - log.error("dispatch alarm exception", e); - - } - } - - } - - - /** - * @return the threadMaxNum - */ - public int getThreadMaxNum() { - return threadMaxNum; - } - - - /** - * @param threadMaxNum the threadMaxNum to set - */ - public void setThreadMaxNum(int threadMaxNum) { - this.threadMaxNum = threadMaxNum; - } - - - /** - * @return the taskService - */ - public TaskThreadService getTaskService() { - return taskService; - } - + private long timeStamp = System.currentTimeMillis(); + + private MessageChannel collectChannel; + + private TaskThreadService taskService; + + private int threadMaxNum = 100; + + @Override + public void dispose() { + collectChannel = MessageChannelFactory + .getMessageChannel(Constant.COLLECT_CHANNEL_KEY); + taskService = TaskThreadService.getInstance(threadMaxNum); + taskService.start(); + + while (isRun()) { + + try { + if (System.currentTimeMillis() - timeStamp > Constant.ONEMINUTE) { + timeStamp = System.currentTimeMillis(); + log.debug("COLLECT_CHANNEL Msg size :" + + collectChannel.size()); + } + Object obj = collectChannel.poll(); + if (obj == null) { + Thread.sleep(10); + continue; + } + if (obj instanceof CollectMsg) { + CollectMsg collectMsg = (CollectMsg) obj; + taskService.add(collectMsg); + log.debug("receive a CollectMsg id = " + collectMsg.getId()); + } else { + log.error("receive Objcet not CollectMsg " + obj); + } + + } catch (Exception e) { + log.error("dispatch alarm exception", e); + + } + } + + } + + /** + * @return the threadMaxNum + */ + public int getThreadMaxNum() { + return threadMaxNum; + } + + /** + * @param threadMaxNum the threadMaxNum to set + */ + public void setThreadMaxNum(int threadMaxNum) { + this.threadMaxNum = threadMaxNum; + } + + /** + * @return the taskService + */ + public TaskThreadService getTaskService() { + return taskService; + } } diff --git a/ems/boco/src/main/java/org/onap/vfc/nfvo/emsdriver/collector/TaskThread.java b/ems/boco/src/main/java/org/onap/vfc/nfvo/emsdriver/collector/TaskThread.java index 30104f6..1ac304e 100644 --- a/ems/boco/src/main/java/org/onap/vfc/nfvo/emsdriver/collector/TaskThread.java +++ b/ems/boco/src/main/java/org/onap/vfc/nfvo/emsdriver/collector/TaskThread.java @@ -43,17 +43,13 @@ import java.util.regex.Pattern; public class TaskThread implements Runnable { private static final Log log = LogFactory.getLog(TaskThread.class); - private MessageChannel pmResultChannel; + private MessageChannel pmResultChannel; private MessageChannel cmResultChannel; private CollectMsg data; - private ConfigurationInterface configurationInterface = new ConfigurationImp(); - private String localPath = Constant.SYS_DATA_TEMP; private String resultPath = Constant.SYS_DATA_RESULT; - private SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMddHHmmss"); - private SimpleDateFormat dateFormat2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); public TaskThread(CollectMsg data) { @@ -66,7 +62,6 @@ public class TaskThread implements Runnable { @Override public void run() { - cmResultChannel = MessageChannelFactory.getMessageChannel(Constant.COLLECT_RESULT_CHANNEL_KEY); pmResultChannel = MessageChannelFactory.getMessageChannel(Constant.COLLECT_RESULT_PM_CHANNEL_KEY); try { @@ -80,7 +75,6 @@ public class TaskThread implements Runnable { String emsName = collectMsg.getEmsName(); String type = collectMsg.getType(); CollectVo collectVo = configurationInterface.getCollectVoByEmsNameAndType(emsName, type); - // ftp download List<String> downloadfiles = this.ftpDownload(collectVo); // paser ftp update message send @@ -90,29 +84,25 @@ public class TaskThread implements Runnable { } public void parseFtpAndSendMessage(String fileName, CollectVo collectVo) { - // List<File> filelist = decompressed(fileName); - for (File tempfile : filelist) { - String unfileName = tempfile.getName(); - Pattern pa = Pattern.compile(".*-(.*)-\\w{2}-"); Matcher ma = pa.matcher(unfileName); if (!ma.find()) continue; - String nename = ma.group(1); + //String nename = ma.group(1); boolean parseResult = false; if (Constant.COLLECT_TYPE_CM.equalsIgnoreCase(collectVo.getType())) { - //parseResult = processCMXml(tempfile, nename, "CM");//The logic is not exist now, but need to add in the future + // parseResult = processCMXml(tempfile, nename, "CM");//The + // logic is not exist now, but need to add in the future } else { - if (unfileName.indexOf(".csv") > -1) {//changed to -1 for coding practice as having ".csv" must have some some legal name + if (unfileName.indexOf(".csv") > -1) {// changed to -1 for coding practice as having ".csv" must have some some legal name parseResult = processPMCsv(tempfile); } else { parseResult = processPMXml(tempfile); } } - if (parseResult) { log.info("parser " + tempfile + " sucess"); tempfile.delete(); @@ -124,11 +114,10 @@ public class TaskThread implements Runnable { } public boolean processPMXml(File file) { - try ( FileInputStream fis = new FileInputStream(file); - InputStreamReader isr = new InputStreamReader(fis, Constant.ENCODING_UTF8)){ - + try (FileInputStream fis = new FileInputStream(file); + InputStreamReader isr = new InputStreamReader(fis,Constant.ENCODING_UTF8)) { XMLInputFactory fac = XMLInputFactory.newInstance(); - XMLStreamReader reader = fac.createXMLStreamReader(isr); + XMLStreamReader reader = fac.createXMLStreamReader(isr); boolean fileHeaderStart = false; boolean measurementStart = false; @@ -152,14 +141,13 @@ public class TaskThread implements Runnable { while (reader.hasNext()) { try { event = reader.next(); - switch (event) { case XMLStreamConstants.START_ELEMENT: localName = reader.getLocalName(); if ("FileHeader".equalsIgnoreCase(localName)) { fileHeaderStart = true; } - if (fileHeaderStart && !"FileHeader".equalsIgnoreCase(localName)) { + if (fileHeaderStart&& !"FileHeader".equalsIgnoreCase(localName)) { commonNameAndValue.put(localName, reader.getElementText().trim()); } if ("Measurements".equalsIgnoreCase(localName)) { @@ -195,7 +183,7 @@ public class TaskThread implements Runnable { int n = reader.getAttributeCount(); for (int i = 0; i < n; i++) { String name = reader.getAttributeLocalName(i); - commonNameAndValue.put(name, reader.getAttributeValue(i)); + commonNameAndValue.put(name,reader.getAttributeValue(i)); } } if (objectFlag) { @@ -209,7 +197,7 @@ public class TaskThread implements Runnable { index = Integer.parseInt(indexStr); String name = pmNames.get(index); if (name == null) { - log.error("illegal data: valueIndex=" + index); + log.error("illegal data: valueIndex="+ index); continue; } @@ -227,7 +215,7 @@ public class TaskThread implements Runnable { currentMea = pmNames.get(index); if (currentMea == null) { - log.error("illegal data: valueIndex=" + index); + log.error("illegal data: valueIndex="+ index); continue; } } @@ -258,8 +246,8 @@ public class TaskThread implements Runnable { pmResultChannel.put(pmDatas); } catch (Exception e) { - pmResultChannel.clear(); - log.error("collectResultChannel.put(resultMap) error ", e); + pmResultChannel.clear(); + log.error("collectResultChannel.put(resultMap) error ",e); } } if ("PmData".equalsIgnoreCase(endLocalName)) { @@ -284,11 +272,11 @@ public class TaskThread implements Runnable { event = reader.next(); } } - reader.close(); + reader.close(); } catch (Exception e) { log.error("processPMXml is Exception ", e); return false; - } + } return true; } @@ -312,7 +300,7 @@ public class TaskThread implements Runnable { List<String> columnNames = new ArrayList<>(); List<String> commonValues = new ArrayList<>(); try (FileInputStream brs = new FileInputStream(tempfile); - InputStreamReader isr = new InputStreamReader(brs, Constant.ENCODING_UTF8); + InputStreamReader isr = new InputStreamReader(brs,Constant.ENCODING_UTF8); BufferedReader br = new BufferedReader(isr)) { // common field @@ -343,11 +331,11 @@ public class TaskThread implements Runnable { for (String value : values) { valuelist.add(value); } - HashMap<String, String> resultMap = this.resultMap(columnNames, valuelist); + HashMap<String, String> resultMap = this.resultMap(columnNames,valuelist); try { pmResultChannel.put(resultMap); } catch (Exception e) { - pmResultChannel.clear(); + pmResultChannel.clear(); log.error("collectResultChannel.put(resultMap) error ", e); } valuelist.clear(); @@ -360,7 +348,8 @@ public class TaskThread implements Runnable { } - private HashMap<String, String> resultMap(List<String> columnNames, List<String> valuelist) { + private HashMap<String, String> resultMap(List<String> columnNames, + List<String> valuelist) { HashMap<String, String> resultMap = new HashMap<>(); if (columnNames.size() == valuelist.size()) { @@ -505,16 +494,18 @@ public class TaskThread implements Runnable { try { cmResultChannel.put(message); } catch (Exception e) { - log.error("collectResultChannel.put(message) is error " + StringUtil.getStackTrace(e)); + log.error("collectResultChannel.put(message) is error " + + StringUtil.getStackTrace(e)); } } - public String createMessage(String zipName, String user, String pwd, String ip, String port, int countNum, - String nename) { + public String createMessage(String zipName, String user, String pwd, + String ip, String port, int countNum, String nename) { StringBuilder strBuffer = new StringBuilder(); - strBuffer.append("<?xml version=\"1.0\" encoding=\"UTF-8\"?>" - + "<FILE_DATA_READY_UL xmlns:xsi=\" http://www.w3.org/2001/XMLSchema-instance\">" - + "<Header SessionID=\""); + strBuffer + .append("<?xml version=\"1.0\" encoding=\"UTF-8\"?>" + + "<FILE_DATA_READY_UL xmlns:xsi=\" http://www.w3.org/2001/XMLSchema-instance\">" + + "<Header SessionID=\""); strBuffer.append(""); strBuffer.append("\" LicenceID=\""); strBuffer.append(""); @@ -555,25 +546,28 @@ public class TaskThread implements Runnable { strBuffer.append("</DataCount>"); strBuffer.append("<FileSize>").append("").append("</FileSize>"); - strBuffer.append("<DataGranularity>").append("").append("</DataGranularity>"); + strBuffer.append("<DataGranularity>").append("") + .append("</DataGranularity>"); strBuffer.append("</Body></FILE_DATA_READY_UL>"); return strBuffer.toString(); } - private void ftpStore(String[] fileKeys, String ip, String port, String ftp_user, String ftp_password, - String ftp_passive, String ftp_type, String remoteFile) { + private void ftpStore(String[] fileKeys, String ip, String port, + String ftp_user, String ftp_password, String ftp_passive, + String ftp_type, String remoteFile) { String zipFilePath = fileKeys[0]; FTPInterface ftpClient; ftpClient = new FTPSrv(); // login try { - ftpClient.login(ip, Integer.parseInt(port), ftp_user, ftp_password, "GBK", - Boolean.parseBoolean(ftp_passive), 5 * 60 * 1000); + ftpClient.login(ip, Integer.parseInt(port), ftp_user, ftp_password, + "GBK", Boolean.parseBoolean(ftp_passive), 5 * 60 * 1000); } catch (Exception e) { - log.error("login fail,ip=[" + ip + "] port=[" + port + "] user=[" + ftp_user + /*"]pwd=[" + ftp_password + */"]" + log.error("login fail,ip=[" + ip + "] port=[" + port + "] user=[" + + ftp_user + /* "]pwd=[" + ftp_password + */"]" + StringUtil.getStackTrace(e)); return; } @@ -583,17 +577,19 @@ public class TaskThread implements Runnable { } - private String[] createZipFile(String csvpathAndFileName, String xmlPathAndFileName, String nename) - throws IOException { - String zipPath = resultPath + nename + dateFormat.format(new Date()) + "_" + System.nanoTime(); + private String[] createZipFile(String csvpathAndFileName, String xmlPathAndFileName, String nename) throws IOException { + String zipPath = resultPath + nename + dateFormat.format(new Date()) + + "_" + System.nanoTime(); File destDir = new File(zipPath); destDir.mkdirs(); try { - FileUtils.copyFileToDirectory(new File(csvpathAndFileName), destDir); - FileUtils.copyFileToDirectory(new File(xmlPathAndFileName), destDir); + FileUtils + .copyFileToDirectory(new File(csvpathAndFileName), destDir); + FileUtils + .copyFileToDirectory(new File(xmlPathAndFileName), destDir); } catch (IOException e) { - throw new IOException("createZipFile",e); + throw new IOException("createZipFile", e); } String destFilePath = zipPath + ".zip"; @@ -605,7 +601,7 @@ public class TaskThread implements Runnable { FileUtils.deleteDirectory(destDir); } catch (IOException e) { log.error("zip.compress() is fail " + StringUtil.getStackTrace(e)); - throw new IOException("createZipFile",e); + throw new IOException("createZipFile", e); } return new String[] { destFilePath, zipPath + ".zip" }; } @@ -620,45 +616,54 @@ public class TaskThread implements Runnable { String xmlFileName = nename + dateFormat.format(new Date()) + System.nanoTime(); String fieldLine = ""; for (int i = 0; i < names.size(); i++) { - String field = "\t<Field>\r\n" + "\t\t<FieldNo>" + i + "</FieldNo>\r\n" + "\t\t<FieldName>" + names.get(i) - + "</FieldName>\r\n" + "\t\t<FieldType>" + names.get(i) + "</FieldType>\r\n" - + "\t\t<FieldNameOther>" + names.get(i) + "</FieldNameOther>\r\n" + "\t</Field>\r\n"; + String field = "\t<Field>\r\n" + "\t\t<FieldNo>" + i + + "</FieldNo>\r\n" + "\t\t<FieldName>" + names.get(i) + + "</FieldName>\r\n" + "\t\t<FieldType>" + names.get(i) + + "</FieldType>\r\n" + "\t\t<FieldNameOther>" + + names.get(i) + "</FieldNameOther>\r\n" + "\t</Field>\r\n"; fieldLine = fieldLine + field; } - String str = "<?xml version=\"1.0\" encoding=\"gbk\"?>\r\n" + "<xml>\r\n" + "<FILE_STRUCTURE>\r\n" + fieldLine + String str = "<?xml version=\"1.0\" encoding=\"gbk\"?>\r\n" + + "<xml>\r\n" + "<FILE_STRUCTURE>\r\n" + fieldLine + "</FILE_STRUCTURE>\r\n" + "</xml>\r\n"; String xmlPathAndFileName = xmlpath + xmlFileName + ".xml"; try { this.writeDetail(xmlPathAndFileName, str); } catch (Exception e) { - log.error("writeDetail is fail ,xmlFileName=" + xmlFileName + StringUtil.getStackTrace(e)); + log.error("writeDetail is fail ,xmlFileName=" + xmlFileName + + StringUtil.getStackTrace(e)); } return xmlPathAndFileName; } - private void writeDetail(String detailFileName, String str) throws IOException { - try (OutputStream readOut = new FileOutputStream(new File(detailFileName), false); + private void writeDetail(String detailFileName, String str) + throws IOException { + try (OutputStream readOut = new FileOutputStream(new File( + detailFileName), false); OutputStreamWriter writer = new OutputStreamWriter(readOut)) { writer.write(str); writer.flush(); } catch (IOException e) { - throw new IOException("writeDetail",e); + throw new IOException("writeDetail", e); } } - private void appendLine(LinkedHashMap<String, String> nameAndValue, BufferedOutputStream bos) { - try{ + private void appendLine(LinkedHashMap<String, String> nameAndValue, + BufferedOutputStream bos) { + try { StringBuilder lineDatas = new StringBuilder(); - + for (String key : nameAndValue.keySet()) { lineDatas.append(nameAndValue.get(key)).append("|"); } - /*for (HashMap.Entry<String, String> entry : nameAndValue.entrySet()) { - lineDatas.append(entry.getValue()).append("|"); - }*/ + /* + * for (HashMap.Entry<String, String> entry : + * nameAndValue.entrySet()) { + * lineDatas.append(entry.getValue()).append("|"); } + */ bos.write(lineDatas.toString().getBytes()); bos.write("\n".getBytes()); } catch (IOException e) { @@ -734,11 +739,13 @@ public class TaskThread implements Runnable { // login try { - log.info("ftp login ,ip=[" + ip + "] port=[" + port + "] user=[" + user + /*"]password=[" + password +*/ "]"); - ftpClient.login(ip, Integer.parseInt(port), user, password, "GBK", Boolean.parseBoolean(passivemode), - 5 * 60 * 1000); + log.info("ftp login ,ip=[" + ip + "] port=[" + port + "] user=[" + + user + /* "]password=[" + password + */"]"); + ftpClient.login(ip, Integer.parseInt(port), user, password, "GBK", + Boolean.parseBoolean(passivemode), 5 * 60 * 1000); } catch (Exception e) { - log.error("login fail,ip=[" + ip + "] port=[" + port + "] user=[" + user + /*"]password=[" + password +*/ "]" + log.error("login fail,ip=[" + ip + "] port=[" + port + "] user=[" + + user + /* "]password=[" + password + */"]" + StringUtil.getStackTrace(e)); return fileList; } @@ -756,13 +763,14 @@ public class TaskThread implements Runnable { collectPeriod = Long.parseLong(collectVo.getGranularity()) * 60; log.info("collectPeriod =[" + collectPeriod + "]"); } catch (NumberFormatException e) { - log.error("NumberFormatException" ,e); + log.error("NumberFormatException", e); } long[] d = DateUtil.getScanScope(new Date(), collectPeriod); searchExprList.add(VarExprParser.replaceVar(conpath, d[0], d[1])); varMap.clear(); - log.info("[" + conpath + "] result[" + (searchExprList.size() - oldSize) + "] path"); + log.info("[" + conpath + "] result[" + + (searchExprList.size() - oldSize) + "] path"); } String nowdir = null; try { @@ -782,7 +790,8 @@ public class TaskThread implements Runnable { boolean cdsucess = ftpClient.chdir(ftpDir); if (cdsucess) { AFtpRemoteFile[] arf = (AFtpRemoteFile[]) ftpClient.list(); - log.info(" list [" + ftpDir + "] result[" + (arf == null ? "null" : arf.length) + "] files"); + log.info(" list [" + ftpDir + "] result[" + + (arf == null ? "null" : arf.length) + "] files"); // filter rfileFilter(remoteFiles, arf, ftpRegular); @@ -792,7 +801,8 @@ public class TaskThread implements Runnable { try { new File(localPath).mkdir(); } catch (Exception e) { - log.error("create localPath is fail localPath=" + localPath + " " + log.error("create localPath is fail localPath=" + + localPath + " " + StringUtil.getStackTrace(e)); } } @@ -801,17 +811,20 @@ public class TaskThread implements Runnable { new File(localPath).mkdirs(); } - String localFileName = localPath + ftpRemoteFile.getFileName(); - if(new File(localFileName).exists()){ + String localFileName = localPath + + ftpRemoteFile.getFileName(); + if (new File(localFileName).exists()) { new File(localFileName).delete(); } - boolean flag = ftpClient.downloadFile(ftpRemoteFile.getAbsFileName(), localFileName); + boolean flag = ftpClient.downloadFile( + ftpRemoteFile.getAbsFileName(), localFileName); if (flag) { fileList.add(localFileName); } else { - log.error("download file fail fileName=" + ftpRemoteFile.getAbsFileName()); + log.error("download file fail fileName=" + + ftpRemoteFile.getAbsFileName()); } } @@ -823,13 +836,14 @@ public class TaskThread implements Runnable { return fileList; } - private void rfileFilter(List<AFtpRemoteFile> fileContainer, AFtpRemoteFile[] arfs, String ftpRegular) { + private void rfileFilter(List<AFtpRemoteFile> fileContainer, + AFtpRemoteFile[] arfs, String ftpRegular) { if (ftpRegular != null && ftpRegular.length() > 0) { Pattern pattern = null; try { pattern = Pattern.compile(ftpRegular, Pattern.CASE_INSENSITIVE); } catch (Exception e) { - log.info("[" + ftpRegular + "]Pattern.compile exception:",e); + log.info("[" + ftpRegular + "]Pattern.compile exception:", e); // should rethrow exception or return from here } int hisSize = fileContainer.size(); @@ -842,10 +856,11 @@ public class TaskThread implements Runnable { // define the flow when pattern is null } - if (null != matcher && matcher.find()) + if (null != matcher && matcher.find()) fileContainer.add(arfs[j]); } - log.info("[" + ftpRegular + "]filter[" + (fileContainer.size() - hisSize) + "]filse"); + log.info("[" + ftpRegular + "]filter[" + + (fileContainer.size() - hisSize) + "]filse"); } else { for (int j = 0; arfs != null && j < arfs.length; j++) fileContainer.add(arfs[j]); @@ -875,7 +890,8 @@ public class TaskThread implements Runnable { return dirkeys; } - public List<String> getPathNoRegular(List<String> searchExprList, FTPInterface ftpCache) throws IOException { + public List<String> getPathNoRegular(List<String> searchExprList, + FTPInterface ftpCache) throws IOException { boolean isregular = false; List<String> regularList = new ArrayList<>(); for (String regular : searchExprList) { @@ -905,12 +921,14 @@ public class TaskThread implements Runnable { } } catch (Exception e) { log.error(" cd dir [" + parpath + "]fail", e); - throw new IOException("ftpCache.chdir",e); + throw new IOException("ftpCache.chdir", e); } RemoteFile[] remotef = ftpCache.list(); for (RemoteFile aremote : remotef) { - if (aremote.isDirectory() && aremote.getFileName().matches(matcher.group(2))) { - regularList.add(matcher.group(1) + aremote.getFileName() + matcher.group(3)); + if (aremote.isDirectory() + && aremote.getFileName().matches(matcher.group(2))) { + regularList.add(matcher.group(1) + + aremote.getFileName() + matcher.group(3)); } } } else { @@ -922,7 +940,7 @@ public class TaskThread implements Runnable { } return regularList; } - + public MessageChannel getPmResultChannel() { return pmResultChannel; } diff --git a/ems/boco/src/main/java/org/onap/vfc/nfvo/emsdriver/collector/TaskThreadService.java b/ems/boco/src/main/java/org/onap/vfc/nfvo/emsdriver/collector/TaskThreadService.java index 2a7bac4..566da0f 100644 --- a/ems/boco/src/main/java/org/onap/vfc/nfvo/emsdriver/collector/TaskThreadService.java +++ b/ems/boco/src/main/java/org/onap/vfc/nfvo/emsdriver/collector/TaskThreadService.java @@ -43,13 +43,11 @@ public class TaskThreadService extends Thread { public void run() { // run the service try { while (startFlag) { - try { if (System.currentTimeMillis() - timeStamp > Constant.ONEMINUTE) { timeStamp = System.currentTimeMillis(); log.debug("task queue size " + queue.size()); } - CollectMsg data = receive(); if (data == null) { continue; @@ -90,7 +88,6 @@ public class TaskThreadService extends Thread { } } - public int size() { return queue.size(); } diff --git a/ems/boco/src/main/java/org/onap/vfc/nfvo/emsdriver/collector/alarm/AlarmManager.java b/ems/boco/src/main/java/org/onap/vfc/nfvo/emsdriver/collector/alarm/AlarmManager.java index 22224e7..6de72be 100644 --- a/ems/boco/src/main/java/org/onap/vfc/nfvo/emsdriver/collector/alarm/AlarmManager.java +++ b/ems/boco/src/main/java/org/onap/vfc/nfvo/emsdriver/collector/alarm/AlarmManager.java @@ -27,59 +27,61 @@ import java.util.List; public class AlarmManager extends DriverThread { - private ConfigurationInterface configurationInterface; + private ConfigurationInterface configurationInterface; - @Override - public void dispose() { - log.debug("AlarmManager is start"); - //get alarm CONFIG_PROPERTIES_LOCATION - List<EMSInfo> emsInfos = configurationInterface.getAllEMSInfo(); - while (isRun() && emsInfos.isEmpty()) { - emsInfos = configurationInterface.getAllEMSInfo(); - if (emsInfos.isEmpty()) { - try { - Thread.sleep(1000); - log.debug("The configuration properties from " + ConfigurationManager.CONFIG_PROPERTIES_LOCATION + " is not load"); - } catch (Exception e) { - log.error("Exception",e); - } - } - } - List<CollectVo> collectVos = new ArrayList<>(); - for (EMSInfo emsInfo : emsInfos) { - //alarm - CollectVo collectVo = emsInfo.getCollectVoByType(Constant.COLLECT_TYPE_ALARM); - if (collectVo != null) { - collectVo.setEmsName(emsInfo.getName()); - collectVos.add(collectVo); - } else { - log.error("emsInfo.getCollectVoByType(EMS_RESOUCE) result CollectVo = null emsInfo =" + emsInfo); - } - } + @Override + public void dispose() { + log.debug("AlarmManager is start"); + // get alarm CONFIG_PROPERTIES_LOCATION + List<EMSInfo> emsInfos = configurationInterface.getAllEMSInfo(); + while (isRun() && emsInfos.isEmpty()) { + emsInfos = configurationInterface.getAllEMSInfo(); + if (emsInfos.isEmpty()) { + try { + Thread.sleep(1000); + log.debug("The configuration properties from " + + ConfigurationManager.CONFIG_PROPERTIES_LOCATION + + " is not load"); + } catch (Exception e) { + log.error("Exception", e); + } + } + } + List<CollectVo> collectVos = new ArrayList<>(); + for (EMSInfo emsInfo : emsInfos) { + // alarm + CollectVo collectVo = emsInfo.getCollectVoByType(Constant.COLLECT_TYPE_ALARM); + if (collectVo != null) { + collectVo.setEmsName(emsInfo.getName()); + collectVos.add(collectVo); + } else { + log.error("emsInfo.getCollectVoByType(EMS_RESOUCE) result CollectVo = null emsInfo =" + + emsInfo); + } + } - for (CollectVo collectVo : collectVos) { - AlarmTaskThread alarm = new AlarmTaskThread(collectVo); - alarm.setName(collectVo.getIP() + collectVo.getPort()); - alarm.start(); - log.info("AlarmTaskThread is start"); - } + for (CollectVo collectVo : collectVos) { + AlarmTaskThread alarm = new AlarmTaskThread(collectVo); + alarm.setName(collectVo.getIP() + collectVo.getPort()); + alarm.start(); + log.info("AlarmTaskThread is start"); + } - } + } - /** - * @return the configurationInterface - */ - public ConfigurationInterface getConfigurationInterface() { - return configurationInterface; - } - - /** - * @param configurationInterface the configurationInterface to set - */ - public void setConfigurationInterface( - ConfigurationInterface configurationInterface) { - this.configurationInterface = configurationInterface; - } + /** + * @return the configurationInterface + */ + public ConfigurationInterface getConfigurationInterface() { + return configurationInterface; + } + /** + * @param configurationInterface the configurationInterface to set + */ + public void setConfigurationInterface( + ConfigurationInterface configurationInterface) { + this.configurationInterface = configurationInterface; + } } diff --git a/ems/boco/src/main/java/org/onap/vfc/nfvo/emsdriver/collector/alarm/AlarmTaskThread.java b/ems/boco/src/main/java/org/onap/vfc/nfvo/emsdriver/collector/alarm/AlarmTaskThread.java index 20dd4fd..f90de5f 100644 --- a/ems/boco/src/main/java/org/onap/vfc/nfvo/emsdriver/collector/alarm/AlarmTaskThread.java +++ b/ems/boco/src/main/java/org/onap/vfc/nfvo/emsdriver/collector/alarm/AlarmTaskThread.java @@ -30,259 +30,259 @@ import java.net.Socket; import java.net.SocketException; import java.net.UnknownHostException; - public class AlarmTaskThread extends Thread { - private static final Log log = LogFactory.getLog(AlarmTaskThread.class); + private static final Log log = LogFactory.getLog(AlarmTaskThread.class); - private HeartBeat heartBeat = null; + private HeartBeat heartBeat = null; + private boolean isStop = false; + private CollectVo collectVo = null; + private int readTimeout = Constant.READ_TIMEOUT_MILLISECOND; + private int reqId; + private Socket socket = null; + private BufferedInputStream is = null; + private BufferedOutputStream dos = null; - private boolean isStop = false; - private CollectVo collectVo = null; - private int readTimeout = Constant.READ_TIMEOUT_MILLISECOND; - private int reqId; + private MessageChannel alarmChannel; - private Socket socket = null; - private BufferedInputStream is = null; - private BufferedOutputStream dos = null; + public AlarmTaskThread() { + super(); + } - private MessageChannel alarmChannel; + public AlarmTaskThread(CollectVo collectVo) { + this.collectVo = collectVo; + } - public AlarmTaskThread() { - super(); - } + @Override + public void run() { + try { + alarmChannel = MessageChannelFactory.getMessageChannel(Constant.RESULT_CHANNEL_KEY); + this.init(); + while (!this.isStop) { + String body; + try { + body = this.receive(); + alarmChannel.put(body); + } catch (Exception e) { + log.error("alarmChannel.put Exception: ", e); + reinit(); + } + } + } catch (Exception e) { + log.error("run Exception:", e); + } + } - public AlarmTaskThread(CollectVo collectVo) { + public String receive() throws IOException { + try { + Msg msg = null; + String retString = null; + while (retString == null && !this.isStop) { + msg = MessageUtil.readOneMsg(is); + log.debug("msg = " + msg.toString(true)); + log.info("msg.getMsgType().name = " + msg.getMsgType().name); + if ("ackLoginAlarm".equalsIgnoreCase(msg.getMsgType().name)) { + log.debug("receive login ack"); + boolean suc = this.ackLoginAlarm(msg); + if (suc) { + if (reqId == Integer.MAX_VALUE) + reqId = 0; + reqId++; + Msg msgheart = MessageUtil.putHeartBeatMsg(reqId); + heartBeat = new HeartBeat(socket, msgheart); + heartBeat.setName("CMCC_JT_HeartBeat"); + // start heartBeat + heartBeat.start(); + } + retString = null; + } - this.collectVo = collectVo; - } + if ("ackHeartBeat".equalsIgnoreCase(msg.getMsgType().name)) { + log.debug("received heartBeat message:" + msg.getBody()); + retString = null; + } + + if ("realTimeAlarm".equalsIgnoreCase(msg.getMsgType().name)) { + log.debug("received alarm message"); + retString = msg.getBody(); + } + if (retString == null) { + Thread.sleep(100); + } + + }// while + return retString; - @Override - public void run() { - try { - alarmChannel = MessageChannelFactory.getMessageChannel(Constant.RESULT_CHANNEL_KEY); - this.init(); - while (!this.isStop) { - String body; - try { - body = this.receive(); - alarmChannel.put(body); } catch (Exception e) { - log.error("alarmChannel.put Exception: ",e); - reinit(); - } - } - } catch (Exception e) { - log.error("run Exception:",e); - } - } - - - public String receive() throws IOException { - try{ - Msg msg = null; - String retString = null; - while (retString == null && !this.isStop) { - msg = MessageUtil.readOneMsg(is); - log.debug("msg = " + msg.toString(true)); - log.info("msg.getMsgType().name = " + msg.getMsgType().name); - if ("ackLoginAlarm".equalsIgnoreCase(msg.getMsgType().name)) { - log.debug("receive login ack"); - boolean suc = this.ackLoginAlarm(msg); - if (suc) { - if (reqId == Integer.MAX_VALUE) - reqId=0; - reqId++; - Msg msgheart = MessageUtil.putHeartBeatMsg(reqId); - heartBeat = new HeartBeat(socket, msgheart); - heartBeat.setName("CMCC_JT_HeartBeat"); - // start heartBeat - heartBeat.start(); - } - retString = null; - } - - if ("ackHeartBeat".equalsIgnoreCase(msg.getMsgType().name)) { - log.debug("received heartBeat message:" + msg.getBody()); - retString = null; + log.error("receive Error: ", e); + throw new IOException("receive Error: ", e); } + } + + public void init() throws IOException { + isStop = false; + // host + String host = collectVo.getIP(); + // port + String port = collectVo.getPort(); + // user + String user = collectVo.getUser(); + // password + String password = collectVo.getPassword(); + try { + if ((collectVo.getReadTimeout()).trim().length() > 0) + this.readTimeout = Integer.parseInt(collectVo.getReadTimeout()); - if ("realTimeAlarm".equalsIgnoreCase(msg.getMsgType().name)) { - log.debug("received alarm message"); - retString = msg.getBody(); + } catch (NumberFormatException e) { + log.error("Unable to parse read_timout: ", e); + throw new NumberFormatException("Unable to parse read_timout: " + e); } - if (retString == null) { - Thread.sleep(100); + + log.info("socket connect host=" + host + ", port=" + port); + try { + int portInt = Integer.parseInt(port); + socket = new Socket(host, portInt); + + } catch (UnknownHostException e) { + log.error("remote host [" + host + "]connect fail" + + StringUtil.getStackTrace(e)); + throw new UnknownHostException("remote host [" + host + + "]connect fail" + e); + } catch (IOException e1) { + log.error("create socket IOException ", e1); + throw new SocketException("create socket IOException " + e1); } - - }//while - return retString; + try { + socket.setSoTimeout(this.readTimeout); + socket.setTcpNoDelay(true); + socket.setKeepAlive(true); + } catch (SocketException e) { + log.error(" SocketException " + StringUtil.getStackTrace(e)); + throw new SocketException(" SocketException " + + StringUtil.getStackTrace(e)); + } + try { + dos = new BufferedOutputStream(socket.getOutputStream()); - }catch(Exception e){ - log.error("receive Error: ",e); - throw new IOException("receive Error: ",e); - } -} + Msg msg = MessageUtil.putLoginMsg(user, password); + + try { + log.debug("send login message " + msg.toString(false)); + MessageUtil.writeMsg(msg, dos); + + } catch (Exception e) { + log.error("send login message is fail " + + StringUtil.getStackTrace(e)); + } - public void init() throws IOException{ - isStop = false; - //host - String host = collectVo.getIP(); - //port - String port = collectVo.getPort(); - //user - String user = collectVo.getUser(); - //password - String password = collectVo.getPassword(); - - try{ - if((collectVo.getReadTimeout()).trim().length()>0) - this.readTimeout = Integer.parseInt(collectVo.getReadTimeout()); - - } catch (NumberFormatException e) { - log.error("Unable to parse read_timout: ",e); - throw new NumberFormatException("Unable to parse read_timout: " + e); + is = new BufferedInputStream(socket.getInputStream()); + + } catch (SocketException e) { + log.error("SocketException ", e); + throw new SocketException("SocketException " + e); + } } - log.info("socket connect host=" + host + ", port=" + port); - try { - int portInt = Integer.parseInt(port); - socket = new Socket(host, portInt); - - } catch (UnknownHostException e) { - log.error("remote host [" + host + "]connect fail" + StringUtil.getStackTrace(e)); - throw new UnknownHostException("remote host [" + host + "]connect fail" + e); - } catch (IOException e1) { - log.error("create socket IOException ", e1); - throw new SocketException("create socket IOException " + e1); - } - try { - socket.setSoTimeout(this.readTimeout); - socket.setTcpNoDelay(true); - socket.setKeepAlive(true); - } catch (SocketException e) { - log.error(" SocketException " + StringUtil.getStackTrace(e)); - throw new SocketException(" SocketException " + StringUtil.getStackTrace(e)); - } - try { - dos = new BufferedOutputStream(socket.getOutputStream()); - - Msg msg = MessageUtil.putLoginMsg(user, password); - - try { - log.debug("send login message " + msg.toString(false)); - MessageUtil.writeMsg(msg, dos); - - } catch (Exception e) { - log.error("send login message is fail " + StringUtil.getStackTrace(e)); - } - - is = new BufferedInputStream(socket.getInputStream()); - - } catch (SocketException e) { - log.error("SocketException ",e); - throw new SocketException("SocketException " + e); - } - } - - private boolean ackLoginAlarm(Msg msg) throws IOException { - boolean ret = false; - try { - String loginres = msg.getBody(); - String[] loginbody = loginres.split(";"); - if (loginbody.length > 1) { - for (String str : loginbody) { - if (str.contains("=")) { - String[] paras1 = str.split("=", -1); - if ("result".equalsIgnoreCase(paras1[0].trim())) { - if("succ".equalsIgnoreCase(paras1[1].trim())) - ret = true; - else ret = false; + private boolean ackLoginAlarm(Msg msg) throws IOException { + boolean ret = false; + try { + String loginres = msg.getBody(); + String[] loginbody = loginres.split(";"); + if (loginbody.length > 1) { + for (String str : loginbody) { + if (str.contains("=")) { + String[] paras1 = str.split("=", -1); + if ("result".equalsIgnoreCase(paras1[0].trim())) { + if ("succ".equalsIgnoreCase(paras1[1].trim())) + ret = true; + else + ret = false; + } } } + } else { + log.error("login ack body Incorrect formatbody=" + loginres); } + + } catch (Exception e) { + log.error("pocess login ack fail" + StringUtil.getStackTrace(e)); + } + if (ret) { + log.info("login sucess receive login ack " + msg.getBody()); } else { - log.error("login ack body Incorrect formatbody=" + loginres); + log.error("login fail receive login ack " + msg.getBody()); + this.close(); + this.isStop = true; + throw new IOException("pocess login ack fail"); } + return ret; + } - } catch (Exception e) { - log.error("pocess login ack fail" + StringUtil.getStackTrace(e)); - } - if (ret) { - log.info("login sucess receive login ack " + msg.getBody()); - } else { - log.error("login fail receive login ack " + msg.getBody()); - this.close(); - this.isStop = true; - throw new IOException("pocess login ack fail"); - } - return ret; - } - - public void close() { - if (heartBeat != null) { - heartBeat.setStop(true); - } - if (is != null) { - try { - is.close(); - } catch (IOException e) { - log.error("Unable to close BufferedInput Stream",e); - } finally { - is = null; - } - } - if (dos != null) { - try { - dos.close(); - } catch (IOException e) { - log.error("Unable to close BufferedOutput Stream",e); - } finally { - dos = null; - } - } - if (socket != null) { - try { - socket.close(); - } catch (IOException e) { - log.error("Unable to close Socket",e); - } finally { - socket = null; - } - - } - } - - public void reinit() { - int time = 0; - close(); - while (!this.isStop) { - close(); - time++; - try { - Thread.sleep(1000L * 30); - init(); - return; - } catch (Exception e) { - log.error("Number [" + time + "]reconnect [" + collectVo.getIP() + "]fail" + e); - } - } - } - - /** - * @param isStop the isStop to set - */ - public void setStop(boolean isStop) { - this.isStop = isStop; - } - - /** - * @return the heartBeat - */ - public HeartBeat getHeartBeat() { - return heartBeat; - } + public void close() { + if (heartBeat != null) { + heartBeat.setStop(true); + } + if (is != null) { + try { + is.close(); + } catch (IOException e) { + log.error("Unable to close BufferedInput Stream", e); + } finally { + is = null; + } + } + if (dos != null) { + try { + dos.close(); + } catch (IOException e) { + log.error("Unable to close BufferedOutput Stream", e); + } finally { + dos = null; + } + } + if (socket != null) { + try { + socket.close(); + } catch (IOException e) { + log.error("Unable to close Socket", e); + } finally { + socket = null; + } + } + } + + public void reinit() { + int time = 0; + close(); + while (!this.isStop) { + close(); + time++; + try { + Thread.sleep(1000L * 30); + init(); + return; + } catch (Exception e) { + log.error("Number [" + time + "]reconnect [" + + collectVo.getIP() + "]fail" + e); + } + } + } + + /** + * @param isStop + * the isStop to set + */ + public void setStop(boolean isStop) { + this.isStop = isStop; + } + + /** + * @return the heartBeat + */ + public HeartBeat getHeartBeat() { + return heartBeat; + } } diff --git a/ems/boco/src/main/java/org/onap/vfc/nfvo/emsdriver/taskscheduler/CollectManager.java b/ems/boco/src/main/java/org/onap/vfc/nfvo/emsdriver/taskscheduler/CollectManager.java index 9e8aa26..db1a676 100644 --- a/ems/boco/src/main/java/org/onap/vfc/nfvo/emsdriver/taskscheduler/CollectManager.java +++ b/ems/boco/src/main/java/org/onap/vfc/nfvo/emsdriver/taskscheduler/CollectManager.java @@ -1,4 +1,4 @@ -/** +/* * Copyright 2017 BOCO Corporation. CMCC Technologies Co., Ltd * * Licensed under the Apache License, Version 2.0 (the "License"); |