From da7037659eb13e018c61ec06b3824f13ce1c5e53 Mon Sep 17 00:00:00 2001 From: Guobiao Mo Date: Fri, 22 Feb 2019 15:36:27 -0800 Subject: DataLake seed code Issue-ID: DCAEGEN2-1189 Change-Id: Ib25b70197e3b162efadbce0f1b5235e3ba6635e9 Signed-off-by: Guobiao Mo --- .../java/org/onap/datalake/feeder/Application.java | 46 ++++++ .../feeder/config/ApplicationConfiguration.java | 72 +++++++++ .../datalake/feeder/controller/PullController.java | 77 +++++++++ .../feeder/controller/TopicController.java | 131 ++++++++++++++++ .../org/onap/datalake/feeder/domain/Topic.java | 165 ++++++++++++++++++++ .../datalake/feeder/enumeration/DataFormat.java | 30 ++++ .../feeder/repository/TopicRepository.java | 67 ++++++++ .../feeder/repository/TopicRepositoryCustom.java | 28 ++++ .../feeder/repository/TopicRepositoryImpl.java | 67 ++++++++ .../datalake/feeder/service/CouchbaseService.java | 129 ++++++++++++++++ .../onap/datalake/feeder/service/DmaapService.java | 94 +++++++++++ .../feeder/service/ElasticsearchService.java | 142 +++++++++++++++++ .../onap/datalake/feeder/service/PullService.java | 117 ++++++++++++++ .../onap/datalake/feeder/service/PullThread.java | 168 ++++++++++++++++++++ .../onap/datalake/feeder/service/StoreService.java | 164 ++++++++++++++++++++ .../onap/datalake/feeder/service/TopicService.java | 94 +++++++++++ .../feeder/util/DruidSupervisorGenerator.java | 172 +++++++++++++++++++++ .../java/org/onap/datalake/feeder/util/Util.java | 65 ++++++++ 18 files changed, 1828 insertions(+) create mode 100644 components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/Application.java create mode 100644 components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java create mode 100644 components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/PullController.java create mode 100644 components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java create mode 100644 components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java create mode 100644 components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/enumeration/DataFormat.java create mode 100644 components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepository.java create mode 100644 components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepositoryCustom.java create mode 100644 components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepositoryImpl.java create mode 100644 components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java create mode 100644 components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java create mode 100644 components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java create mode 100644 components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullService.java create mode 100644 components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullThread.java create mode 100644 components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java create mode 100644 components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java create mode 100644 components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/util/DruidSupervisorGenerator.java create mode 100644 components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/util/Util.java (limited to 'components/datalake-handler/feeder/src/main/java/org') diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/Application.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/Application.java new file mode 100644 index 00000000..8f17937b --- /dev/null +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/Application.java @@ -0,0 +1,46 @@ +/* +* ============LICENSE_START======================================================= +* ONAP : DataLake +* ================================================================================ +* Copyright 2019 China Mobile +*================================================================================= +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* ============LICENSE_END========================================================= +*/ + +package org.onap.datalake.feeder; + +import org.onap.datalake.feeder.service.PullService; +import org.springframework.boot.CommandLineRunner; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.annotation.Bean; + +/** + * Entry point of the DataLake feeder application + * + * @author Guobiao Mo + * + */ + +@SpringBootApplication +public class Application { + public static void main(String[] args) { + SpringApplication.run(Application.class, args); + } + + @Bean + public CommandLineRunner commandLineRunner(PullService pullService) { + return args -> pullService.start(); + } +} diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java new file mode 100644 index 00000000..62ac37fb --- /dev/null +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java @@ -0,0 +1,72 @@ +/* +* ============LICENSE_START======================================================= +* ONAP : DATALAKE +* ================================================================================ +* Copyright 2019 China Mobile +*================================================================================= +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* ============LICENSE_END========================================================= +*/ + +package org.onap.datalake.feeder.config; + +import java.util.Set; + +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Configuration; + +import lombok.Getter; +import lombok.Setter; + +/** + * Mapping from src/main/resources/application.properties to Java configuration + * object + * + * @author Guobiao Mo + * + */ +@Getter +@Setter +@Configuration +@ConfigurationProperties +public class ApplicationConfiguration { + + private String couchbaseHost; + private String couchbaseUser; + private String couchbasePass; + private String couchbaseBucket; + + // private int mongodbPort; + // private String mongodbDatabase; + + private boolean storeJson; + private boolean storeYaml; + private boolean storeXml; + + private String dmaapZookeeperHostPort; + private String dmaapKafkaHostPort; + private String dmaapKafkaGroup; + private long dmaapKafkaTimeout; + +// private boolean dmaapMonitorAllTopics; + private int dmaapCheckNewTopicIntervalInSec; + //private String dmaapHostPort; + //private Set dmaapExcludeTopics; + //private Set dmaapIncludeTopics; + + private int kafkaConsumerCount; + + private boolean async; + + private String elasticsearchHost; +} diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/PullController.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/PullController.java new file mode 100644 index 00000000..2b176370 --- /dev/null +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/PullController.java @@ -0,0 +1,77 @@ +/* +* ============LICENSE_START======================================================= +* ONAP : DataLake +* ================================================================================ +* Copyright 2019 China Mobile +*================================================================================= +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* ============LICENSE_END========================================================= +*/ +package org.onap.datalake.feeder.controller; + +import java.io.IOException; + +import org.onap.datalake.feeder.service.PullService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.MediaType; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +/** + * This controller controls DL data feeder. + * + * @author Guobiao Mo + * + */ + +@RestController +@RequestMapping(value = "/pull", produces = { MediaType.TEXT_PLAIN_VALUE }) +public class PullController { + + private final Logger log = LoggerFactory.getLogger(this.getClass()); + + @Autowired + private PullService pullService; + + /** + * @return message that application is started + * @throws IOException + */ + @RequestMapping("/start") + public String start() throws IOException { + log.info("DataLake feeder starting to pull data from DMaaP..."); + pullService.start(); + return "DataLake feeder is running."; + } + + /** + * @return message that application stop process is triggered + */ + @RequestMapping("/stop") + public String stop() { + pullService.shutdown(); + log.info("DataLake feeder is stopped."); + return "DataLake feeder is stopped."; + } + /** + * @return feeder status + */ + @RequestMapping("/status") + public String status() { + String status = "to be impletemented"; + log.info("senting feeder status ...");//TODO we can send what topics are monitored, how many messages are sent, etc. + return status; + } +} diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java new file mode 100644 index 00000000..25028d58 --- /dev/null +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java @@ -0,0 +1,131 @@ +/* +* ============LICENSE_START======================================================= +* ONAP : DataLake +* ================================================================================ +* Copyright 2019 China Mobile +*================================================================================= +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* ============LICENSE_END========================================================= +*/ +package org.onap.datalake.feeder.controller; + +import java.io.IOException; +import java.util.List; +import java.util.Optional; + +import org.onap.datalake.feeder.domain.Topic; +import org.onap.datalake.feeder.repository.TopicRepository; +import org.onap.datalake.feeder.service.DmaapService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.MediaType; +import org.springframework.validation.BindingResult; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.PutMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.ResponseBody; +import org.springframework.web.bind.annotation.RestController; + +/** + * This controller manages all the topic settings. Topic "_DL_DEFAULT_" acts as + * the default. For example, if a topic's enabled=null, _DL_DEFAULT_.enabled is + * used for that topic. All the settings are saved in Couchbase. topic + * "_DL_DEFAULT_" is populated at setup by a DB script. + * + * @author Guobiao Mo + * + */ + +@RestController +@RequestMapping(value = "/topics", produces = { MediaType.APPLICATION_JSON_VALUE }) +public class TopicController { + + private final Logger log = LoggerFactory.getLogger(this.getClass()); + + @Autowired + private DmaapService dmaapService; + + @Autowired + private TopicRepository topicRepository; + + //list all topics in DMaaP + @GetMapping("/dmaap/") + @ResponseBody + public List listDmaapTopics() throws IOException { + return dmaapService.getTopics(); + } + + //list all topics + @GetMapping("/") + @ResponseBody + public Iterable list() throws IOException { + Iterable ret = topicRepository.findAll(); + return ret; + } + + //Read a topic + @GetMapping("/{name}") + @ResponseBody + public Topic getTopic(@PathVariable("name") String topicName) throws IOException { + //Topic topic = topicRepository.findFirstById(topicName); + Optional topic = topicRepository.findById(topicName); + if (topic.isPresent()) { + return topic.get(); + } else { + return null; + } + } + + //Update Topic + @PutMapping("/") + @ResponseBody + public Topic updateTopic(Topic topic, BindingResult result) throws IOException { + + if (result.hasErrors()) { + log.error(result.toString()); + + return null;//TODO return binding error + } + + Topic oldTopic = getTopic(topic.getId()); + if (oldTopic == null) { + return null;//TODO return not found error + } else { + topicRepository.save(topic); + return topic; + } + } + + //create a new Topic + @PostMapping("/") + @ResponseBody + public Topic createTopic(Topic topic, BindingResult result) throws IOException { + + if (result.hasErrors()) { + log.error(result.toString()); + return null; + } + + Topic oldTopic = getTopic(topic.getId()); + if (oldTopic != null) { + return null;//TODO return 'already exists' error + } else { + topicRepository.save(topic); + return topic; + } + } + +} diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java new file mode 100644 index 00000000..99216ad3 --- /dev/null +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java @@ -0,0 +1,165 @@ +/* +* ============LICENSE_START======================================================= +* ONAP : DataLake +* ================================================================================ +* Copyright 2019 China Mobile +*================================================================================= +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* ============LICENSE_END========================================================= +*/ +package org.onap.datalake.feeder.domain; + +import java.util.function.Predicate; + +import javax.validation.constraints.NotNull; + +import org.onap.datalake.feeder.enumeration.DataFormat; +import org.springframework.data.annotation.Id; +import org.springframework.data.annotation.Transient; +import org.springframework.data.couchbase.core.mapping.Document; + +/** + * Domain class representing topic table in Couchbase + * + * @author Guobiao Mo + * + */ +@Document +public class Topic { + @NotNull + @Id + private String id;//topic name + + @Transient + private Topic defaultTopic; + + //for protected Kafka topics + private String login; + private String pass; + + /** + * indicate if we should monitor this topic + */ + private Boolean enabled; + + /** + * save raw message text + */ + private Boolean saveRaw; + + /** + * true: save it to Elasticsearch false: don't save null: use default + */ + private Boolean supportElasticsearch; + private Boolean supportCouchbase; + private Boolean supportDruid; + + /** + * need to explicitly tell feeder the data format of the message + * support JSON, XML, YAML, TEXT + */ + private DataFormat dataFormat; + + /** + * TTL in day + */ + private Integer ttl; + + //if this flag is true, need to correlate alarm cleared message to previous alarm + private Boolean correlateClearedMessage; + + public Topic() { + } + + public Topic(String id) { + this.id = id; + } + + public String getId() { + return id; + } + + public void setDefaultTopic(Topic defaultTopic) { + this.defaultTopic = defaultTopic; + } + + public boolean isEnabled() { + return is(enabled, Topic::isEnabled); + } + + public boolean isCorrelateClearedMessage() { + return is(correlateClearedMessage, Topic::isCorrelateClearedMessage); + } + + public int getTtl() { + if (ttl != null) { + return ttl; + } else if (defaultTopic != null) { + return defaultTopic.getTtl(); + } else { + return 3650;//default to 10 years for safe + } + } + + public DataFormat getDataFormat() { + if (dataFormat != null) { + return dataFormat; + } else if (defaultTopic != null) { + return defaultTopic.getDataFormat(); + } else { + return null; + } + } + + //if 'this' Topic does not have the setting, use default Topic's + private boolean is(Boolean b, Predicate pre) { + if (b != null) { + return b; + } else if (defaultTopic != null) { + return pre.test(defaultTopic); + } else { + return false; + } + } + + public boolean isSaveRaw() { + return is(saveRaw, Topic::isSaveRaw); + } + + public boolean isSupportElasticsearch() { + return is(supportElasticsearch, Topic::isSupportElasticsearch); + } + + public boolean isSupportCouchbase() { + return is(supportCouchbase, Topic::isSupportCouchbase); + } + + public boolean isSupportDruid() { + return is(supportDruid, Topic::isSupportDruid); + } + + @Override + public String toString() { + return id; + } + + // for testing + public static void main(String[] args) { + Topic defaultTopic=new Topic("def"); + Topic test = new Topic("test"); + test.setDefaultTopic(defaultTopic); + defaultTopic.supportElasticsearch=true; + boolean b = test.isSupportElasticsearch(); + System.out.println(b); + } +} diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/enumeration/DataFormat.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/enumeration/DataFormat.java new file mode 100644 index 00000000..83ffac18 --- /dev/null +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/enumeration/DataFormat.java @@ -0,0 +1,30 @@ +/* +* ============LICENSE_START======================================================= +* ONAP : DCAE +* ================================================================================ +* Copyright 2018 TechMahindra +*================================================================================= +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* ============LICENSE_END========================================================= +*/ +package org.onap.datalake.feeder.enumeration; + +/** + * Data format of DMaaP messages + * + * @author Guobiao Mo + * + */ +public enum DataFormat { + JSON, XML, YAML, TEXT; +} diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepository.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepository.java new file mode 100644 index 00000000..37d1a669 --- /dev/null +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepository.java @@ -0,0 +1,67 @@ +/* +* ============LICENSE_START======================================================= +* ONAP : DataLake +* ================================================================================ +* Copyright 2019 China Mobile +*================================================================================= +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* ============LICENSE_END========================================================= +*/ +package org.onap.datalake.feeder.repository; + +import org.onap.datalake.feeder.domain.Topic; +import org.springframework.data.couchbase.core.query.N1qlPrimaryIndexed; +import org.springframework.data.couchbase.core.query.Query; +import org.springframework.data.couchbase.core.query.ViewIndexed; +import org.springframework.data.couchbase.repository.CouchbasePagingAndSortingRepository; +import org.springframework.data.domain.Page; +import org.springframework.data.domain.Pageable; + + +import java.util.List; + +/** + * + * Topic Repository interface, implementation is taken care by Spring framework. + * Customization is done through TopicRepositoryCustom and its implementation TopicRepositoryImpl. + * + * @author Guobiao Mo + * + */ +@ViewIndexed(designDoc = "topic", viewName = "all") +public interface TopicRepository extends CouchbasePagingAndSortingRepository, TopicRepositoryCustom { +/* + Topic findFirstById(String topic); + + Topic findByIdAndState(String topic, boolean state); + + //Supports native JSON query string + @Query("{topic:'?0'}") + Topic findTopicById(String topic); + + @Query("{topic: { $regex: ?0 } })") + List findTopicByRegExId(String topic); + + + //Page findByCompanyIdAndNameLikeOrderByName(String companyId, String name, Pageable pageable); + + @Query("#{#n1ql.selectEntity} where #{#n1ql.filter} and companyId = $1 and $2 within #{#n1ql.bucket}") + Topic findByCompanyAndAreaId(String companyId, String areaId); + + @Query("#{#n1ql.selectEntity} where #{#n1ql.filter} AND ANY phone IN phoneNumbers SATISFIES phone = $1 END") + List findByPhoneNumber(String telephoneNumber); + + @Query("SELECT COUNT(*) AS count FROM #{#n1ql.bucket} WHERE #{#n1ql.filter} and companyId = $1") + Long countBuildings(String companyId); + */ +} diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepositoryCustom.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepositoryCustom.java new file mode 100644 index 00000000..220a8f76 --- /dev/null +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepositoryCustom.java @@ -0,0 +1,28 @@ +/* +* ============LICENSE_START======================================================= +* ONAP : DataLake +* ================================================================================ +* Copyright 2019 China Mobile +*================================================================================= +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* ============LICENSE_END========================================================= +*/ +package org.onap.datalake.feeder.repository; + +/** + * @author Guobiao Mo + * + */ +public interface TopicRepositoryCustom { + long updateTopic(String topic, Boolean state); +} diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepositoryImpl.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepositoryImpl.java new file mode 100644 index 00000000..018d5b95 --- /dev/null +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepositoryImpl.java @@ -0,0 +1,67 @@ +/* +* ============LICENSE_START======================================================= +* ONAP : DataLake +* ================================================================================ +* Copyright 2019 China Mobile +*================================================================================= +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* ============LICENSE_END========================================================= +*/ +package org.onap.datalake.feeder.repository; + +import org.onap.datalake.feeder.domain.Topic; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.couchbase.core.CouchbaseTemplate; +/* +import org.springframework.data.mongodb.MongoDbFactory; +import org.springframework.data.mongodb.core.MongoTemplate; +import org.springframework.data.mongodb.core.convert.DefaultDbRefResolver; +import org.springframework.data.mongodb.core.convert.DefaultMongoTypeMapper; +import org.springframework.data.mongodb.core.convert.MappingMongoConverter; +import org.springframework.data.mongodb.core.mapping.MongoMappingContext; +import org.springframework.data.mongodb.core.query.Criteria; +import org.springframework.data.mongodb.core.query.Query; +import org.springframework.data.mongodb.core.query.Update; + +import com.mongodb.WriteResult; +import com.mongodb.client.result.UpdateResult; +*/ +import java.util.List; + +/** + * @author Guobiao Mo + * + */ +public class TopicRepositoryImpl implements TopicRepositoryCustom { + + @Autowired + CouchbaseTemplate template; + + @Override + public long updateTopic(String topic, Boolean state) { +/* + Query query = new Query(Criteria.where("id").is(topic)); + Update update = new Update(); + update.set("state", state); + + UpdateResult result = mongoTemplate.updateFirst(query, update, Topic.class); + + if(result!=null) + return result.getModifiedCount(); + else + */ return 0L; + + + + } +} diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java new file mode 100644 index 00000000..35432587 --- /dev/null +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java @@ -0,0 +1,129 @@ +/* +* ============LICENSE_START======================================================= +* ONAP : DATALAKE +* ================================================================================ +* Copyright 2019 China Mobile +*================================================================================= +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* ============LICENSE_END========================================================= +*/ + +package org.onap.datalake.feeder.service; + +import java.util.ArrayList; +import java.util.List; + +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; + +import org.json.JSONObject; +import org.onap.datalake.feeder.config.ApplicationConfiguration; +import org.onap.datalake.feeder.domain.Topic; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import com.couchbase.client.java.Bucket; +import com.couchbase.client.java.Cluster; +import com.couchbase.client.java.CouchbaseCluster; +import com.couchbase.client.java.document.JsonDocument; +import com.couchbase.client.java.document.JsonLongDocument; +import com.couchbase.client.java.document.json.JsonObject; + +import rx.Observable; +import rx.functions.Func1; + +/** + * Service to use Couchbase + * + * @author Guobiao Mo + * + */ +@Service +public class CouchbaseService { + + private final Logger log = LoggerFactory.getLogger(this.getClass()); + + @Autowired + private ApplicationConfiguration config; + + Bucket bucket; + + @PostConstruct + private void init() { + // Initialize Couchbase Connection + Cluster cluster = CouchbaseCluster.create(config.getCouchbaseHost()); + cluster.authenticate(config.getCouchbaseUser(), config.getCouchbasePass()); + bucket = cluster.openBucket(config.getCouchbaseBucket()); + + log.info("Connect to Couchbase " + config.getCouchbaseHost()); + + // Create a N1QL Primary Index (but ignore if it exists) + bucket.bucketManager().createN1qlPrimaryIndex(true, false); + } + + @PreDestroy + public void cleanUp() { + bucket.close(); + } + + public void saveJsons(Topic topic, List jsons) { + List documents= new ArrayList<>(jsons.size()); + for(JSONObject json : jsons) { + //convert to Couchbase JsonObject from org.json JSONObject + JsonObject jsonObject = JsonObject.fromJson(json.toString()); + + long timestamp = jsonObject.getLong("_ts");//this is Kafka time stamp, which is added in StoreService.messageToJson() + + //setup TTL + int expiry = (int) (timestamp/1000L) + topic.getTtl()*3600*24; //in second + + String id = getId(topic.getId()); + JsonDocument doc = JsonDocument.create(id, expiry, jsonObject); + documents.add(doc); + } + saveDocuments(documents); + } + + + private String getId(String topicStr) { + //String id = topicStr+":"+timestamp+":"+UUID.randomUUID(); + + //https://forums.couchbase.com/t/how-to-set-an-auto-increment-id/4892/2 + //atomically get the next sequence number: + // increment by 1, initialize at 0 if counter doc not found + //TODO how slow is this compared with above UUID approach? + JsonLongDocument nextIdNumber = bucket.counter(topicStr, 1, 0); //like 12345 + String id = topicStr +":"+ nextIdNumber.content(); + + return id; + } + + //https://docs.couchbase.com/java-sdk/2.7/document-operations.html + private void saveDocuments(List documents) { + Observable + .from(documents) + .flatMap(new Func1>() { + @Override + public Observable call(final JsonDocument docToInsert) { + return bucket.async().insert(docToInsert); + } + }) + .last() + .toBlocking() + .single(); + } + +} diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java new file mode 100644 index 00000000..96ad81bf --- /dev/null +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java @@ -0,0 +1,94 @@ +/* +* ============LICENSE_START======================================================= +* ONAP : DATALAKE +* ================================================================================ +* Copyright 2019 China Mobile +*================================================================================= +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* ============LICENSE_END========================================================= +*/ + +package org.onap.datalake.feeder.service; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import javax.annotation.PostConstruct; + +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooKeeper; +import org.onap.datalake.feeder.config.ApplicationConfiguration; +import org.onap.datalake.feeder.domain.Topic; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +/** + * This service will handle all the communication with Kafka + * + * @author Guobiao Mo + * + */ +@Service +public class DmaapService { + + private final Logger log = LoggerFactory.getLogger(this.getClass()); + + @Autowired + private ApplicationConfiguration config; + + @Autowired + private TopicService topicService; + + + @PostConstruct + private void init() { + + } + + //get all topic names from Zookeeper + public List getTopics() { + try { + Watcher watcher = new Watcher() { + @Override + public void process(WatchedEvent event) { + // TODO monitor new topics + + } + }; + ZooKeeper zk = new ZooKeeper(config.getDmaapZookeeperHostPort(), 10000, watcher); + List topics = zk.getChildren("/brokers/topics", false); + return topics; + } catch (Exception e) { + log.error("Can not get topic list from Zookeeper, for testing, going to use hard coded topic list.", e); + return null; + } + } + + public List getActiveTopics() throws IOException { + List allTopics = new ArrayList<>(getTopics()); + + List ret = new ArrayList<>(); + for (String topicStr : allTopics) { + Topic topic = topicService.getEffectiveTopic(topicStr, true); + if (topic.isEnabled()) { + ret.add(topicStr); + } + } + return ret; + } + +} diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java new file mode 100644 index 00000000..cbcc5f86 --- /dev/null +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java @@ -0,0 +1,142 @@ +/* +* ============LICENSE_START======================================================= +* ONAP : DATALAKE +* ================================================================================ +* Copyright 2019 China Mobile +*================================================================================= +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* ============LICENSE_END========================================================= +*/ + +package org.onap.datalake.feeder.service; + +import java.io.IOException; +import java.util.List; + +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; + +import org.apache.http.HttpHost; +import org.elasticsearch.ElasticsearchStatusException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; +import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.common.xcontent.XContentType; +import org.json.JSONObject; +import org.onap.datalake.feeder.config.ApplicationConfiguration; +import org.onap.datalake.feeder.domain.Topic; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +/** + * Service to use Elasticsearch + * + * @author Guobiao Mo + * + */ +@Service +public class ElasticsearchService { + + private final Logger log = LoggerFactory.getLogger(this.getClass()); + + @Autowired + private ApplicationConfiguration config; + + private RestHighLevelClient client; + ActionListener listener; + + @PostConstruct + private void init() { + String elasticsearchHost = config.getElasticsearchHost(); + + // Initialize the Connection + client = new RestHighLevelClient(RestClient.builder(new HttpHost(elasticsearchHost, 9200, "http"), new HttpHost(elasticsearchHost, 9201, "http"))); + + log.info("Connect to Elasticsearch Host " + elasticsearchHost); + + listener = new ActionListener() { + @Override + public void onResponse(BulkResponse bulkResponse) { + + } + + @Override + public void onFailure(Exception e) { + log.error(e.getMessage()); + } + }; + } + + @PreDestroy + public void cleanUp() throws IOException { + client.close(); + } + + public void ensureTableExist(String topic) throws IOException { + String topicLower = topic.toLowerCase(); + + CreateIndexRequest createIndexRequest = new CreateIndexRequest(topicLower); + try { + CreateIndexResponse createIndexResponse = client.indices().create(createIndexRequest, RequestOptions.DEFAULT); + log.info(createIndexResponse.index()+" : created "+createIndexResponse.isAcknowledged()); + }catch(ElasticsearchStatusException e) { + log.info("{} create ES topic status: {}", topic, e.getDetailedMessage()); + } + } + + //TTL is not supported in Elasticsearch 5.0 and later, what can we do? FIXME + public void saveJsons(Topic topic, List jsons) { + BulkRequest request = new BulkRequest(); + + for (JSONObject json : jsons) { + if(topic.isCorrelateClearedMessage()) { + boolean found = correlateClearedMessage(json); + if(found) { + continue; + } + } + request.add(new IndexRequest(topic.getId().toLowerCase(), "doc").source(json.toString(), XContentType.JSON)); + } + if(config.isAsync()) { + client.bulkAsync(request, RequestOptions.DEFAULT, listener); + }else { + try { + client.bulk(request, RequestOptions.DEFAULT); + } catch (IOException e) { + log.error( topic.getId() , e); + } + } + } + + private boolean correlateClearedMessage(JSONObject json) { + boolean found = false; + + /*TODO + * 1. check if this is a alarm cleared message + * 2. search previous alarm message + * 3. update previous message, if success, set found=true + */ + + return found; + } + +} diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullService.java new file mode 100644 index 00000000..3dcbd8ee --- /dev/null +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullService.java @@ -0,0 +1,117 @@ +/* +* ============LICENSE_START======================================================= +* ONAP : DATALAKE +* ================================================================================ +* Copyright 2019 China Mobile +*================================================================================= +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* ============LICENSE_END========================================================= +*/ + +package org.onap.datalake.feeder.service; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import javax.annotation.PostConstruct; + +import org.onap.datalake.feeder.config.ApplicationConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationContext; +import org.springframework.stereotype.Service; + +/** + * Service that pulls messages from DMaaP and save them to Big Data DBs + * + * @author Guobiao Mo + * + */ + +@Service +public class PullService { + + private final Logger logger = LoggerFactory.getLogger(this.getClass()); + + private boolean isRunning = false; + private ExecutorService executorService; + private List consumers; + + @Autowired + private ApplicationContext context; + + @Autowired + private ApplicationConfiguration config; + + @PostConstruct + private void init() { + } + + /** + * start pulling. + * + * @throws IOException + */ + public synchronized void start() throws IOException { + if (isRunning) { + return; + } + + logger.info("start pulling ..."); + + int numConsumers = config.getKafkaConsumerCount(); + + executorService = Executors.newFixedThreadPool(numConsumers); + consumers = new ArrayList<>(numConsumers); + + for (int i = 0; i < numConsumers; i++) { + PullThread puller = context.getBean(PullThread.class, i); + consumers.add(puller); + executorService.submit(puller); + } + + isRunning = true; + + Runtime.getRuntime().addShutdownHook(new Thread(()->shutdown())) ; + } + + /** + * stop pulling + */ + public synchronized void shutdown() { + if (!isRunning) { + return; + } + + logger.info("stop pulling ..."); + for (PullThread puller : consumers) { + puller.shutdown(); + } + + executorService.shutdown(); + + try { + executorService.awaitTermination(10L, TimeUnit.SECONDS); + } catch (InterruptedException e) { + logger.error("executor.awaitTermination", e); + } + + isRunning = false; + } + +} diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullThread.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullThread.java new file mode 100644 index 00000000..3c75eb68 --- /dev/null +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullThread.java @@ -0,0 +1,168 @@ +/* +* ============LICENSE_START======================================================= +* ONAP : DATALAKE +* ================================================================================ +* Copyright 2019 China Mobile +*================================================================================= +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* ============LICENSE_END========================================================= +*/ + +package org.onap.datalake.feeder.service; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicBoolean; + +import javax.annotation.PostConstruct; + +import org.apache.commons.lang3.tuple.Pair; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.onap.datalake.feeder.config.ApplicationConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.config.ConfigurableBeanFactory; +import org.springframework.context.annotation.Scope; +import org.springframework.stereotype.Service; + +/** + * Thread that pulls messages from DMaaP and save them to Big Data DBs + * + * @author Guobiao Mo + * + */ + +@Service +@Scope(value=ConfigurableBeanFactory.SCOPE_PROTOTYPE) +public class PullThread implements Runnable { + + @Autowired + private DmaapService dmaapService; + + @Autowired + private StoreService storeService; + + @Autowired + private ApplicationConfiguration config; + + private final Logger log = LoggerFactory.getLogger(this.getClass()); + + private KafkaConsumer consumer; // is key-value type, in our case key is empty, value is JSON text + private int id; + + private final AtomicBoolean active = new AtomicBoolean(false); + private boolean async; + + public PullThread(int id) { + this.id = id; + } + + @PostConstruct + private void init() { + async = config.isAsync(); + Properties consumerConfig = getConsumerConfig(); + consumer = new KafkaConsumer<>(consumerConfig); + } + + private Properties getConsumerConfig() { + Properties consumerConfig = new Properties(); + + consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getDmaapKafkaHostPort()); + consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, config.getDmaapKafkaGroup()); + consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); + consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); + consumerConfig.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.RoundRobinAssignor"); + consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); + + return consumerConfig; + } + + /** + * start pulling. + */ + @Override + public void run() { + active.set(true); + + DummyRebalanceListener rebalanceListener = new DummyRebalanceListener(); + + try { + List topics = dmaapService.getActiveTopics(); //TODO get updated topic list within loop + + log.info("Thread {} going to subscribe to topics: {}", id, topics); + + consumer.subscribe(topics, rebalanceListener); + + while (active.get()) { + + ConsumerRecords records = consumer.poll(Duration.ofSeconds(config.getDmaapKafkaTimeout())); + + List> messages = new ArrayList<>(records.count()); + for (TopicPartition partition : records.partitions()) { + messages.clear(); + List> partitionRecords = records.records(partition); + for (ConsumerRecord record : partitionRecords) { + messages.add(Pair.of(record.timestamp(), record.value())); + log.debug("threadid={} topic={}, timestamp={} key={}, offset={}, partition={}, value={}", id, record.topic(), record.timestamp(), record.key(), record.offset(), record.partition(), record.value()); + } + storeService.saveMessages(partition.topic(), messages); + log.info("topic={} count={}", partition.topic(), partitionRecords.size());//TODO we may record this number to DB + + if (!async) {//for reliability, sync commit offset to Kafka, this slows down a bit + long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); + consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1))); + } + } + + if (async) {//for high Throughput, async commit offset in batch to Kafka + consumer.commitAsync(); + } + } + } catch (Exception e) { + log.error("Puller {} run(): exception={}", id, e.getMessage()); + log.error("", e); + } finally { + consumer.close(); + } + } + + public void shutdown() { + active.set(false); + consumer.wakeup(); + } + + private class DummyRebalanceListener implements ConsumerRebalanceListener { + @Override + public void onPartitionsRevoked(Collection partitions) { + log.info("Called onPartitionsRevoked with partitions: {}", partitions); + } + + @Override + public void onPartitionsAssigned(Collection partitions) { + log.info("Called onPartitionsAssigned with partitions: {}", partitions); + } + } + +} diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java new file mode 100644 index 00000000..1cd3a8a1 --- /dev/null +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java @@ -0,0 +1,164 @@ +/* +* ============LICENSE_START======================================================= +* ONAP : DATALAKE +* ================================================================================ +* Copyright 2019 China Mobile +*================================================================================= +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* ============LICENSE_END========================================================= +*/ + +package org.onap.datalake.feeder.service; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; + +import org.apache.commons.lang3.tuple.Pair; + +import org.json.JSONException; +import org.json.JSONObject; +import org.json.XML; +import org.onap.datalake.feeder.domain.Topic; +import org.onap.datalake.feeder.enumeration.DataFormat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import com.fasterxml.jackson.core.JsonParseException; +import com.fasterxml.jackson.databind.JsonMappingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; + +/** + * Service to store messages to varieties of DBs + * + * comment out YAML support, since AML is for config and don't see this data type in DMaaP. Do we need to support XML? + * + * @author Guobiao Mo + * + */ +@Service +public class StoreService { + private final Logger log = LoggerFactory.getLogger(this.getClass()); + + @Autowired + private TopicService topicService; + + @Autowired + private CouchbaseService couchbaseService; + + @Autowired + private ElasticsearchService elasticsearchService; + + private Map topicMap = new HashMap<>(); + + private ObjectMapper yamlReader; + + @PostConstruct + private void init() { + yamlReader = new ObjectMapper(new YAMLFactory()); + } + + @PreDestroy + public void cleanUp() { + } + + public void saveMessages(String topicStr, List> messages) {//pair=ts+text + if (messages == null || messages.isEmpty()) { + return; + } + + Topic topic = topicMap.computeIfAbsent(topicStr, k -> { //TODO get topic updated settings from DB periodically + return topicService.getEffectiveTopic(topicStr); + }); + + List docs = new ArrayList<>(); + + for (Pair pair : messages) { + try { + docs.add(messageToJson(topic, pair)); + } catch (Exception e) { + log.error(pair.getRight(), e); + } + } + + saveJsons(topic, docs); + } + + private JSONObject messageToJson(Topic topic, Pair pair) throws JSONException, JsonParseException, JsonMappingException, IOException { + + long timestamp = pair.getLeft(); + String text = pair.getRight(); + + //for debug, to be remove +// String topicStr = topic.getId(); +// if (!"TestTopic1".equals(topicStr) && !"msgrtr.apinode.metrics.dmaap".equals(topicStr) && !"AAI-EVENT".equals(topicStr) && !"unauthenticated.DCAE_CL_OUTPUT".equals(topicStr) && !"unauthenticated.SEC_FAULT_OUTPUT".equals(topicStr)) { + // log.debug("{} ={}", topicStr, text); + //} + + boolean storeRaw = topic.isSaveRaw(); + + JSONObject json = null; + + DataFormat dataFormat = topic.getDataFormat(); + + switch (dataFormat) { + case JSON: + json = new JSONObject(text); + break; + case XML://XML and YAML can be directly inserted into ES, we may not need to convert it to JSON + json = XML.toJSONObject(text); + break; + case YAML:// Do we need to support YAML? + Object obj = yamlReader.readValue(text, Object.class); + ObjectMapper jsonWriter = new ObjectMapper(); + String jsonString = jsonWriter.writeValueAsString(obj); + json = new JSONObject(jsonString); + break; + default: + json = new JSONObject(); + storeRaw = true; + break; + } + + //FIXME for debug, to be remove + json.remove("_id"); + json.remove("_dl_text_"); + + json.put("_ts", timestamp); + if (storeRaw) { + json.put("_text", text); + } + + return json; + } + + private void saveJsons(Topic topic, List jsons) { + if (topic.isSupportCouchbase()) { + couchbaseService.saveJsons(topic, jsons); + } + + if (topic.isSupportElasticsearch()) { + elasticsearchService.saveJsons(topic, jsons); + } + } + +} diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java new file mode 100644 index 00000000..9b8fabc1 --- /dev/null +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java @@ -0,0 +1,94 @@ +/* +* ============LICENSE_START======================================================= +* ONAP : DATALAKE +* ================================================================================ +* Copyright 2019 China Mobile +*================================================================================= +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* ============LICENSE_END========================================================= +*/ + +package org.onap.datalake.feeder.service; + +import java.io.IOException; +import java.util.Optional; + +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; + +import org.onap.datalake.feeder.domain.Topic; +import org.onap.datalake.feeder.repository.TopicRepository; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +/** + * Service for topics topic setting is stored in Couchbase, bucket 'dl', see + * application.properties for Spring setup + * + * @author Guobiao Mo + * + */ +@Service +public class TopicService { + + private final Logger log = LoggerFactory.getLogger(this.getClass()); + + @Autowired + private TopicRepository topicRepository; + + @Autowired + private ElasticsearchService elasticsearchService; + + @PostConstruct + private void init() { + } + + @PreDestroy + public void cleanUp() { + } + + public Topic getEffectiveTopic(String topicStr) { + try { + return getEffectiveTopic(topicStr, false); + } catch (IOException e) { + log.error(topicStr, e); + } + return null; + } + + public Topic getEffectiveTopic(String topicStr, boolean ensureTableExist) throws IOException { + Topic topic = getTopic(topicStr); + if (topic == null) { + topic = new Topic(topicStr); + } + + topic.setDefaultTopic(getDefaultTopic()); + + if(ensureTableExist && topic.isEnabled() && topic.isSupportElasticsearch()) { + elasticsearchService.ensureTableExist(topicStr); + } + return topic; + } + + public Topic getTopic(String topicStr) { + Optional ret = topicRepository.findById(topicStr); + return ret.isPresent() ? ret.get() : null; + } + + public Topic getDefaultTopic() { + return getTopic("_DL_DEFAULT_"); + } + +} diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/util/DruidSupervisorGenerator.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/util/DruidSupervisorGenerator.java new file mode 100644 index 00000000..819590b7 --- /dev/null +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/util/DruidSupervisorGenerator.java @@ -0,0 +1,172 @@ +/* +* ============LICENSE_START======================================================= +* ONAP : DataLake +* ================================================================================ +* Copyright 2019 China Mobile +*================================================================================= +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* ============LICENSE_END========================================================= +*/ +package org.onap.datalake.feeder.util; + +import java.io.BufferedWriter; +import java.io.FileWriter; +import java.io.IOException; +import java.net.MalformedURLException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map.Entry; + +import org.apache.velocity.Template; +import org.apache.velocity.VelocityContext; +import org.apache.velocity.app.Velocity; +import org.apache.velocity.runtime.RuntimeConstants; +import org.apache.velocity.runtime.resource.loader.ClasspathResourceLoader; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.JsonNodeType; + + +/* + * read sample json and output supervisor to resources\druid\generated + * need manual edit to be production ready, final versions are in resources\druid + * + * http://druid.io/docs/latest/tutorials/tutorial-ingestion-spec.html + * http://druid.io/docs/latest/ingestion/flatten-json + * + * + * todo: + * reduce the manual editing + * path hard coded + * auto get topics, + * auto get sample, and for each topic, get multiple samples. + * make supervisor file names consistent + * dimension type default is string, in msgrtr.apinode.metrics.dmaap , many are long/double, so need to generate dimensionsSpec, this is done at the end of printFlattenSpec() + */ + +public class DruidSupervisorGenerator { + + Template template = null; + VelocityContext context; + + List dimensions; + + public DruidSupervisorGenerator() { + dimensions = new ArrayList<>(); + + Velocity.setProperty(RuntimeConstants.RESOURCE_LOADER, "classpath"); + Velocity.setProperty("classpath.resource.loader.class", ClasspathResourceLoader.class.getName()); + + Velocity.init(); + + context = new VelocityContext(); + + context.put("host", "dl_dmaap_kf"); + + template = Velocity.getTemplate("druid/kafka-supervisor-template.vm"); + } + + public void printNode(String prefix, JsonNode node) { + + // lets see what type the node is + // System.out.println("NodeType=" + node.getNodeType() + ", isContainerNode=" + node.isContainerNode() + ", " + node); // prints OBJECT + + if (node.isContainerNode()) { + + Iterator> fields = node.fields(); + + while (fields.hasNext()) { + Entry field = fields.next(); + // System.out.println("--------"+field.getKey()+"--------"); + printNode(prefix + "." + field.getKey(), field.getValue()); + } + + if (node.isArray()) { + Iterator elements = node.elements(); + int i = 0; + while (elements.hasNext()) { + JsonNode element = elements.next(); + printNode(prefix + "[" + i + "]", element); + i++; + } + } + + } else { + printFlattenSpec(node.getNodeType(), prefix); + } + + } + + public void printFlattenSpec(JsonNodeType type, String path) { + String name = path.substring(2).replace('.', ':'); + // lets see what type the node is + System.out.println("{"); + System.out.println("\"type\": \"path\","); + System.out.println("\"name\": \"" + name + "\","); + System.out.println("\"expr\": \"" + path + "\""); + System.out.println("},"); + + dimensions.add(new String[] { name, path }); + /* + //for dimensionsSpec + if (JsonNodeType.NUMBER.equals(type)) { + System.out.println("{"); + System.out.println("\"type\": \"long\","); + System.out.println("\"name\": \"" + name + "\","); + System.out.println("},"); + } else { + System.out.println("\"" + name + "\","); + + } + */ + } + + public void doTopic(String topic) throws IOException { + dimensions.clear(); + + String sampleFileName = "C:\\git\\onap\\datalake\\olap\\src\\main\\resources\\druid\\" + topic + "-sample-format.json";//FIXME hard coded path + String outputFileName = "C:\\git\\onap\\datalake\\olap\\src\\main\\resources\\druid\\generated\\" + topic + "-kafka-supervisor.json"; + + // Get the contents of json as a string using commons IO IOUTils class. + String sampleJson = Util.getTextFromFile(sampleFileName); + + // create an ObjectMapper instance. + ObjectMapper mapper = new ObjectMapper(); + // use the ObjectMapper to read the json string and create a tree + JsonNode root = mapper.readTree(sampleJson); + printNode("$", root); + + context.put("topic", topic); + context.put("timestamp", "event-header:timestamp");//FIXME hard coded, should be topic based + context.put("timestampFormat", "yyyyMMdd-HH:mm:ss:SSS");//FIXME hard coded, should be topic based + + context.put("dimensions", dimensions); + + BufferedWriter out = new BufferedWriter(new FileWriter(outputFileName)); + + template.merge(context, out); + out.close(); + } + + public static void main(String[] args) throws MalformedURLException, IOException { + String[] topics = new String[] { "AAI-EVENT", "msgrtr.apinode.metrics.dmaap", "unauthenticated.DCAE_CL_OUTPUT", "unauthenticated.SEC_FAULT_OUTPUT" };//FIXME hard coded + + DruidSupervisorGenerator p = new DruidSupervisorGenerator(); + + for (String topic : topics) { + p.doTopic(topic); + } + } +} diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/util/Util.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/util/Util.java new file mode 100644 index 00000000..89e81af1 --- /dev/null +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/util/Util.java @@ -0,0 +1,65 @@ +/* +* ============LICENSE_START======================================================= +* ONAP : DCAE +* ================================================================================ +* Copyright 2019 China Mobile +*================================================================================= +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* ============LICENSE_END========================================================= +*/ + +package org.onap.datalake.feeder.util; + +import java.io.File; +import java.io.IOException; + +import org.apache.commons.io.FileUtils; + +/** + * utils + * + * @author Guobiao Mo + * + */ +public class Util { + + // https://commons.apache.org/proper/commons-io/description.html + public static String getTextFromFile(String fileName) throws IOException { + File file = new File(fileName); + String string = FileUtils.readFileToString(file, "UTF-8"); + return string; + } + + /** + * given a json file, remove dot(.) in all keys + * http://www.vogella.com/tutorials/JavaRegularExpressions/article.html + */ + public static String replaceDotInKey(String json) { + String regex = "(\"[\\-\\w]+)(\\.)([\\.\\-\\w]+\\\\?\"\\:)"; + + String newJson = json.replaceAll(regex, "$1_$3"); + + if (json.equals(newJson)) { + return json; + } else { + return replaceDotInKey(newJson);// there maybe more to replace + } + } + + public static void main(String[] args) { + String a = "\"u-y.t.y-t\":\"u.gfh\",\\\"jg.h\\\":\"j_9889\""; + String b = replaceDotInKey(a); + System.out.println(a); + System.out.println(b); + } +} -- cgit 1.2.3-korg