summaryrefslogtreecommitdiffstats
path: root/components/datalake-handler/feeder/src/main/java
diff options
context:
space:
mode:
authorGuobiao Mo <guobiaomo@chinamobile.com>2019-02-22 15:36:27 -0800
committerGuobiao Mo <guobiaomo@chinamobile.com>2019-03-26 15:06:46 -0700
commitda7037659eb13e018c61ec06b3824f13ce1c5e53 (patch)
treeb95aaa030c18ce04c37a1b559e4c73c82b5f99d8 /components/datalake-handler/feeder/src/main/java
parent82fe4e29ff6c0b48fe15d88b1fca882292e6af43 (diff)
DataLake seed code
Issue-ID: DCAEGEN2-1189 Change-Id: Ib25b70197e3b162efadbce0f1b5235e3ba6635e9 Signed-off-by: Guobiao Mo <guobiaomo@chinamobile.com>
Diffstat (limited to 'components/datalake-handler/feeder/src/main/java')
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/Application.java46
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java72
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/PullController.java77
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java131
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java165
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/enumeration/DataFormat.java30
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepository.java67
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepositoryCustom.java28
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepositoryImpl.java67
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java129
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java94
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java142
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullService.java117
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullThread.java168
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java164
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java94
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/util/DruidSupervisorGenerator.java172
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/util/Util.java65
18 files changed, 1828 insertions, 0 deletions
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/Application.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/Application.java
new file mode 100644
index 00000000..8f17937b
--- /dev/null
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/Application.java
@@ -0,0 +1,46 @@
+/*
+* ============LICENSE_START=======================================================
+* ONAP : DataLake
+* ================================================================================
+* Copyright 2019 China Mobile
+*=================================================================================
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* ============LICENSE_END=========================================================
+*/
+
+package org.onap.datalake.feeder;
+
+import org.onap.datalake.feeder.service.PullService;
+import org.springframework.boot.CommandLineRunner;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.context.annotation.Bean;
+
+/**
+ * Entry point of the DataLake feeder application
+ *
+ * @author Guobiao Mo
+ *
+ */
+
+@SpringBootApplication
+public class Application {
+ public static void main(String[] args) {
+ SpringApplication.run(Application.class, args);
+ }
+
+ @Bean
+ public CommandLineRunner commandLineRunner(PullService pullService) {
+ return args -> pullService.start();
+ }
+}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java
new file mode 100644
index 00000000..62ac37fb
--- /dev/null
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java
@@ -0,0 +1,72 @@
+/*
+* ============LICENSE_START=======================================================
+* ONAP : DATALAKE
+* ================================================================================
+* Copyright 2019 China Mobile
+*=================================================================================
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* ============LICENSE_END=========================================================
+*/
+
+package org.onap.datalake.feeder.config;
+
+import java.util.Set;
+
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.context.annotation.Configuration;
+
+import lombok.Getter;
+import lombok.Setter;
+
+/**
+ * Mapping from src/main/resources/application.properties to Java configuration
+ * object
+ *
+ * @author Guobiao Mo
+ *
+ */
+@Getter
+@Setter
+@Configuration
+@ConfigurationProperties
+public class ApplicationConfiguration {
+
+ private String couchbaseHost;
+ private String couchbaseUser;
+ private String couchbasePass;
+ private String couchbaseBucket;
+
+ // private int mongodbPort;
+ // private String mongodbDatabase;
+
+ private boolean storeJson;
+ private boolean storeYaml;
+ private boolean storeXml;
+
+ private String dmaapZookeeperHostPort;
+ private String dmaapKafkaHostPort;
+ private String dmaapKafkaGroup;
+ private long dmaapKafkaTimeout;
+
+// private boolean dmaapMonitorAllTopics;
+ private int dmaapCheckNewTopicIntervalInSec;
+ //private String dmaapHostPort;
+ //private Set<String> dmaapExcludeTopics;
+ //private Set<String> dmaapIncludeTopics;
+
+ private int kafkaConsumerCount;
+
+ private boolean async;
+
+ private String elasticsearchHost;
+}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/PullController.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/PullController.java
new file mode 100644
index 00000000..2b176370
--- /dev/null
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/PullController.java
@@ -0,0 +1,77 @@
+/*
+* ============LICENSE_START=======================================================
+* ONAP : DataLake
+* ================================================================================
+* Copyright 2019 China Mobile
+*=================================================================================
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* ============LICENSE_END=========================================================
+*/
+package org.onap.datalake.feeder.controller;
+
+import java.io.IOException;
+
+import org.onap.datalake.feeder.service.PullService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.MediaType;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+/**
+ * This controller controls DL data feeder.
+ *
+ * @author Guobiao Mo
+ *
+ */
+
+@RestController
+@RequestMapping(value = "/pull", produces = { MediaType.TEXT_PLAIN_VALUE })
+public class PullController {
+
+ private final Logger log = LoggerFactory.getLogger(this.getClass());
+
+ @Autowired
+ private PullService pullService;
+
+ /**
+ * @return message that application is started
+ * @throws IOException
+ */
+ @RequestMapping("/start")
+ public String start() throws IOException {
+ log.info("DataLake feeder starting to pull data from DMaaP...");
+ pullService.start();
+ return "DataLake feeder is running.";
+ }
+
+ /**
+ * @return message that application stop process is triggered
+ */
+ @RequestMapping("/stop")
+ public String stop() {
+ pullService.shutdown();
+ log.info("DataLake feeder is stopped.");
+ return "DataLake feeder is stopped.";
+ }
+ /**
+ * @return feeder status
+ */
+ @RequestMapping("/status")
+ public String status() {
+ String status = "to be impletemented";
+ log.info("senting feeder status ...");//TODO we can send what topics are monitored, how many messages are sent, etc.
+ return status;
+ }
+}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java
new file mode 100644
index 00000000..25028d58
--- /dev/null
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java
@@ -0,0 +1,131 @@
+/*
+* ============LICENSE_START=======================================================
+* ONAP : DataLake
+* ================================================================================
+* Copyright 2019 China Mobile
+*=================================================================================
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* ============LICENSE_END=========================================================
+*/
+package org.onap.datalake.feeder.controller;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+
+import org.onap.datalake.feeder.domain.Topic;
+import org.onap.datalake.feeder.repository.TopicRepository;
+import org.onap.datalake.feeder.service.DmaapService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.MediaType;
+import org.springframework.validation.BindingResult;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.PutMapping;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.ResponseBody;
+import org.springframework.web.bind.annotation.RestController;
+
+/**
+ * This controller manages all the topic settings. Topic "_DL_DEFAULT_" acts as
+ * the default. For example, if a topic's enabled=null, _DL_DEFAULT_.enabled is
+ * used for that topic. All the settings are saved in Couchbase. topic
+ * "_DL_DEFAULT_" is populated at setup by a DB script.
+ *
+ * @author Guobiao Mo
+ *
+ */
+
+@RestController
+@RequestMapping(value = "/topics", produces = { MediaType.APPLICATION_JSON_VALUE })
+public class TopicController {
+
+ private final Logger log = LoggerFactory.getLogger(this.getClass());
+
+ @Autowired
+ private DmaapService dmaapService;
+
+ @Autowired
+ private TopicRepository topicRepository;
+
+ //list all topics in DMaaP
+ @GetMapping("/dmaap/")
+ @ResponseBody
+ public List<String> listDmaapTopics() throws IOException {
+ return dmaapService.getTopics();
+ }
+
+ //list all topics
+ @GetMapping("/")
+ @ResponseBody
+ public Iterable<Topic> list() throws IOException {
+ Iterable<Topic> ret = topicRepository.findAll();
+ return ret;
+ }
+
+ //Read a topic
+ @GetMapping("/{name}")
+ @ResponseBody
+ public Topic getTopic(@PathVariable("name") String topicName) throws IOException {
+ //Topic topic = topicRepository.findFirstById(topicName);
+ Optional<Topic> topic = topicRepository.findById(topicName);
+ if (topic.isPresent()) {
+ return topic.get();
+ } else {
+ return null;
+ }
+ }
+
+ //Update Topic
+ @PutMapping("/")
+ @ResponseBody
+ public Topic updateTopic(Topic topic, BindingResult result) throws IOException {
+
+ if (result.hasErrors()) {
+ log.error(result.toString());
+
+ return null;//TODO return binding error
+ }
+
+ Topic oldTopic = getTopic(topic.getId());
+ if (oldTopic == null) {
+ return null;//TODO return not found error
+ } else {
+ topicRepository.save(topic);
+ return topic;
+ }
+ }
+
+ //create a new Topic
+ @PostMapping("/")
+ @ResponseBody
+ public Topic createTopic(Topic topic, BindingResult result) throws IOException {
+
+ if (result.hasErrors()) {
+ log.error(result.toString());
+ return null;
+ }
+
+ Topic oldTopic = getTopic(topic.getId());
+ if (oldTopic != null) {
+ return null;//TODO return 'already exists' error
+ } else {
+ topicRepository.save(topic);
+ return topic;
+ }
+ }
+
+}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java
new file mode 100644
index 00000000..99216ad3
--- /dev/null
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java
@@ -0,0 +1,165 @@
+/*
+* ============LICENSE_START=======================================================
+* ONAP : DataLake
+* ================================================================================
+* Copyright 2019 China Mobile
+*=================================================================================
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* ============LICENSE_END=========================================================
+*/
+package org.onap.datalake.feeder.domain;
+
+import java.util.function.Predicate;
+
+import javax.validation.constraints.NotNull;
+
+import org.onap.datalake.feeder.enumeration.DataFormat;
+import org.springframework.data.annotation.Id;
+import org.springframework.data.annotation.Transient;
+import org.springframework.data.couchbase.core.mapping.Document;
+
+/**
+ * Domain class representing topic table in Couchbase
+ *
+ * @author Guobiao Mo
+ *
+ */
+@Document
+public class Topic {
+ @NotNull
+ @Id
+ private String id;//topic name
+
+ @Transient
+ private Topic defaultTopic;
+
+ //for protected Kafka topics
+ private String login;
+ private String pass;
+
+ /**
+ * indicate if we should monitor this topic
+ */
+ private Boolean enabled;
+
+ /**
+ * save raw message text
+ */
+ private Boolean saveRaw;
+
+ /**
+ * true: save it to Elasticsearch false: don't save null: use default
+ */
+ private Boolean supportElasticsearch;
+ private Boolean supportCouchbase;
+ private Boolean supportDruid;
+
+ /**
+ * need to explicitly tell feeder the data format of the message
+ * support JSON, XML, YAML, TEXT
+ */
+ private DataFormat dataFormat;
+
+ /**
+ * TTL in day
+ */
+ private Integer ttl;
+
+ //if this flag is true, need to correlate alarm cleared message to previous alarm
+ private Boolean correlateClearedMessage;
+
+ public Topic() {
+ }
+
+ public Topic(String id) {
+ this.id = id;
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public void setDefaultTopic(Topic defaultTopic) {
+ this.defaultTopic = defaultTopic;
+ }
+
+ public boolean isEnabled() {
+ return is(enabled, Topic::isEnabled);
+ }
+
+ public boolean isCorrelateClearedMessage() {
+ return is(correlateClearedMessage, Topic::isCorrelateClearedMessage);
+ }
+
+ public int getTtl() {
+ if (ttl != null) {
+ return ttl;
+ } else if (defaultTopic != null) {
+ return defaultTopic.getTtl();
+ } else {
+ return 3650;//default to 10 years for safe
+ }
+ }
+
+ public DataFormat getDataFormat() {
+ if (dataFormat != null) {
+ return dataFormat;
+ } else if (defaultTopic != null) {
+ return defaultTopic.getDataFormat();
+ } else {
+ return null;
+ }
+ }
+
+ //if 'this' Topic does not have the setting, use default Topic's
+ private boolean is(Boolean b, Predicate<Topic> pre) {
+ if (b != null) {
+ return b;
+ } else if (defaultTopic != null) {
+ return pre.test(defaultTopic);
+ } else {
+ return false;
+ }
+ }
+
+ public boolean isSaveRaw() {
+ return is(saveRaw, Topic::isSaveRaw);
+ }
+
+ public boolean isSupportElasticsearch() {
+ return is(supportElasticsearch, Topic::isSupportElasticsearch);
+ }
+
+ public boolean isSupportCouchbase() {
+ return is(supportCouchbase, Topic::isSupportCouchbase);
+ }
+
+ public boolean isSupportDruid() {
+ return is(supportDruid, Topic::isSupportDruid);
+ }
+
+ @Override
+ public String toString() {
+ return id;
+ }
+
+ // for testing
+ public static void main(String[] args) {
+ Topic defaultTopic=new Topic("def");
+ Topic test = new Topic("test");
+ test.setDefaultTopic(defaultTopic);
+ defaultTopic.supportElasticsearch=true;
+ boolean b = test.isSupportElasticsearch();
+ System.out.println(b);
+ }
+}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/enumeration/DataFormat.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/enumeration/DataFormat.java
new file mode 100644
index 00000000..83ffac18
--- /dev/null
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/enumeration/DataFormat.java
@@ -0,0 +1,30 @@
+/*
+* ============LICENSE_START=======================================================
+* ONAP : DCAE
+* ================================================================================
+* Copyright 2018 TechMahindra
+*=================================================================================
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* ============LICENSE_END=========================================================
+*/
+package org.onap.datalake.feeder.enumeration;
+
+/**
+ * Data format of DMaaP messages
+ *
+ * @author Guobiao Mo
+ *
+ */
+public enum DataFormat {
+ JSON, XML, YAML, TEXT;
+}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepository.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepository.java
new file mode 100644
index 00000000..37d1a669
--- /dev/null
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepository.java
@@ -0,0 +1,67 @@
+/*
+* ============LICENSE_START=======================================================
+* ONAP : DataLake
+* ================================================================================
+* Copyright 2019 China Mobile
+*=================================================================================
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* ============LICENSE_END=========================================================
+*/
+package org.onap.datalake.feeder.repository;
+
+import org.onap.datalake.feeder.domain.Topic;
+import org.springframework.data.couchbase.core.query.N1qlPrimaryIndexed;
+import org.springframework.data.couchbase.core.query.Query;
+import org.springframework.data.couchbase.core.query.ViewIndexed;
+import org.springframework.data.couchbase.repository.CouchbasePagingAndSortingRepository;
+import org.springframework.data.domain.Page;
+import org.springframework.data.domain.Pageable;
+
+
+import java.util.List;
+
+/**
+ *
+ * Topic Repository interface, implementation is taken care by Spring framework.
+ * Customization is done through TopicRepositoryCustom and its implementation TopicRepositoryImpl.
+ *
+ * @author Guobiao Mo
+ *
+ */
+@ViewIndexed(designDoc = "topic", viewName = "all")
+public interface TopicRepository extends CouchbasePagingAndSortingRepository<Topic, String>, TopicRepositoryCustom {
+/*
+ Topic findFirstById(String topic);
+
+ Topic findByIdAndState(String topic, boolean state);
+
+ //Supports native JSON query string
+ @Query("{topic:'?0'}")
+ Topic findTopicById(String topic);
+
+ @Query("{topic: { $regex: ?0 } })")
+ List<Topic> findTopicByRegExId(String topic);
+
+
+ //Page<Topic> findByCompanyIdAndNameLikeOrderByName(String companyId, String name, Pageable pageable);
+
+ @Query("#{#n1ql.selectEntity} where #{#n1ql.filter} and companyId = $1 and $2 within #{#n1ql.bucket}")
+ Topic findByCompanyAndAreaId(String companyId, String areaId);
+
+ @Query("#{#n1ql.selectEntity} where #{#n1ql.filter} AND ANY phone IN phoneNumbers SATISFIES phone = $1 END")
+ List<Topic> findByPhoneNumber(String telephoneNumber);
+
+ @Query("SELECT COUNT(*) AS count FROM #{#n1ql.bucket} WHERE #{#n1ql.filter} and companyId = $1")
+ Long countBuildings(String companyId);
+ */
+}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepositoryCustom.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepositoryCustom.java
new file mode 100644
index 00000000..220a8f76
--- /dev/null
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepositoryCustom.java
@@ -0,0 +1,28 @@
+/*
+* ============LICENSE_START=======================================================
+* ONAP : DataLake
+* ================================================================================
+* Copyright 2019 China Mobile
+*=================================================================================
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* ============LICENSE_END=========================================================
+*/
+package org.onap.datalake.feeder.repository;
+
+/**
+ * @author Guobiao Mo
+ *
+ */
+public interface TopicRepositoryCustom {
+ long updateTopic(String topic, Boolean state);
+}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepositoryImpl.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepositoryImpl.java
new file mode 100644
index 00000000..018d5b95
--- /dev/null
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepositoryImpl.java
@@ -0,0 +1,67 @@
+/*
+* ============LICENSE_START=======================================================
+* ONAP : DataLake
+* ================================================================================
+* Copyright 2019 China Mobile
+*=================================================================================
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* ============LICENSE_END=========================================================
+*/
+package org.onap.datalake.feeder.repository;
+
+import org.onap.datalake.feeder.domain.Topic;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.couchbase.core.CouchbaseTemplate;
+/*
+import org.springframework.data.mongodb.MongoDbFactory;
+import org.springframework.data.mongodb.core.MongoTemplate;
+import org.springframework.data.mongodb.core.convert.DefaultDbRefResolver;
+import org.springframework.data.mongodb.core.convert.DefaultMongoTypeMapper;
+import org.springframework.data.mongodb.core.convert.MappingMongoConverter;
+import org.springframework.data.mongodb.core.mapping.MongoMappingContext;
+import org.springframework.data.mongodb.core.query.Criteria;
+import org.springframework.data.mongodb.core.query.Query;
+import org.springframework.data.mongodb.core.query.Update;
+
+import com.mongodb.WriteResult;
+import com.mongodb.client.result.UpdateResult;
+*/
+import java.util.List;
+
+/**
+ * @author Guobiao Mo
+ *
+ */
+public class TopicRepositoryImpl implements TopicRepositoryCustom {
+
+ @Autowired
+ CouchbaseTemplate template;
+
+ @Override
+ public long updateTopic(String topic, Boolean state) {
+/*
+ Query query = new Query(Criteria.where("id").is(topic));
+ Update update = new Update();
+ update.set("state", state);
+
+ UpdateResult result = mongoTemplate.updateFirst(query, update, Topic.class);
+
+ if(result!=null)
+ return result.getModifiedCount();
+ else
+ */ return 0L;
+
+
+
+ }
+}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java
new file mode 100644
index 00000000..35432587
--- /dev/null
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java
@@ -0,0 +1,129 @@
+/*
+* ============LICENSE_START=======================================================
+* ONAP : DATALAKE
+* ================================================================================
+* Copyright 2019 China Mobile
+*=================================================================================
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* ============LICENSE_END=========================================================
+*/
+
+package org.onap.datalake.feeder.service;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+
+import org.json.JSONObject;
+import org.onap.datalake.feeder.config.ApplicationConfiguration;
+import org.onap.datalake.feeder.domain.Topic;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import com.couchbase.client.java.Bucket;
+import com.couchbase.client.java.Cluster;
+import com.couchbase.client.java.CouchbaseCluster;
+import com.couchbase.client.java.document.JsonDocument;
+import com.couchbase.client.java.document.JsonLongDocument;
+import com.couchbase.client.java.document.json.JsonObject;
+
+import rx.Observable;
+import rx.functions.Func1;
+
+/**
+ * Service to use Couchbase
+ *
+ * @author Guobiao Mo
+ *
+ */
+@Service
+public class CouchbaseService {
+
+ private final Logger log = LoggerFactory.getLogger(this.getClass());
+
+ @Autowired
+ private ApplicationConfiguration config;
+
+ Bucket bucket;
+
+ @PostConstruct
+ private void init() {
+ // Initialize Couchbase Connection
+ Cluster cluster = CouchbaseCluster.create(config.getCouchbaseHost());
+ cluster.authenticate(config.getCouchbaseUser(), config.getCouchbasePass());
+ bucket = cluster.openBucket(config.getCouchbaseBucket());
+
+ log.info("Connect to Couchbase " + config.getCouchbaseHost());
+
+ // Create a N1QL Primary Index (but ignore if it exists)
+ bucket.bucketManager().createN1qlPrimaryIndex(true, false);
+ }
+
+ @PreDestroy
+ public void cleanUp() {
+ bucket.close();
+ }
+
+ public void saveJsons(Topic topic, List<JSONObject> jsons) {
+ List<JsonDocument> documents= new ArrayList<>(jsons.size());
+ for(JSONObject json : jsons) {
+ //convert to Couchbase JsonObject from org.json JSONObject
+ JsonObject jsonObject = JsonObject.fromJson(json.toString());
+
+ long timestamp = jsonObject.getLong("_ts");//this is Kafka time stamp, which is added in StoreService.messageToJson()
+
+ //setup TTL
+ int expiry = (int) (timestamp/1000L) + topic.getTtl()*3600*24; //in second
+
+ String id = getId(topic.getId());
+ JsonDocument doc = JsonDocument.create(id, expiry, jsonObject);
+ documents.add(doc);
+ }
+ saveDocuments(documents);
+ }
+
+
+ private String getId(String topicStr) {
+ //String id = topicStr+":"+timestamp+":"+UUID.randomUUID();
+
+ //https://forums.couchbase.com/t/how-to-set-an-auto-increment-id/4892/2
+ //atomically get the next sequence number:
+ // increment by 1, initialize at 0 if counter doc not found
+ //TODO how slow is this compared with above UUID approach?
+ JsonLongDocument nextIdNumber = bucket.counter(topicStr, 1, 0); //like 12345
+ String id = topicStr +":"+ nextIdNumber.content();
+
+ return id;
+ }
+
+ //https://docs.couchbase.com/java-sdk/2.7/document-operations.html
+ private void saveDocuments(List<JsonDocument> documents) {
+ Observable
+ .from(documents)
+ .flatMap(new Func1<JsonDocument, Observable<JsonDocument>>() {
+ @Override
+ public Observable<JsonDocument> call(final JsonDocument docToInsert) {
+ return bucket.async().insert(docToInsert);
+ }
+ })
+ .last()
+ .toBlocking()
+ .single();
+ }
+
+}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java
new file mode 100644
index 00000000..96ad81bf
--- /dev/null
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java
@@ -0,0 +1,94 @@
+/*
+* ============LICENSE_START=======================================================
+* ONAP : DATALAKE
+* ================================================================================
+* Copyright 2019 China Mobile
+*=================================================================================
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* ============LICENSE_END=========================================================
+*/
+
+package org.onap.datalake.feeder.service;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.annotation.PostConstruct;
+
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.onap.datalake.feeder.config.ApplicationConfiguration;
+import org.onap.datalake.feeder.domain.Topic;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+/**
+ * This service will handle all the communication with Kafka
+ *
+ * @author Guobiao Mo
+ *
+ */
+@Service
+public class DmaapService {
+
+ private final Logger log = LoggerFactory.getLogger(this.getClass());
+
+ @Autowired
+ private ApplicationConfiguration config;
+
+ @Autowired
+ private TopicService topicService;
+
+
+ @PostConstruct
+ private void init() {
+
+ }
+
+ //get all topic names from Zookeeper
+ public List<String> getTopics() {
+ try {
+ Watcher watcher = new Watcher() {
+ @Override
+ public void process(WatchedEvent event) {
+ // TODO monitor new topics
+
+ }
+ };
+ ZooKeeper zk = new ZooKeeper(config.getDmaapZookeeperHostPort(), 10000, watcher);
+ List<String> topics = zk.getChildren("/brokers/topics", false);
+ return topics;
+ } catch (Exception e) {
+ log.error("Can not get topic list from Zookeeper, for testing, going to use hard coded topic list.", e);
+ return null;
+ }
+ }
+
+ public List<String> getActiveTopics() throws IOException {
+ List<String> allTopics = new ArrayList<>(getTopics());
+
+ List<String> ret = new ArrayList<>();
+ for (String topicStr : allTopics) {
+ Topic topic = topicService.getEffectiveTopic(topicStr, true);
+ if (topic.isEnabled()) {
+ ret.add(topicStr);
+ }
+ }
+ return ret;
+ }
+
+}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java
new file mode 100644
index 00000000..cbcc5f86
--- /dev/null
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java
@@ -0,0 +1,142 @@
+/*
+* ============LICENSE_START=======================================================
+* ONAP : DATALAKE
+* ================================================================================
+* Copyright 2019 China Mobile
+*=================================================================================
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* ============LICENSE_END=========================================================
+*/
+
+package org.onap.datalake.feeder.service;
+
+import java.io.IOException;
+import java.util.List;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.ElasticsearchStatusException;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
+import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.common.xcontent.XContentType;
+import org.json.JSONObject;
+import org.onap.datalake.feeder.config.ApplicationConfiguration;
+import org.onap.datalake.feeder.domain.Topic;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+/**
+ * Service to use Elasticsearch
+ *
+ * @author Guobiao Mo
+ *
+ */
+@Service
+public class ElasticsearchService {
+
+ private final Logger log = LoggerFactory.getLogger(this.getClass());
+
+ @Autowired
+ private ApplicationConfiguration config;
+
+ private RestHighLevelClient client;
+ ActionListener<BulkResponse> listener;
+
+ @PostConstruct
+ private void init() {
+ String elasticsearchHost = config.getElasticsearchHost();
+
+ // Initialize the Connection
+ client = new RestHighLevelClient(RestClient.builder(new HttpHost(elasticsearchHost, 9200, "http"), new HttpHost(elasticsearchHost, 9201, "http")));
+
+ log.info("Connect to Elasticsearch Host " + elasticsearchHost);
+
+ listener = new ActionListener<BulkResponse>() {
+ @Override
+ public void onResponse(BulkResponse bulkResponse) {
+
+ }
+
+ @Override
+ public void onFailure(Exception e) {
+ log.error(e.getMessage());
+ }
+ };
+ }
+
+ @PreDestroy
+ public void cleanUp() throws IOException {
+ client.close();
+ }
+
+ public void ensureTableExist(String topic) throws IOException {
+ String topicLower = topic.toLowerCase();
+
+ CreateIndexRequest createIndexRequest = new CreateIndexRequest(topicLower);
+ try {
+ CreateIndexResponse createIndexResponse = client.indices().create(createIndexRequest, RequestOptions.DEFAULT);
+ log.info(createIndexResponse.index()+" : created "+createIndexResponse.isAcknowledged());
+ }catch(ElasticsearchStatusException e) {
+ log.info("{} create ES topic status: {}", topic, e.getDetailedMessage());
+ }
+ }
+
+ //TTL is not supported in Elasticsearch 5.0 and later, what can we do? FIXME
+ public void saveJsons(Topic topic, List<JSONObject> jsons) {
+ BulkRequest request = new BulkRequest();
+
+ for (JSONObject json : jsons) {
+ if(topic.isCorrelateClearedMessage()) {
+ boolean found = correlateClearedMessage(json);
+ if(found) {
+ continue;
+ }
+ }
+ request.add(new IndexRequest(topic.getId().toLowerCase(), "doc").source(json.toString(), XContentType.JSON));
+ }
+ if(config.isAsync()) {
+ client.bulkAsync(request, RequestOptions.DEFAULT, listener);
+ }else {
+ try {
+ client.bulk(request, RequestOptions.DEFAULT);
+ } catch (IOException e) {
+ log.error( topic.getId() , e);
+ }
+ }
+ }
+
+ private boolean correlateClearedMessage(JSONObject json) {
+ boolean found = false;
+
+ /*TODO
+ * 1. check if this is a alarm cleared message
+ * 2. search previous alarm message
+ * 3. update previous message, if success, set found=true
+ */
+
+ return found;
+ }
+
+}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullService.java
new file mode 100644
index 00000000..3dcbd8ee
--- /dev/null
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullService.java
@@ -0,0 +1,117 @@
+/*
+* ============LICENSE_START=======================================================
+* ONAP : DATALAKE
+* ================================================================================
+* Copyright 2019 China Mobile
+*=================================================================================
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* ============LICENSE_END=========================================================
+*/
+
+package org.onap.datalake.feeder.service;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.PostConstruct;
+
+import org.onap.datalake.feeder.config.ApplicationConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.ApplicationContext;
+import org.springframework.stereotype.Service;
+
+/**
+ * Service that pulls messages from DMaaP and save them to Big Data DBs
+ *
+ * @author Guobiao Mo
+ *
+ */
+
+@Service
+public class PullService {
+
+ private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+ private boolean isRunning = false;
+ private ExecutorService executorService;
+ private List<PullThread> consumers;
+
+ @Autowired
+ private ApplicationContext context;
+
+ @Autowired
+ private ApplicationConfiguration config;
+
+ @PostConstruct
+ private void init() {
+ }
+
+ /**
+ * start pulling.
+ *
+ * @throws IOException
+ */
+ public synchronized void start() throws IOException {
+ if (isRunning) {
+ return;
+ }
+
+ logger.info("start pulling ...");
+
+ int numConsumers = config.getKafkaConsumerCount();
+
+ executorService = Executors.newFixedThreadPool(numConsumers);
+ consumers = new ArrayList<>(numConsumers);
+
+ for (int i = 0; i < numConsumers; i++) {
+ PullThread puller = context.getBean(PullThread.class, i);
+ consumers.add(puller);
+ executorService.submit(puller);
+ }
+
+ isRunning = true;
+
+ Runtime.getRuntime().addShutdownHook(new Thread(()->shutdown())) ;
+ }
+
+ /**
+ * stop pulling
+ */
+ public synchronized void shutdown() {
+ if (!isRunning) {
+ return;
+ }
+
+ logger.info("stop pulling ...");
+ for (PullThread puller : consumers) {
+ puller.shutdown();
+ }
+
+ executorService.shutdown();
+
+ try {
+ executorService.awaitTermination(10L, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ logger.error("executor.awaitTermination", e);
+ }
+
+ isRunning = false;
+ }
+
+}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullThread.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullThread.java
new file mode 100644
index 00000000..3c75eb68
--- /dev/null
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullThread.java
@@ -0,0 +1,168 @@
+/*
+* ============LICENSE_START=======================================================
+* ONAP : DATALAKE
+* ================================================================================
+* Copyright 2019 China Mobile
+*=================================================================================
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* ============LICENSE_END=========================================================
+*/
+
+package org.onap.datalake.feeder.service;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.annotation.PostConstruct;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.onap.datalake.feeder.config.ApplicationConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.config.ConfigurableBeanFactory;
+import org.springframework.context.annotation.Scope;
+import org.springframework.stereotype.Service;
+
+/**
+ * Thread that pulls messages from DMaaP and save them to Big Data DBs
+ *
+ * @author Guobiao Mo
+ *
+ */
+
+@Service
+@Scope(value=ConfigurableBeanFactory.SCOPE_PROTOTYPE)
+public class PullThread implements Runnable {
+
+ @Autowired
+ private DmaapService dmaapService;
+
+ @Autowired
+ private StoreService storeService;
+
+ @Autowired
+ private ApplicationConfiguration config;
+
+ private final Logger log = LoggerFactory.getLogger(this.getClass());
+
+ private KafkaConsumer<String, String> consumer; //<String, String> is key-value type, in our case key is empty, value is JSON text
+ private int id;
+
+ private final AtomicBoolean active = new AtomicBoolean(false);
+ private boolean async;
+
+ public PullThread(int id) {
+ this.id = id;
+ }
+
+ @PostConstruct
+ private void init() {
+ async = config.isAsync();
+ Properties consumerConfig = getConsumerConfig();
+ consumer = new KafkaConsumer<>(consumerConfig);
+ }
+
+ private Properties getConsumerConfig() {
+ Properties consumerConfig = new Properties();
+
+ consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getDmaapKafkaHostPort());
+ consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, config.getDmaapKafkaGroup());
+ consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
+ consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
+ consumerConfig.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.RoundRobinAssignor");
+ consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+
+ return consumerConfig;
+ }
+
+ /**
+ * start pulling.
+ */
+ @Override
+ public void run() {
+ active.set(true);
+
+ DummyRebalanceListener rebalanceListener = new DummyRebalanceListener();
+
+ try {
+ List<String> topics = dmaapService.getActiveTopics(); //TODO get updated topic list within loop
+
+ log.info("Thread {} going to subscribe to topics: {}", id, topics);
+
+ consumer.subscribe(topics, rebalanceListener);
+
+ while (active.get()) {
+
+ ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(config.getDmaapKafkaTimeout()));
+
+ List<Pair<Long, String>> messages = new ArrayList<>(records.count());
+ for (TopicPartition partition : records.partitions()) {
+ messages.clear();
+ List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
+ for (ConsumerRecord<String, String> record : partitionRecords) {
+ messages.add(Pair.of(record.timestamp(), record.value()));
+ log.debug("threadid={} topic={}, timestamp={} key={}, offset={}, partition={}, value={}", id, record.topic(), record.timestamp(), record.key(), record.offset(), record.partition(), record.value());
+ }
+ storeService.saveMessages(partition.topic(), messages);
+ log.info("topic={} count={}", partition.topic(), partitionRecords.size());//TODO we may record this number to DB
+
+ if (!async) {//for reliability, sync commit offset to Kafka, this slows down a bit
+ long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
+ consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
+ }
+ }
+
+ if (async) {//for high Throughput, async commit offset in batch to Kafka
+ consumer.commitAsync();
+ }
+ }
+ } catch (Exception e) {
+ log.error("Puller {} run(): exception={}", id, e.getMessage());
+ log.error("", e);
+ } finally {
+ consumer.close();
+ }
+ }
+
+ public void shutdown() {
+ active.set(false);
+ consumer.wakeup();
+ }
+
+ private class DummyRebalanceListener implements ConsumerRebalanceListener {
+ @Override
+ public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
+ log.info("Called onPartitionsRevoked with partitions: {}", partitions);
+ }
+
+ @Override
+ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
+ log.info("Called onPartitionsAssigned with partitions: {}", partitions);
+ }
+ }
+
+}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java
new file mode 100644
index 00000000..1cd3a8a1
--- /dev/null
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java
@@ -0,0 +1,164 @@
+/*
+* ============LICENSE_START=======================================================
+* ONAP : DATALAKE
+* ================================================================================
+* Copyright 2019 China Mobile
+*=================================================================================
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* ============LICENSE_END=========================================================
+*/
+
+package org.onap.datalake.feeder.service;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.json.XML;
+import org.onap.datalake.feeder.domain.Topic;
+import org.onap.datalake.feeder.enumeration.DataFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.databind.JsonMappingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+
+/**
+ * Service to store messages to varieties of DBs
+ *
+ * comment out YAML support, since AML is for config and don't see this data type in DMaaP. Do we need to support XML?
+ *
+ * @author Guobiao Mo
+ *
+ */
+@Service
+public class StoreService {
+ private final Logger log = LoggerFactory.getLogger(this.getClass());
+
+ @Autowired
+ private TopicService topicService;
+
+ @Autowired
+ private CouchbaseService couchbaseService;
+
+ @Autowired
+ private ElasticsearchService elasticsearchService;
+
+ private Map<String, Topic> topicMap = new HashMap<>();
+
+ private ObjectMapper yamlReader;
+
+ @PostConstruct
+ private void init() {
+ yamlReader = new ObjectMapper(new YAMLFactory());
+ }
+
+ @PreDestroy
+ public void cleanUp() {
+ }
+
+ public void saveMessages(String topicStr, List<Pair<Long, String>> messages) {//pair=ts+text
+ if (messages == null || messages.isEmpty()) {
+ return;
+ }
+
+ Topic topic = topicMap.computeIfAbsent(topicStr, k -> { //TODO get topic updated settings from DB periodically
+ return topicService.getEffectiveTopic(topicStr);
+ });
+
+ List<JSONObject> docs = new ArrayList<>();
+
+ for (Pair<Long, String> pair : messages) {
+ try {
+ docs.add(messageToJson(topic, pair));
+ } catch (Exception e) {
+ log.error(pair.getRight(), e);
+ }
+ }
+
+ saveJsons(topic, docs);
+ }
+
+ private JSONObject messageToJson(Topic topic, Pair<Long, String> pair) throws JSONException, JsonParseException, JsonMappingException, IOException {
+
+ long timestamp = pair.getLeft();
+ String text = pair.getRight();
+
+ //for debug, to be remove
+// String topicStr = topic.getId();
+// if (!"TestTopic1".equals(topicStr) && !"msgrtr.apinode.metrics.dmaap".equals(topicStr) && !"AAI-EVENT".equals(topicStr) && !"unauthenticated.DCAE_CL_OUTPUT".equals(topicStr) && !"unauthenticated.SEC_FAULT_OUTPUT".equals(topicStr)) {
+ // log.debug("{} ={}", topicStr, text);
+ //}
+
+ boolean storeRaw = topic.isSaveRaw();
+
+ JSONObject json = null;
+
+ DataFormat dataFormat = topic.getDataFormat();
+
+ switch (dataFormat) {
+ case JSON:
+ json = new JSONObject(text);
+ break;
+ case XML://XML and YAML can be directly inserted into ES, we may not need to convert it to JSON
+ json = XML.toJSONObject(text);
+ break;
+ case YAML:// Do we need to support YAML?
+ Object obj = yamlReader.readValue(text, Object.class);
+ ObjectMapper jsonWriter = new ObjectMapper();
+ String jsonString = jsonWriter.writeValueAsString(obj);
+ json = new JSONObject(jsonString);
+ break;
+ default:
+ json = new JSONObject();
+ storeRaw = true;
+ break;
+ }
+
+ //FIXME for debug, to be remove
+ json.remove("_id");
+ json.remove("_dl_text_");
+
+ json.put("_ts", timestamp);
+ if (storeRaw) {
+ json.put("_text", text);
+ }
+
+ return json;
+ }
+
+ private void saveJsons(Topic topic, List<JSONObject> jsons) {
+ if (topic.isSupportCouchbase()) {
+ couchbaseService.saveJsons(topic, jsons);
+ }
+
+ if (topic.isSupportElasticsearch()) {
+ elasticsearchService.saveJsons(topic, jsons);
+ }
+ }
+
+}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java
new file mode 100644
index 00000000..9b8fabc1
--- /dev/null
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java
@@ -0,0 +1,94 @@
+/*
+* ============LICENSE_START=======================================================
+* ONAP : DATALAKE
+* ================================================================================
+* Copyright 2019 China Mobile
+*=================================================================================
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* ============LICENSE_END=========================================================
+*/
+
+package org.onap.datalake.feeder.service;
+
+import java.io.IOException;
+import java.util.Optional;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+
+import org.onap.datalake.feeder.domain.Topic;
+import org.onap.datalake.feeder.repository.TopicRepository;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+/**
+ * Service for topics topic setting is stored in Couchbase, bucket 'dl', see
+ * application.properties for Spring setup
+ *
+ * @author Guobiao Mo
+ *
+ */
+@Service
+public class TopicService {
+
+ private final Logger log = LoggerFactory.getLogger(this.getClass());
+
+ @Autowired
+ private TopicRepository topicRepository;
+
+ @Autowired
+ private ElasticsearchService elasticsearchService;
+
+ @PostConstruct
+ private void init() {
+ }
+
+ @PreDestroy
+ public void cleanUp() {
+ }
+
+ public Topic getEffectiveTopic(String topicStr) {
+ try {
+ return getEffectiveTopic(topicStr, false);
+ } catch (IOException e) {
+ log.error(topicStr, e);
+ }
+ return null;
+ }
+
+ public Topic getEffectiveTopic(String topicStr, boolean ensureTableExist) throws IOException {
+ Topic topic = getTopic(topicStr);
+ if (topic == null) {
+ topic = new Topic(topicStr);
+ }
+
+ topic.setDefaultTopic(getDefaultTopic());
+
+ if(ensureTableExist && topic.isEnabled() && topic.isSupportElasticsearch()) {
+ elasticsearchService.ensureTableExist(topicStr);
+ }
+ return topic;
+ }
+
+ public Topic getTopic(String topicStr) {
+ Optional<Topic> ret = topicRepository.findById(topicStr);
+ return ret.isPresent() ? ret.get() : null;
+ }
+
+ public Topic getDefaultTopic() {
+ return getTopic("_DL_DEFAULT_");
+ }
+
+}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/util/DruidSupervisorGenerator.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/util/DruidSupervisorGenerator.java
new file mode 100644
index 00000000..819590b7
--- /dev/null
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/util/DruidSupervisorGenerator.java
@@ -0,0 +1,172 @@
+/*
+* ============LICENSE_START=======================================================
+* ONAP : DataLake
+* ================================================================================
+* Copyright 2019 China Mobile
+*=================================================================================
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* ============LICENSE_END=========================================================
+*/
+package org.onap.datalake.feeder.util;
+
+import java.io.BufferedWriter;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.velocity.Template;
+import org.apache.velocity.VelocityContext;
+import org.apache.velocity.app.Velocity;
+import org.apache.velocity.runtime.RuntimeConstants;
+import org.apache.velocity.runtime.resource.loader.ClasspathResourceLoader;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.JsonNodeType;
+
+
+/*
+ * read sample json and output supervisor to resources\druid\generated
+ * need manual edit to be production ready, final versions are in resources\druid
+ *
+ * http://druid.io/docs/latest/tutorials/tutorial-ingestion-spec.html
+ * http://druid.io/docs/latest/ingestion/flatten-json
+ *
+ *
+ * todo:
+ * reduce the manual editing
+ * path hard coded
+ * auto get topics,
+ * auto get sample, and for each topic, get multiple samples.
+ * make supervisor file names consistent
+ * dimension type default is string, in msgrtr.apinode.metrics.dmaap , many are long/double, so need to generate dimensionsSpec, this is done at the end of printFlattenSpec()
+ */
+
+public class DruidSupervisorGenerator {
+
+ Template template = null;
+ VelocityContext context;
+
+ List<String[]> dimensions;
+
+ public DruidSupervisorGenerator() {
+ dimensions = new ArrayList<>();
+
+ Velocity.setProperty(RuntimeConstants.RESOURCE_LOADER, "classpath");
+ Velocity.setProperty("classpath.resource.loader.class", ClasspathResourceLoader.class.getName());
+
+ Velocity.init();
+
+ context = new VelocityContext();
+
+ context.put("host", "dl_dmaap_kf");
+
+ template = Velocity.getTemplate("druid/kafka-supervisor-template.vm");
+ }
+
+ public void printNode(String prefix, JsonNode node) {
+
+ // lets see what type the node is
+ // System.out.println("NodeType=" + node.getNodeType() + ", isContainerNode=" + node.isContainerNode() + ", " + node); // prints OBJECT
+
+ if (node.isContainerNode()) {
+
+ Iterator<Entry<String, JsonNode>> fields = node.fields();
+
+ while (fields.hasNext()) {
+ Entry<String, JsonNode> field = fields.next();
+ // System.out.println("--------"+field.getKey()+"--------");
+ printNode(prefix + "." + field.getKey(), field.getValue());
+ }
+
+ if (node.isArray()) {
+ Iterator<JsonNode> elements = node.elements();
+ int i = 0;
+ while (elements.hasNext()) {
+ JsonNode element = elements.next();
+ printNode(prefix + "[" + i + "]", element);
+ i++;
+ }
+ }
+
+ } else {
+ printFlattenSpec(node.getNodeType(), prefix);
+ }
+
+ }
+
+ public void printFlattenSpec(JsonNodeType type, String path) {
+ String name = path.substring(2).replace('.', ':');
+ // lets see what type the node is
+ System.out.println("{");
+ System.out.println("\"type\": \"path\",");
+ System.out.println("\"name\": \"" + name + "\",");
+ System.out.println("\"expr\": \"" + path + "\"");
+ System.out.println("},");
+
+ dimensions.add(new String[] { name, path });
+ /*
+ //for dimensionsSpec
+ if (JsonNodeType.NUMBER.equals(type)) {
+ System.out.println("{");
+ System.out.println("\"type\": \"long\",");
+ System.out.println("\"name\": \"" + name + "\",");
+ System.out.println("},");
+ } else {
+ System.out.println("\"" + name + "\",");
+
+ }
+ */
+ }
+
+ public void doTopic(String topic) throws IOException {
+ dimensions.clear();
+
+ String sampleFileName = "C:\\git\\onap\\datalake\\olap\\src\\main\\resources\\druid\\" + topic + "-sample-format.json";//FIXME hard coded path
+ String outputFileName = "C:\\git\\onap\\datalake\\olap\\src\\main\\resources\\druid\\generated\\" + topic + "-kafka-supervisor.json";
+
+ // Get the contents of json as a string using commons IO IOUTils class.
+ String sampleJson = Util.getTextFromFile(sampleFileName);
+
+ // create an ObjectMapper instance.
+ ObjectMapper mapper = new ObjectMapper();
+ // use the ObjectMapper to read the json string and create a tree
+ JsonNode root = mapper.readTree(sampleJson);
+ printNode("$", root);
+
+ context.put("topic", topic);
+ context.put("timestamp", "event-header:timestamp");//FIXME hard coded, should be topic based
+ context.put("timestampFormat", "yyyyMMdd-HH:mm:ss:SSS");//FIXME hard coded, should be topic based
+
+ context.put("dimensions", dimensions);
+
+ BufferedWriter out = new BufferedWriter(new FileWriter(outputFileName));
+
+ template.merge(context, out);
+ out.close();
+ }
+
+ public static void main(String[] args) throws MalformedURLException, IOException {
+ String[] topics = new String[] { "AAI-EVENT", "msgrtr.apinode.metrics.dmaap", "unauthenticated.DCAE_CL_OUTPUT", "unauthenticated.SEC_FAULT_OUTPUT" };//FIXME hard coded
+
+ DruidSupervisorGenerator p = new DruidSupervisorGenerator();
+
+ for (String topic : topics) {
+ p.doTopic(topic);
+ }
+ }
+}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/util/Util.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/util/Util.java
new file mode 100644
index 00000000..89e81af1
--- /dev/null
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/util/Util.java
@@ -0,0 +1,65 @@
+/*
+* ============LICENSE_START=======================================================
+* ONAP : DCAE
+* ================================================================================
+* Copyright 2019 China Mobile
+*=================================================================================
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* ============LICENSE_END=========================================================
+*/
+
+package org.onap.datalake.feeder.util;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.commons.io.FileUtils;
+
+/**
+ * utils
+ *
+ * @author Guobiao Mo
+ *
+ */
+public class Util {
+
+ // https://commons.apache.org/proper/commons-io/description.html
+ public static String getTextFromFile(String fileName) throws IOException {
+ File file = new File(fileName);
+ String string = FileUtils.readFileToString(file, "UTF-8");
+ return string;
+ }
+
+ /**
+ * given a json file, remove dot(.) in all keys
+ * http://www.vogella.com/tutorials/JavaRegularExpressions/article.html
+ */
+ public static String replaceDotInKey(String json) {
+ String regex = "(\"[\\-\\w]+)(\\.)([\\.\\-\\w]+\\\\?\"\\:)";
+
+ String newJson = json.replaceAll(regex, "$1_$3");
+
+ if (json.equals(newJson)) {
+ return json;
+ } else {
+ return replaceDotInKey(newJson);// there maybe more to replace
+ }
+ }
+
+ public static void main(String[] args) {
+ String a = "\"u-y.t.y-t\":\"u.gfh\",\\\"jg.h\\\":\"j_9889\"";
+ String b = replaceDotInKey(a);
+ System.out.println(a);
+ System.out.println(b);
+ }
+}