diff options
Diffstat (limited to 'components/datalake-handler/feeder/src/main')
11 files changed, 261 insertions, 16 deletions
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/FeederController.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/FeederController.java index 4fc9b7b6..6a44c4f2 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/FeederController.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/FeederController.java @@ -86,7 +86,7 @@ public class FeederController { @ApiOperation(value="Retrieve feeder status.") public String status() { String status = "Feeder is running: "+pullService.isRunning(); - log.info("senting feeder status ...");//TODO we can send what topics are monitored, how many messages are sent, etc. + log.info("sending feeder status ...");//TODO we can send what topics are monitored, how many messages are sent, etc. return "{\"version\": \""+config.getDatalakeVersion()+"\", \"running\": "+pullService.isRunning()+"}"; } diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java index 30737162..acb48aef 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java @@ -100,6 +100,15 @@ public class Topic { @Column(name = "`message_id_path`") private String messageIdPath; + //paths to the array that need aggregation, comma separated, example: "/event/measurementsForVfScalingFields/diskUsageArray,/event/measurementsForVfScalingFields/cpuUsageArray,/event/measurementsForVfScalingFields/vNicPerformanceArray" + @Column(name = "`aggregate_array_path`") + private String aggregateArrayPath; + + //paths to the element in array that need flatten, this element is used as label, comma separated, + //example: "/event/measurementsForVfScalingFields/astriMeasurement/astriDPMeasurementArray/astriInterface,..." + @Column(name = "`flatten_array_path`") + private String flattenArrayPath; + public Topic() { } @@ -149,6 +158,8 @@ public class Topic { tConfig.setSaveRaw(isSaveRaw()); tConfig.setCorrelateClearedMessage(isCorrelateClearedMessage()); tConfig.setMessageIdPath(getMessageIdPath()); + tConfig.setAggregateArrayPath(getAggregateArrayPath()); + tConfig.setFlattenArrayPath(getFlattenArrayPath()); tConfig.setTtl(getTtl()); Set<Db> topicDb = getDbs(); List<String> dbList = new ArrayList<>(); diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java index deaa0969..8dfe1b16 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java @@ -51,6 +51,8 @@ public class TopicConfig { private int ttl; private boolean correlateClearedMessage; private String messageIdPath; + private String aggregateArrayPath; + private String flattenArrayPath; public DataFormat getDataFormat2() { if (dataFormat != null) { @@ -60,7 +62,6 @@ public class TopicConfig { } } - public boolean supportHdfs() { return containDb("HDFS"); } @@ -105,6 +106,26 @@ public class TopicConfig { return id; } + public String[] getAggregateArrayPath2() { + String[] ret = null; + + if (StringUtils.isNotBlank(aggregateArrayPath)) { + ret = aggregateArrayPath.split(","); + } + + return ret; + } + + public String[] getFlattenArrayPath2() { + String[] ret = null; + + if (StringUtils.isNotBlank(flattenArrayPath)) { + ret = flattenArrayPath.split(","); + } + + return ret; + } + @Override public String toString() { return name; diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java index 2274ce99..3be5be6e 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java @@ -58,11 +58,13 @@ public class DmaapService { @Autowired private TopicService topicService; - ZooKeeper zk; + private ZooKeeper zk; @PreDestroy public void cleanUp() throws InterruptedException { - zk.close(); + if (zk != null) { + zk.close(); + } } @PostConstruct @@ -71,6 +73,7 @@ public class DmaapService { } //get all topic names from Zookeeper + //This method returns empty list if nothing found. public List<String> getTopics() { try { if (zk == null) { @@ -84,7 +87,7 @@ public class DmaapService { return topics; } catch (Exception e) { zk = null; - log.error("Can not get topic list from Zookeeper, for testing, going to use hard coded topic list.", e); + log.error("Can not get topic list from Zookeeper, return empty list.", e); return Collections.emptyList(); } } @@ -119,9 +122,6 @@ public class DmaapService { public List<TopicConfig> getActiveTopicConfigs() throws IOException { log.debug("entering getActiveTopicConfigs()..."); List<String> allTopics = getTopics(); - if (allTopics == null) { - return Collections.emptyList(); - } List<TopicConfig> ret = new ArrayList<>(allTopics.size()); for (String topicStr : allTopics) { diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java index 126e23b2..2a2f997e 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java @@ -27,12 +27,14 @@ import java.util.List; import javax.annotation.PostConstruct; import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Pair; import org.json.JSONObject; import org.json.XML; import org.onap.datalake.feeder.config.ApplicationConfiguration; import org.onap.datalake.feeder.dto.TopicConfig; import org.onap.datalake.feeder.enumeration.DataFormat; +import org.onap.datalake.feeder.util.JsonUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -91,15 +93,16 @@ public class StoreService { for (Pair<Long, String> pair : messages) { try { docs.add(messageToJson(topicConfig, pair)); - } catch (IOException e) { - log.error(pair.getRight(), e); + } catch (Exception e) { + //may see org.json.JSONException. + log.error("Error when converting this message to JSON: " + pair.getRight(), e); } } saveJsons(topicConfig, docs, messages); } - private JSONObject messageToJson(TopicConfig topic, Pair<Long, String> pair) throws IOException { + private JSONObject messageToJson(TopicConfig topicConfig, Pair<Long, String> pair) throws IOException { long timestamp = pair.getLeft(); String text = pair.getRight(); @@ -110,11 +113,11 @@ public class StoreService { // log.debug("{} ={}", topicStr, text); //} - boolean storeRaw = topic.isSaveRaw(); + boolean storeRaw = topicConfig.isSaveRaw(); JSONObject json = null; - DataFormat dataFormat = topic.getDataFormat2(); + DataFormat dataFormat = topicConfig.getDataFormat2(); switch (dataFormat) { case JSON: @@ -145,6 +148,20 @@ public class StoreService { json.put(config.getRawDataLabel(), text); } + if (StringUtils.isNotBlank(topicConfig.getAggregateArrayPath())) { + String[] paths = topicConfig.getAggregateArrayPath2(); + for (String path : paths) { + JsonUtil.arrayAggregate(path, json); + } + } + + if (StringUtils.isNotBlank(topicConfig.getFlattenArrayPath())) { + String[] paths = topicConfig.getFlattenArrayPath2(); + for (String path : paths) { + JsonUtil.flattenArray(path, json); + } + } + return json; } @@ -167,9 +184,9 @@ public class StoreService { } public void flush() { //force flush all buffer - hdfsService.flush(); + hdfsService.flush(); } - + public void flushStall() { //flush stall buffer hdfsService.flushStall(); } diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicConfigPollingService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicConfigPollingService.java index 80da55fd..58b27834 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicConfigPollingService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicConfigPollingService.java @@ -102,6 +102,7 @@ public class TopicConfigPollingService implements Runnable { Thread.sleep(config.getDmaapCheckNewTopicInterval()); } catch (InterruptedException e) { log.error("Thread.sleep(config.getDmaapCheckNewTopicInterval())", e); + Thread.currentThread().interrupt(); } try { diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java index f0b000bc..64e8b8b1 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java @@ -123,6 +123,8 @@ public class TopicService { topic.setCorrelateClearedMessage(tConfig.isCorrelateClearedMessage()); topic.setDataFormat(tConfig.getDataFormat()); topic.setMessageIdPath(tConfig.getMessageIdPath()); + topic.setAggregateArrayPath(tConfig.getAggregateArrayPath()); + topic.setFlattenArrayPath(tConfig.getFlattenArrayPath()); if(tConfig.getSinkdbs() != null) { for (String item : tConfig.getSinkdbs()) { diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/util/JsonUtil.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/util/JsonUtil.java new file mode 100644 index 00000000..db4dcfae --- /dev/null +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/util/JsonUtil.java @@ -0,0 +1,158 @@ +/* +* ============LICENSE_START======================================================= +* ONAP : DCAE +* ================================================================================ +* Copyright 2019 China Mobile +*================================================================================= +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* ============LICENSE_END========================================================= +*/ + +package org.onap.datalake.feeder.util; + +import java.util.HashMap; + +import org.apache.commons.collections.CollectionUtils; +import org.json.JSONArray; +import org.json.JSONObject; + +import lombok.Getter; + +/** + * utils for JSON + * + * @author Guobiao Mo + * + */ +public class JsonUtil { + + @Getter + enum AggregateType { + ALL("aggregate"), AVEARGE("average"), SUM("sum"), MAX("max"), MIN("min"), COUNT("count"); + private final String name; + + AggregateType(String name) { + this.name = name; + } + + public String getLabel(String path) { + return path.substring(path.lastIndexOf('/') + 1) + "_" + name; + } + } + + public static void flattenArray(String path, JSONObject json) { + //path = /event/measurementsForVfScalingFields/astriMeasurement/astriDPMeasurementArray/astriInterface + + int index1 = path.lastIndexOf('/'); + + String arrayPath = path.substring(0, index1);// /event/measurementsForVfScalingFields/astriMeasurement/astriDPMeasurementArray + + Object obj; + try { + obj = json.query(arrayPath); + } catch (org.json.JSONPointerException e) { + return; + } + if (obj == null || !(obj instanceof JSONArray)) { + return; + } + Iterable<JSONObject> subjsonaArray = (Iterable<JSONObject>) obj; + + String tagName = path.substring(index1 + 1);//astriInterface + + int index2 = path.lastIndexOf('/', index1 - 1); + String arrayName = path.substring(index2 + 1, index1);//astriDPMeasurementArray + + String parentPath = path.substring(0, index2);// /event/measurementsForVfScalingFields/astriMeasurement + JSONObject parent = (JSONObject) json.query(parentPath); + + for (JSONObject element : subjsonaArray) { + String tagValue = element.get(tagName).toString(); + String label = arrayName + "_" + tagName + "_" + tagValue; + + parent.put(label, element); + } + } + + /** + * json got modified. + * + * @param aggregateType + * @param path + * @param json + */ + public static void arrayAggregate(String path, JSONObject json) { + HashMap<String, Double> sumHashMap = new HashMap<>(); + HashMap<String, Double> maxHashMap = new HashMap<>(); + HashMap<String, Double> minHashMap = new HashMap<>(); + + Object obj; + try { + obj = json.query(path); + } catch (org.json.JSONPointerException e) { + return; + } + if (obj == null || !(obj instanceof JSONArray)) { + return; + } + Iterable<JSONObject> subjsonaArray = (Iterable<JSONObject>) obj; + + int count = 0; + for (JSONObject element : subjsonaArray) { + String[] names = JSONObject.getNames(element); + for (String name : names) { + Number value = element.optNumber(name); + if (value != null) { + double existing = sumHashMap.computeIfAbsent(name, k -> 0.0); + sumHashMap.put(name, existing + value.doubleValue()); + + existing = maxHashMap.computeIfAbsent(name, k -> Double.MIN_VALUE); + maxHashMap.put(name, Math.max(existing, value.doubleValue())); + + existing = minHashMap.computeIfAbsent(name, k -> Double.MAX_VALUE); + minHashMap.put(name, Math.min(existing, value.doubleValue())); + } + } + count++; + } + + if (count == 0) { + return; + } + + JSONObject parentJson = (JSONObject) json.query(path.substring(0, path.lastIndexOf('/'))); + + //sum + JSONObject aggJson = new JSONObject(sumHashMap); + parentJson.put(AggregateType.SUM.getLabel(path), aggJson); + + //AVEARGE + int c = count;//need to be Effectively Final + sumHashMap.replaceAll((k, v) -> v / c); + aggJson = new JSONObject(sumHashMap); + parentJson.put(AggregateType.AVEARGE.getLabel(path), aggJson); + + //Count + parentJson.put(AggregateType.COUNT.getLabel(path), count); + + //Max + aggJson = new JSONObject(maxHashMap); + parentJson.put(AggregateType.MAX.getLabel(path), aggJson); + + //Min + aggJson = new JSONObject(minHashMap); + parentJson.put(AggregateType.MIN.getLabel(path), aggJson); + + } + +} diff --git a/components/datalake-handler/feeder/src/main/resources/application.properties b/components/datalake-handler/feeder/src/main/resources/application.properties index ed167f33..faf27583 100644 --- a/components/datalake-handler/feeder/src/main/resources/application.properties +++ b/components/datalake-handler/feeder/src/main/resources/application.properties @@ -42,7 +42,7 @@ dmaapCheckNewTopicInterval=60000 kafkaConsumerCount=3 #####################Elasticsearch -elasticsearchType=doc +elasticsearchType=_doc #####################HDFS hdfsBufferSize=4096 diff --git a/components/datalake-handler/feeder/src/main/resources/elasticsearch/mappings/readme.txt b/components/datalake-handler/feeder/src/main/resources/elasticsearch/mappings/readme.txt new file mode 100644 index 00000000..88513539 --- /dev/null +++ b/components/datalake-handler/feeder/src/main/resources/elasticsearch/mappings/readme.txt @@ -0,0 +1,4 @@ +before creating index
+PUT http://dl_es:9200/unauthenticated.ves_measurement_output
+application/json
+body from unauthenticated.ves_measurement_output.json
\ No newline at end of file diff --git a/components/datalake-handler/feeder/src/main/resources/elasticsearch/mappings/unauthenticated.ves_measurement_output.json b/components/datalake-handler/feeder/src/main/resources/elasticsearch/mappings/unauthenticated.ves_measurement_output.json new file mode 100644 index 00000000..9a53b70c --- /dev/null +++ b/components/datalake-handler/feeder/src/main/resources/elasticsearch/mappings/unauthenticated.ves_measurement_output.json @@ -0,0 +1,31 @@ +{
+ "mappings": {
+ "properties": {
+ "datalake_ts_": {
+ "type": "date",
+ "format": "epoch_millis"
+ },
+ "event.commonEventHeader.internalHeaderFields.collectorTimeStamp": {
+ "type": "date",
+ "format":"EEE, MM dd yyyy HH:mm:ss z"
+ },
+ "event.commonEventHeader.startEpochMicrosec": {
+ "type": "date",
+ "format": "epoch_millis"
+ },
+ "event.commonEventHeader.lastEpochMicrosec": {
+ "type": "date",
+ "format": "epoch_millis"
+ },
+ "event.measurementsForVfScalingFields.diskUsageArray": {
+ "type": "nested"
+ },
+ "event.measurementsForVfScalingFields.cpuUsageArray": {
+ "type": "nested"
+ },
+ "event.measurementsForVfScalingFields.vNicPerformanceArray": {
+ "type": "nested"
+ }
+ }
+ }
+}
\ No newline at end of file |