summaryrefslogtreecommitdiffstats
path: root/components/datalake-handler/feeder/src/main/java/org
diff options
context:
space:
mode:
Diffstat (limited to 'components/datalake-handler/feeder/src/main/java/org')
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/FeederController.java2
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java11
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java23
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java12
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java31
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicConfigPollingService.java1
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java2
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/util/JsonUtil.java158
8 files changed, 225 insertions, 15 deletions
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/FeederController.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/FeederController.java
index 4fc9b7b6..6a44c4f2 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/FeederController.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/FeederController.java
@@ -86,7 +86,7 @@ public class FeederController {
@ApiOperation(value="Retrieve feeder status.")
public String status() {
String status = "Feeder is running: "+pullService.isRunning();
- log.info("senting feeder status ...");//TODO we can send what topics are monitored, how many messages are sent, etc.
+ log.info("sending feeder status ...");//TODO we can send what topics are monitored, how many messages are sent, etc.
return "{\"version\": \""+config.getDatalakeVersion()+"\", \"running\": "+pullService.isRunning()+"}";
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java
index 30737162..acb48aef 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java
@@ -100,6 +100,15 @@ public class Topic {
@Column(name = "`message_id_path`")
private String messageIdPath;
+ //paths to the array that need aggregation, comma separated, example: "/event/measurementsForVfScalingFields/diskUsageArray,/event/measurementsForVfScalingFields/cpuUsageArray,/event/measurementsForVfScalingFields/vNicPerformanceArray"
+ @Column(name = "`aggregate_array_path`")
+ private String aggregateArrayPath;
+
+ //paths to the element in array that need flatten, this element is used as label, comma separated,
+ //example: "/event/measurementsForVfScalingFields/astriMeasurement/astriDPMeasurementArray/astriInterface,..."
+ @Column(name = "`flatten_array_path`")
+ private String flattenArrayPath;
+
public Topic() {
}
@@ -149,6 +158,8 @@ public class Topic {
tConfig.setSaveRaw(isSaveRaw());
tConfig.setCorrelateClearedMessage(isCorrelateClearedMessage());
tConfig.setMessageIdPath(getMessageIdPath());
+ tConfig.setAggregateArrayPath(getAggregateArrayPath());
+ tConfig.setFlattenArrayPath(getFlattenArrayPath());
tConfig.setTtl(getTtl());
Set<Db> topicDb = getDbs();
List<String> dbList = new ArrayList<>();
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java
index deaa0969..8dfe1b16 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java
@@ -51,6 +51,8 @@ public class TopicConfig {
private int ttl;
private boolean correlateClearedMessage;
private String messageIdPath;
+ private String aggregateArrayPath;
+ private String flattenArrayPath;
public DataFormat getDataFormat2() {
if (dataFormat != null) {
@@ -60,7 +62,6 @@ public class TopicConfig {
}
}
-
public boolean supportHdfs() {
return containDb("HDFS");
}
@@ -105,6 +106,26 @@ public class TopicConfig {
return id;
}
+ public String[] getAggregateArrayPath2() {
+ String[] ret = null;
+
+ if (StringUtils.isNotBlank(aggregateArrayPath)) {
+ ret = aggregateArrayPath.split(",");
+ }
+
+ return ret;
+ }
+
+ public String[] getFlattenArrayPath2() {
+ String[] ret = null;
+
+ if (StringUtils.isNotBlank(flattenArrayPath)) {
+ ret = flattenArrayPath.split(",");
+ }
+
+ return ret;
+ }
+
@Override
public String toString() {
return name;
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java
index 2274ce99..3be5be6e 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java
@@ -58,11 +58,13 @@ public class DmaapService {
@Autowired
private TopicService topicService;
- ZooKeeper zk;
+ private ZooKeeper zk;
@PreDestroy
public void cleanUp() throws InterruptedException {
- zk.close();
+ if (zk != null) {
+ zk.close();
+ }
}
@PostConstruct
@@ -71,6 +73,7 @@ public class DmaapService {
}
//get all topic names from Zookeeper
+ //This method returns empty list if nothing found.
public List<String> getTopics() {
try {
if (zk == null) {
@@ -84,7 +87,7 @@ public class DmaapService {
return topics;
} catch (Exception e) {
zk = null;
- log.error("Can not get topic list from Zookeeper, for testing, going to use hard coded topic list.", e);
+ log.error("Can not get topic list from Zookeeper, return empty list.", e);
return Collections.emptyList();
}
}
@@ -119,9 +122,6 @@ public class DmaapService {
public List<TopicConfig> getActiveTopicConfigs() throws IOException {
log.debug("entering getActiveTopicConfigs()...");
List<String> allTopics = getTopics();
- if (allTopics == null) {
- return Collections.emptyList();
- }
List<TopicConfig> ret = new ArrayList<>(allTopics.size());
for (String topicStr : allTopics) {
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java
index 126e23b2..2a2f997e 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java
@@ -27,12 +27,14 @@ import java.util.List;
import javax.annotation.PostConstruct;
import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.json.JSONObject;
import org.json.XML;
import org.onap.datalake.feeder.config.ApplicationConfiguration;
import org.onap.datalake.feeder.dto.TopicConfig;
import org.onap.datalake.feeder.enumeration.DataFormat;
+import org.onap.datalake.feeder.util.JsonUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -91,15 +93,16 @@ public class StoreService {
for (Pair<Long, String> pair : messages) {
try {
docs.add(messageToJson(topicConfig, pair));
- } catch (IOException e) {
- log.error(pair.getRight(), e);
+ } catch (Exception e) {
+ //may see org.json.JSONException.
+ log.error("Error when converting this message to JSON: " + pair.getRight(), e);
}
}
saveJsons(topicConfig, docs, messages);
}
- private JSONObject messageToJson(TopicConfig topic, Pair<Long, String> pair) throws IOException {
+ private JSONObject messageToJson(TopicConfig topicConfig, Pair<Long, String> pair) throws IOException {
long timestamp = pair.getLeft();
String text = pair.getRight();
@@ -110,11 +113,11 @@ public class StoreService {
// log.debug("{} ={}", topicStr, text);
//}
- boolean storeRaw = topic.isSaveRaw();
+ boolean storeRaw = topicConfig.isSaveRaw();
JSONObject json = null;
- DataFormat dataFormat = topic.getDataFormat2();
+ DataFormat dataFormat = topicConfig.getDataFormat2();
switch (dataFormat) {
case JSON:
@@ -145,6 +148,20 @@ public class StoreService {
json.put(config.getRawDataLabel(), text);
}
+ if (StringUtils.isNotBlank(topicConfig.getAggregateArrayPath())) {
+ String[] paths = topicConfig.getAggregateArrayPath2();
+ for (String path : paths) {
+ JsonUtil.arrayAggregate(path, json);
+ }
+ }
+
+ if (StringUtils.isNotBlank(topicConfig.getFlattenArrayPath())) {
+ String[] paths = topicConfig.getFlattenArrayPath2();
+ for (String path : paths) {
+ JsonUtil.flattenArray(path, json);
+ }
+ }
+
return json;
}
@@ -167,9 +184,9 @@ public class StoreService {
}
public void flush() { //force flush all buffer
- hdfsService.flush();
+ hdfsService.flush();
}
-
+
public void flushStall() { //flush stall buffer
hdfsService.flushStall();
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicConfigPollingService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicConfigPollingService.java
index 80da55fd..58b27834 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicConfigPollingService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicConfigPollingService.java
@@ -102,6 +102,7 @@ public class TopicConfigPollingService implements Runnable {
Thread.sleep(config.getDmaapCheckNewTopicInterval());
} catch (InterruptedException e) {
log.error("Thread.sleep(config.getDmaapCheckNewTopicInterval())", e);
+ Thread.currentThread().interrupt();
}
try {
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java
index f0b000bc..64e8b8b1 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java
@@ -123,6 +123,8 @@ public class TopicService {
topic.setCorrelateClearedMessage(tConfig.isCorrelateClearedMessage());
topic.setDataFormat(tConfig.getDataFormat());
topic.setMessageIdPath(tConfig.getMessageIdPath());
+ topic.setAggregateArrayPath(tConfig.getAggregateArrayPath());
+ topic.setFlattenArrayPath(tConfig.getFlattenArrayPath());
if(tConfig.getSinkdbs() != null) {
for (String item : tConfig.getSinkdbs()) {
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/util/JsonUtil.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/util/JsonUtil.java
new file mode 100644
index 00000000..db4dcfae
--- /dev/null
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/util/JsonUtil.java
@@ -0,0 +1,158 @@
+/*
+* ============LICENSE_START=======================================================
+* ONAP : DCAE
+* ================================================================================
+* Copyright 2019 China Mobile
+*=================================================================================
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* ============LICENSE_END=========================================================
+*/
+
+package org.onap.datalake.feeder.util;
+
+import java.util.HashMap;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.json.JSONArray;
+import org.json.JSONObject;
+
+import lombok.Getter;
+
+/**
+ * utils for JSON
+ *
+ * @author Guobiao Mo
+ *
+ */
+public class JsonUtil {
+
+ @Getter
+ enum AggregateType {
+ ALL("aggregate"), AVEARGE("average"), SUM("sum"), MAX("max"), MIN("min"), COUNT("count");
+ private final String name;
+
+ AggregateType(String name) {
+ this.name = name;
+ }
+
+ public String getLabel(String path) {
+ return path.substring(path.lastIndexOf('/') + 1) + "_" + name;
+ }
+ }
+
+ public static void flattenArray(String path, JSONObject json) {
+ //path = /event/measurementsForVfScalingFields/astriMeasurement/astriDPMeasurementArray/astriInterface
+
+ int index1 = path.lastIndexOf('/');
+
+ String arrayPath = path.substring(0, index1);// /event/measurementsForVfScalingFields/astriMeasurement/astriDPMeasurementArray
+
+ Object obj;
+ try {
+ obj = json.query(arrayPath);
+ } catch (org.json.JSONPointerException e) {
+ return;
+ }
+ if (obj == null || !(obj instanceof JSONArray)) {
+ return;
+ }
+ Iterable<JSONObject> subjsonaArray = (Iterable<JSONObject>) obj;
+
+ String tagName = path.substring(index1 + 1);//astriInterface
+
+ int index2 = path.lastIndexOf('/', index1 - 1);
+ String arrayName = path.substring(index2 + 1, index1);//astriDPMeasurementArray
+
+ String parentPath = path.substring(0, index2);// /event/measurementsForVfScalingFields/astriMeasurement
+ JSONObject parent = (JSONObject) json.query(parentPath);
+
+ for (JSONObject element : subjsonaArray) {
+ String tagValue = element.get(tagName).toString();
+ String label = arrayName + "_" + tagName + "_" + tagValue;
+
+ parent.put(label, element);
+ }
+ }
+
+ /**
+ * json got modified.
+ *
+ * @param aggregateType
+ * @param path
+ * @param json
+ */
+ public static void arrayAggregate(String path, JSONObject json) {
+ HashMap<String, Double> sumHashMap = new HashMap<>();
+ HashMap<String, Double> maxHashMap = new HashMap<>();
+ HashMap<String, Double> minHashMap = new HashMap<>();
+
+ Object obj;
+ try {
+ obj = json.query(path);
+ } catch (org.json.JSONPointerException e) {
+ return;
+ }
+ if (obj == null || !(obj instanceof JSONArray)) {
+ return;
+ }
+ Iterable<JSONObject> subjsonaArray = (Iterable<JSONObject>) obj;
+
+ int count = 0;
+ for (JSONObject element : subjsonaArray) {
+ String[] names = JSONObject.getNames(element);
+ for (String name : names) {
+ Number value = element.optNumber(name);
+ if (value != null) {
+ double existing = sumHashMap.computeIfAbsent(name, k -> 0.0);
+ sumHashMap.put(name, existing + value.doubleValue());
+
+ existing = maxHashMap.computeIfAbsent(name, k -> Double.MIN_VALUE);
+ maxHashMap.put(name, Math.max(existing, value.doubleValue()));
+
+ existing = minHashMap.computeIfAbsent(name, k -> Double.MAX_VALUE);
+ minHashMap.put(name, Math.min(existing, value.doubleValue()));
+ }
+ }
+ count++;
+ }
+
+ if (count == 0) {
+ return;
+ }
+
+ JSONObject parentJson = (JSONObject) json.query(path.substring(0, path.lastIndexOf('/')));
+
+ //sum
+ JSONObject aggJson = new JSONObject(sumHashMap);
+ parentJson.put(AggregateType.SUM.getLabel(path), aggJson);
+
+ //AVEARGE
+ int c = count;//need to be Effectively Final
+ sumHashMap.replaceAll((k, v) -> v / c);
+ aggJson = new JSONObject(sumHashMap);
+ parentJson.put(AggregateType.AVEARGE.getLabel(path), aggJson);
+
+ //Count
+ parentJson.put(AggregateType.COUNT.getLabel(path), count);
+
+ //Max
+ aggJson = new JSONObject(maxHashMap);
+ parentJson.put(AggregateType.MAX.getLabel(path), aggJson);
+
+ //Min
+ aggJson = new JSONObject(minHashMap);
+ parentJson.put(AggregateType.MIN.getLabel(path), aggJson);
+
+ }
+
+}