From d1d558f4ec2358a592a2add59bea52ef7c1dced7 Mon Sep 17 00:00:00 2001 From: Guobiao Mo Date: Tue, 4 Jun 2019 15:19:45 -0700 Subject: Flatten and Aggregate features in JSON array processing Issue-ID: DCAEGEN2-1598 Change-Id: I9f563bcfa18285daf7b48878e8427bfdb1aff21f Signed-off-by: Guobiao Mo --- .../org/onap/datalake/feeder/domain/Topic.java | 11 ++ .../org/onap/datalake/feeder/dto/TopicConfig.java | 23 ++- .../onap/datalake/feeder/service/StoreService.java | 26 +++- .../onap/datalake/feeder/service/TopicService.java | 2 + .../org/onap/datalake/feeder/util/JsonUtil.java | 158 +++++++++++++++++++++ 5 files changed, 214 insertions(+), 6 deletions(-) create mode 100644 components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/util/JsonUtil.java (limited to 'components/datalake-handler/feeder/src/main/java/org') diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java index 30737162..acb48aef 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java @@ -100,6 +100,15 @@ public class Topic { @Column(name = "`message_id_path`") private String messageIdPath; + //paths to the array that need aggregation, comma separated, example: "/event/measurementsForVfScalingFields/diskUsageArray,/event/measurementsForVfScalingFields/cpuUsageArray,/event/measurementsForVfScalingFields/vNicPerformanceArray" + @Column(name = "`aggregate_array_path`") + private String aggregateArrayPath; + + //paths to the element in array that need flatten, this element is used as label, comma separated, + //example: "/event/measurementsForVfScalingFields/astriMeasurement/astriDPMeasurementArray/astriInterface,..." + @Column(name = "`flatten_array_path`") + private String flattenArrayPath; + public Topic() { } @@ -149,6 +158,8 @@ public class Topic { tConfig.setSaveRaw(isSaveRaw()); tConfig.setCorrelateClearedMessage(isCorrelateClearedMessage()); tConfig.setMessageIdPath(getMessageIdPath()); + tConfig.setAggregateArrayPath(getAggregateArrayPath()); + tConfig.setFlattenArrayPath(getFlattenArrayPath()); tConfig.setTtl(getTtl()); Set topicDb = getDbs(); List dbList = new ArrayList<>(); diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java index deaa0969..8dfe1b16 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java @@ -51,6 +51,8 @@ public class TopicConfig { private int ttl; private boolean correlateClearedMessage; private String messageIdPath; + private String aggregateArrayPath; + private String flattenArrayPath; public DataFormat getDataFormat2() { if (dataFormat != null) { @@ -60,7 +62,6 @@ public class TopicConfig { } } - public boolean supportHdfs() { return containDb("HDFS"); } @@ -105,6 +106,26 @@ public class TopicConfig { return id; } + public String[] getAggregateArrayPath2() { + String[] ret = null; + + if (StringUtils.isNotBlank(aggregateArrayPath)) { + ret = aggregateArrayPath.split(","); + } + + return ret; + } + + public String[] getFlattenArrayPath2() { + String[] ret = null; + + if (StringUtils.isNotBlank(flattenArrayPath)) { + ret = flattenArrayPath.split(","); + } + + return ret; + } + @Override public String toString() { return name; diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java index 03faeb81..2a2f997e 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java @@ -27,12 +27,14 @@ import java.util.List; import javax.annotation.PostConstruct; import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Pair; import org.json.JSONObject; import org.json.XML; import org.onap.datalake.feeder.config.ApplicationConfiguration; import org.onap.datalake.feeder.dto.TopicConfig; import org.onap.datalake.feeder.enumeration.DataFormat; +import org.onap.datalake.feeder.util.JsonUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -100,7 +102,7 @@ public class StoreService { saveJsons(topicConfig, docs, messages); } - private JSONObject messageToJson(TopicConfig topic, Pair pair) throws IOException { + private JSONObject messageToJson(TopicConfig topicConfig, Pair pair) throws IOException { long timestamp = pair.getLeft(); String text = pair.getRight(); @@ -111,11 +113,11 @@ public class StoreService { // log.debug("{} ={}", topicStr, text); //} - boolean storeRaw = topic.isSaveRaw(); + boolean storeRaw = topicConfig.isSaveRaw(); JSONObject json = null; - DataFormat dataFormat = topic.getDataFormat2(); + DataFormat dataFormat = topicConfig.getDataFormat2(); switch (dataFormat) { case JSON: @@ -146,6 +148,20 @@ public class StoreService { json.put(config.getRawDataLabel(), text); } + if (StringUtils.isNotBlank(topicConfig.getAggregateArrayPath())) { + String[] paths = topicConfig.getAggregateArrayPath2(); + for (String path : paths) { + JsonUtil.arrayAggregate(path, json); + } + } + + if (StringUtils.isNotBlank(topicConfig.getFlattenArrayPath())) { + String[] paths = topicConfig.getFlattenArrayPath2(); + for (String path : paths) { + JsonUtil.flattenArray(path, json); + } + } + return json; } @@ -168,9 +184,9 @@ public class StoreService { } public void flush() { //force flush all buffer - hdfsService.flush(); + hdfsService.flush(); } - + public void flushStall() { //flush stall buffer hdfsService.flushStall(); } diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java index f0b000bc..64e8b8b1 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java @@ -123,6 +123,8 @@ public class TopicService { topic.setCorrelateClearedMessage(tConfig.isCorrelateClearedMessage()); topic.setDataFormat(tConfig.getDataFormat()); topic.setMessageIdPath(tConfig.getMessageIdPath()); + topic.setAggregateArrayPath(tConfig.getAggregateArrayPath()); + topic.setFlattenArrayPath(tConfig.getFlattenArrayPath()); if(tConfig.getSinkdbs() != null) { for (String item : tConfig.getSinkdbs()) { diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/util/JsonUtil.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/util/JsonUtil.java new file mode 100644 index 00000000..db4dcfae --- /dev/null +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/util/JsonUtil.java @@ -0,0 +1,158 @@ +/* +* ============LICENSE_START======================================================= +* ONAP : DCAE +* ================================================================================ +* Copyright 2019 China Mobile +*================================================================================= +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* ============LICENSE_END========================================================= +*/ + +package org.onap.datalake.feeder.util; + +import java.util.HashMap; + +import org.apache.commons.collections.CollectionUtils; +import org.json.JSONArray; +import org.json.JSONObject; + +import lombok.Getter; + +/** + * utils for JSON + * + * @author Guobiao Mo + * + */ +public class JsonUtil { + + @Getter + enum AggregateType { + ALL("aggregate"), AVEARGE("average"), SUM("sum"), MAX("max"), MIN("min"), COUNT("count"); + private final String name; + + AggregateType(String name) { + this.name = name; + } + + public String getLabel(String path) { + return path.substring(path.lastIndexOf('/') + 1) + "_" + name; + } + } + + public static void flattenArray(String path, JSONObject json) { + //path = /event/measurementsForVfScalingFields/astriMeasurement/astriDPMeasurementArray/astriInterface + + int index1 = path.lastIndexOf('/'); + + String arrayPath = path.substring(0, index1);// /event/measurementsForVfScalingFields/astriMeasurement/astriDPMeasurementArray + + Object obj; + try { + obj = json.query(arrayPath); + } catch (org.json.JSONPointerException e) { + return; + } + if (obj == null || !(obj instanceof JSONArray)) { + return; + } + Iterable subjsonaArray = (Iterable) obj; + + String tagName = path.substring(index1 + 1);//astriInterface + + int index2 = path.lastIndexOf('/', index1 - 1); + String arrayName = path.substring(index2 + 1, index1);//astriDPMeasurementArray + + String parentPath = path.substring(0, index2);// /event/measurementsForVfScalingFields/astriMeasurement + JSONObject parent = (JSONObject) json.query(parentPath); + + for (JSONObject element : subjsonaArray) { + String tagValue = element.get(tagName).toString(); + String label = arrayName + "_" + tagName + "_" + tagValue; + + parent.put(label, element); + } + } + + /** + * json got modified. + * + * @param aggregateType + * @param path + * @param json + */ + public static void arrayAggregate(String path, JSONObject json) { + HashMap sumHashMap = new HashMap<>(); + HashMap maxHashMap = new HashMap<>(); + HashMap minHashMap = new HashMap<>(); + + Object obj; + try { + obj = json.query(path); + } catch (org.json.JSONPointerException e) { + return; + } + if (obj == null || !(obj instanceof JSONArray)) { + return; + } + Iterable subjsonaArray = (Iterable) obj; + + int count = 0; + for (JSONObject element : subjsonaArray) { + String[] names = JSONObject.getNames(element); + for (String name : names) { + Number value = element.optNumber(name); + if (value != null) { + double existing = sumHashMap.computeIfAbsent(name, k -> 0.0); + sumHashMap.put(name, existing + value.doubleValue()); + + existing = maxHashMap.computeIfAbsent(name, k -> Double.MIN_VALUE); + maxHashMap.put(name, Math.max(existing, value.doubleValue())); + + existing = minHashMap.computeIfAbsent(name, k -> Double.MAX_VALUE); + minHashMap.put(name, Math.min(existing, value.doubleValue())); + } + } + count++; + } + + if (count == 0) { + return; + } + + JSONObject parentJson = (JSONObject) json.query(path.substring(0, path.lastIndexOf('/'))); + + //sum + JSONObject aggJson = new JSONObject(sumHashMap); + parentJson.put(AggregateType.SUM.getLabel(path), aggJson); + + //AVEARGE + int c = count;//need to be Effectively Final + sumHashMap.replaceAll((k, v) -> v / c); + aggJson = new JSONObject(sumHashMap); + parentJson.put(AggregateType.AVEARGE.getLabel(path), aggJson); + + //Count + parentJson.put(AggregateType.COUNT.getLabel(path), count); + + //Max + aggJson = new JSONObject(maxHashMap); + parentJson.put(AggregateType.MAX.getLabel(path), aggJson); + + //Min + aggJson = new JSONObject(minHashMap); + parentJson.put(AggregateType.MIN.getLabel(path), aggJson); + + } + +} -- cgit 1.2.3-korg