summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ems/boco/src/main/java/org/onap/vfc/nfvo/emsdriver/collector/TaskThread.java133
-rw-r--r--ems/boco/src/main/java/org/onap/vfc/nfvo/emsdriver/collector/TaskThreadService.java7
2 files changed, 50 insertions, 90 deletions
diff --git a/ems/boco/src/main/java/org/onap/vfc/nfvo/emsdriver/collector/TaskThread.java b/ems/boco/src/main/java/org/onap/vfc/nfvo/emsdriver/collector/TaskThread.java
index c4f7b1f..c7b87e9 100644
--- a/ems/boco/src/main/java/org/onap/vfc/nfvo/emsdriver/collector/TaskThread.java
+++ b/ems/boco/src/main/java/org/onap/vfc/nfvo/emsdriver/collector/TaskThread.java
@@ -42,8 +42,8 @@ import java.util.regex.Pattern;
public class TaskThread implements Runnable {
- public Log log = LogFactory.getLog(TaskThread.class);
- public MessageChannel pmResultChannel;
+ private static final Log log = LogFactory.getLog(TaskThread.class);
+ public MessageChannel pmResultChannel; //This should also be private. Need to change in TastThreadTest
private MessageChannel cmResultChannel;
private CollectMsg data;
@@ -144,7 +144,7 @@ public class TaskThread implements Runnable {
String endLocalName = null;
String objectType = null;
- LinkedHashMap<String, String> commonNameAndValue = new LinkedHashMap<String, String>();
+ LinkedHashMap<String, String> commonNameAndValue = new LinkedHashMap<>();
LinkedHashMap<String, String> pmDatas = null;
LinkedHashMap<Integer, String> pmNames = null;
@@ -159,11 +159,8 @@ public class TaskThread implements Runnable {
if ("FileHeader".equalsIgnoreCase(localName)) {
fileHeaderStart = true;
}
- if (fileHeaderStart) {
- if (!"FileHeader".equalsIgnoreCase(localName)) {
- commonNameAndValue.put(localName, reader.getElementText().trim());
- }
-
+ if (fileHeaderStart && !"FileHeader".equalsIgnoreCase(localName)) {
+ commonNameAndValue.put(localName, reader.getElementText().trim());
}
if ("Measurements".equalsIgnoreCase(localName)) {
// a new Measurement starts
@@ -180,13 +177,11 @@ public class TaskThread implements Runnable {
pmNames = new LinkedHashMap<Integer, String>();
}
- if (pmNameFlag) {
+ if (pmNameFlag && "N".equalsIgnoreCase(localName)) {
// pmname handler, add columnNames
- if ("N".equalsIgnoreCase(localName)) {
- nameIndex = Integer.parseInt(getXMLAttribute(reader, "i"));
- String text = reader.getElementText().trim();
- pmNames.put(nameIndex, text);
- }
+ nameIndex = Integer.parseInt(getXMLAttribute(reader, "i"));
+ String text = reader.getElementText().trim();
+ pmNames.put(nameIndex, text);
}
if ("PmData".equalsIgnoreCase(localName)) {
pmDataFlag = true;
@@ -204,7 +199,6 @@ public class TaskThread implements Runnable {
}
}
if (objectFlag) {
-
// add columnValues
if ("V".equalsIgnoreCase(localName)) {
String indexStr = getXMLAttribute(reader, "i");
@@ -244,8 +238,6 @@ public class TaskThread implements Runnable {
}
if ("SV".equalsIgnoreCase(localName)) {
String subValue = reader.getElementText().trim();
- // pmDatas.put(currentMea+subName,
- // subValue);
pmDatas.put(subName, subValue);
}
}
@@ -258,7 +250,6 @@ public class TaskThread implements Runnable {
// ...
break;
case XMLStreamConstants.END_ELEMENT:
- // ...
endLocalName = reader.getLocalName();
if ("Object".equalsIgnoreCase(endLocalName)) {
objectFlag = false;
@@ -270,17 +261,15 @@ public class TaskThread implements Runnable {
pmResultChannel.clear();
log.error("collectResultChannel.put(resultMap) error ", e);
}
- // System.out.println(pmDatas);
- // pmDatas.clear();
}
- if (endLocalName.equalsIgnoreCase("PmData")) {
+ if ("PmData".equalsIgnoreCase(endLocalName)) {
pmDataFlag = false;
}
- if (endLocalName.equalsIgnoreCase("PmName")) {
+ if ("PmName".equalsIgnoreCase(endLocalName)) {
pmNameFlag = false;
}
- if (endLocalName.equalsIgnoreCase("Measurements")) {
+ if ("Measurements".equalsIgnoreCase(endLocalName)) {
// a measurement over
measurementStart = false;
}
@@ -291,7 +280,7 @@ public class TaskThread implements Runnable {
break;
}
} catch (Exception e) {
- log.error("", e);
+ log.error("Exception: ", e);
event = reader.next();
}
}
@@ -320,8 +309,8 @@ public class TaskThread implements Runnable {
public boolean processPMCsv(File tempfile) {
- List<String> columnNames = new ArrayList<String>();
- List<String> commonValues = new ArrayList<String>();
+ List<String> columnNames = new ArrayList<>();
+ List<String> commonValues = new ArrayList<>();
try (FileInputStream brs = new FileInputStream(tempfile);
InputStreamReader isr = new InputStreamReader(brs, Constant.ENCODING_UTF8);
BufferedReader br = new BufferedReader(isr)) {
@@ -342,21 +331,18 @@ public class TaskThread implements Runnable {
}
String valueLine = "";
- List<String> valuelist = new ArrayList<String>();
+ List<String> valuelist = new ArrayList<>();
while ((valueLine = br.readLine()) != null) {
- if (valueLine.trim().equals("")) {
+ if ("".equals(valueLine.trim())) {
continue;
}
- // countNum ++;
String[] values = valueLine.split("\\|", -1);
valuelist.addAll(commonValues);
for (String value : values) {
valuelist.add(value);
}
- // this.appendLine(valuelist, bos);
- // resultMap
HashMap<String, String> resultMap = this.resultMap(columnNames, valuelist);
try {
pmResultChannel.put(resultMap);
@@ -376,7 +362,7 @@ public class TaskThread implements Runnable {
private HashMap<String, String> resultMap(List<String> columnNames, List<String> valuelist) {
- HashMap<String, String> resultMap = new HashMap<String, String>();
+ HashMap<String, String> resultMap = new HashMap<>();
if (columnNames.size() == valuelist.size()) {
for (int i = 0; i < columnNames.size(); i++) {
resultMap.put(columnNames.get(i), valuelist.get(i));
@@ -401,15 +387,14 @@ public class TaskThread implements Runnable {
boolean FieldNameFlag = false;
boolean FieldValueFlag = false;
- // line num
int countNum = 0;
String xmlPathAndFileName = null;
String localName = null;
String endLocalName = null;
String rmUID = null;
int index = -1;
- ArrayList<String> names = new ArrayList<String>();// colname
- LinkedHashMap<String, String> nameAndValue = new LinkedHashMap<String, String>();
+ ArrayList<String> names = new ArrayList<>();// colname
+ LinkedHashMap<String, String> nameAndValue = new LinkedHashMap<>();
try( FileInputStream fis = new FileInputStream(tempfile);
InputStreamReader isr = new InputStreamReader(fis, Constant.ENCODING_UTF8)){
@@ -427,11 +412,9 @@ public class TaskThread implements Runnable {
if ("FieldName".equalsIgnoreCase(localName)) {
FieldNameFlag = true;
}
- if (FieldNameFlag) {
- if ("N".equalsIgnoreCase(localName)) {
- String colName = reader.getElementText().trim();
- names.add(colName);
- }
+ if (FieldNameFlag && "N".equalsIgnoreCase(localName)) {
+ String colName = reader.getElementText().trim();
+ names.add(colName);
}
if ("FieldValue".equalsIgnoreCase(localName)) {
FieldValueFlag = true;
@@ -479,7 +462,7 @@ public class TaskThread implements Runnable {
break;
}
} catch (Exception e) {
- log.error("" + StringUtil.getStackTrace(e));
+ log.error("Exception: ",e);
event = reader.next();
}
}
@@ -519,7 +502,6 @@ public class TaskThread implements Runnable {
}
private void setMessage(String message) {
-
try {
cmResultChannel.put(message);
} catch (Exception e) {
@@ -529,8 +511,7 @@ public class TaskThread implements Runnable {
public String createMessage(String zipName, String user, String pwd, String ip, String port, int countNum,
String nename) {
-
- StringBuffer strBuffer = new StringBuffer();
+ StringBuilder strBuffer = new StringBuilder();
strBuffer.append("<?xml version=\"1.0\" encoding=\"UTF-8\"?>"
+ "<FILE_DATA_READY_UL xmlns:xsi=\" http://www.w3.org/2001/XMLSchema-instance\">"
+ "<Header SessionID=\"");
@@ -596,7 +577,6 @@ public class TaskThread implements Runnable {
+ StringUtil.getStackTrace(e));
return;
}
- // ftpClient.store(zipFilePath, remoteFile);
log.debug("store [" + zipFilePath + "]to[" + remoteFile + "]");
FileUtils.deleteQuietly(new File(zipFilePath));
@@ -605,18 +585,15 @@ public class TaskThread implements Runnable {
private String[] createZipFile(String csvpathAndFileName, String xmlPathAndFileName, String nename)
throws IOException {
-
String zipPath = resultPath + nename + dateFormat.format(new Date()) + "_" + System.nanoTime();
File destDir = new File(zipPath);
destDir.mkdirs();
-
try {
FileUtils.copyFileToDirectory(new File(csvpathAndFileName), destDir);
FileUtils.copyFileToDirectory(new File(xmlPathAndFileName), destDir);
} catch (IOException e) {
- throw e;
- // flow should end here in case of exception
+ throw new IOException("createZipFile",e);
}
String destFilePath = zipPath + ".zip";
@@ -628,6 +605,7 @@ public class TaskThread implements Runnable {
FileUtils.deleteDirectory(destDir);
} catch (IOException e) {
log.error("zip.compress() is fail " + StringUtil.getStackTrace(e));
+ throw new IOException("createZipFile",e);
}
return new String[] { destFilePath, zipPath + ".zip" };
}
@@ -661,23 +639,26 @@ public class TaskThread implements Runnable {
}
private void writeDetail(String detailFileName, String str) throws IOException {
-
try (OutputStream readOut = new FileOutputStream(new File(detailFileName), false);
OutputStreamWriter writer = new OutputStreamWriter(readOut)) {
writer.write(str);
writer.flush();
} catch (IOException e) {
- throw e;
+ throw new IOException("writeDetail",e);
}
}
private void appendLine(LinkedHashMap<String, String> nameAndValue, BufferedOutputStream bos) {
- StringBuilder lineDatas = new StringBuilder();
+ try{
+ StringBuilder lineDatas = new StringBuilder();
+
+ for (String key : nameAndValue.keySet()) {
+ lineDatas.append(nameAndValue.get(key)).append("|");
+ }
- for (String key : nameAndValue.keySet()) {
- lineDatas.append(nameAndValue.get(key)).append("|");
- }
- try {
+ /*for (HashMap.Entry<String, String> entry : nameAndValue.entrySet()) {
+ lineDatas.append(entry.getValue()).append("|");
+ }*/
bos.write(lineDatas.toString().getBytes());
bos.write("\n".getBytes());
} catch (IOException e) {
@@ -685,20 +666,6 @@ public class TaskThread implements Runnable {
}
}
- // private void appendLine(List<String> values,BufferedOutputStream bos) {
- // StringBuilder lineDatas = new StringBuilder();
- //
- // for (String value : values) {
- // lineDatas.append(value).append("|");
- // }
- // try {
- // bos.write(lineDatas.toString().getBytes());
- // bos.write("\n".getBytes());
- // } catch (IOException e) {
- // log.error("appendLine error "+StringUtil.getStackTrace(e));
- // }
- // }
-
public List<File> decompressed(String fileName) {
List<File> filelist = new ArrayList<File>();
@@ -785,22 +752,19 @@ public class TaskThread implements Runnable {
for (int i = 0; i < FPath.length; i++) {
int oldSize = searchExprList.size();
String conpath = FPath[i] + collectVo.getMatch();
- Hashtable<String, String> varMap = new Hashtable<String, String>();
+ HashMap<String, String> varMap = new HashMap<>();
long collectPeriod = 900;
try {
collectPeriod = Long.parseLong(collectVo.getGranularity()) * 60;
log.info("collectPeriod =[" + collectPeriod + "]");
} catch (NumberFormatException e) {
- //e.printStackTrace();
log.error("NumberFormatException" ,e);
}
long[] d = DateUtil.getScanScope(new Date(), collectPeriod);
searchExprList.add(VarExprParser.replaceVar(conpath, d[0], d[1]));
varMap.clear();
- varMap = null;
log.info("[" + conpath + "] result[" + (searchExprList.size() - oldSize) + "] path");
- conpath = null;
}
String nowdir = null;
try {
@@ -825,9 +789,6 @@ public class TaskThread implements Runnable {
rfileFilter(remoteFiles, arf, ftpRegular);
- keys = null;
- ftpRegular = ftpDir = null;
-
for (AFtpRemoteFile ftpRemoteFile : remoteFiles) {
if (!new File(localPath).exists()) {
try {
@@ -843,9 +804,8 @@ public class TaskThread implements Runnable {
}
String localFileName = localPath + ftpRemoteFile.getFileName();
- File loaclFile = new File(localFileName);
- if (loaclFile.exists()) {
- loaclFile.delete();
+ if(new File(localFileName).exists()){
+ new File(localFileName).delete();
}
boolean flag = ftpClient.downloadFile(ftpRemoteFile.getAbsFileName(), localFileName);
@@ -871,7 +831,7 @@ public class TaskThread implements Runnable {
try {
pattern = Pattern.compile(ftpRegular, Pattern.CASE_INSENSITIVE);
} catch (Exception e) {
- log.info("[" + ftpRegular + "]Pattern.compile exception:" + e.getMessage());
+ log.info("[" + ftpRegular + "]Pattern.compile exception:",e);
// should rethrow exception or return from here
}
int hisSize = fileContainer.size();
@@ -884,11 +844,10 @@ public class TaskThread implements Runnable {
// define the flow when pattern is null
}
- if (matcher.find())
+ if (null != matcher && matcher.find())
fileContainer.add(arfs[j]);
}
log.info("[" + ftpRegular + "]filter[" + (fileContainer.size() - hisSize) + "]filse");
- pattern = null;
} else {
for (int j = 0; arfs != null && j < arfs.length; j++)
fileContainer.add(arfs[j]);
@@ -918,7 +877,7 @@ public class TaskThread implements Runnable {
return dirkeys;
}
- public List<String> getPathNoRegular(List<String> searchExprList, FTPInterface ftpCache) throws Exception {
+ public List<String> getPathNoRegular(List<String> searchExprList, FTPInterface ftpCache) throws IOException {
boolean isregular = false;
List<String> regularList = new ArrayList<String>();
for (String regular : searchExprList) {
@@ -927,7 +886,7 @@ public class TaskThread implements Runnable {
lpattern = Pattern.compile("(.*/)<([^/]+)>(/.*)");
} catch (Exception e) {
log.error("[" + regular + "]compile fails:" + e.getMessage());
- // e.printStackTrace();
+ throw new IOException("getPathNoRegular", e);
}
Matcher matcher = null;
if (lpattern != null)
@@ -936,7 +895,7 @@ public class TaskThread implements Runnable {
// define flow in case lpattern is null
}
- if (matcher.find()) {
+ if (null != matcher && matcher.find()) {
isregular = true;
String parpath = matcher.group(1);
try {
@@ -948,7 +907,7 @@ public class TaskThread implements Runnable {
}
} catch (Exception e) {
log.error(" cd dir [" + parpath + "]fail", e);
- throw e;
+ throw new IOException("ftpCache.chdir",e);
}
RemoteFile[] remotef = ftpCache.list();
for (RemoteFile aremote : remotef) {
diff --git a/ems/boco/src/main/java/org/onap/vfc/nfvo/emsdriver/collector/TaskThreadService.java b/ems/boco/src/main/java/org/onap/vfc/nfvo/emsdriver/collector/TaskThreadService.java
index 1b8963f..b0dca40 100644
--- a/ems/boco/src/main/java/org/onap/vfc/nfvo/emsdriver/collector/TaskThreadService.java
+++ b/ems/boco/src/main/java/org/onap/vfc/nfvo/emsdriver/collector/TaskThreadService.java
@@ -27,8 +27,8 @@ import java.util.concurrent.*;
public class TaskThreadService extends Thread {
private final ExecutorService pool;
- public Log log = LogFactory.getLog(TaskThreadService.class);
- private BlockingQueue<CollectMsg> queue = new LinkedBlockingQueue<CollectMsg>();
+ private static final Log log = LogFactory.getLog(TaskThreadService.class);
+ private BlockingQueue<CollectMsg> queue = new LinkedBlockingQueue<>();
private boolean startFlag = true;
private long timeStamp = System.currentTimeMillis();
@@ -39,7 +39,8 @@ public class TaskThreadService extends Thread {
public static TaskThreadService getInstance(int poolSize) {
return new TaskThreadService(poolSize);
}
-
+
+ @Override
public void run() { // run the service
try {
while (startFlag) {