diff options
-rw-r--r-- | ems/boco/src/main/java/org/onap/vfc/nfvo/emsdriver/collector/TaskThread.java | 1991 |
1 files changed, 987 insertions, 1004 deletions
diff --git a/ems/boco/src/main/java/org/onap/vfc/nfvo/emsdriver/collector/TaskThread.java b/ems/boco/src/main/java/org/onap/vfc/nfvo/emsdriver/collector/TaskThread.java index 9f2824b..1b97a6e 100644 --- a/ems/boco/src/main/java/org/onap/vfc/nfvo/emsdriver/collector/TaskThread.java +++ b/ems/boco/src/main/java/org/onap/vfc/nfvo/emsdriver/collector/TaskThread.java @@ -40,1011 +40,994 @@ import java.util.*; import java.util.regex.Matcher; import java.util.regex.Pattern; - public class TaskThread implements Runnable { - public Log log = LogFactory.getLog(TaskThread.class); - public MessageChannel pmResultChannel; - private MessageChannel cmResultChannel; - private CollectMsg data; - - private ConfigurationInterface configurationInterface = new ConfigurationImp(); - - private String localPath = Constant.SYS_DATA_TEMP; - private String resultPath = Constant.SYS_DATA_RESULT; - - private SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMddHHmmss"); - - private SimpleDateFormat dateFormat2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); - -// private String csvpathAndFileName; -// private String xmlPathAndFileName; -// private int countNum = 0 ; - - public TaskThread(CollectMsg data) { - this.data = data; - } - - public TaskThread() { - super(); - } - - @Override - public void run() { - - cmResultChannel = MessageChannelFactory.getMessageChannel(Constant.COLLECT_RESULT_CHANNEL_KEY); - pmResultChannel = MessageChannelFactory.getMessageChannel(Constant.COLLECT_RESULT_PM_CHANNEL_KEY); - try { - collectMsgHandle(data); - } catch (Exception e) { - log.error("", e); - } - } - - private void collectMsgHandle(CollectMsg collectMsg) { - String emsName = collectMsg.getEmsName(); - String type = collectMsg.getType(); - CollectVo collectVo = configurationInterface.getCollectVoByEmsNameAndType(emsName, type); - - //ftp download - List<String> downloadfiles = this.ftpDownload(collectVo); - //paser ftp update message send - for (String fileName : downloadfiles) { - this.parseFtpAndSendMessage(fileName, collectVo); - } - } - - public void parseFtpAndSendMessage(String fileName, CollectVo collectVo) { - // - List<File> filelist = decompressed(fileName); - - for (File tempfile : filelist) { - - String unfileName = tempfile.getName(); - - Pattern pa = Pattern.compile(".*-(.*)-\\w{2}-"); - Matcher ma = pa.matcher(unfileName); - if (!ma.find()) - continue; - String nename = ma.group(1); - boolean parseResult = false; - if (Constant.COLLECT_TYPE_CM.equalsIgnoreCase(collectVo.getType())) { - parseResult = processCMXml(tempfile, nename, "CM"); - } else { - if (unfileName.indexOf(".csv") > 0) { - parseResult = processPMCsv(tempfile); - } else { - parseResult = processPMXml(tempfile); - } - } - - if (parseResult) { - log.info("parser " + tempfile + " sucess"); - tempfile.delete(); - } else { - log.info("parser " + tempfile + " fail"); - } - - } - } - - public boolean processPMXml(File file) { - - FileInputStream fis = null; - InputStreamReader isr = null; - XMLStreamReader reader = null; - try { - fis = new FileInputStream(file); - isr = new InputStreamReader(fis, Constant.ENCODING_UTF8); - - XMLInputFactory fac = XMLInputFactory.newInstance(); - reader = fac.createXMLStreamReader(isr); - - boolean fileHeaderStart = false; - boolean measurementStart = false; - boolean pmNameFlag = false; - boolean pmDataFlag = false; - boolean objectFlag = true; - - int index = -1; - int nameIndex = -1; - String currentMea = null; - String subName = null; - String localName = null; - String endLocalName = null; - String objectType = null; - - - LinkedHashMap<String, String> commonNameAndValue = new LinkedHashMap<String, String>(); - LinkedHashMap<String, String> pmDatas = null; - LinkedHashMap<Integer, String> pmNames = null; - - - int event = -1; - while (reader.hasNext()) { - try { - event = reader.next(); - - switch (event) { - case XMLStreamConstants.START_ELEMENT: - localName = reader.getLocalName(); - if ("FileHeader".equalsIgnoreCase(localName)) { - fileHeaderStart = true; - } - if (fileHeaderStart) { - if (!"FileHeader".equalsIgnoreCase(localName)) { - commonNameAndValue.put(localName, reader.getElementText().trim()); - } - - } - if ("Measurements".equalsIgnoreCase(localName)) { - // a new Measurement starts - measurementStart = true; - } - if (measurementStart) { - // measurement handler - if ("ObjectType".equalsIgnoreCase(localName)) { - objectType = reader.getElementText().trim(); - commonNameAndValue.put("ObjectType", objectType); - } - if ("PmName".equalsIgnoreCase(localName)) { - pmNameFlag = true; - pmNames = new LinkedHashMap<Integer, String>(); - - } - if (pmNameFlag) { - // pmname handler, add columnNames - if ("N".equalsIgnoreCase(localName)) { - nameIndex = Integer.parseInt(getXMLAttribute(reader, "i")); - String text = reader.getElementText().trim(); - pmNames.put(nameIndex, text); - } - } - if ("PmData".equalsIgnoreCase(localName)) { - pmDataFlag = true; - pmDatas = new LinkedHashMap<String, String>(); - } - - if (pmDataFlag) { - // pmdata handler - if ("Object".equalsIgnoreCase(localName)) { - objectFlag = true; - int n = reader.getAttributeCount(); - for (int i = 0; i < n; i++) { - String name = reader.getAttributeLocalName(i); - commonNameAndValue.put(name, reader.getAttributeValue(i)); - } - } - if (objectFlag) { - - // add columnValues - if ("V".equalsIgnoreCase(localName)) { - String indexStr = getXMLAttribute(reader, "i"); - if (indexStr == null) { - log.error("ERROR: illegal value index"); - continue; - } - index = Integer.parseInt(indexStr); - String name = pmNames.get(index); - if (name == null) { - log.error("illegal data: valueIndex=" + index); - continue; - } - - String value = reader.getElementText().trim(); - pmDatas.put(name, value); - } - if ("CV".equalsIgnoreCase(localName)) { - - String indexStr = getXMLAttribute(reader, "i"); - if (indexStr == null) { - log.error("ERROR: illegal value index"); - continue; - } - index = Integer.parseInt(indexStr); - - currentMea = pmNames.get(index); - if (currentMea == null) { - log.error("illegal data: valueIndex=" + index); - continue; - } - } - - if ("SN".equalsIgnoreCase(localName)) { - subName = reader.getElementText().trim(); - - } - if ("SV".equalsIgnoreCase(localName)) { - String subValue = reader.getElementText().trim(); -// pmDatas.put(currentMea+subName, subValue); - pmDatas.put(subName, subValue); - } - } - } - - } - - break; - case XMLStreamConstants.CHARACTERS: - // ... - break; - case XMLStreamConstants.END_ELEMENT: - // ... - endLocalName = reader.getLocalName(); - if ("Object".equalsIgnoreCase(endLocalName)) { - objectFlag = false; - pmDatas.putAll(commonNameAndValue); - try { - pmResultChannel.put(pmDatas); - - } catch (InterruptedException e) { - log.error("collectResultChannel.put(resultMap) error ", e); - } -// System.out.println(pmDatas); -// pmDatas.clear(); - } - if (endLocalName.equalsIgnoreCase("PmData")) { - pmDataFlag = false; - } - - if (endLocalName.equalsIgnoreCase("PmName")) { - pmNameFlag = false; - } - if (endLocalName.equalsIgnoreCase("Measurements")) { - // a measurement over - measurementStart = false; - } - - if ("FileHeader".equalsIgnoreCase(endLocalName)) { - fileHeaderStart = false; - } - break; - } - } catch (Exception e) { - log.error("", e); - event = reader.next(); - } - } - - } catch (Exception e) { - log.error("processPMXml is Exception ", e); - return false; - } finally { - try { - if (reader != null) reader.close(); - if (isr != null) isr.close(); - if (fis != null) fis.close(); - } catch (Exception e) { - e.printStackTrace(); - } - } - return true; - } - - private String getXMLAttribute(XMLStreamReader reader, String obj) { - String res = null; - if (obj == null || reader == null) { - return res; - } - int n = reader.getAttributeCount(); - for (int i = 0; i < n; i++) { - String name = reader.getAttributeLocalName(i); - if (obj.equalsIgnoreCase(name)) { - res = reader.getAttributeValue(i); - } - } - return res; - } - - public boolean processPMCsv(File tempfile) { - - FileInputStream brs = null; - InputStreamReader isr = null; - BufferedReader br = null; - - List<String> columnNames = new ArrayList<String>(); - List<String> commonValues = new ArrayList<String>(); - try { - - brs = new FileInputStream(tempfile); - isr = new InputStreamReader(brs, Constant.ENCODING_UTF8); - br = new BufferedReader(isr); - //common field - String commonField = br.readLine(); - String[] fields = commonField.split("\\|", -1); - for (String com : fields) { - String[] comNameAndValue = com.split("=", 2); - columnNames.add(comNameAndValue[0].trim()); - commonValues.add(comNameAndValue[1]); - } - //column names - String columnName = br.readLine(); - String[] names = columnName.split("\\|", -1); - for (String name : names) { - columnNames.add(name); - } - - String valueLine = ""; - List<String> valuelist = new ArrayList<String>(); - - while ((valueLine = br.readLine()) != null) { - if (valueLine.trim().equals("")) { - continue; - } -// countNum ++; - String[] values = valueLine.split("\\|", -1); - - valuelist.addAll(commonValues); - for (String value : values) { - valuelist.add(value); - } -// this.appendLine(valuelist, bos); - //resultMap - HashMap<String, String> resultMap = this.resultMap(columnNames, valuelist); - try { - pmResultChannel.put(resultMap); - } catch (InterruptedException e) { - log.error("collectResultChannel.put(resultMap) error ", e); - } - valuelist.clear(); - } - } catch (IOException e) { - log.error("processPMCsv is fail ", e); - return false; - } finally { - try { - if (br != null) - br.close(); - if (isr != null) - isr.close(); - if (brs != null) - brs.close(); - - } catch (Exception e) { - log.error(e); - } - } - return true; - - } - - private HashMap<String, String> resultMap(List<String> columnNames, List<String> valuelist) { - - HashMap<String, String> resultMap = new HashMap<String, String>(); - if (columnNames.size() == valuelist.size()) { - for (int i = 0; i < columnNames.size(); i++) { - resultMap.put(columnNames.get(i), valuelist.get(i)); - } - } - - return resultMap; - - } - - private boolean processCMXml(File tempfile, String nename, String type) { - - String csvpath = localPath + nename + "/" + type + "/"; - File csvpathfile = new File(csvpath); - if (!csvpathfile.exists()) { - csvpathfile.mkdirs(); - } - String csvFileName = nename + dateFormat.format(new Date()) + System.nanoTime(); - String csvpathAndFileName = csvpath + csvFileName + ".csv"; - BufferedOutputStream bos = null; - FileOutputStream fos = null; - try { - fos = new FileOutputStream(csvpathAndFileName, false); - bos = new BufferedOutputStream(fos, 10240); - } catch (FileNotFoundException e1) { - log.error("FileNotFoundException " + StringUtil.getStackTrace(e1)); - } - - boolean FieldNameFlag = false; - boolean FieldValueFlag = false; - //line num - int countNum = 0; - String xmlPathAndFileName = null; - String localName = null; - String endLocalName = null; - String rmUID = null; - int index = -1; - ArrayList<String> names = new ArrayList<String>();// colname - LinkedHashMap<String, String> nameAndValue = new LinkedHashMap<String, String>(); - - - FileInputStream fis = null; - InputStreamReader isr = null; - XMLStreamReader reader = null; - try { - fis = new FileInputStream(tempfile); - isr = new InputStreamReader(fis, Constant.ENCODING_UTF8); - XMLInputFactory fac = XMLInputFactory.newInstance(); - reader = fac.createXMLStreamReader(isr); - int event = -1; - boolean setcolum = true; - while (reader.hasNext()) { - try { - event = reader.next(); - switch (event) { - case XMLStreamConstants.START_ELEMENT: - localName = reader.getLocalName(); - if ("FieldName".equalsIgnoreCase(localName)) { - FieldNameFlag = true; - } - if (FieldNameFlag) { - if ("N".equalsIgnoreCase(localName)) { - String colName = reader.getElementText().trim(); - names.add(colName); - } - } - if ("FieldValue".equalsIgnoreCase(localName)) { - FieldValueFlag = true; - - } - if (FieldValueFlag) { - if (setcolum) { - xmlPathAndFileName = this.setColumnNames(nename, names, type); - setcolum = false; - } - - if ("Object".equalsIgnoreCase(localName)) { - int ac = reader.getAttributeCount(); - for (int i = 0; i < ac; i++) { - if ("rmUID".equalsIgnoreCase(reader.getAttributeLocalName(i))) { - rmUID = reader.getAttributeValue(i).trim(); - } - } - nameAndValue.put("rmUID", rmUID); - } - if ("V".equalsIgnoreCase(localName)) { - index = Integer.parseInt(reader - .getAttributeValue(0)) - 1; - String currentName = names.get(index); - String v = reader.getElementText().trim(); - nameAndValue.put(currentName, v); - } - } - break; - case XMLStreamConstants.CHARACTERS: - break; - case XMLStreamConstants.END_ELEMENT: - endLocalName = reader.getLocalName(); - - if ("FieldName".equalsIgnoreCase(endLocalName)) { - FieldNameFlag = false; - } - if ("FieldValue".equalsIgnoreCase(endLocalName)) { - FieldValueFlag = false; - } - if ("Object".equalsIgnoreCase(endLocalName)) { - countNum++; - this.appendLine(nameAndValue, bos); - nameAndValue.clear(); - } - break; - } - } catch (Exception e) { - log.error("" + StringUtil.getStackTrace(e)); - event = reader.next(); - } - } - - - if (bos != null) { - bos.close(); - bos = null; - } - if (fos != null) { - fos.close(); - fos = null; - } - - String[] fileKeys = this.createZipFile(csvpathAndFileName, xmlPathAndFileName, nename); - //ftp store - Properties ftpPro = configurationInterface.getProperties(); - String ip = ftpPro.getProperty("ftp_ip"); - String port = ftpPro.getProperty("ftp_port"); - String ftp_user = ftpPro.getProperty("ftp_user"); - String ftp_password = ftpPro.getProperty("ftp_password"); - - String ftp_passive = ftpPro.getProperty("ftp_passive"); - String ftp_type = ftpPro.getProperty("ftp_type"); - String remoteFile = ftpPro.getProperty("ftp_remote_path"); - this.ftpStore(fileKeys, ip, port, ftp_user, ftp_password, ftp_passive, ftp_type, remoteFile); - //create Message - String message = this.createMessage(fileKeys[1], ftp_user, ftp_password, ip, port, countNum, nename); - - //set message - this.setMessage(message); - } catch (Exception e) { - log.error("" + StringUtil.getStackTrace(e)); - return false; - } finally { - try { - if (reader != null) { - reader.close(); - } - if (isr != null) { - isr.close(); - } - if (fis != null) { - fis.close(); - } - if (bos != null) { - bos.close(); - } - - if (fos != null) { - fos.close(); - } - } catch (Exception e) { - log.error(e); - } - } - return true; - } - - private void setMessage(String message) { - - try { - cmResultChannel.put(message); - } catch (Exception e) { - log.error("collectResultChannel.put(message) is error " + StringUtil.getStackTrace(e)); - } - } - - public String createMessage(String zipName, String user, String pwd, String ip, String port, int countNum, String nename) { - - StringBuffer strBuffer = new StringBuffer(); - strBuffer - .append("<?xml version=\"1.0\" encoding=\"UTF-8\"?>" - + "<FILE_DATA_READY_UL xmlns:xsi=\" http://www.w3.org/2001/XMLSchema-instance\">" - + "<Header SessionID=\""); - strBuffer.append(""); - strBuffer.append("\" LicenceID=\""); - strBuffer.append(""); - strBuffer.append("\" SystemID=\""); - strBuffer.append(""); - strBuffer.append("\" Time=\""); - strBuffer.append(dateFormat2.format(new Date())); - strBuffer.append("\" PolicyID=\""); - strBuffer.append(""); - strBuffer.append("\"/><Body>"); - strBuffer.append("<DataCatalog>"); - strBuffer.append(""); - strBuffer.append("</DataCatalog><GroupID>"); - strBuffer.append(nename); - strBuffer.append("</GroupID><DataSourceName>"); - strBuffer.append(""); - strBuffer.append("</DataSourceName><InstanceID>"); - strBuffer.append(""); - strBuffer.append("</InstanceID><FileFormat>"); - strBuffer.append("csv"); - strBuffer.append("</FileFormat><CharSet>"); - strBuffer.append("gbk"); - strBuffer.append("</CharSet><FieldSeparator>"); - strBuffer.append("|"); - strBuffer.append("</FieldSeparator><IsCompressed>"); - strBuffer.append("true"); - strBuffer.append("</IsCompressed><StartTime>"); - strBuffer.append(dateFormat2.format(new Date())); - strBuffer.append("</StartTime><EndTime>"); - strBuffer.append(""); - strBuffer.append("</EndTime><FileList>"); - strBuffer.append(zipName); - strBuffer.append("</FileList><ConnectionString>"); - strBuffer.append("ftp://" + user + ":" + pwd + "@" + ip + ":" + port); - strBuffer.append("</ConnectionString>"); - strBuffer.append("<DataCount>"); - strBuffer.append(countNum); - strBuffer.append("</DataCount>"); - - strBuffer.append("<FileSize>").append("").append("</FileSize>"); - strBuffer.append("<DataGranularity>").append("").append("</DataGranularity>"); - - - strBuffer.append("</Body></FILE_DATA_READY_UL>"); - return strBuffer.toString(); - - } - - private void ftpStore(String[] fileKeys, String ip, String port, String ftp_user, String ftp_password, - String ftp_passive, String ftp_type, String remoteFile) { - String zipFilePath = fileKeys[0]; - - - FTPInterface ftpClient; - ftpClient = new FTPSrv(); - //login - try { - ftpClient.login(ip, Integer.parseInt(port), ftp_user, ftp_password, "GBK", Boolean.parseBoolean(ftp_passive), 5 * 60 * 1000); - } catch (Exception e) { - log.error("login fail,ip=[" + ip + "] port=[" + port + "] user=[" + ftp_user + "]pwd=[" + ftp_password + "]" + StringUtil.getStackTrace(e)); - return; - } -// ftpClient.store(zipFilePath, remoteFile); - log.debug("store [" + zipFilePath + "]to[" + remoteFile + "]"); - - FileUtils.deleteQuietly(new File(zipFilePath)); - - - } - - private String[] createZipFile(String csvpathAndFileName, String xmlPathAndFileName, String nename) { - - String zipPath = resultPath + nename + dateFormat.format(new Date()) + "_" + System.nanoTime(); - - File destDir = new File(zipPath); - destDir.mkdirs(); - - try { - FileUtils.copyFileToDirectory(new File(csvpathAndFileName), destDir); - FileUtils.copyFileToDirectory(new File(xmlPathAndFileName), destDir); - } catch (IOException e) { - - } - - String destFilePath = zipPath + ".zip"; - try { - Zip zip = new Zip(destDir.getAbsolutePath(), destFilePath); - zip.setCompressLevel(9); - zip.compress(); - - FileUtils.deleteDirectory(destDir); - } catch (IOException e) { - log.error("zip.compress() is fail " + StringUtil.getStackTrace(e)); - } - return new String[]{destFilePath, zipPath + ".zip"}; - } - - - private String setColumnNames(String nename, List<String> names, String type) { - //write xml - String xmlpath = localPath + nename + "/" + type + "/"; - File xmlpathfile = new File(xmlpath); - if (!xmlpathfile.exists()) { - xmlpathfile.mkdirs(); - } - String xmlFileName = nename + dateFormat.format(new Date()) + System.nanoTime(); - String fieldLine = ""; - for (int i = 0; i < names.size(); i++) { - String field = "\t<Field>\r\n" + "\t\t<FieldNo>" + i - + "</FieldNo>\r\n" + "\t\t<FieldName>" - + names.get(i) + "</FieldName>\r\n" - + "\t\t<FieldType>" + names.get(i) - + "</FieldType>\r\n" - + "\t\t<FieldNameOther>" + names.get(i) - + "</FieldNameOther>\r\n" + - "\t</Field>\r\n"; - fieldLine = fieldLine + field; - } - - String str = "<?xml version=\"1.0\" encoding=\"gbk\"?>\r\n" - + "<xml>\r\n" + "<FILE_STRUCTURE>\r\n" + fieldLine - + "</FILE_STRUCTURE>\r\n" + "</xml>\r\n"; - String xmlPathAndFileName = xmlpath + xmlFileName + ".xml"; - try { - this.writeDetail(xmlPathAndFileName, str); - } catch (Exception e) { - log.error("writeDetail is fail ,xmlFileName=" + xmlFileName + StringUtil.getStackTrace(e)); - } - - return xmlPathAndFileName; - } - - private void writeDetail(String detailFileName, String str) throws Exception { - OutputStreamWriter writer = null; - OutputStream readOut = null; - try { - readOut = new FileOutputStream(new File(detailFileName), false); - writer = new OutputStreamWriter(readOut); - writer.write(str); - writer.flush(); - } finally { - - if (null != writer) { - writer.close(); - } - if (readOut != null) { - readOut.close(); - } - - } - - } - - - private void appendLine(LinkedHashMap<String, String> nameAndValue, BufferedOutputStream bos) { - StringBuilder lineDatas = new StringBuilder(); - - for (String key : nameAndValue.keySet()) { - lineDatas.append(nameAndValue.get(key)).append("|"); - } - try { - bos.write(lineDatas.toString().getBytes()); - bos.write("\n".getBytes()); - } catch (IOException e) { - log.error("appendLine error " + StringUtil.getStackTrace(e)); - } - } - -// private void appendLine(List<String> values,BufferedOutputStream bos) { -// StringBuilder lineDatas = new StringBuilder(); -// -// for (String value : values) { -// lineDatas.append(value).append("|"); -// } -// try { -// bos.write(lineDatas.toString().getBytes()); -// bos.write("\n".getBytes()); -// } catch (IOException e) { -// log.error("appendLine error "+StringUtil.getStackTrace(e)); -// } -// } - - public List<File> decompressed(String fileName) { - List<File> filelist = new ArrayList<File>(); - - if (fileName.indexOf(".gz") > 1) { - try { - File decompressFile = deGz(fileName); - filelist.add(decompressFile); - new File(fileName).delete(); - } catch (IOException e) { - log.error("decompressed is fail " + StringUtil.getStackTrace(e)); - } - } else if (fileName.indexOf(".zip") > 1) { - try { - File[] files = deZip(new File(fileName)); - new File(fileName).delete(); - for (File temp : files) { - filelist.add(temp); - } - } catch (Exception e) { - log.error("decompressed is fail " + StringUtil.getStackTrace(e)); - } - } else { - filelist.add(new File(fileName)); - } - - return filelist; - } - - private File deGz(String gzFileName) throws IOException { - Gunzip gunzip = new Gunzip(); - String orgFile = gzFileName.replace(".gz", ""); - gunzip.unCompress(gzFileName, orgFile); - return new File(orgFile); - } - - public File[] deZip(File file) throws Exception { - - String regx = "(.*).zip"; - Pattern p = Pattern.compile(regx); - Matcher m = p.matcher(file.getName()); - if (m.find()) { - String orgFile = localPath + m.group(1) + "/"; - UnZip unzip = new UnZip(file.getAbsolutePath(), orgFile); - unzip.deCompress(); - file = new File(orgFile); - } - File[] files = file.listFiles(); - - return files; - - } - - private List<String> ftpDownload(CollectVo collectVo) { - - List<String> fileList = new ArrayList<String>(); - //IP - String ip = collectVo.getIP(); - //port - String port = collectVo.getPort(); - //user - String user = collectVo.getUser(); - //password - String password = collectVo.getPassword(); - //isPassiveMode - String passivemode = collectVo.getPassive(); - - FTPInterface ftpClient = new FTPSrv(); - - //login - try { - log.info("ftp login ,ip=[" + ip + "] port=[" + port + "] user=[" + user + "]password=[" + password + "]"); - ftpClient.login(ip, Integer.parseInt(port), user, password, "GBK", Boolean.parseBoolean(passivemode), 5 * 60 * 1000); - } catch (Exception e) { - log.error("login fail,ip=[" + ip + "] port=[" + port + "] user=[" + user + "]password=[" + password + "]" + StringUtil.getStackTrace(e)); - return fileList; - } - - //download - String dir = collectVo.getRemotepath(); - List<String> searchExprList = new ArrayList<String>(); - String[] FPath = dir.split(";"); - for (int i = 0; i < FPath.length; i++) { - int oldSize = searchExprList.size(); - String conpath = FPath[i] + collectVo.getMatch(); - Hashtable<String, String> varMap = new Hashtable<String, String>(); - long collectPeriod = 900; - try { - collectPeriod = Long.parseLong(collectVo.getGranularity()) * 60; - log.info("collectPeriod =[" + collectPeriod + "]"); - } catch (NumberFormatException e) { - e.printStackTrace(); - } - long[] d = DateUtil.getScanScope(new Date(), collectPeriod); - searchExprList.add(VarExprParser.replaceVar(conpath, d[0], d[1])); - - varMap.clear(); - varMap = null; - log.info("[" + conpath + "] result[" + (searchExprList.size() - oldSize) + "] path"); - conpath = null; - } - String nowdir = null; - try { - nowdir = ftpClient.pwd(); - searchExprList = getPathNoRegular(searchExprList, ftpClient); - } catch (Exception e1) { - log.error(" collect fail ", e1); - return fileList; - } - List<AFtpRemoteFile> remoteFiles = new ArrayList<AFtpRemoteFile>(); - for (String expr : searchExprList) { - ftpClient.chdir(nowdir); - String keys[] = parseExprKeys(expr); - String ftpRegular = keys[1]; - String ftpDir = keys[0]; - - boolean cdsucess = ftpClient.chdir(ftpDir); - if (cdsucess) { - AFtpRemoteFile[] arf = (AFtpRemoteFile[]) ftpClient.list(); - log.info(" list [" + ftpDir + "] result[" + (arf == null ? "null" : arf.length) + "] files"); - //filter - - rfileFilter(remoteFiles, arf, ftpRegular); - - keys = null; - ftpRegular = ftpDir = null; - - for (AFtpRemoteFile ftpRemoteFile : remoteFiles) { - if (!new File(localPath).exists()) { - try { - new File(localPath).mkdir(); - } catch (Exception e) { - log.error("create localPath is fail localPath=" + localPath + " " + StringUtil.getStackTrace(e)); - } - } - - if (!new File(localPath).exists()) { - new File(localPath).mkdirs(); - } - - String localFileName = localPath + ftpRemoteFile.getFileName(); - File loaclFile = new File(localFileName); - if (loaclFile.exists()) { - loaclFile.delete(); - } - - boolean flag = ftpClient.downloadFile(ftpRemoteFile.getAbsFileName(), localFileName); - - if (flag) { - fileList.add(localFileName); - } else { - log.error("download file fail fileName=" + ftpRemoteFile.getAbsFileName()); - } - } - - } else { - log.error("cd dir is faill dir =[" + ftpDir + "]"); - } - } - - - return fileList; - } - - private void rfileFilter(List<AFtpRemoteFile> fileContainer, AFtpRemoteFile[] arfs, String ftpRegular) { - if (ftpRegular != null && ftpRegular.length() > 0) { - Pattern pattern = null; - try { - pattern = Pattern.compile(ftpRegular, Pattern.CASE_INSENSITIVE); - } catch (Exception e) { - log.info("[" + ftpRegular + "]Pattern.compile exception:" + e.getMessage()); - //should rethrow exception or return from here - } - int hisSize = fileContainer.size(); - for (int j = 0; arfs != null && j < arfs.length; j++) { - String fileName = parseFileName(arfs[j].getFileName()); - Matcher matcher = null; - if (pattern != null) - matcher = pattern.matcher(fileName); - else { - //define the flow when pattern is null - } - - if (matcher.find()) - fileContainer.add(arfs[j]); - } - log.info("[" + ftpRegular + "]filter[" + (fileContainer.size() - hisSize) + "]filse"); - pattern = null; - } else { - for (int j = 0; arfs != null && j < arfs.length; j++) - fileContainer.add(arfs[j]); - } - - } - - private String parseFileName(String fileName) { - int idx = fileName.lastIndexOf("/"); - if (idx == -1) - return fileName; - return fileName.substring(idx + 1, fileName.length()); - } - - private String[] parseExprKeys(String source) { - - if (source.indexOf(";") > -1) { - source = source.substring(0, source.indexOf(";")); - } - if (source.endsWith("/")) - return new String[]{source, ""}; - - int idx = source.lastIndexOf("/"); - String[] dirkeys = new String[2]; - dirkeys[0] = source.substring(0, idx + 1); - dirkeys[1] = source.substring(idx + 1, source.length()); - return dirkeys; - } - - - public List<String> getPathNoRegular(List<String> searchExprList, FTPInterface ftpCache) throws Exception { - boolean isregular = false; - List<String> regularList = new ArrayList<String>(); - for (String regular : searchExprList) { - Pattern lpattern = null; - try { - lpattern = Pattern.compile("(.*/)<([^/]+)>(/.*)"); - } catch (Exception e) { - log.error("[" + regular + "]compile fails:" + e.getMessage()); - e.printStackTrace(); - } - Matcher matcher=null; - if(lpattern!=null) - matcher = lpattern.matcher(regular); - else{ - //define flow in case lpattern is null - } - - if (matcher.find()) { - isregular = true; - String parpath = matcher.group(1); - try { - boolean isin = ftpCache.chdir(parpath); - if (isin) { - log.info("cd dir [" + parpath + "] sucess"); - } else { - log.error("cd dir [" + parpath + "] fail"); - } - } catch (Exception e) { - log.error(" cd dir [" + parpath + "]fail", e); - throw e; - } - RemoteFile[] remotef = ftpCache.list(); - for (RemoteFile aremote : remotef) { - if (aremote.isDirectory() && aremote.getFileName().matches(matcher.group(2))) { - regularList.add(matcher.group(1) + aremote.getFileName() + matcher.group(3)); - } - } - } else { - regularList.add(regular); - } - } - if (isregular == true) { - getPathNoRegular(regularList, ftpCache); - } - return regularList; - } + public Log log = LogFactory.getLog(TaskThread.class); + public MessageChannel pmResultChannel; + private MessageChannel cmResultChannel; + private CollectMsg data; + + private ConfigurationInterface configurationInterface = new ConfigurationImp(); + + private String localPath = Constant.SYS_DATA_TEMP; + private String resultPath = Constant.SYS_DATA_RESULT; + + private SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMddHHmmss"); + + private SimpleDateFormat dateFormat2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + + // private String csvpathAndFileName; + // private String xmlPathAndFileName; + // private int countNum = 0 ; + + public TaskThread(CollectMsg data) { + this.data = data; + } + + public TaskThread() { + super(); + } + + @Override + public void run() { + + cmResultChannel = MessageChannelFactory.getMessageChannel(Constant.COLLECT_RESULT_CHANNEL_KEY); + pmResultChannel = MessageChannelFactory.getMessageChannel(Constant.COLLECT_RESULT_PM_CHANNEL_KEY); + try { + collectMsgHandle(data); + } catch (Exception e) { + log.error("", e); + } + } + + private void collectMsgHandle(CollectMsg collectMsg) { + String emsName = collectMsg.getEmsName(); + String type = collectMsg.getType(); + CollectVo collectVo = configurationInterface.getCollectVoByEmsNameAndType(emsName, type); + + // ftp download + List<String> downloadfiles = this.ftpDownload(collectVo); + // paser ftp update message send + for (String fileName : downloadfiles) { + this.parseFtpAndSendMessage(fileName, collectVo); + } + } + + public void parseFtpAndSendMessage(String fileName, CollectVo collectVo) { + // + List<File> filelist = decompressed(fileName); + + for (File tempfile : filelist) { + + String unfileName = tempfile.getName(); + + Pattern pa = Pattern.compile(".*-(.*)-\\w{2}-"); + Matcher ma = pa.matcher(unfileName); + if (!ma.find()) + continue; + String nename = ma.group(1); + boolean parseResult = false; + if (Constant.COLLECT_TYPE_CM.equalsIgnoreCase(collectVo.getType())) { + parseResult = processCMXml(tempfile, nename, "CM"); + } else { + if (unfileName.indexOf(".csv") > 0) { + parseResult = processPMCsv(tempfile); + } else { + parseResult = processPMXml(tempfile); + } + } + + if (parseResult) { + log.info("parser " + tempfile + " sucess"); + tempfile.delete(); + } else { + log.info("parser " + tempfile + " fail"); + } + + } + } + + public boolean processPMXml(File file) { + + FileInputStream fis = null; + InputStreamReader isr = null; + XMLStreamReader reader = null; + try { + fis = new FileInputStream(file); + isr = new InputStreamReader(fis, Constant.ENCODING_UTF8); + + XMLInputFactory fac = XMLInputFactory.newInstance(); + reader = fac.createXMLStreamReader(isr); + + boolean fileHeaderStart = false; + boolean measurementStart = false; + boolean pmNameFlag = false; + boolean pmDataFlag = false; + boolean objectFlag = true; + + int index = -1; + int nameIndex = -1; + String currentMea = null; + String subName = null; + String localName = null; + String endLocalName = null; + String objectType = null; + + LinkedHashMap<String, String> commonNameAndValue = new LinkedHashMap<String, String>(); + LinkedHashMap<String, String> pmDatas = null; + LinkedHashMap<Integer, String> pmNames = null; + + int event = -1; + while (reader.hasNext()) { + try { + event = reader.next(); + + switch (event) { + case XMLStreamConstants.START_ELEMENT: + localName = reader.getLocalName(); + if ("FileHeader".equalsIgnoreCase(localName)) { + fileHeaderStart = true; + } + if (fileHeaderStart) { + if (!"FileHeader".equalsIgnoreCase(localName)) { + commonNameAndValue.put(localName, reader.getElementText().trim()); + } + + } + if ("Measurements".equalsIgnoreCase(localName)) { + // a new Measurement starts + measurementStart = true; + } + if (measurementStart) { + // measurement handler + if ("ObjectType".equalsIgnoreCase(localName)) { + objectType = reader.getElementText().trim(); + commonNameAndValue.put("ObjectType", objectType); + } + if ("PmName".equalsIgnoreCase(localName)) { + pmNameFlag = true; + pmNames = new LinkedHashMap<Integer, String>(); + + } + if (pmNameFlag) { + // pmname handler, add columnNames + if ("N".equalsIgnoreCase(localName)) { + nameIndex = Integer.parseInt(getXMLAttribute(reader, "i")); + String text = reader.getElementText().trim(); + pmNames.put(nameIndex, text); + } + } + if ("PmData".equalsIgnoreCase(localName)) { + pmDataFlag = true; + pmDatas = new LinkedHashMap<String, String>(); + } + + if (pmDataFlag) { + // pmdata handler + if ("Object".equalsIgnoreCase(localName)) { + objectFlag = true; + int n = reader.getAttributeCount(); + for (int i = 0; i < n; i++) { + String name = reader.getAttributeLocalName(i); + commonNameAndValue.put(name, reader.getAttributeValue(i)); + } + } + if (objectFlag) { + + // add columnValues + if ("V".equalsIgnoreCase(localName)) { + String indexStr = getXMLAttribute(reader, "i"); + if (indexStr == null) { + log.error("ERROR: illegal value index"); + continue; + } + index = Integer.parseInt(indexStr); + String name = pmNames.get(index); + if (name == null) { + log.error("illegal data: valueIndex=" + index); + continue; + } + + String value = reader.getElementText().trim(); + pmDatas.put(name, value); + } + if ("CV".equalsIgnoreCase(localName)) { + + String indexStr = getXMLAttribute(reader, "i"); + if (indexStr == null) { + log.error("ERROR: illegal value index"); + continue; + } + index = Integer.parseInt(indexStr); + + currentMea = pmNames.get(index); + if (currentMea == null) { + log.error("illegal data: valueIndex=" + index); + continue; + } + } + + if ("SN".equalsIgnoreCase(localName)) { + subName = reader.getElementText().trim(); + + } + if ("SV".equalsIgnoreCase(localName)) { + String subValue = reader.getElementText().trim(); + // pmDatas.put(currentMea+subName, + // subValue); + pmDatas.put(subName, subValue); + } + } + } + + } + + break; + case XMLStreamConstants.CHARACTERS: + // ... + break; + case XMLStreamConstants.END_ELEMENT: + // ... + endLocalName = reader.getLocalName(); + if ("Object".equalsIgnoreCase(endLocalName)) { + objectFlag = false; + pmDatas.putAll(commonNameAndValue); + try { + pmResultChannel.put(pmDatas); + + } catch (InterruptedException e) { + log.error("collectResultChannel.put(resultMap) error ", e); + } + // System.out.println(pmDatas); + // pmDatas.clear(); + } + if (endLocalName.equalsIgnoreCase("PmData")) { + pmDataFlag = false; + } + + if (endLocalName.equalsIgnoreCase("PmName")) { + pmNameFlag = false; + } + if (endLocalName.equalsIgnoreCase("Measurements")) { + // a measurement over + measurementStart = false; + } + + if ("FileHeader".equalsIgnoreCase(endLocalName)) { + fileHeaderStart = false; + } + break; + } + } catch (Exception e) { + log.error("", e); + event = reader.next(); + } + } + + } catch (Exception e) { + log.error("processPMXml is Exception ", e); + return false; + } finally { + try { + if (reader != null) + reader.close(); + if (isr != null) + isr.close(); + if (fis != null) + fis.close(); + } catch (Exception e) { + e.printStackTrace(); + } + } + return true; + } + + private String getXMLAttribute(XMLStreamReader reader, String obj) { + String res = null; + if (obj == null || reader == null) { + return res; + } + int n = reader.getAttributeCount(); + for (int i = 0; i < n; i++) { + String name = reader.getAttributeLocalName(i); + if (obj.equalsIgnoreCase(name)) { + res = reader.getAttributeValue(i); + } + } + return res; + } + + public boolean processPMCsv(File tempfile) { + + FileInputStream brs = null; + InputStreamReader isr = null; + BufferedReader br = null; + + List<String> columnNames = new ArrayList<String>(); + List<String> commonValues = new ArrayList<String>(); + try { + + brs = new FileInputStream(tempfile); + isr = new InputStreamReader(brs, Constant.ENCODING_UTF8); + br = new BufferedReader(isr); + // common field + String commonField = br.readLine(); + String[] fields = commonField.split("\\|", -1); + for (String com : fields) { + String[] comNameAndValue = com.split("=", 2); + columnNames.add(comNameAndValue[0].trim()); + commonValues.add(comNameAndValue[1]); + } + // column names + String columnName = br.readLine(); + String[] names = columnName.split("\\|", -1); + for (String name : names) { + columnNames.add(name); + } + + String valueLine = ""; + List<String> valuelist = new ArrayList<String>(); + + while ((valueLine = br.readLine()) != null) { + if (valueLine.trim().equals("")) { + continue; + } + // countNum ++; + String[] values = valueLine.split("\\|", -1); + + valuelist.addAll(commonValues); + for (String value : values) { + valuelist.add(value); + } + // this.appendLine(valuelist, bos); + // resultMap + HashMap<String, String> resultMap = this.resultMap(columnNames, valuelist); + try { + pmResultChannel.put(resultMap); + } catch (InterruptedException e) { + log.error("collectResultChannel.put(resultMap) error ", e); + throw new RuntimeException(e); + } + valuelist.clear(); + } + } catch (IOException e) { + log.error("processPMCsv is fail ", e); + return false; + } finally { + try { + if (br != null) + br.close(); + if (isr != null) + isr.close(); + if (brs != null) + brs.close(); + + } catch (Exception e) { + log.error(e); + } + } + return true; + + } + + private HashMap<String, String> resultMap(List<String> columnNames, List<String> valuelist) { + + HashMap<String, String> resultMap = new HashMap<String, String>(); + if (columnNames.size() == valuelist.size()) { + for (int i = 0; i < columnNames.size(); i++) { + resultMap.put(columnNames.get(i), valuelist.get(i)); + } + } + + return resultMap; + + } + + private boolean processCMXml(File tempfile, String nename, String type) { + + String csvpath = localPath + nename + "/" + type + "/"; + File csvpathfile = new File(csvpath); + if (!csvpathfile.exists()) { + csvpathfile.mkdirs(); + } + String csvFileName = nename + dateFormat.format(new Date()) + System.nanoTime(); + String csvpathAndFileName = csvpath + csvFileName + ".csv"; + BufferedOutputStream bos = null; + FileOutputStream fos = null; + try { + fos = new FileOutputStream(csvpathAndFileName, false); + bos = new BufferedOutputStream(fos, 10240); + } catch (FileNotFoundException e1) { + log.error("FileNotFoundException " + StringUtil.getStackTrace(e1)); + } + + boolean FieldNameFlag = false; + boolean FieldValueFlag = false; + // line num + int countNum = 0; + String xmlPathAndFileName = null; + String localName = null; + String endLocalName = null; + String rmUID = null; + int index = -1; + ArrayList<String> names = new ArrayList<String>();// colname + LinkedHashMap<String, String> nameAndValue = new LinkedHashMap<String, String>(); + + FileInputStream fis = null; + InputStreamReader isr = null; + XMLStreamReader reader = null; + try { + fis = new FileInputStream(tempfile); + isr = new InputStreamReader(fis, Constant.ENCODING_UTF8); + XMLInputFactory fac = XMLInputFactory.newInstance(); + reader = fac.createXMLStreamReader(isr); + int event = -1; + boolean setcolum = true; + while (reader.hasNext()) { + try { + event = reader.next(); + switch (event) { + case XMLStreamConstants.START_ELEMENT: + localName = reader.getLocalName(); + if ("FieldName".equalsIgnoreCase(localName)) { + FieldNameFlag = true; + } + if (FieldNameFlag) { + if ("N".equalsIgnoreCase(localName)) { + String colName = reader.getElementText().trim(); + names.add(colName); + } + } + if ("FieldValue".equalsIgnoreCase(localName)) { + FieldValueFlag = true; + + } + if (FieldValueFlag) { + if (setcolum) { + xmlPathAndFileName = this.setColumnNames(nename, names, type); + setcolum = false; + } + + if ("Object".equalsIgnoreCase(localName)) { + int ac = reader.getAttributeCount(); + for (int i = 0; i < ac; i++) { + if ("rmUID".equalsIgnoreCase(reader.getAttributeLocalName(i))) { + rmUID = reader.getAttributeValue(i).trim(); + } + } + nameAndValue.put("rmUID", rmUID); + } + if ("V".equalsIgnoreCase(localName)) { + index = Integer.parseInt(reader.getAttributeValue(0)) - 1; + String currentName = names.get(index); + String v = reader.getElementText().trim(); + nameAndValue.put(currentName, v); + } + } + break; + case XMLStreamConstants.CHARACTERS: + break; + case XMLStreamConstants.END_ELEMENT: + endLocalName = reader.getLocalName(); + + if ("FieldName".equalsIgnoreCase(endLocalName)) { + FieldNameFlag = false; + } + if ("FieldValue".equalsIgnoreCase(endLocalName)) { + FieldValueFlag = false; + } + if ("Object".equalsIgnoreCase(endLocalName)) { + countNum++; + this.appendLine(nameAndValue, bos); + nameAndValue.clear(); + } + break; + } + } catch (Exception e) { + log.error("" + StringUtil.getStackTrace(e)); + event = reader.next(); + } + } + + if (bos != null) { + bos.close(); + bos = null; + } + if (fos != null) { + fos.close(); + fos = null; + } + + String[] fileKeys = this.createZipFile(csvpathAndFileName, xmlPathAndFileName, nename); + // ftp store + Properties ftpPro = configurationInterface.getProperties(); + String ip = ftpPro.getProperty("ftp_ip"); + String port = ftpPro.getProperty("ftp_port"); + String ftp_user = ftpPro.getProperty("ftp_user"); + String ftp_password = ftpPro.getProperty("ftp_password"); + + String ftp_passive = ftpPro.getProperty("ftp_passive"); + String ftp_type = ftpPro.getProperty("ftp_type"); + String remoteFile = ftpPro.getProperty("ftp_remote_path"); + this.ftpStore(fileKeys, ip, port, ftp_user, ftp_password, ftp_passive, ftp_type, remoteFile); + // create Message + String message = this.createMessage(fileKeys[1], ftp_user, ftp_password, ip, port, countNum, nename); + + // set message + this.setMessage(message); + } catch (Exception e) { + log.error("" + StringUtil.getStackTrace(e)); + return false; + } finally { + try { + if (reader != null) { + reader.close(); + } + if (isr != null) { + isr.close(); + } + if (fis != null) { + fis.close(); + } + if (bos != null) { + bos.close(); + } + + if (fos != null) { + fos.close(); + } + } catch (Exception e) { + log.error(e); + } + } + return true; + } + + private void setMessage(String message) { + + try { + cmResultChannel.put(message); + } catch (Exception e) { + log.error("collectResultChannel.put(message) is error " + StringUtil.getStackTrace(e)); + } + } + + public String createMessage(String zipName, String user, String pwd, String ip, String port, int countNum, + String nename) { + + StringBuffer strBuffer = new StringBuffer(); + strBuffer.append("<?xml version=\"1.0\" encoding=\"UTF-8\"?>" + + "<FILE_DATA_READY_UL xmlns:xsi=\" http://www.w3.org/2001/XMLSchema-instance\">" + + "<Header SessionID=\""); + strBuffer.append(""); + strBuffer.append("\" LicenceID=\""); + strBuffer.append(""); + strBuffer.append("\" SystemID=\""); + strBuffer.append(""); + strBuffer.append("\" Time=\""); + strBuffer.append(dateFormat2.format(new Date())); + strBuffer.append("\" PolicyID=\""); + strBuffer.append(""); + strBuffer.append("\"/><Body>"); + strBuffer.append("<DataCatalog>"); + strBuffer.append(""); + strBuffer.append("</DataCatalog><GroupID>"); + strBuffer.append(nename); + strBuffer.append("</GroupID><DataSourceName>"); + strBuffer.append(""); + strBuffer.append("</DataSourceName><InstanceID>"); + strBuffer.append(""); + strBuffer.append("</InstanceID><FileFormat>"); + strBuffer.append("csv"); + strBuffer.append("</FileFormat><CharSet>"); + strBuffer.append("gbk"); + strBuffer.append("</CharSet><FieldSeparator>"); + strBuffer.append("|"); + strBuffer.append("</FieldSeparator><IsCompressed>"); + strBuffer.append("true"); + strBuffer.append("</IsCompressed><StartTime>"); + strBuffer.append(dateFormat2.format(new Date())); + strBuffer.append("</StartTime><EndTime>"); + strBuffer.append(""); + strBuffer.append("</EndTime><FileList>"); + strBuffer.append(zipName); + strBuffer.append("</FileList><ConnectionString>"); + strBuffer.append("ftp://" + user + ":" + pwd + "@" + ip + ":" + port); + strBuffer.append("</ConnectionString>"); + strBuffer.append("<DataCount>"); + strBuffer.append(countNum); + strBuffer.append("</DataCount>"); + + strBuffer.append("<FileSize>").append("").append("</FileSize>"); + strBuffer.append("<DataGranularity>").append("").append("</DataGranularity>"); + + strBuffer.append("</Body></FILE_DATA_READY_UL>"); + return strBuffer.toString(); + + } + + private void ftpStore(String[] fileKeys, String ip, String port, String ftp_user, String ftp_password, + String ftp_passive, String ftp_type, String remoteFile) { + String zipFilePath = fileKeys[0]; + + FTPInterface ftpClient; + ftpClient = new FTPSrv(); + // login + try { + ftpClient.login(ip, Integer.parseInt(port), ftp_user, ftp_password, "GBK", + Boolean.parseBoolean(ftp_passive), 5 * 60 * 1000); + } catch (Exception e) { + log.error("login fail,ip=[" + ip + "] port=[" + port + "] user=[" + ftp_user + "]pwd=[" + ftp_password + "]" + + StringUtil.getStackTrace(e)); + return; + } + // ftpClient.store(zipFilePath, remoteFile); + log.debug("store [" + zipFilePath + "]to[" + remoteFile + "]"); + + FileUtils.deleteQuietly(new File(zipFilePath)); + + } + + private String[] createZipFile(String csvpathAndFileName, String xmlPathAndFileName, String nename) + throws IOException { + + String zipPath = resultPath + nename + dateFormat.format(new Date()) + "_" + System.nanoTime(); + + File destDir = new File(zipPath); + destDir.mkdirs(); + + try { + FileUtils.copyFileToDirectory(new File(csvpathAndFileName), destDir); + FileUtils.copyFileToDirectory(new File(xmlPathAndFileName), destDir); + } catch (IOException e) { + throw e; + // flow should end here in case of exception + } + + String destFilePath = zipPath + ".zip"; + try { + Zip zip = new Zip(destDir.getAbsolutePath(), destFilePath); + zip.setCompressLevel(9); + zip.compress(); + + FileUtils.deleteDirectory(destDir); + } catch (IOException e) { + log.error("zip.compress() is fail " + StringUtil.getStackTrace(e)); + } + return new String[] { destFilePath, zipPath + ".zip" }; + } + + private String setColumnNames(String nename, List<String> names, String type) { + // write xml + String xmlpath = localPath + nename + "/" + type + "/"; + File xmlpathfile = new File(xmlpath); + if (!xmlpathfile.exists()) { + xmlpathfile.mkdirs(); + } + String xmlFileName = nename + dateFormat.format(new Date()) + System.nanoTime(); + String fieldLine = ""; + for (int i = 0; i < names.size(); i++) { + String field = "\t<Field>\r\n" + "\t\t<FieldNo>" + i + "</FieldNo>\r\n" + "\t\t<FieldName>" + names.get(i) + + "</FieldName>\r\n" + "\t\t<FieldType>" + names.get(i) + "</FieldType>\r\n" + + "\t\t<FieldNameOther>" + names.get(i) + "</FieldNameOther>\r\n" + "\t</Field>\r\n"; + fieldLine = fieldLine + field; + } + + String str = "<?xml version=\"1.0\" encoding=\"gbk\"?>\r\n" + "<xml>\r\n" + "<FILE_STRUCTURE>\r\n" + fieldLine + + "</FILE_STRUCTURE>\r\n" + "</xml>\r\n"; + String xmlPathAndFileName = xmlpath + xmlFileName + ".xml"; + try { + this.writeDetail(xmlPathAndFileName, str); + } catch (Exception e) { + log.error("writeDetail is fail ,xmlFileName=" + xmlFileName + StringUtil.getStackTrace(e)); + } + + return xmlPathAndFileName; + } + + private void writeDetail(String detailFileName, String str) throws Exception { + + try (OutputStream readOut = new FileOutputStream(new File(detailFileName), false); + OutputStreamWriter writer = new OutputStreamWriter(readOut)) { + writer.write(str); + writer.flush(); + } catch (IOException e) { + throw e; + } + } + + private void appendLine(LinkedHashMap<String, String> nameAndValue, BufferedOutputStream bos) { + StringBuilder lineDatas = new StringBuilder(); + + for (String key : nameAndValue.keySet()) { + lineDatas.append(nameAndValue.get(key)).append("|"); + } + try { + bos.write(lineDatas.toString().getBytes()); + bos.write("\n".getBytes()); + } catch (IOException e) { + log.error("appendLine error " + StringUtil.getStackTrace(e)); + } + } + + // private void appendLine(List<String> values,BufferedOutputStream bos) { + // StringBuilder lineDatas = new StringBuilder(); + // + // for (String value : values) { + // lineDatas.append(value).append("|"); + // } + // try { + // bos.write(lineDatas.toString().getBytes()); + // bos.write("\n".getBytes()); + // } catch (IOException e) { + // log.error("appendLine error "+StringUtil.getStackTrace(e)); + // } + // } + + public List<File> decompressed(String fileName) { + List<File> filelist = new ArrayList<File>(); + + if (fileName.indexOf(".gz") > 1) { + try { + File decompressFile = deGz(fileName); + filelist.add(decompressFile); + new File(fileName).delete(); + } catch (IOException e) { + log.error("decompressed is fail " + StringUtil.getStackTrace(e)); + } + } else if (fileName.indexOf(".zip") > 1) { + try { + File[] files = deZip(new File(fileName)); + new File(fileName).delete(); + for (File temp : files) { + filelist.add(temp); + } + } catch (Exception e) { + log.error("decompressed is fail " + StringUtil.getStackTrace(e)); + } + } else { + filelist.add(new File(fileName)); + } + + return filelist; + } + + private File deGz(String gzFileName) throws IOException { + Gunzip gunzip = new Gunzip(); + String orgFile = gzFileName.replace(".gz", ""); + gunzip.unCompress(gzFileName, orgFile); + return new File(orgFile); + } + + public File[] deZip(File file) throws Exception { + + String regx = "(.*).zip"; + Pattern p = Pattern.compile(regx); + Matcher m = p.matcher(file.getName()); + if (m.find()) { + String orgFile = localPath + m.group(1) + "/"; + UnZip unzip = new UnZip(file.getAbsolutePath(), orgFile); + unzip.deCompress(); + file = new File(orgFile); + } + File[] files = file.listFiles(); + + return files; + + } + + private List<String> ftpDownload(CollectVo collectVo) { + + List<String> fileList = new ArrayList<String>(); + // IP + String ip = collectVo.getIP(); + // port + String port = collectVo.getPort(); + // user + String user = collectVo.getUser(); + // password + String password = collectVo.getPassword(); + // isPassiveMode + String passivemode = collectVo.getPassive(); + + FTPInterface ftpClient = new FTPSrv(); + + // login + try { + log.info("ftp login ,ip=[" + ip + "] port=[" + port + "] user=[" + user + "]password=[" + password + "]"); + ftpClient.login(ip, Integer.parseInt(port), user, password, "GBK", Boolean.parseBoolean(passivemode), + 5 * 60 * 1000); + } catch (Exception e) { + log.error("login fail,ip=[" + ip + "] port=[" + port + "] user=[" + user + "]password=[" + password + "]" + + StringUtil.getStackTrace(e)); + return fileList; + } + + // download + String dir = collectVo.getRemotepath(); + List<String> searchExprList = new ArrayList<String>(); + String[] FPath = dir.split(";"); + for (int i = 0; i < FPath.length; i++) { + int oldSize = searchExprList.size(); + String conpath = FPath[i] + collectVo.getMatch(); + Hashtable<String, String> varMap = new Hashtable<String, String>(); + long collectPeriod = 900; + try { + collectPeriod = Long.parseLong(collectVo.getGranularity()) * 60; + log.info("collectPeriod =[" + collectPeriod + "]"); + } catch (NumberFormatException e) { + e.printStackTrace(); + } + long[] d = DateUtil.getScanScope(new Date(), collectPeriod); + searchExprList.add(VarExprParser.replaceVar(conpath, d[0], d[1])); + + varMap.clear(); + varMap = null; + log.info("[" + conpath + "] result[" + (searchExprList.size() - oldSize) + "] path"); + conpath = null; + } + String nowdir = null; + try { + nowdir = ftpClient.pwd(); + searchExprList = getPathNoRegular(searchExprList, ftpClient); + } catch (Exception e1) { + log.error(" collect fail ", e1); + return fileList; + } + List<AFtpRemoteFile> remoteFiles = new ArrayList<AFtpRemoteFile>(); + for (String expr : searchExprList) { + ftpClient.chdir(nowdir); + String keys[] = parseExprKeys(expr); + String ftpRegular = keys[1]; + String ftpDir = keys[0]; + + boolean cdsucess = ftpClient.chdir(ftpDir); + if (cdsucess) { + AFtpRemoteFile[] arf = (AFtpRemoteFile[]) ftpClient.list(); + log.info(" list [" + ftpDir + "] result[" + (arf == null ? "null" : arf.length) + "] files"); + // filter + + rfileFilter(remoteFiles, arf, ftpRegular); + + keys = null; + ftpRegular = ftpDir = null; + + for (AFtpRemoteFile ftpRemoteFile : remoteFiles) { + if (!new File(localPath).exists()) { + try { + new File(localPath).mkdir(); + } catch (Exception e) { + log.error("create localPath is fail localPath=" + localPath + " " + + StringUtil.getStackTrace(e)); + } + } + + if (!new File(localPath).exists()) { + new File(localPath).mkdirs(); + } + + String localFileName = localPath + ftpRemoteFile.getFileName(); + File loaclFile = new File(localFileName); + if (loaclFile.exists()) { + loaclFile.delete(); + } + + boolean flag = ftpClient.downloadFile(ftpRemoteFile.getAbsFileName(), localFileName); + + if (flag) { + fileList.add(localFileName); + } else { + log.error("download file fail fileName=" + ftpRemoteFile.getAbsFileName()); + } + } + + } else { + log.error("cd dir is faill dir =[" + ftpDir + "]"); + } + } + + return fileList; + } + + private void rfileFilter(List<AFtpRemoteFile> fileContainer, AFtpRemoteFile[] arfs, String ftpRegular) { + if (ftpRegular != null && ftpRegular.length() > 0) { + Pattern pattern = null; + try { + pattern = Pattern.compile(ftpRegular, Pattern.CASE_INSENSITIVE); + } catch (Exception e) { + log.info("[" + ftpRegular + "]Pattern.compile exception:" + e.getMessage()); + // should rethrow exception or return from here + } + int hisSize = fileContainer.size(); + for (int j = 0; arfs != null && j < arfs.length; j++) { + String fileName = parseFileName(arfs[j].getFileName()); + Matcher matcher = null; + if (pattern != null) + matcher = pattern.matcher(fileName); + else { + // define the flow when pattern is null + } + + if (matcher.find()) + fileContainer.add(arfs[j]); + } + log.info("[" + ftpRegular + "]filter[" + (fileContainer.size() - hisSize) + "]filse"); + pattern = null; + } else { + for (int j = 0; arfs != null && j < arfs.length; j++) + fileContainer.add(arfs[j]); + } + + } + + private String parseFileName(String fileName) { + int idx = fileName.lastIndexOf("/"); + if (idx == -1) + return fileName; + return fileName.substring(idx + 1, fileName.length()); + } + + private String[] parseExprKeys(String source) { + + if (source.indexOf(";") > -1) { + source = source.substring(0, source.indexOf(";")); + } + if (source.endsWith("/")) + return new String[] { source, "" }; + + int idx = source.lastIndexOf("/"); + String[] dirkeys = new String[2]; + dirkeys[0] = source.substring(0, idx + 1); + dirkeys[1] = source.substring(idx + 1, source.length()); + return dirkeys; + } + + public List<String> getPathNoRegular(List<String> searchExprList, FTPInterface ftpCache) throws Exception { + boolean isregular = false; + List<String> regularList = new ArrayList<String>(); + for (String regular : searchExprList) { + Pattern lpattern = null; + try { + lpattern = Pattern.compile("(.*/)<([^/]+)>(/.*)"); + } catch (Exception e) { + log.error("[" + regular + "]compile fails:" + e.getMessage()); + e.printStackTrace(); + } + Matcher matcher = null; + if (lpattern != null) + matcher = lpattern.matcher(regular); + else { + // define flow in case lpattern is null + } + + if (matcher.find()) { + isregular = true; + String parpath = matcher.group(1); + try { + boolean isin = ftpCache.chdir(parpath); + if (isin) { + log.info("cd dir [" + parpath + "] sucess"); + } else { + log.error("cd dir [" + parpath + "] fail"); + } + } catch (Exception e) { + log.error(" cd dir [" + parpath + "]fail", e); + throw e; + } + RemoteFile[] remotef = ftpCache.list(); + for (RemoteFile aremote : remotef) { + if (aremote.isDirectory() && aremote.getFileName().matches(matcher.group(2))) { + regularList.add(matcher.group(1) + aremote.getFileName() + matcher.group(3)); + } + } + } else { + regularList.add(regular); + } + } + if (isregular == true) { + getPathNoRegular(regularList, ftpCache); + } + return regularList; + } } |