diff options
author | Guobiao Mo <guobiaomo@chinamobile.com> | 2019-06-04 15:19:45 -0700 |
---|---|---|
committer | Guobiao Mo <guobiaomo@chinamobile.com> | 2019-06-04 16:29:42 -0700 |
commit | d1d558f4ec2358a592a2add59bea52ef7c1dced7 (patch) | |
tree | bd986ae1a5f9e9458ce00bfb74e4e8cbec3decc1 /components/datalake-handler/feeder/src | |
parent | b2952abd55264de281a85e0ed1b6bd53211b1c91 (diff) |
Flatten and Aggregate features in JSON array processing
Issue-ID: DCAEGEN2-1598
Change-Id: I9f563bcfa18285daf7b48878e8427bfdb1aff21f
Signed-off-by: Guobiao Mo <guobiaomo@chinamobile.com>
Diffstat (limited to 'components/datalake-handler/feeder/src')
14 files changed, 296 insertions, 18 deletions
diff --git a/components/datalake-handler/feeder/src/assembly/scripts/init_db.sql b/components/datalake-handler/feeder/src/assembly/scripts/init_db.sql index fd9b3dc3..ad142dcf 100644 --- a/components/datalake-handler/feeder/src/assembly/scripts/init_db.sql +++ b/components/datalake-handler/feeder/src/assembly/scripts/init_db.sql @@ -7,6 +7,8 @@ CREATE TABLE `topic` ( `enabled` bit(1) DEFAULT 0,
`login` varchar(255) DEFAULT NULL,
`message_id_path` varchar(255) DEFAULT NULL,
+ `aggregate_array_path` varchar(2000) DEFAULT NULL,
+ `flatten_array_path` varchar(2000) DEFAULT NULL,
`pass` varchar(255) DEFAULT NULL,
`save_raw` bit(1) DEFAULT NULL,
`ttl` int(11) DEFAULT NULL,
@@ -86,6 +88,12 @@ insert into db (`name`,`host`,`login`) values ('HDFS','dlhdfs','dl'); -- in production, default enabled should be off
insert into `topic`(`name`,`enabled`,`save_raw`,`ttl`,`data_format`) values ('_DL_DEFAULT_',1,0,3650,'JSON');
+insert into `topic`(`name`,correlate_cleared_message,`enabled`, message_id_path,`data_format`) values ('unauthenticated.SEC_FAULT_OUTPUT',1,1,'/event/commonEventHeader/eventName,/event/commonEventHeader/reportingEntityName,/event/faultFields/specificProblem,/event/commonEventHeader/eventId','JSON');
+insert into `topic`(`name`,`enabled`, aggregate_array_path,flatten_array_path,`data_format`)
+values ('unauthenticated.VES_MEASUREMENT_OUTPUT',1,
+'/event/measurementsForVfScalingFields/diskUsageArray,/event/measurementsForVfScalingFields/cpuUsageArray,/event/measurementsForVfScalingFields/vNicPerformanceArray',
+'/event/measurementsForVfScalingFields/astriMeasurement/astriDPMeasurementArray/astriInterface',
+'JSON');
insert into `map_db_topic`(`db_name`,`topic_name`) values ('Couchbase','_DL_DEFAULT_');
insert into `map_db_topic`(`db_name`,`topic_name`) values ('Elasticsearch','_DL_DEFAULT_');
@@ -93,14 +101,17 @@ insert into `map_db_topic`(`db_name`,`topic_name`) values ('MongoDB','_DL_DEFAUL insert into `map_db_topic`(`db_name`,`topic_name`) values ('Druid','_DL_DEFAULT_');
insert into `map_db_topic`(`db_name`,`topic_name`) values ('HDFS','_DL_DEFAULT_');
-insert into `topic`(`name`,correlate_cleared_message,`enabled`, message_id_path,`data_format`) values ('unauthenticated.SEC_FAULT_OUTPUT',1,1,'/event/commonEventHeader/eventName,/event/commonEventHeader/reportingEntityName,/event/faultFields/specificProblem,/event/commonEventHeader/eventId','JSON');
-
insert into `map_db_topic`(`db_name`,`topic_name`) values ('Couchbase','unauthenticated.SEC_FAULT_OUTPUT');
insert into `map_db_topic`(`db_name`,`topic_name`) values ('Elasticsearch','unauthenticated.SEC_FAULT_OUTPUT');
insert into `map_db_topic`(`db_name`,`topic_name`) values ('MongoDB','unauthenticated.SEC_FAULT_OUTPUT');
insert into `map_db_topic`(`db_name`,`topic_name`) values ('Druid','unauthenticated.SEC_FAULT_OUTPUT');
insert into `map_db_topic`(`db_name`,`topic_name`) values ('HDFS','unauthenticated.SEC_FAULT_OUTPUT');
+insert into `map_db_topic`(`db_name`,`topic_name`) values ('Couchbase','unauthenticated.VES_MEASUREMENT_OUTPUT');
+insert into `map_db_topic`(`db_name`,`topic_name`) values ('Elasticsearch','unauthenticated.VES_MEASUREMENT_OUTPUT');
+insert into `map_db_topic`(`db_name`,`topic_name`) values ('MongoDB','unauthenticated.VES_MEASUREMENT_OUTPUT');
+insert into `map_db_topic`(`db_name`,`topic_name`) values ('Druid','unauthenticated.VES_MEASUREMENT_OUTPUT');
+insert into `map_db_topic`(`db_name`,`topic_name`) values ('HDFS','unauthenticated.VES_MEASUREMENT_OUTPUT');
insert into portal (`name`,`related_db`, host) values ('Kibana', 'Elasticsearch', 'dl_es');
insert into portal (`name`,`related_db`) values ('Elasticsearch', 'Elasticsearch');
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java index 30737162..acb48aef 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java @@ -100,6 +100,15 @@ public class Topic { @Column(name = "`message_id_path`") private String messageIdPath; + //paths to the array that need aggregation, comma separated, example: "/event/measurementsForVfScalingFields/diskUsageArray,/event/measurementsForVfScalingFields/cpuUsageArray,/event/measurementsForVfScalingFields/vNicPerformanceArray" + @Column(name = "`aggregate_array_path`") + private String aggregateArrayPath; + + //paths to the element in array that need flatten, this element is used as label, comma separated, + //example: "/event/measurementsForVfScalingFields/astriMeasurement/astriDPMeasurementArray/astriInterface,..." + @Column(name = "`flatten_array_path`") + private String flattenArrayPath; + public Topic() { } @@ -149,6 +158,8 @@ public class Topic { tConfig.setSaveRaw(isSaveRaw()); tConfig.setCorrelateClearedMessage(isCorrelateClearedMessage()); tConfig.setMessageIdPath(getMessageIdPath()); + tConfig.setAggregateArrayPath(getAggregateArrayPath()); + tConfig.setFlattenArrayPath(getFlattenArrayPath()); tConfig.setTtl(getTtl()); Set<Db> topicDb = getDbs(); List<String> dbList = new ArrayList<>(); diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java index deaa0969..8dfe1b16 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java @@ -51,6 +51,8 @@ public class TopicConfig { private int ttl; private boolean correlateClearedMessage; private String messageIdPath; + private String aggregateArrayPath; + private String flattenArrayPath; public DataFormat getDataFormat2() { if (dataFormat != null) { @@ -60,7 +62,6 @@ public class TopicConfig { } } - public boolean supportHdfs() { return containDb("HDFS"); } @@ -105,6 +106,26 @@ public class TopicConfig { return id; } + public String[] getAggregateArrayPath2() { + String[] ret = null; + + if (StringUtils.isNotBlank(aggregateArrayPath)) { + ret = aggregateArrayPath.split(","); + } + + return ret; + } + + public String[] getFlattenArrayPath2() { + String[] ret = null; + + if (StringUtils.isNotBlank(flattenArrayPath)) { + ret = flattenArrayPath.split(","); + } + + return ret; + } + @Override public String toString() { return name; diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java index 03faeb81..2a2f997e 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java @@ -27,12 +27,14 @@ import java.util.List; import javax.annotation.PostConstruct; import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Pair; import org.json.JSONObject; import org.json.XML; import org.onap.datalake.feeder.config.ApplicationConfiguration; import org.onap.datalake.feeder.dto.TopicConfig; import org.onap.datalake.feeder.enumeration.DataFormat; +import org.onap.datalake.feeder.util.JsonUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -100,7 +102,7 @@ public class StoreService { saveJsons(topicConfig, docs, messages); } - private JSONObject messageToJson(TopicConfig topic, Pair<Long, String> pair) throws IOException { + private JSONObject messageToJson(TopicConfig topicConfig, Pair<Long, String> pair) throws IOException { long timestamp = pair.getLeft(); String text = pair.getRight(); @@ -111,11 +113,11 @@ public class StoreService { // log.debug("{} ={}", topicStr, text); //} - boolean storeRaw = topic.isSaveRaw(); + boolean storeRaw = topicConfig.isSaveRaw(); JSONObject json = null; - DataFormat dataFormat = topic.getDataFormat2(); + DataFormat dataFormat = topicConfig.getDataFormat2(); switch (dataFormat) { case JSON: @@ -146,6 +148,20 @@ public class StoreService { json.put(config.getRawDataLabel(), text); } + if (StringUtils.isNotBlank(topicConfig.getAggregateArrayPath())) { + String[] paths = topicConfig.getAggregateArrayPath2(); + for (String path : paths) { + JsonUtil.arrayAggregate(path, json); + } + } + + if (StringUtils.isNotBlank(topicConfig.getFlattenArrayPath())) { + String[] paths = topicConfig.getFlattenArrayPath2(); + for (String path : paths) { + JsonUtil.flattenArray(path, json); + } + } + return json; } @@ -168,9 +184,9 @@ public class StoreService { } public void flush() { //force flush all buffer - hdfsService.flush(); + hdfsService.flush(); } - + public void flushStall() { //flush stall buffer hdfsService.flushStall(); } diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java index f0b000bc..64e8b8b1 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java @@ -123,6 +123,8 @@ public class TopicService { topic.setCorrelateClearedMessage(tConfig.isCorrelateClearedMessage()); topic.setDataFormat(tConfig.getDataFormat()); topic.setMessageIdPath(tConfig.getMessageIdPath()); + topic.setAggregateArrayPath(tConfig.getAggregateArrayPath()); + topic.setFlattenArrayPath(tConfig.getFlattenArrayPath()); if(tConfig.getSinkdbs() != null) { for (String item : tConfig.getSinkdbs()) { diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/util/JsonUtil.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/util/JsonUtil.java new file mode 100644 index 00000000..db4dcfae --- /dev/null +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/util/JsonUtil.java @@ -0,0 +1,158 @@ +/* +* ============LICENSE_START======================================================= +* ONAP : DCAE +* ================================================================================ +* Copyright 2019 China Mobile +*================================================================================= +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* ============LICENSE_END========================================================= +*/ + +package org.onap.datalake.feeder.util; + +import java.util.HashMap; + +import org.apache.commons.collections.CollectionUtils; +import org.json.JSONArray; +import org.json.JSONObject; + +import lombok.Getter; + +/** + * utils for JSON + * + * @author Guobiao Mo + * + */ +public class JsonUtil { + + @Getter + enum AggregateType { + ALL("aggregate"), AVEARGE("average"), SUM("sum"), MAX("max"), MIN("min"), COUNT("count"); + private final String name; + + AggregateType(String name) { + this.name = name; + } + + public String getLabel(String path) { + return path.substring(path.lastIndexOf('/') + 1) + "_" + name; + } + } + + public static void flattenArray(String path, JSONObject json) { + //path = /event/measurementsForVfScalingFields/astriMeasurement/astriDPMeasurementArray/astriInterface + + int index1 = path.lastIndexOf('/'); + + String arrayPath = path.substring(0, index1);// /event/measurementsForVfScalingFields/astriMeasurement/astriDPMeasurementArray + + Object obj; + try { + obj = json.query(arrayPath); + } catch (org.json.JSONPointerException e) { + return; + } + if (obj == null || !(obj instanceof JSONArray)) { + return; + } + Iterable<JSONObject> subjsonaArray = (Iterable<JSONObject>) obj; + + String tagName = path.substring(index1 + 1);//astriInterface + + int index2 = path.lastIndexOf('/', index1 - 1); + String arrayName = path.substring(index2 + 1, index1);//astriDPMeasurementArray + + String parentPath = path.substring(0, index2);// /event/measurementsForVfScalingFields/astriMeasurement + JSONObject parent = (JSONObject) json.query(parentPath); + + for (JSONObject element : subjsonaArray) { + String tagValue = element.get(tagName).toString(); + String label = arrayName + "_" + tagName + "_" + tagValue; + + parent.put(label, element); + } + } + + /** + * json got modified. + * + * @param aggregateType + * @param path + * @param json + */ + public static void arrayAggregate(String path, JSONObject json) { + HashMap<String, Double> sumHashMap = new HashMap<>(); + HashMap<String, Double> maxHashMap = new HashMap<>(); + HashMap<String, Double> minHashMap = new HashMap<>(); + + Object obj; + try { + obj = json.query(path); + } catch (org.json.JSONPointerException e) { + return; + } + if (obj == null || !(obj instanceof JSONArray)) { + return; + } + Iterable<JSONObject> subjsonaArray = (Iterable<JSONObject>) obj; + + int count = 0; + for (JSONObject element : subjsonaArray) { + String[] names = JSONObject.getNames(element); + for (String name : names) { + Number value = element.optNumber(name); + if (value != null) { + double existing = sumHashMap.computeIfAbsent(name, k -> 0.0); + sumHashMap.put(name, existing + value.doubleValue()); + + existing = maxHashMap.computeIfAbsent(name, k -> Double.MIN_VALUE); + maxHashMap.put(name, Math.max(existing, value.doubleValue())); + + existing = minHashMap.computeIfAbsent(name, k -> Double.MAX_VALUE); + minHashMap.put(name, Math.min(existing, value.doubleValue())); + } + } + count++; + } + + if (count == 0) { + return; + } + + JSONObject parentJson = (JSONObject) json.query(path.substring(0, path.lastIndexOf('/'))); + + //sum + JSONObject aggJson = new JSONObject(sumHashMap); + parentJson.put(AggregateType.SUM.getLabel(path), aggJson); + + //AVEARGE + int c = count;//need to be Effectively Final + sumHashMap.replaceAll((k, v) -> v / c); + aggJson = new JSONObject(sumHashMap); + parentJson.put(AggregateType.AVEARGE.getLabel(path), aggJson); + + //Count + parentJson.put(AggregateType.COUNT.getLabel(path), count); + + //Max + aggJson = new JSONObject(maxHashMap); + parentJson.put(AggregateType.MAX.getLabel(path), aggJson); + + //Min + aggJson = new JSONObject(minHashMap); + parentJson.put(AggregateType.MIN.getLabel(path), aggJson); + + } + +} diff --git a/components/datalake-handler/feeder/src/main/resources/application.properties b/components/datalake-handler/feeder/src/main/resources/application.properties index a1054731..7bbbac05 100644 --- a/components/datalake-handler/feeder/src/main/resources/application.properties +++ b/components/datalake-handler/feeder/src/main/resources/application.properties @@ -42,7 +42,7 @@ dmaapCheckNewTopicInterval=60000 kafkaConsumerCount=3 #####################Elasticsearch -elasticsearchType=doc +elasticsearchType=_doc #####################HDFS hdfsBufferSize=4096 diff --git a/components/datalake-handler/feeder/src/main/resources/elasticsearch/mappings/readme.txt b/components/datalake-handler/feeder/src/main/resources/elasticsearch/mappings/readme.txt new file mode 100644 index 00000000..88513539 --- /dev/null +++ b/components/datalake-handler/feeder/src/main/resources/elasticsearch/mappings/readme.txt @@ -0,0 +1,4 @@ +before creating index
+PUT http://dl_es:9200/unauthenticated.ves_measurement_output
+application/json
+body from unauthenticated.ves_measurement_output.json
\ No newline at end of file diff --git a/components/datalake-handler/feeder/src/main/resources/elasticsearch/mappings/unauthenticated.ves_measurement_output.json b/components/datalake-handler/feeder/src/main/resources/elasticsearch/mappings/unauthenticated.ves_measurement_output.json new file mode 100644 index 00000000..9a53b70c --- /dev/null +++ b/components/datalake-handler/feeder/src/main/resources/elasticsearch/mappings/unauthenticated.ves_measurement_output.json @@ -0,0 +1,31 @@ +{
+ "mappings": {
+ "properties": {
+ "datalake_ts_": {
+ "type": "date",
+ "format": "epoch_millis"
+ },
+ "event.commonEventHeader.internalHeaderFields.collectorTimeStamp": {
+ "type": "date",
+ "format":"EEE, MM dd yyyy HH:mm:ss z"
+ },
+ "event.commonEventHeader.startEpochMicrosec": {
+ "type": "date",
+ "format": "epoch_millis"
+ },
+ "event.commonEventHeader.lastEpochMicrosec": {
+ "type": "date",
+ "format": "epoch_millis"
+ },
+ "event.measurementsForVfScalingFields.diskUsageArray": {
+ "type": "nested"
+ },
+ "event.measurementsForVfScalingFields.cpuUsageArray": {
+ "type": "nested"
+ },
+ "event.measurementsForVfScalingFields.vNicPerformanceArray": {
+ "type": "nested"
+ }
+ }
+ }
+}
\ No newline at end of file diff --git a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/config/ApplicationConfigurationTest.java b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/config/ApplicationConfigurationTest.java index 617b50e3..0c56d5af 100644 --- a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/config/ApplicationConfigurationTest.java +++ b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/config/ApplicationConfigurationTest.java @@ -68,6 +68,7 @@ public class ApplicationConfigurationTest { assertNotNull(config.getRawDataLabel()); assertNotNull(config.getTimestampLabel()); assertNotNull(config.getElasticsearchType()); + assertNotNull(config.getDatalakeVersion()); //HDFS assertTrue(config.getHdfsBatchSize()>0); diff --git a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/dto/TopicConfigTest.java b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/dto/TopicConfigTest.java index bb31cd74..f52332a5 100644 --- a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/dto/TopicConfigTest.java +++ b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/dto/TopicConfigTest.java @@ -77,6 +77,23 @@ public class TopicConfigTest { } @Test + public void testArrayPath() { + Topic topic = new Topic("testArrayPath"); + topic.setAggregateArrayPath("/data/data2/value,/data/data3"); + topic.setFlattenArrayPath("/data/data2/value,/data/data3"); + + TopicConfig topicConfig = topic.getTopicConfig(); + + String[] value = topicConfig.getAggregateArrayPath2(); + assertEquals(value[0], "/data/data2/value"); + assertEquals(value[1], "/data/data3"); + + value = topicConfig.getFlattenArrayPath2(); + assertEquals(value[0], "/data/data2/value"); + assertEquals(value[1], "/data/data3"); + } + + @Test public void testIs() { Topic testTopic = new Topic("test"); diff --git a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/StoreServiceTest.java b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/StoreServiceTest.java index 44e76328..fc05d1d4 100644 --- a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/StoreServiceTest.java +++ b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/StoreServiceTest.java @@ -93,6 +93,8 @@ public class StoreServiceTest { testInit(); TopicConfig topicConfig = createTopicConfig("test1", "JSON"); + topicConfig.setAggregateArrayPath("/test"); + topicConfig.setFlattenArrayPath("/test"); topicConfig = createTopicConfig("test2", "XML"); topicConfig.setSaveRaw(false); diff --git a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/TopicServiceTest.java b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/TopicServiceTest.java index 265ec963..774cd229 100644 --- a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/TopicServiceTest.java +++ b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/TopicServiceTest.java @@ -21,6 +21,7 @@ package org.onap.datalake.feeder.service; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; @@ -70,6 +71,8 @@ public class TopicServiceTest { String name = "a"; when(topicRepository.findById(name)).thenReturn(Optional.of(new Topic(name))); assertEquals(topicService.getTopic(name), new Topic(name)); + + assertFalse(topicService.istDefaultTopic(new Topic(name))); } @Test diff --git a/components/datalake-handler/feeder/src/test/resources/application.properties b/components/datalake-handler/feeder/src/test/resources/application.properties index 189adece..64ecdee9 100644 --- a/components/datalake-handler/feeder/src/test/resources/application.properties +++ b/components/datalake-handler/feeder/src/test/resources/application.properties @@ -14,36 +14,37 @@ rawDataLabel=datalake_text_ defaultTopicName=_DL_DEFAULT_ -#how often do we check topic setting update, in millisecond -topicCheckInterval=60000 #####################DMaaP #dmaapZookeeperHostPort=127.0.0.1:2181 #dmaapKafkaHostPort=127.0.0.1:9092 dmaapZookeeperHostPort=message-router-zookeeper:2181 dmaapKafkaHostPort=message-router-kafka:9092 -dmaapKafkaGroup=dlgroup19 +dmaapKafkaGroup=dlgroup44 #in second dmaapKafkaTimeout=60 dmaapKafkaExclude[0]=__consumer_offsets dmaapKafkaExclude[1]=__transaction_state -dmaapKafkaExclude[2]=msgrtr.apinode.metrics.dmaap +#dmaapKafkaExclude[2]=msgrtr.apinode.metrics.dmaap #check for new topics , in millisecond -dmaapCheckNewTopicInterval=300000 +dmaapCheckNewTopicInterval=60000 -kafkaConsumerCount=1 +kafkaConsumerCount=3 #####################Elasticsearch -elasticsearchType=doc +elasticsearchType=_doc #####################HDFS hdfsBufferSize=4096 #how often we flush stall updates, in millisecond -hdfsFlushInterval=10000 -hdfsBatchSize=250 +hdfsFlushInterval=30000 +hdfsBatchSize=500 #####################Logging logging.level.org.springframework.web=ERROR logging.level.com.att.nsa.apiClient.http=ERROR logging.level.org.onap.datalake=DEBUG + +#####################Verison +datalakeVersion=0.0.1 |