summaryrefslogtreecommitdiffstats
path: root/components/datalake-handler/feeder/src/main/java/org
diff options
context:
space:
mode:
authorGuobiao Mo <guobiaomo@chinamobile.com>2019-06-04 15:19:45 -0700
committerGuobiao Mo <guobiaomo@chinamobile.com>2019-06-04 16:29:42 -0700
commitd1d558f4ec2358a592a2add59bea52ef7c1dced7 (patch)
treebd986ae1a5f9e9458ce00bfb74e4e8cbec3decc1 /components/datalake-handler/feeder/src/main/java/org
parentb2952abd55264de281a85e0ed1b6bd53211b1c91 (diff)
Flatten and Aggregate features in JSON array processing
Issue-ID: DCAEGEN2-1598 Change-Id: I9f563bcfa18285daf7b48878e8427bfdb1aff21f Signed-off-by: Guobiao Mo <guobiaomo@chinamobile.com>
Diffstat (limited to 'components/datalake-handler/feeder/src/main/java/org')
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java11
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java23
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java26
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java2
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/util/JsonUtil.java158
5 files changed, 214 insertions, 6 deletions
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java
index 30737162..acb48aef 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java
@@ -100,6 +100,15 @@ public class Topic {
@Column(name = "`message_id_path`")
private String messageIdPath;
+ //paths to the array that need aggregation, comma separated, example: "/event/measurementsForVfScalingFields/diskUsageArray,/event/measurementsForVfScalingFields/cpuUsageArray,/event/measurementsForVfScalingFields/vNicPerformanceArray"
+ @Column(name = "`aggregate_array_path`")
+ private String aggregateArrayPath;
+
+ //paths to the element in array that need flatten, this element is used as label, comma separated,
+ //example: "/event/measurementsForVfScalingFields/astriMeasurement/astriDPMeasurementArray/astriInterface,..."
+ @Column(name = "`flatten_array_path`")
+ private String flattenArrayPath;
+
public Topic() {
}
@@ -149,6 +158,8 @@ public class Topic {
tConfig.setSaveRaw(isSaveRaw());
tConfig.setCorrelateClearedMessage(isCorrelateClearedMessage());
tConfig.setMessageIdPath(getMessageIdPath());
+ tConfig.setAggregateArrayPath(getAggregateArrayPath());
+ tConfig.setFlattenArrayPath(getFlattenArrayPath());
tConfig.setTtl(getTtl());
Set<Db> topicDb = getDbs();
List<String> dbList = new ArrayList<>();
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java
index deaa0969..8dfe1b16 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java
@@ -51,6 +51,8 @@ public class TopicConfig {
private int ttl;
private boolean correlateClearedMessage;
private String messageIdPath;
+ private String aggregateArrayPath;
+ private String flattenArrayPath;
public DataFormat getDataFormat2() {
if (dataFormat != null) {
@@ -60,7 +62,6 @@ public class TopicConfig {
}
}
-
public boolean supportHdfs() {
return containDb("HDFS");
}
@@ -105,6 +106,26 @@ public class TopicConfig {
return id;
}
+ public String[] getAggregateArrayPath2() {
+ String[] ret = null;
+
+ if (StringUtils.isNotBlank(aggregateArrayPath)) {
+ ret = aggregateArrayPath.split(",");
+ }
+
+ return ret;
+ }
+
+ public String[] getFlattenArrayPath2() {
+ String[] ret = null;
+
+ if (StringUtils.isNotBlank(flattenArrayPath)) {
+ ret = flattenArrayPath.split(",");
+ }
+
+ return ret;
+ }
+
@Override
public String toString() {
return name;
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java
index 03faeb81..2a2f997e 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java
@@ -27,12 +27,14 @@ import java.util.List;
import javax.annotation.PostConstruct;
import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.json.JSONObject;
import org.json.XML;
import org.onap.datalake.feeder.config.ApplicationConfiguration;
import org.onap.datalake.feeder.dto.TopicConfig;
import org.onap.datalake.feeder.enumeration.DataFormat;
+import org.onap.datalake.feeder.util.JsonUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -100,7 +102,7 @@ public class StoreService {
saveJsons(topicConfig, docs, messages);
}
- private JSONObject messageToJson(TopicConfig topic, Pair<Long, String> pair) throws IOException {
+ private JSONObject messageToJson(TopicConfig topicConfig, Pair<Long, String> pair) throws IOException {
long timestamp = pair.getLeft();
String text = pair.getRight();
@@ -111,11 +113,11 @@ public class StoreService {
// log.debug("{} ={}", topicStr, text);
//}
- boolean storeRaw = topic.isSaveRaw();
+ boolean storeRaw = topicConfig.isSaveRaw();
JSONObject json = null;
- DataFormat dataFormat = topic.getDataFormat2();
+ DataFormat dataFormat = topicConfig.getDataFormat2();
switch (dataFormat) {
case JSON:
@@ -146,6 +148,20 @@ public class StoreService {
json.put(config.getRawDataLabel(), text);
}
+ if (StringUtils.isNotBlank(topicConfig.getAggregateArrayPath())) {
+ String[] paths = topicConfig.getAggregateArrayPath2();
+ for (String path : paths) {
+ JsonUtil.arrayAggregate(path, json);
+ }
+ }
+
+ if (StringUtils.isNotBlank(topicConfig.getFlattenArrayPath())) {
+ String[] paths = topicConfig.getFlattenArrayPath2();
+ for (String path : paths) {
+ JsonUtil.flattenArray(path, json);
+ }
+ }
+
return json;
}
@@ -168,9 +184,9 @@ public class StoreService {
}
public void flush() { //force flush all buffer
- hdfsService.flush();
+ hdfsService.flush();
}
-
+
public void flushStall() { //flush stall buffer
hdfsService.flushStall();
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java
index f0b000bc..64e8b8b1 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java
@@ -123,6 +123,8 @@ public class TopicService {
topic.setCorrelateClearedMessage(tConfig.isCorrelateClearedMessage());
topic.setDataFormat(tConfig.getDataFormat());
topic.setMessageIdPath(tConfig.getMessageIdPath());
+ topic.setAggregateArrayPath(tConfig.getAggregateArrayPath());
+ topic.setFlattenArrayPath(tConfig.getFlattenArrayPath());
if(tConfig.getSinkdbs() != null) {
for (String item : tConfig.getSinkdbs()) {
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/util/JsonUtil.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/util/JsonUtil.java
new file mode 100644
index 00000000..db4dcfae
--- /dev/null
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/util/JsonUtil.java
@@ -0,0 +1,158 @@
+/*
+* ============LICENSE_START=======================================================
+* ONAP : DCAE
+* ================================================================================
+* Copyright 2019 China Mobile
+*=================================================================================
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* ============LICENSE_END=========================================================
+*/
+
+package org.onap.datalake.feeder.util;
+
+import java.util.HashMap;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.json.JSONArray;
+import org.json.JSONObject;
+
+import lombok.Getter;
+
+/**
+ * utils for JSON
+ *
+ * @author Guobiao Mo
+ *
+ */
+public class JsonUtil {
+
+ @Getter
+ enum AggregateType {
+ ALL("aggregate"), AVEARGE("average"), SUM("sum"), MAX("max"), MIN("min"), COUNT("count");
+ private final String name;
+
+ AggregateType(String name) {
+ this.name = name;
+ }
+
+ public String getLabel(String path) {
+ return path.substring(path.lastIndexOf('/') + 1) + "_" + name;
+ }
+ }
+
+ public static void flattenArray(String path, JSONObject json) {
+ //path = /event/measurementsForVfScalingFields/astriMeasurement/astriDPMeasurementArray/astriInterface
+
+ int index1 = path.lastIndexOf('/');
+
+ String arrayPath = path.substring(0, index1);// /event/measurementsForVfScalingFields/astriMeasurement/astriDPMeasurementArray
+
+ Object obj;
+ try {
+ obj = json.query(arrayPath);
+ } catch (org.json.JSONPointerException e) {
+ return;
+ }
+ if (obj == null || !(obj instanceof JSONArray)) {
+ return;
+ }
+ Iterable<JSONObject> subjsonaArray = (Iterable<JSONObject>) obj;
+
+ String tagName = path.substring(index1 + 1);//astriInterface
+
+ int index2 = path.lastIndexOf('/', index1 - 1);
+ String arrayName = path.substring(index2 + 1, index1);//astriDPMeasurementArray
+
+ String parentPath = path.substring(0, index2);// /event/measurementsForVfScalingFields/astriMeasurement
+ JSONObject parent = (JSONObject) json.query(parentPath);
+
+ for (JSONObject element : subjsonaArray) {
+ String tagValue = element.get(tagName).toString();
+ String label = arrayName + "_" + tagName + "_" + tagValue;
+
+ parent.put(label, element);
+ }
+ }
+
+ /**
+ * json got modified.
+ *
+ * @param aggregateType
+ * @param path
+ * @param json
+ */
+ public static void arrayAggregate(String path, JSONObject json) {
+ HashMap<String, Double> sumHashMap = new HashMap<>();
+ HashMap<String, Double> maxHashMap = new HashMap<>();
+ HashMap<String, Double> minHashMap = new HashMap<>();
+
+ Object obj;
+ try {
+ obj = json.query(path);
+ } catch (org.json.JSONPointerException e) {
+ return;
+ }
+ if (obj == null || !(obj instanceof JSONArray)) {
+ return;
+ }
+ Iterable<JSONObject> subjsonaArray = (Iterable<JSONObject>) obj;
+
+ int count = 0;
+ for (JSONObject element : subjsonaArray) {
+ String[] names = JSONObject.getNames(element);
+ for (String name : names) {
+ Number value = element.optNumber(name);
+ if (value != null) {
+ double existing = sumHashMap.computeIfAbsent(name, k -> 0.0);
+ sumHashMap.put(name, existing + value.doubleValue());
+
+ existing = maxHashMap.computeIfAbsent(name, k -> Double.MIN_VALUE);
+ maxHashMap.put(name, Math.max(existing, value.doubleValue()));
+
+ existing = minHashMap.computeIfAbsent(name, k -> Double.MAX_VALUE);
+ minHashMap.put(name, Math.min(existing, value.doubleValue()));
+ }
+ }
+ count++;
+ }
+
+ if (count == 0) {
+ return;
+ }
+
+ JSONObject parentJson = (JSONObject) json.query(path.substring(0, path.lastIndexOf('/')));
+
+ //sum
+ JSONObject aggJson = new JSONObject(sumHashMap);
+ parentJson.put(AggregateType.SUM.getLabel(path), aggJson);
+
+ //AVEARGE
+ int c = count;//need to be Effectively Final
+ sumHashMap.replaceAll((k, v) -> v / c);
+ aggJson = new JSONObject(sumHashMap);
+ parentJson.put(AggregateType.AVEARGE.getLabel(path), aggJson);
+
+ //Count
+ parentJson.put(AggregateType.COUNT.getLabel(path), count);
+
+ //Max
+ aggJson = new JSONObject(maxHashMap);
+ parentJson.put(AggregateType.MAX.getLabel(path), aggJson);
+
+ //Min
+ aggJson = new JSONObject(minHashMap);
+ parentJson.put(AggregateType.MIN.getLabel(path), aggJson);
+
+ }
+
+}