diff options
12 files changed, 461 insertions, 26 deletions
diff --git a/components/datalake-handler/feeder/pom.xml b/components/datalake-handler/feeder/pom.xml index ec395464..f88baf36 100644 --- a/components/datalake-handler/feeder/pom.xml +++ b/components/datalake-handler/feeder/pom.xml @@ -18,6 +18,11 @@ <dependencies> <dependency> + <groupId>org.json</groupId> + <artifactId>json</artifactId> + </dependency> + + <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient</artifactId> <version>4.5.6</version> @@ -80,11 +85,6 @@ </dependency> <dependency> - <groupId>org.json</groupId> - <artifactId>json</artifactId> - </dependency> - - <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> </dependency> diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java index 62ac37fb..108eb4e0 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java @@ -20,10 +20,13 @@ package org.onap.datalake.feeder.config; -import java.util.Set; - +import org.springframework.boot.CommandLineRunner; +import org.springframework.boot.SpringBootConfiguration; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.autoconfigure.couchbase.CouchbaseConfiguration; import org.springframework.boot.context.properties.ConfigurationProperties; -import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.FilterType; +import org.springframework.context.annotation.ComponentScan; import lombok.Getter; import lombok.Setter; @@ -37,8 +40,12 @@ import lombok.Setter; */ @Getter @Setter -@Configuration +@SpringBootConfiguration @ConfigurationProperties +//@ComponentScan(excludeFilters = @ComponentScan.Filter(type = FilterType.ASSIGNABLE_TYPE, value = CouchbaseConfiguration.class)) +//https://stackoverflow.com/questions/29344313/prevent-application-commandlinerunner-classes-from-executing-during-junit-test +@EnableAutoConfiguration +//@Profile("test") public class ApplicationConfiguration { private String couchbaseHost; @@ -49,10 +56,6 @@ public class ApplicationConfiguration { // private int mongodbPort; // private String mongodbDatabase; - private boolean storeJson; - private boolean storeYaml; - private boolean storeXml; - private String dmaapZookeeperHostPort; private String dmaapKafkaHostPort; private String dmaapKafkaGroup; diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java index 99216ad3..ace33dcc 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java @@ -23,10 +23,14 @@ import java.util.function.Predicate; import javax.validation.constraints.NotNull; +import org.apache.commons.lang3.StringUtils; +import org.json.JSONObject; import org.onap.datalake.feeder.enumeration.DataFormat; import org.springframework.data.annotation.Id; import org.springframework.data.annotation.Transient; import org.springframework.data.couchbase.core.mapping.Document; + +import lombok.Setter; /** * Domain class representing topic table in Couchbase @@ -35,6 +39,7 @@ import org.springframework.data.couchbase.core.mapping.Document; * */ @Document +@Setter public class Topic { @NotNull @Id @@ -77,6 +82,9 @@ public class Topic { //if this flag is true, need to correlate alarm cleared message to previous alarm private Boolean correlateClearedMessage; + + //the value in the JSON with this path will be used as DB id + private String messageIdPath; public Topic() { } @@ -123,12 +131,16 @@ public class Topic { //if 'this' Topic does not have the setting, use default Topic's private boolean is(Boolean b, Predicate<Topic> pre) { + return is(b, pre, false); + } + + private boolean is(Boolean b, Predicate<Topic> pre, boolean defaultValue) { if (b != null) { return b; } else if (defaultTopic != null) { return pre.test(defaultTopic); } else { - return false; + return defaultValue; } } @@ -148,18 +160,33 @@ public class Topic { return is(supportDruid, Topic::isSupportDruid); } + //extract DB id from a JSON attribute, TODO support multiple attributes + public String getMessageId(JSONObject json) { + String id = null; + + if(StringUtils.isNotBlank(messageIdPath)) { + id = json.query(messageIdPath).toString(); + } + + return id; + } + @Override public String toString() { return id; } - // for testing - public static void main(String[] args) { - Topic defaultTopic=new Topic("def"); - Topic test = new Topic("test"); - test.setDefaultTopic(defaultTopic); - defaultTopic.supportElasticsearch=true; - boolean b = test.isSupportElasticsearch(); - System.out.println(b); + /** + * @return the messageIdPath + */ + public String getMessageIdPath() { + return messageIdPath; + } + + /** + * @param messageIdPath the messageIdPath to set + */ + public void setMessageIdPath(String messageIdPath) { + this.messageIdPath = messageIdPath; } } diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/enumeration/DataFormat.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/enumeration/DataFormat.java index 83ffac18..fdcbdfc1 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/enumeration/DataFormat.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/enumeration/DataFormat.java @@ -27,4 +27,20 @@ package org.onap.datalake.feeder.enumeration; */ public enum DataFormat { JSON, XML, YAML, TEXT; + + public static DataFormat fromString(String s) { + if ("JSON".equalsIgnoreCase(s)) { + return JSON; + } + if ("XML".equalsIgnoreCase(s)) { + return XML; + } + if ("YAML".equalsIgnoreCase(s)) { + return YAML; + } + if ("TEXT".equalsIgnoreCase(s)) { + return TEXT; + } + throw new IllegalArgumentException("Invalid value for format: " + s); + } } diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/util/DruidSupervisorGenerator.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/util/DruidSupervisorGenerator.java index 819590b7..31f46362 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/util/DruidSupervisorGenerator.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/util/DruidSupervisorGenerator.java @@ -33,11 +33,15 @@ import org.apache.velocity.VelocityContext; import org.apache.velocity.app.Velocity;
import org.apache.velocity.runtime.RuntimeConstants;
import org.apache.velocity.runtime.resource.loader.ClasspathResourceLoader;
+import org.onap.datalake.feeder.enumeration.DataFormat;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.JsonNodeType;
+import lombok.Getter;
+import lombok.Setter;
+
/*
* read sample json and output supervisor to resources\druid\generated
@@ -56,6 +60,7 @@ import com.fasterxml.jackson.databind.node.JsonNodeType; * dimension type default is string, in msgrtr.apinode.metrics.dmaap , many are long/double, so need to generate dimensionsSpec, this is done at the end of printFlattenSpec()
*/
+@Getter
public class DruidSupervisorGenerator {
Template template = null;
@@ -73,12 +78,12 @@ public class DruidSupervisorGenerator { context = new VelocityContext();
- context.put("host", "dl_dmaap_kf");
+ context.put("host", "message-router-kafka:9092");//TODO get from config
template = Velocity.getTemplate("druid/kafka-supervisor-template.vm");
}
- public void printNode(String prefix, JsonNode node) {
+ private void printNode(String prefix, JsonNode node) {
// lets see what type the node is
// System.out.println("NodeType=" + node.getNodeType() + ", isContainerNode=" + node.isContainerNode() + ", " + node); // prints OBJECT
@@ -109,7 +114,7 @@ public class DruidSupervisorGenerator { }
- public void printFlattenSpec(JsonNodeType type, String path) {
+ private void printFlattenSpec(JsonNodeType type, String path) {
String name = path.substring(2).replace('.', ':');
// lets see what type the node is
System.out.println("{");
@@ -151,7 +156,6 @@ public class DruidSupervisorGenerator { context.put("topic", topic);
context.put("timestamp", "event-header:timestamp");//FIXME hard coded, should be topic based
context.put("timestampFormat", "yyyyMMdd-HH:mm:ss:SSS");//FIXME hard coded, should be topic based
-
context.put("dimensions", dimensions);
BufferedWriter out = new BufferedWriter(new FileWriter(outputFileName));
diff --git a/components/datalake-handler/feeder/src/main/resources/druid/kafka-supervisor-template.vm b/components/datalake-handler/feeder/src/main/resources/druid/kafka-supervisor-template.vm new file mode 100644 index 00000000..01ebaf2e --- /dev/null +++ b/components/datalake-handler/feeder/src/main/resources/druid/kafka-supervisor-template.vm @@ -0,0 +1,57 @@ +{ + "type": "kafka", + "dataSchema": { + "dataSource": "$topic", + "parser": { + "type": "string", + "parseSpec": { + "format": "json", + "flattenSpec": { + "useFieldDiscovery": false, + "fields": [ + #foreach($flatten in $dimensions) + + { +"type": "path", +"name": "$flatten[0]", +"expr": "$flatten[1]" +}, + #end + ] + }, + "timestampSpec": { + "column": "$timestamp", + "format": "$timestampFormat" + }, + "dimensionsSpec": { + "dimensions": [ + ], + "dimensionsExclusions": [ + ] + } + } + }, + "metricsSpec": [], + "granularitySpec": { + "type": "uniform", + "segmentGranularity": "HOUR", + "queryGranularity": "MINUTE", + "rollup": false + } + }, + "tuningConfig": { + "type": "kafka", + "reportParseExceptions": true + }, + "ioConfig": { + "topic": "$topic", + "replicas": 1, + "startDelay": "PT1S", + "taskDuration": "PT1H", + "completionTimeout": "PT30M", + "consumerProperties": { + "bootstrap.servers": "$host" + }, + "useEarliestOffset": true + } +} diff --git a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/config/ApplicationConfigurationTest.java b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/config/ApplicationConfigurationTest.java new file mode 100644 index 00000000..934451fe --- /dev/null +++ b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/config/ApplicationConfigurationTest.java @@ -0,0 +1,73 @@ +/* +* ============LICENSE_START======================================================= +* ONAP : DATALAKE +* ================================================================================ +* Copyright 2019 China Mobile +*================================================================================= +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* ============LICENSE_END========================================================= +*/ + +package org.onap.datalake.feeder.config; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.onap.datalake.feeder.Application; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.ConfigFileApplicationContextInitializer; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.ActiveProfiles; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +/** + * test ApplicationConfiguration + * + * @author Guobiao Mo + * + */ +//@RunWith(SpringRunner.class) +//@SpringBootTest +/* +@RunWith(SpringJUnit4ClassRunner.class) +@ContextConfiguration(classes = Application.class, + initializers = ConfigFileApplicationContextInitializer.class) +*/ +@RunWith(SpringJUnit4ClassRunner.class) +@SpringBootTest(classes = ApplicationConfiguration.class) +//@ActiveProfiles("test") +public class ApplicationConfigurationTest { + + @Autowired + private ApplicationConfiguration config; + + @Test + public void readConfig() { + assertNotNull(config.getCouchbaseHost()); + assertNotNull(config.getCouchbaseUser()); + assertNotNull(config.getCouchbasePass()); + assertNotNull(config.getCouchbaseBucket()); + + assertNotNull(config.getDmaapZookeeperHostPort()); + assertNotNull(config.getDmaapKafkaHostPort()); + assertNotNull(config.getDmaapKafkaGroup()); + assertTrue(config.getDmaapKafkaTimeout() > 0L); + assertTrue(config.getDmaapCheckNewTopicIntervalInSec() > 0); + + assertTrue(config.getKafkaConsumerCount() > 0); + } + +} diff --git a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/domain/TopicTest.java b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/domain/TopicTest.java new file mode 100644 index 00000000..23ec3b10 --- /dev/null +++ b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/domain/TopicTest.java @@ -0,0 +1,66 @@ +/* +* ============LICENSE_START======================================================= +* ONAP : DataLake +* ================================================================================ +* Copyright 2019 China Mobile +*================================================================================= +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* ============LICENSE_END========================================================= +*/ +package org.onap.datalake.feeder.domain; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import org.json.JSONObject; +import org.junit.Test; + +/** + * Test Topic + * + * @author Guobiao Mo + * + */ + +public class TopicTest { + + @Test + public void getMessageId() { + String text = "{ data: { data2 : { value : 'hello'}}}"; + + JSONObject json = new JSONObject(text); + + Topic topic = new Topic("test getMessageId"); + topic.setMessageIdPath("/data/data2/value"); + + String value = topic.getMessageId(json); + + assertEquals(value, "hello"); + } + + @Test + public void testIs() { + Topic defaultTopic=new Topic("default"); + Topic testTopic = new Topic("test"); + testTopic.setDefaultTopic(defaultTopic); + + defaultTopic.setSupportElasticsearch(true); + boolean b = testTopic.isSupportElasticsearch(); + assertTrue(b); + + defaultTopic.setSupportElasticsearch(false); + b = testTopic.isSupportElasticsearch(); + assertFalse(b); + } +} diff --git a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/enumeration/DataFormatTest.java b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/enumeration/DataFormatTest.java new file mode 100644 index 00000000..f1c0d853 --- /dev/null +++ b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/enumeration/DataFormatTest.java @@ -0,0 +1,45 @@ +/* +* ============LICENSE_START======================================================= +* ONAP : DCAE +* ================================================================================ +* Copyright 2018 TechMahindra +*================================================================================= +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* ============LICENSE_END========================================================= +*/ +package org.onap.datalake.feeder.enumeration; + +import static org.junit.Assert.assertEquals; + +import org.junit.Test; + +/** + * Test Data format of DMaaP messages + * + * @author Guobiao Mo + * + */ +public class DataFormatTest { + @Test + public void fromString() { + assertEquals(DataFormat.JSON, DataFormat.fromString("json")); + assertEquals(DataFormat.XML, DataFormat.fromString("xml")); + assertEquals(DataFormat.YAML, DataFormat.fromString("YAML")); + assertEquals(DataFormat.TEXT, DataFormat.fromString("Text")); + } + + @Test(expected = IllegalArgumentException.class) + public void fromStringWithException() { + DataFormat.fromString("test"); + } +} diff --git a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/util/DruidSupervisorGeneratorTest.java b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/util/DruidSupervisorGeneratorTest.java new file mode 100644 index 00000000..f5a42bbb --- /dev/null +++ b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/util/DruidSupervisorGeneratorTest.java @@ -0,0 +1,59 @@ +/*
+* ============LICENSE_START=======================================================
+* ONAP : DataLake
+* ================================================================================
+* Copyright 2019 China Mobile
+*=================================================================================
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* ============LICENSE_END=========================================================
+*/
+package org.onap.datalake.feeder.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import org.apache.velocity.VelocityContext;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.onap.datalake.feeder.config.ApplicationConfiguration;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
+
+/**
+ * Test DruidSupervisorGenerator
+ * @author Guobiao Mo
+ *
+ */
+
+@RunWith(SpringJUnit4ClassRunner.class)
+@SpringBootTest(classes = ApplicationConfiguration.class)
+
+public class DruidSupervisorGeneratorTest {
+
+ @Autowired
+ private ApplicationConfiguration config;
+
+ @Test
+ public void testConstructor() {
+ DruidSupervisorGenerator gen = new DruidSupervisorGenerator();
+ VelocityContext context= gen.getContext();
+
+ assertNotNull(context);
+ assertNotNull(gen.getDimensions() );
+ assertNotNull(gen.getTemplate() );
+
+ String host = (String) context.get("host");
+ assertEquals(host, config.getDmaapKafkaHostPort());
+ }
+}
diff --git a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/util/UtilTest.java b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/util/UtilTest.java new file mode 100644 index 00000000..b0f5492d --- /dev/null +++ b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/util/UtilTest.java @@ -0,0 +1,43 @@ +/* +* ============LICENSE_START======================================================= +* ONAP : DCAE +* ================================================================================ +* Copyright 2019 China Mobile +*================================================================================= +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* ============LICENSE_END========================================================= +*/ + +package org.onap.datalake.feeder.util; + +import static org.junit.Assert.assertEquals; + +import org.junit.Test; + +/** + * test utils + * + * @author Guobiao Mo + * + */ +public class UtilTest { + + @Test + //only dot(.) in key got replaced + public void replaceDotInKey() { + String a = "\"u-y.t.y-t\":\"u.gfh\",\\\"jg.h\\\":\"j_9889\""; + String b = "\"u-y_t_y-t\":\"u.gfh\",\\\"jg_h\\\":\"j_9889\""; + + assertEquals(Util.replaceDotInKey(a), b); + } +} diff --git a/components/datalake-handler/feeder/src/test/resources/application.properties b/components/datalake-handler/feeder/src/test/resources/application.properties new file mode 100644 index 00000000..ede5999b --- /dev/null +++ b/components/datalake-handler/feeder/src/test/resources/application.properties @@ -0,0 +1,42 @@ + +server.port = 1680 + + + +#For Beijing lab +#dmaapZookeeperHostPort=zookeeper.mr01.onap.vip:80 +#dmaapKafkaHostPort=kafka.mr01.onap.vip:80 +#spring.couchbase.bootstrap-hosts=172.30.1.74 +#couchbaseHost=172.30.1.74 + + +#DMaaP +#dmaapZookeeperHostPort=127.0.0.1:2181 +#dmaapKafkaHostPort=127.0.0.1:9092 +dmaapZookeeperHostPort=message-router-zookeeper:2181 +dmaapKafkaHostPort=message-router-kafka:9092 +dmaapKafkaGroup=dlgroup10 +dmaapKafkaTimeout=60 +#check for new topics +dmaapCheckNewTopicIntervalInSec=3000 + +kafkaConsumerCount=1 + +#tolerate inconsistency when system crash, see PullThread.run() +async=true + +#Logging +logging.level.org.springframework.web=ERROR +logging.level.com.att.nsa.apiClient.http=ERROR +logging.level.org.onap.datalake=DEBUG + + +#DL Feeder DB: Couchbase +couchbaseHost=dl_couchbase +#couchbaseHost=172.30.1.74 +couchbaseUser=dmaap +couchbasePass=dmaap1234 +couchbaseBucket=dmaap + +#DL Feeder DB: Elasticsearch +elasticsearchHost=dl_es |