diff options
-rw-r--r-- | ems/boco/src/main/java/org/onap/vfc/nfvo/emsdriver/collector/TaskThread.java | 133 | ||||
-rw-r--r-- | ems/boco/src/main/java/org/onap/vfc/nfvo/emsdriver/collector/TaskThreadService.java | 7 |
2 files changed, 50 insertions, 90 deletions
diff --git a/ems/boco/src/main/java/org/onap/vfc/nfvo/emsdriver/collector/TaskThread.java b/ems/boco/src/main/java/org/onap/vfc/nfvo/emsdriver/collector/TaskThread.java index c4f7b1f..c7b87e9 100644 --- a/ems/boco/src/main/java/org/onap/vfc/nfvo/emsdriver/collector/TaskThread.java +++ b/ems/boco/src/main/java/org/onap/vfc/nfvo/emsdriver/collector/TaskThread.java @@ -42,8 +42,8 @@ import java.util.regex.Pattern; public class TaskThread implements Runnable { - public Log log = LogFactory.getLog(TaskThread.class); - public MessageChannel pmResultChannel; + private static final Log log = LogFactory.getLog(TaskThread.class); + public MessageChannel pmResultChannel; //This should also be private. Need to change in TastThreadTest private MessageChannel cmResultChannel; private CollectMsg data; @@ -144,7 +144,7 @@ public class TaskThread implements Runnable { String endLocalName = null; String objectType = null; - LinkedHashMap<String, String> commonNameAndValue = new LinkedHashMap<String, String>(); + LinkedHashMap<String, String> commonNameAndValue = new LinkedHashMap<>(); LinkedHashMap<String, String> pmDatas = null; LinkedHashMap<Integer, String> pmNames = null; @@ -159,11 +159,8 @@ public class TaskThread implements Runnable { if ("FileHeader".equalsIgnoreCase(localName)) { fileHeaderStart = true; } - if (fileHeaderStart) { - if (!"FileHeader".equalsIgnoreCase(localName)) { - commonNameAndValue.put(localName, reader.getElementText().trim()); - } - + if (fileHeaderStart && !"FileHeader".equalsIgnoreCase(localName)) { + commonNameAndValue.put(localName, reader.getElementText().trim()); } if ("Measurements".equalsIgnoreCase(localName)) { // a new Measurement starts @@ -180,13 +177,11 @@ public class TaskThread implements Runnable { pmNames = new LinkedHashMap<Integer, String>(); } - if (pmNameFlag) { + if (pmNameFlag && "N".equalsIgnoreCase(localName)) { // pmname handler, add columnNames - if ("N".equalsIgnoreCase(localName)) { - nameIndex = Integer.parseInt(getXMLAttribute(reader, "i")); - String text = reader.getElementText().trim(); - pmNames.put(nameIndex, text); - } + nameIndex = Integer.parseInt(getXMLAttribute(reader, "i")); + String text = reader.getElementText().trim(); + pmNames.put(nameIndex, text); } if ("PmData".equalsIgnoreCase(localName)) { pmDataFlag = true; @@ -204,7 +199,6 @@ public class TaskThread implements Runnable { } } if (objectFlag) { - // add columnValues if ("V".equalsIgnoreCase(localName)) { String indexStr = getXMLAttribute(reader, "i"); @@ -244,8 +238,6 @@ public class TaskThread implements Runnable { } if ("SV".equalsIgnoreCase(localName)) { String subValue = reader.getElementText().trim(); - // pmDatas.put(currentMea+subName, - // subValue); pmDatas.put(subName, subValue); } } @@ -258,7 +250,6 @@ public class TaskThread implements Runnable { // ... break; case XMLStreamConstants.END_ELEMENT: - // ... endLocalName = reader.getLocalName(); if ("Object".equalsIgnoreCase(endLocalName)) { objectFlag = false; @@ -270,17 +261,15 @@ public class TaskThread implements Runnable { pmResultChannel.clear(); log.error("collectResultChannel.put(resultMap) error ", e); } - // System.out.println(pmDatas); - // pmDatas.clear(); } - if (endLocalName.equalsIgnoreCase("PmData")) { + if ("PmData".equalsIgnoreCase(endLocalName)) { pmDataFlag = false; } - if (endLocalName.equalsIgnoreCase("PmName")) { + if ("PmName".equalsIgnoreCase(endLocalName)) { pmNameFlag = false; } - if (endLocalName.equalsIgnoreCase("Measurements")) { + if ("Measurements".equalsIgnoreCase(endLocalName)) { // a measurement over measurementStart = false; } @@ -291,7 +280,7 @@ public class TaskThread implements Runnable { break; } } catch (Exception e) { - log.error("", e); + log.error("Exception: ", e); event = reader.next(); } } @@ -320,8 +309,8 @@ public class TaskThread implements Runnable { public boolean processPMCsv(File tempfile) { - List<String> columnNames = new ArrayList<String>(); - List<String> commonValues = new ArrayList<String>(); + List<String> columnNames = new ArrayList<>(); + List<String> commonValues = new ArrayList<>(); try (FileInputStream brs = new FileInputStream(tempfile); InputStreamReader isr = new InputStreamReader(brs, Constant.ENCODING_UTF8); BufferedReader br = new BufferedReader(isr)) { @@ -342,21 +331,18 @@ public class TaskThread implements Runnable { } String valueLine = ""; - List<String> valuelist = new ArrayList<String>(); + List<String> valuelist = new ArrayList<>(); while ((valueLine = br.readLine()) != null) { - if (valueLine.trim().equals("")) { + if ("".equals(valueLine.trim())) { continue; } - // countNum ++; String[] values = valueLine.split("\\|", -1); valuelist.addAll(commonValues); for (String value : values) { valuelist.add(value); } - // this.appendLine(valuelist, bos); - // resultMap HashMap<String, String> resultMap = this.resultMap(columnNames, valuelist); try { pmResultChannel.put(resultMap); @@ -376,7 +362,7 @@ public class TaskThread implements Runnable { private HashMap<String, String> resultMap(List<String> columnNames, List<String> valuelist) { - HashMap<String, String> resultMap = new HashMap<String, String>(); + HashMap<String, String> resultMap = new HashMap<>(); if (columnNames.size() == valuelist.size()) { for (int i = 0; i < columnNames.size(); i++) { resultMap.put(columnNames.get(i), valuelist.get(i)); @@ -401,15 +387,14 @@ public class TaskThread implements Runnable { boolean FieldNameFlag = false; boolean FieldValueFlag = false; - // line num int countNum = 0; String xmlPathAndFileName = null; String localName = null; String endLocalName = null; String rmUID = null; int index = -1; - ArrayList<String> names = new ArrayList<String>();// colname - LinkedHashMap<String, String> nameAndValue = new LinkedHashMap<String, String>(); + ArrayList<String> names = new ArrayList<>();// colname + LinkedHashMap<String, String> nameAndValue = new LinkedHashMap<>(); try( FileInputStream fis = new FileInputStream(tempfile); InputStreamReader isr = new InputStreamReader(fis, Constant.ENCODING_UTF8)){ @@ -427,11 +412,9 @@ public class TaskThread implements Runnable { if ("FieldName".equalsIgnoreCase(localName)) { FieldNameFlag = true; } - if (FieldNameFlag) { - if ("N".equalsIgnoreCase(localName)) { - String colName = reader.getElementText().trim(); - names.add(colName); - } + if (FieldNameFlag && "N".equalsIgnoreCase(localName)) { + String colName = reader.getElementText().trim(); + names.add(colName); } if ("FieldValue".equalsIgnoreCase(localName)) { FieldValueFlag = true; @@ -479,7 +462,7 @@ public class TaskThread implements Runnable { break; } } catch (Exception e) { - log.error("" + StringUtil.getStackTrace(e)); + log.error("Exception: ",e); event = reader.next(); } } @@ -519,7 +502,6 @@ public class TaskThread implements Runnable { } private void setMessage(String message) { - try { cmResultChannel.put(message); } catch (Exception e) { @@ -529,8 +511,7 @@ public class TaskThread implements Runnable { public String createMessage(String zipName, String user, String pwd, String ip, String port, int countNum, String nename) { - - StringBuffer strBuffer = new StringBuffer(); + StringBuilder strBuffer = new StringBuilder(); strBuffer.append("<?xml version=\"1.0\" encoding=\"UTF-8\"?>" + "<FILE_DATA_READY_UL xmlns:xsi=\" http://www.w3.org/2001/XMLSchema-instance\">" + "<Header SessionID=\""); @@ -596,7 +577,6 @@ public class TaskThread implements Runnable { + StringUtil.getStackTrace(e)); return; } - // ftpClient.store(zipFilePath, remoteFile); log.debug("store [" + zipFilePath + "]to[" + remoteFile + "]"); FileUtils.deleteQuietly(new File(zipFilePath)); @@ -605,18 +585,15 @@ public class TaskThread implements Runnable { private String[] createZipFile(String csvpathAndFileName, String xmlPathAndFileName, String nename) throws IOException { - String zipPath = resultPath + nename + dateFormat.format(new Date()) + "_" + System.nanoTime(); File destDir = new File(zipPath); destDir.mkdirs(); - try { FileUtils.copyFileToDirectory(new File(csvpathAndFileName), destDir); FileUtils.copyFileToDirectory(new File(xmlPathAndFileName), destDir); } catch (IOException e) { - throw e; - // flow should end here in case of exception + throw new IOException("createZipFile",e); } String destFilePath = zipPath + ".zip"; @@ -628,6 +605,7 @@ public class TaskThread implements Runnable { FileUtils.deleteDirectory(destDir); } catch (IOException e) { log.error("zip.compress() is fail " + StringUtil.getStackTrace(e)); + throw new IOException("createZipFile",e); } return new String[] { destFilePath, zipPath + ".zip" }; } @@ -661,23 +639,26 @@ public class TaskThread implements Runnable { } private void writeDetail(String detailFileName, String str) throws IOException { - try (OutputStream readOut = new FileOutputStream(new File(detailFileName), false); OutputStreamWriter writer = new OutputStreamWriter(readOut)) { writer.write(str); writer.flush(); } catch (IOException e) { - throw e; + throw new IOException("writeDetail",e); } } private void appendLine(LinkedHashMap<String, String> nameAndValue, BufferedOutputStream bos) { - StringBuilder lineDatas = new StringBuilder(); + try{ + StringBuilder lineDatas = new StringBuilder(); + + for (String key : nameAndValue.keySet()) { + lineDatas.append(nameAndValue.get(key)).append("|"); + } - for (String key : nameAndValue.keySet()) { - lineDatas.append(nameAndValue.get(key)).append("|"); - } - try { + /*for (HashMap.Entry<String, String> entry : nameAndValue.entrySet()) { + lineDatas.append(entry.getValue()).append("|"); + }*/ bos.write(lineDatas.toString().getBytes()); bos.write("\n".getBytes()); } catch (IOException e) { @@ -685,20 +666,6 @@ public class TaskThread implements Runnable { } } - // private void appendLine(List<String> values,BufferedOutputStream bos) { - // StringBuilder lineDatas = new StringBuilder(); - // - // for (String value : values) { - // lineDatas.append(value).append("|"); - // } - // try { - // bos.write(lineDatas.toString().getBytes()); - // bos.write("\n".getBytes()); - // } catch (IOException e) { - // log.error("appendLine error "+StringUtil.getStackTrace(e)); - // } - // } - public List<File> decompressed(String fileName) { List<File> filelist = new ArrayList<File>(); @@ -785,22 +752,19 @@ public class TaskThread implements Runnable { for (int i = 0; i < FPath.length; i++) { int oldSize = searchExprList.size(); String conpath = FPath[i] + collectVo.getMatch(); - Hashtable<String, String> varMap = new Hashtable<String, String>(); + HashMap<String, String> varMap = new HashMap<>(); long collectPeriod = 900; try { collectPeriod = Long.parseLong(collectVo.getGranularity()) * 60; log.info("collectPeriod =[" + collectPeriod + "]"); } catch (NumberFormatException e) { - //e.printStackTrace(); log.error("NumberFormatException" ,e); } long[] d = DateUtil.getScanScope(new Date(), collectPeriod); searchExprList.add(VarExprParser.replaceVar(conpath, d[0], d[1])); varMap.clear(); - varMap = null; log.info("[" + conpath + "] result[" + (searchExprList.size() - oldSize) + "] path"); - conpath = null; } String nowdir = null; try { @@ -825,9 +789,6 @@ public class TaskThread implements Runnable { rfileFilter(remoteFiles, arf, ftpRegular); - keys = null; - ftpRegular = ftpDir = null; - for (AFtpRemoteFile ftpRemoteFile : remoteFiles) { if (!new File(localPath).exists()) { try { @@ -843,9 +804,8 @@ public class TaskThread implements Runnable { } String localFileName = localPath + ftpRemoteFile.getFileName(); - File loaclFile = new File(localFileName); - if (loaclFile.exists()) { - loaclFile.delete(); + if(new File(localFileName).exists()){ + new File(localFileName).delete(); } boolean flag = ftpClient.downloadFile(ftpRemoteFile.getAbsFileName(), localFileName); @@ -871,7 +831,7 @@ public class TaskThread implements Runnable { try { pattern = Pattern.compile(ftpRegular, Pattern.CASE_INSENSITIVE); } catch (Exception e) { - log.info("[" + ftpRegular + "]Pattern.compile exception:" + e.getMessage()); + log.info("[" + ftpRegular + "]Pattern.compile exception:",e); // should rethrow exception or return from here } int hisSize = fileContainer.size(); @@ -884,11 +844,10 @@ public class TaskThread implements Runnable { // define the flow when pattern is null } - if (matcher.find()) + if (null != matcher && matcher.find()) fileContainer.add(arfs[j]); } log.info("[" + ftpRegular + "]filter[" + (fileContainer.size() - hisSize) + "]filse"); - pattern = null; } else { for (int j = 0; arfs != null && j < arfs.length; j++) fileContainer.add(arfs[j]); @@ -918,7 +877,7 @@ public class TaskThread implements Runnable { return dirkeys; } - public List<String> getPathNoRegular(List<String> searchExprList, FTPInterface ftpCache) throws Exception { + public List<String> getPathNoRegular(List<String> searchExprList, FTPInterface ftpCache) throws IOException { boolean isregular = false; List<String> regularList = new ArrayList<String>(); for (String regular : searchExprList) { @@ -927,7 +886,7 @@ public class TaskThread implements Runnable { lpattern = Pattern.compile("(.*/)<([^/]+)>(/.*)"); } catch (Exception e) { log.error("[" + regular + "]compile fails:" + e.getMessage()); - // e.printStackTrace(); + throw new IOException("getPathNoRegular", e); } Matcher matcher = null; if (lpattern != null) @@ -936,7 +895,7 @@ public class TaskThread implements Runnable { // define flow in case lpattern is null } - if (matcher.find()) { + if (null != matcher && matcher.find()) { isregular = true; String parpath = matcher.group(1); try { @@ -948,7 +907,7 @@ public class TaskThread implements Runnable { } } catch (Exception e) { log.error(" cd dir [" + parpath + "]fail", e); - throw e; + throw new IOException("ftpCache.chdir",e); } RemoteFile[] remotef = ftpCache.list(); for (RemoteFile aremote : remotef) { diff --git a/ems/boco/src/main/java/org/onap/vfc/nfvo/emsdriver/collector/TaskThreadService.java b/ems/boco/src/main/java/org/onap/vfc/nfvo/emsdriver/collector/TaskThreadService.java index 1b8963f..b0dca40 100644 --- a/ems/boco/src/main/java/org/onap/vfc/nfvo/emsdriver/collector/TaskThreadService.java +++ b/ems/boco/src/main/java/org/onap/vfc/nfvo/emsdriver/collector/TaskThreadService.java @@ -27,8 +27,8 @@ import java.util.concurrent.*; public class TaskThreadService extends Thread { private final ExecutorService pool; - public Log log = LogFactory.getLog(TaskThreadService.class); - private BlockingQueue<CollectMsg> queue = new LinkedBlockingQueue<CollectMsg>(); + private static final Log log = LogFactory.getLog(TaskThreadService.class); + private BlockingQueue<CollectMsg> queue = new LinkedBlockingQueue<>(); private boolean startFlag = true; private long timeStamp = System.currentTimeMillis(); @@ -39,7 +39,8 @@ public class TaskThreadService extends Thread { public static TaskThreadService getInstance(int poolSize) { return new TaskThreadService(poolSize); } - + + @Override public void run() { // run the service try { while (startFlag) { |