diff options
author | Guobiao Mo <guobiaomo@chinamobile.com> | 2019-02-22 15:36:27 -0800 |
---|---|---|
committer | Guobiao Mo <guobiaomo@chinamobile.com> | 2019-03-26 15:06:46 -0700 |
commit | da7037659eb13e018c61ec06b3824f13ce1c5e53 (patch) | |
tree | b95aaa030c18ce04c37a1b559e4c73c82b5f99d8 /components/datalake-handler/feeder/src/main/java/org | |
parent | 82fe4e29ff6c0b48fe15d88b1fca882292e6af43 (diff) |
DataLake seed code
Issue-ID: DCAEGEN2-1189
Change-Id: Ib25b70197e3b162efadbce0f1b5235e3ba6635e9
Signed-off-by: Guobiao Mo <guobiaomo@chinamobile.com>
Diffstat (limited to 'components/datalake-handler/feeder/src/main/java/org')
18 files changed, 1828 insertions, 0 deletions
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/Application.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/Application.java new file mode 100644 index 00000000..8f17937b --- /dev/null +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/Application.java @@ -0,0 +1,46 @@ +/* +* ============LICENSE_START======================================================= +* ONAP : DataLake +* ================================================================================ +* Copyright 2019 China Mobile +*================================================================================= +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* ============LICENSE_END========================================================= +*/ + +package org.onap.datalake.feeder; + +import org.onap.datalake.feeder.service.PullService; +import org.springframework.boot.CommandLineRunner; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.annotation.Bean; + +/** + * Entry point of the DataLake feeder application + * + * @author Guobiao Mo + * + */ + +@SpringBootApplication +public class Application { + public static void main(String[] args) { + SpringApplication.run(Application.class, args); + } + + @Bean + public CommandLineRunner commandLineRunner(PullService pullService) { + return args -> pullService.start(); + } +} diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java new file mode 100644 index 00000000..62ac37fb --- /dev/null +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java @@ -0,0 +1,72 @@ +/* +* ============LICENSE_START======================================================= +* ONAP : DATALAKE +* ================================================================================ +* Copyright 2019 China Mobile +*================================================================================= +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* ============LICENSE_END========================================================= +*/ + +package org.onap.datalake.feeder.config; + +import java.util.Set; + +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Configuration; + +import lombok.Getter; +import lombok.Setter; + +/** + * Mapping from src/main/resources/application.properties to Java configuration + * object + * + * @author Guobiao Mo + * + */ +@Getter +@Setter +@Configuration +@ConfigurationProperties +public class ApplicationConfiguration { + + private String couchbaseHost; + private String couchbaseUser; + private String couchbasePass; + private String couchbaseBucket; + + // private int mongodbPort; + // private String mongodbDatabase; + + private boolean storeJson; + private boolean storeYaml; + private boolean storeXml; + + private String dmaapZookeeperHostPort; + private String dmaapKafkaHostPort; + private String dmaapKafkaGroup; + private long dmaapKafkaTimeout; + +// private boolean dmaapMonitorAllTopics; + private int dmaapCheckNewTopicIntervalInSec; + //private String dmaapHostPort; + //private Set<String> dmaapExcludeTopics; + //private Set<String> dmaapIncludeTopics; + + private int kafkaConsumerCount; + + private boolean async; + + private String elasticsearchHost; +} diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/PullController.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/PullController.java new file mode 100644 index 00000000..2b176370 --- /dev/null +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/PullController.java @@ -0,0 +1,77 @@ +/* +* ============LICENSE_START======================================================= +* ONAP : DataLake +* ================================================================================ +* Copyright 2019 China Mobile +*================================================================================= +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* ============LICENSE_END========================================================= +*/ +package org.onap.datalake.feeder.controller; + +import java.io.IOException; + +import org.onap.datalake.feeder.service.PullService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.MediaType; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +/** + * This controller controls DL data feeder. + * + * @author Guobiao Mo + * + */ + +@RestController +@RequestMapping(value = "/pull", produces = { MediaType.TEXT_PLAIN_VALUE }) +public class PullController { + + private final Logger log = LoggerFactory.getLogger(this.getClass()); + + @Autowired + private PullService pullService; + + /** + * @return message that application is started + * @throws IOException + */ + @RequestMapping("/start") + public String start() throws IOException { + log.info("DataLake feeder starting to pull data from DMaaP..."); + pullService.start(); + return "DataLake feeder is running."; + } + + /** + * @return message that application stop process is triggered + */ + @RequestMapping("/stop") + public String stop() { + pullService.shutdown(); + log.info("DataLake feeder is stopped."); + return "DataLake feeder is stopped."; + } + /** + * @return feeder status + */ + @RequestMapping("/status") + public String status() { + String status = "to be impletemented"; + log.info("senting feeder status ...");//TODO we can send what topics are monitored, how many messages are sent, etc. + return status; + } +} diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java new file mode 100644 index 00000000..25028d58 --- /dev/null +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/TopicController.java @@ -0,0 +1,131 @@ +/* +* ============LICENSE_START======================================================= +* ONAP : DataLake +* ================================================================================ +* Copyright 2019 China Mobile +*================================================================================= +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* ============LICENSE_END========================================================= +*/ +package org.onap.datalake.feeder.controller; + +import java.io.IOException; +import java.util.List; +import java.util.Optional; + +import org.onap.datalake.feeder.domain.Topic; +import org.onap.datalake.feeder.repository.TopicRepository; +import org.onap.datalake.feeder.service.DmaapService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.MediaType; +import org.springframework.validation.BindingResult; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.PutMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.ResponseBody; +import org.springframework.web.bind.annotation.RestController; + +/** + * This controller manages all the topic settings. Topic "_DL_DEFAULT_" acts as + * the default. For example, if a topic's enabled=null, _DL_DEFAULT_.enabled is + * used for that topic. All the settings are saved in Couchbase. topic + * "_DL_DEFAULT_" is populated at setup by a DB script. + * + * @author Guobiao Mo + * + */ + +@RestController +@RequestMapping(value = "/topics", produces = { MediaType.APPLICATION_JSON_VALUE }) +public class TopicController { + + private final Logger log = LoggerFactory.getLogger(this.getClass()); + + @Autowired + private DmaapService dmaapService; + + @Autowired + private TopicRepository topicRepository; + + //list all topics in DMaaP + @GetMapping("/dmaap/") + @ResponseBody + public List<String> listDmaapTopics() throws IOException { + return dmaapService.getTopics(); + } + + //list all topics + @GetMapping("/") + @ResponseBody + public Iterable<Topic> list() throws IOException { + Iterable<Topic> ret = topicRepository.findAll(); + return ret; + } + + //Read a topic + @GetMapping("/{name}") + @ResponseBody + public Topic getTopic(@PathVariable("name") String topicName) throws IOException { + //Topic topic = topicRepository.findFirstById(topicName); + Optional<Topic> topic = topicRepository.findById(topicName); + if (topic.isPresent()) { + return topic.get(); + } else { + return null; + } + } + + //Update Topic + @PutMapping("/") + @ResponseBody + public Topic updateTopic(Topic topic, BindingResult result) throws IOException { + + if (result.hasErrors()) { + log.error(result.toString()); + + return null;//TODO return binding error + } + + Topic oldTopic = getTopic(topic.getId()); + if (oldTopic == null) { + return null;//TODO return not found error + } else { + topicRepository.save(topic); + return topic; + } + } + + //create a new Topic + @PostMapping("/") + @ResponseBody + public Topic createTopic(Topic topic, BindingResult result) throws IOException { + + if (result.hasErrors()) { + log.error(result.toString()); + return null; + } + + Topic oldTopic = getTopic(topic.getId()); + if (oldTopic != null) { + return null;//TODO return 'already exists' error + } else { + topicRepository.save(topic); + return topic; + } + } + +} diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java new file mode 100644 index 00000000..99216ad3 --- /dev/null +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java @@ -0,0 +1,165 @@ +/* +* ============LICENSE_START======================================================= +* ONAP : DataLake +* ================================================================================ +* Copyright 2019 China Mobile +*================================================================================= +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* ============LICENSE_END========================================================= +*/ +package org.onap.datalake.feeder.domain; + +import java.util.function.Predicate; + +import javax.validation.constraints.NotNull; + +import org.onap.datalake.feeder.enumeration.DataFormat; +import org.springframework.data.annotation.Id; +import org.springframework.data.annotation.Transient; +import org.springframework.data.couchbase.core.mapping.Document; + +/** + * Domain class representing topic table in Couchbase + * + * @author Guobiao Mo + * + */ +@Document +public class Topic { + @NotNull + @Id + private String id;//topic name + + @Transient + private Topic defaultTopic; + + //for protected Kafka topics + private String login; + private String pass; + + /** + * indicate if we should monitor this topic + */ + private Boolean enabled; + + /** + * save raw message text + */ + private Boolean saveRaw; + + /** + * true: save it to Elasticsearch false: don't save null: use default + */ + private Boolean supportElasticsearch; + private Boolean supportCouchbase; + private Boolean supportDruid; + + /** + * need to explicitly tell feeder the data format of the message + * support JSON, XML, YAML, TEXT + */ + private DataFormat dataFormat; + + /** + * TTL in day + */ + private Integer ttl; + + //if this flag is true, need to correlate alarm cleared message to previous alarm + private Boolean correlateClearedMessage; + + public Topic() { + } + + public Topic(String id) { + this.id = id; + } + + public String getId() { + return id; + } + + public void setDefaultTopic(Topic defaultTopic) { + this.defaultTopic = defaultTopic; + } + + public boolean isEnabled() { + return is(enabled, Topic::isEnabled); + } + + public boolean isCorrelateClearedMessage() { + return is(correlateClearedMessage, Topic::isCorrelateClearedMessage); + } + + public int getTtl() { + if (ttl != null) { + return ttl; + } else if (defaultTopic != null) { + return defaultTopic.getTtl(); + } else { + return 3650;//default to 10 years for safe + } + } + + public DataFormat getDataFormat() { + if (dataFormat != null) { + return dataFormat; + } else if (defaultTopic != null) { + return defaultTopic.getDataFormat(); + } else { + return null; + } + } + + //if 'this' Topic does not have the setting, use default Topic's + private boolean is(Boolean b, Predicate<Topic> pre) { + if (b != null) { + return b; + } else if (defaultTopic != null) { + return pre.test(defaultTopic); + } else { + return false; + } + } + + public boolean isSaveRaw() { + return is(saveRaw, Topic::isSaveRaw); + } + + public boolean isSupportElasticsearch() { + return is(supportElasticsearch, Topic::isSupportElasticsearch); + } + + public boolean isSupportCouchbase() { + return is(supportCouchbase, Topic::isSupportCouchbase); + } + + public boolean isSupportDruid() { + return is(supportDruid, Topic::isSupportDruid); + } + + @Override + public String toString() { + return id; + } + + // for testing + public static void main(String[] args) { + Topic defaultTopic=new Topic("def"); + Topic test = new Topic("test"); + test.setDefaultTopic(defaultTopic); + defaultTopic.supportElasticsearch=true; + boolean b = test.isSupportElasticsearch(); + System.out.println(b); + } +} diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/enumeration/DataFormat.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/enumeration/DataFormat.java new file mode 100644 index 00000000..83ffac18 --- /dev/null +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/enumeration/DataFormat.java @@ -0,0 +1,30 @@ +/* +* ============LICENSE_START======================================================= +* ONAP : DCAE +* ================================================================================ +* Copyright 2018 TechMahindra +*================================================================================= +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* ============LICENSE_END========================================================= +*/ +package org.onap.datalake.feeder.enumeration; + +/** + * Data format of DMaaP messages + * + * @author Guobiao Mo + * + */ +public enum DataFormat { + JSON, XML, YAML, TEXT; +} diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepository.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepository.java new file mode 100644 index 00000000..37d1a669 --- /dev/null +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepository.java @@ -0,0 +1,67 @@ +/*
+* ============LICENSE_START=======================================================
+* ONAP : DataLake
+* ================================================================================
+* Copyright 2019 China Mobile
+*=================================================================================
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* ============LICENSE_END=========================================================
+*/
+package org.onap.datalake.feeder.repository;
+
+import org.onap.datalake.feeder.domain.Topic;
+import org.springframework.data.couchbase.core.query.N1qlPrimaryIndexed;
+import org.springframework.data.couchbase.core.query.Query;
+import org.springframework.data.couchbase.core.query.ViewIndexed;
+import org.springframework.data.couchbase.repository.CouchbasePagingAndSortingRepository;
+import org.springframework.data.domain.Page;
+import org.springframework.data.domain.Pageable;
+
+
+import java.util.List;
+
+/**
+ *
+ * Topic Repository interface, implementation is taken care by Spring framework.
+ * Customization is done through TopicRepositoryCustom and its implementation TopicRepositoryImpl.
+ *
+ * @author Guobiao Mo
+ *
+ */
+@ViewIndexed(designDoc = "topic", viewName = "all")
+public interface TopicRepository extends CouchbasePagingAndSortingRepository<Topic, String>, TopicRepositoryCustom {
+/*
+ Topic findFirstById(String topic);
+
+ Topic findByIdAndState(String topic, boolean state);
+
+ //Supports native JSON query string
+ @Query("{topic:'?0'}")
+ Topic findTopicById(String topic);
+
+ @Query("{topic: { $regex: ?0 } })")
+ List<Topic> findTopicByRegExId(String topic);
+
+
+ //Page<Topic> findByCompanyIdAndNameLikeOrderByName(String companyId, String name, Pageable pageable);
+
+ @Query("#{#n1ql.selectEntity} where #{#n1ql.filter} and companyId = $1 and $2 within #{#n1ql.bucket}")
+ Topic findByCompanyAndAreaId(String companyId, String areaId);
+
+ @Query("#{#n1ql.selectEntity} where #{#n1ql.filter} AND ANY phone IN phoneNumbers SATISFIES phone = $1 END")
+ List<Topic> findByPhoneNumber(String telephoneNumber);
+
+ @Query("SELECT COUNT(*) AS count FROM #{#n1ql.bucket} WHERE #{#n1ql.filter} and companyId = $1")
+ Long countBuildings(String companyId);
+ */
+}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepositoryCustom.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepositoryCustom.java new file mode 100644 index 00000000..220a8f76 --- /dev/null +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepositoryCustom.java @@ -0,0 +1,28 @@ +/*
+* ============LICENSE_START=======================================================
+* ONAP : DataLake
+* ================================================================================
+* Copyright 2019 China Mobile
+*=================================================================================
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* ============LICENSE_END=========================================================
+*/
+package org.onap.datalake.feeder.repository;
+
+/**
+ * @author Guobiao Mo
+ *
+ */
+public interface TopicRepositoryCustom {
+ long updateTopic(String topic, Boolean state);
+}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepositoryImpl.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepositoryImpl.java new file mode 100644 index 00000000..018d5b95 --- /dev/null +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/repository/TopicRepositoryImpl.java @@ -0,0 +1,67 @@ +/*
+* ============LICENSE_START=======================================================
+* ONAP : DataLake
+* ================================================================================
+* Copyright 2019 China Mobile
+*=================================================================================
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* ============LICENSE_END=========================================================
+*/
+package org.onap.datalake.feeder.repository;
+
+import org.onap.datalake.feeder.domain.Topic;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.couchbase.core.CouchbaseTemplate;
+/*
+import org.springframework.data.mongodb.MongoDbFactory;
+import org.springframework.data.mongodb.core.MongoTemplate;
+import org.springframework.data.mongodb.core.convert.DefaultDbRefResolver;
+import org.springframework.data.mongodb.core.convert.DefaultMongoTypeMapper;
+import org.springframework.data.mongodb.core.convert.MappingMongoConverter;
+import org.springframework.data.mongodb.core.mapping.MongoMappingContext;
+import org.springframework.data.mongodb.core.query.Criteria;
+import org.springframework.data.mongodb.core.query.Query;
+import org.springframework.data.mongodb.core.query.Update;
+
+import com.mongodb.WriteResult;
+import com.mongodb.client.result.UpdateResult;
+*/
+import java.util.List;
+
+/**
+ * @author Guobiao Mo
+ *
+ */
+public class TopicRepositoryImpl implements TopicRepositoryCustom {
+
+ @Autowired
+ CouchbaseTemplate template;
+
+ @Override
+ public long updateTopic(String topic, Boolean state) {
+/*
+ Query query = new Query(Criteria.where("id").is(topic));
+ Update update = new Update();
+ update.set("state", state);
+
+ UpdateResult result = mongoTemplate.updateFirst(query, update, Topic.class);
+
+ if(result!=null)
+ return result.getModifiedCount();
+ else
+ */ return 0L;
+
+
+
+ }
+}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java new file mode 100644 index 00000000..35432587 --- /dev/null +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/CouchbaseService.java @@ -0,0 +1,129 @@ +/* +* ============LICENSE_START======================================================= +* ONAP : DATALAKE +* ================================================================================ +* Copyright 2019 China Mobile +*================================================================================= +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* ============LICENSE_END========================================================= +*/ + +package org.onap.datalake.feeder.service; + +import java.util.ArrayList; +import java.util.List; + +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; + +import org.json.JSONObject; +import org.onap.datalake.feeder.config.ApplicationConfiguration; +import org.onap.datalake.feeder.domain.Topic; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import com.couchbase.client.java.Bucket; +import com.couchbase.client.java.Cluster; +import com.couchbase.client.java.CouchbaseCluster; +import com.couchbase.client.java.document.JsonDocument; +import com.couchbase.client.java.document.JsonLongDocument; +import com.couchbase.client.java.document.json.JsonObject; + +import rx.Observable; +import rx.functions.Func1; + +/** + * Service to use Couchbase + * + * @author Guobiao Mo + * + */ +@Service +public class CouchbaseService { + + private final Logger log = LoggerFactory.getLogger(this.getClass()); + + @Autowired + private ApplicationConfiguration config; + + Bucket bucket; + + @PostConstruct + private void init() { + // Initialize Couchbase Connection + Cluster cluster = CouchbaseCluster.create(config.getCouchbaseHost()); + cluster.authenticate(config.getCouchbaseUser(), config.getCouchbasePass()); + bucket = cluster.openBucket(config.getCouchbaseBucket()); + + log.info("Connect to Couchbase " + config.getCouchbaseHost()); + + // Create a N1QL Primary Index (but ignore if it exists) + bucket.bucketManager().createN1qlPrimaryIndex(true, false); + } + + @PreDestroy + public void cleanUp() { + bucket.close(); + } + + public void saveJsons(Topic topic, List<JSONObject> jsons) { + List<JsonDocument> documents= new ArrayList<>(jsons.size()); + for(JSONObject json : jsons) { + //convert to Couchbase JsonObject from org.json JSONObject + JsonObject jsonObject = JsonObject.fromJson(json.toString()); + + long timestamp = jsonObject.getLong("_ts");//this is Kafka time stamp, which is added in StoreService.messageToJson() + + //setup TTL + int expiry = (int) (timestamp/1000L) + topic.getTtl()*3600*24; //in second + + String id = getId(topic.getId()); + JsonDocument doc = JsonDocument.create(id, expiry, jsonObject); + documents.add(doc); + } + saveDocuments(documents); + } + + + private String getId(String topicStr) { + //String id = topicStr+":"+timestamp+":"+UUID.randomUUID(); + + //https://forums.couchbase.com/t/how-to-set-an-auto-increment-id/4892/2 + //atomically get the next sequence number: + // increment by 1, initialize at 0 if counter doc not found + //TODO how slow is this compared with above UUID approach? + JsonLongDocument nextIdNumber = bucket.counter(topicStr, 1, 0); //like 12345 + String id = topicStr +":"+ nextIdNumber.content(); + + return id; + } + + //https://docs.couchbase.com/java-sdk/2.7/document-operations.html + private void saveDocuments(List<JsonDocument> documents) { + Observable + .from(documents) + .flatMap(new Func1<JsonDocument, Observable<JsonDocument>>() { + @Override + public Observable<JsonDocument> call(final JsonDocument docToInsert) { + return bucket.async().insert(docToInsert); + } + }) + .last() + .toBlocking() + .single(); + } + +} diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java new file mode 100644 index 00000000..96ad81bf --- /dev/null +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/DmaapService.java @@ -0,0 +1,94 @@ +/* +* ============LICENSE_START======================================================= +* ONAP : DATALAKE +* ================================================================================ +* Copyright 2019 China Mobile +*================================================================================= +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* ============LICENSE_END========================================================= +*/ + +package org.onap.datalake.feeder.service; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import javax.annotation.PostConstruct; + +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooKeeper; +import org.onap.datalake.feeder.config.ApplicationConfiguration; +import org.onap.datalake.feeder.domain.Topic; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +/** + * This service will handle all the communication with Kafka + * + * @author Guobiao Mo + * + */ +@Service +public class DmaapService { + + private final Logger log = LoggerFactory.getLogger(this.getClass()); + + @Autowired + private ApplicationConfiguration config; + + @Autowired + private TopicService topicService; + + + @PostConstruct + private void init() { + + } + + //get all topic names from Zookeeper + public List<String> getTopics() { + try { + Watcher watcher = new Watcher() { + @Override + public void process(WatchedEvent event) { + // TODO monitor new topics + + } + }; + ZooKeeper zk = new ZooKeeper(config.getDmaapZookeeperHostPort(), 10000, watcher); + List<String> topics = zk.getChildren("/brokers/topics", false); + return topics; + } catch (Exception e) { + log.error("Can not get topic list from Zookeeper, for testing, going to use hard coded topic list.", e); + return null; + } + } + + public List<String> getActiveTopics() throws IOException { + List<String> allTopics = new ArrayList<>(getTopics()); + + List<String> ret = new ArrayList<>(); + for (String topicStr : allTopics) { + Topic topic = topicService.getEffectiveTopic(topicStr, true); + if (topic.isEnabled()) { + ret.add(topicStr); + } + } + return ret; + } + +} diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java new file mode 100644 index 00000000..cbcc5f86 --- /dev/null +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java @@ -0,0 +1,142 @@ +/* +* ============LICENSE_START======================================================= +* ONAP : DATALAKE +* ================================================================================ +* Copyright 2019 China Mobile +*================================================================================= +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* ============LICENSE_END========================================================= +*/ + +package org.onap.datalake.feeder.service; + +import java.io.IOException; +import java.util.List; + +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; + +import org.apache.http.HttpHost; +import org.elasticsearch.ElasticsearchStatusException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; +import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.common.xcontent.XContentType; +import org.json.JSONObject; +import org.onap.datalake.feeder.config.ApplicationConfiguration; +import org.onap.datalake.feeder.domain.Topic; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +/** + * Service to use Elasticsearch + * + * @author Guobiao Mo + * + */ +@Service +public class ElasticsearchService { + + private final Logger log = LoggerFactory.getLogger(this.getClass()); + + @Autowired + private ApplicationConfiguration config; + + private RestHighLevelClient client; + ActionListener<BulkResponse> listener; + + @PostConstruct + private void init() { + String elasticsearchHost = config.getElasticsearchHost(); + + // Initialize the Connection + client = new RestHighLevelClient(RestClient.builder(new HttpHost(elasticsearchHost, 9200, "http"), new HttpHost(elasticsearchHost, 9201, "http"))); + + log.info("Connect to Elasticsearch Host " + elasticsearchHost); + + listener = new ActionListener<BulkResponse>() { + @Override + public void onResponse(BulkResponse bulkResponse) { + + } + + @Override + public void onFailure(Exception e) { + log.error(e.getMessage()); + } + }; + } + + @PreDestroy + public void cleanUp() throws IOException { + client.close(); + } + + public void ensureTableExist(String topic) throws IOException { + String topicLower = topic.toLowerCase(); + + CreateIndexRequest createIndexRequest = new CreateIndexRequest(topicLower); + try { + CreateIndexResponse createIndexResponse = client.indices().create(createIndexRequest, RequestOptions.DEFAULT); + log.info(createIndexResponse.index()+" : created "+createIndexResponse.isAcknowledged()); + }catch(ElasticsearchStatusException e) { + log.info("{} create ES topic status: {}", topic, e.getDetailedMessage()); + } + } + + //TTL is not supported in Elasticsearch 5.0 and later, what can we do? FIXME + public void saveJsons(Topic topic, List<JSONObject> jsons) { + BulkRequest request = new BulkRequest(); + + for (JSONObject json : jsons) { + if(topic.isCorrelateClearedMessage()) { + boolean found = correlateClearedMessage(json); + if(found) { + continue; + } + } + request.add(new IndexRequest(topic.getId().toLowerCase(), "doc").source(json.toString(), XContentType.JSON)); + } + if(config.isAsync()) { + client.bulkAsync(request, RequestOptions.DEFAULT, listener); + }else { + try { + client.bulk(request, RequestOptions.DEFAULT); + } catch (IOException e) { + log.error( topic.getId() , e); + } + } + } + + private boolean correlateClearedMessage(JSONObject json) { + boolean found = false; + + /*TODO + * 1. check if this is a alarm cleared message + * 2. search previous alarm message + * 3. update previous message, if success, set found=true + */ + + return found; + } + +} diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullService.java new file mode 100644 index 00000000..3dcbd8ee --- /dev/null +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullService.java @@ -0,0 +1,117 @@ +/* +* ============LICENSE_START======================================================= +* ONAP : DATALAKE +* ================================================================================ +* Copyright 2019 China Mobile +*================================================================================= +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* ============LICENSE_END========================================================= +*/ + +package org.onap.datalake.feeder.service; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import javax.annotation.PostConstruct; + +import org.onap.datalake.feeder.config.ApplicationConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationContext; +import org.springframework.stereotype.Service; + +/** + * Service that pulls messages from DMaaP and save them to Big Data DBs + * + * @author Guobiao Mo + * + */ + +@Service +public class PullService { + + private final Logger logger = LoggerFactory.getLogger(this.getClass()); + + private boolean isRunning = false; + private ExecutorService executorService; + private List<PullThread> consumers; + + @Autowired + private ApplicationContext context; + + @Autowired + private ApplicationConfiguration config; + + @PostConstruct + private void init() { + } + + /** + * start pulling. + * + * @throws IOException + */ + public synchronized void start() throws IOException { + if (isRunning) { + return; + } + + logger.info("start pulling ..."); + + int numConsumers = config.getKafkaConsumerCount(); + + executorService = Executors.newFixedThreadPool(numConsumers); + consumers = new ArrayList<>(numConsumers); + + for (int i = 0; i < numConsumers; i++) { + PullThread puller = context.getBean(PullThread.class, i); + consumers.add(puller); + executorService.submit(puller); + } + + isRunning = true; + + Runtime.getRuntime().addShutdownHook(new Thread(()->shutdown())) ; + } + + /** + * stop pulling + */ + public synchronized void shutdown() { + if (!isRunning) { + return; + } + + logger.info("stop pulling ..."); + for (PullThread puller : consumers) { + puller.shutdown(); + } + + executorService.shutdown(); + + try { + executorService.awaitTermination(10L, TimeUnit.SECONDS); + } catch (InterruptedException e) { + logger.error("executor.awaitTermination", e); + } + + isRunning = false; + } + +} diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullThread.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullThread.java new file mode 100644 index 00000000..3c75eb68 --- /dev/null +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/PullThread.java @@ -0,0 +1,168 @@ +/* +* ============LICENSE_START======================================================= +* ONAP : DATALAKE +* ================================================================================ +* Copyright 2019 China Mobile +*================================================================================= +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* ============LICENSE_END========================================================= +*/ + +package org.onap.datalake.feeder.service; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicBoolean; + +import javax.annotation.PostConstruct; + +import org.apache.commons.lang3.tuple.Pair; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.onap.datalake.feeder.config.ApplicationConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.config.ConfigurableBeanFactory; +import org.springframework.context.annotation.Scope; +import org.springframework.stereotype.Service; + +/** + * Thread that pulls messages from DMaaP and save them to Big Data DBs + * + * @author Guobiao Mo + * + */ + +@Service +@Scope(value=ConfigurableBeanFactory.SCOPE_PROTOTYPE) +public class PullThread implements Runnable { + + @Autowired + private DmaapService dmaapService; + + @Autowired + private StoreService storeService; + + @Autowired + private ApplicationConfiguration config; + + private final Logger log = LoggerFactory.getLogger(this.getClass()); + + private KafkaConsumer<String, String> consumer; //<String, String> is key-value type, in our case key is empty, value is JSON text + private int id; + + private final AtomicBoolean active = new AtomicBoolean(false); + private boolean async; + + public PullThread(int id) { + this.id = id; + } + + @PostConstruct + private void init() { + async = config.isAsync(); + Properties consumerConfig = getConsumerConfig(); + consumer = new KafkaConsumer<>(consumerConfig); + } + + private Properties getConsumerConfig() { + Properties consumerConfig = new Properties(); + + consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getDmaapKafkaHostPort()); + consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, config.getDmaapKafkaGroup()); + consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); + consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); + consumerConfig.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.RoundRobinAssignor"); + consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); + + return consumerConfig; + } + + /** + * start pulling. + */ + @Override + public void run() { + active.set(true); + + DummyRebalanceListener rebalanceListener = new DummyRebalanceListener(); + + try { + List<String> topics = dmaapService.getActiveTopics(); //TODO get updated topic list within loop + + log.info("Thread {} going to subscribe to topics: {}", id, topics); + + consumer.subscribe(topics, rebalanceListener); + + while (active.get()) { + + ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(config.getDmaapKafkaTimeout())); + + List<Pair<Long, String>> messages = new ArrayList<>(records.count()); + for (TopicPartition partition : records.partitions()) { + messages.clear(); + List<ConsumerRecord<String, String>> partitionRecords = records.records(partition); + for (ConsumerRecord<String, String> record : partitionRecords) { + messages.add(Pair.of(record.timestamp(), record.value())); + log.debug("threadid={} topic={}, timestamp={} key={}, offset={}, partition={}, value={}", id, record.topic(), record.timestamp(), record.key(), record.offset(), record.partition(), record.value()); + } + storeService.saveMessages(partition.topic(), messages); + log.info("topic={} count={}", partition.topic(), partitionRecords.size());//TODO we may record this number to DB + + if (!async) {//for reliability, sync commit offset to Kafka, this slows down a bit + long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); + consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1))); + } + } + + if (async) {//for high Throughput, async commit offset in batch to Kafka + consumer.commitAsync(); + } + } + } catch (Exception e) { + log.error("Puller {} run(): exception={}", id, e.getMessage()); + log.error("", e); + } finally { + consumer.close(); + } + } + + public void shutdown() { + active.set(false); + consumer.wakeup(); + } + + private class DummyRebalanceListener implements ConsumerRebalanceListener { + @Override + public void onPartitionsRevoked(Collection<TopicPartition> partitions) { + log.info("Called onPartitionsRevoked with partitions: {}", partitions); + } + + @Override + public void onPartitionsAssigned(Collection<TopicPartition> partitions) { + log.info("Called onPartitionsAssigned with partitions: {}", partitions); + } + } + +} diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java new file mode 100644 index 00000000..1cd3a8a1 --- /dev/null +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java @@ -0,0 +1,164 @@ +/* +* ============LICENSE_START======================================================= +* ONAP : DATALAKE +* ================================================================================ +* Copyright 2019 China Mobile +*================================================================================= +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* ============LICENSE_END========================================================= +*/ + +package org.onap.datalake.feeder.service; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; + +import org.apache.commons.lang3.tuple.Pair; + +import org.json.JSONException; +import org.json.JSONObject; +import org.json.XML; +import org.onap.datalake.feeder.domain.Topic; +import org.onap.datalake.feeder.enumeration.DataFormat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import com.fasterxml.jackson.core.JsonParseException; +import com.fasterxml.jackson.databind.JsonMappingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; + +/** + * Service to store messages to varieties of DBs + * + * comment out YAML support, since AML is for config and don't see this data type in DMaaP. Do we need to support XML? + * + * @author Guobiao Mo + * + */ +@Service +public class StoreService { + private final Logger log = LoggerFactory.getLogger(this.getClass()); + + @Autowired + private TopicService topicService; + + @Autowired + private CouchbaseService couchbaseService; + + @Autowired + private ElasticsearchService elasticsearchService; + + private Map<String, Topic> topicMap = new HashMap<>(); + + private ObjectMapper yamlReader; + + @PostConstruct + private void init() { + yamlReader = new ObjectMapper(new YAMLFactory()); + } + + @PreDestroy + public void cleanUp() { + } + + public void saveMessages(String topicStr, List<Pair<Long, String>> messages) {//pair=ts+text + if (messages == null || messages.isEmpty()) { + return; + } + + Topic topic = topicMap.computeIfAbsent(topicStr, k -> { //TODO get topic updated settings from DB periodically + return topicService.getEffectiveTopic(topicStr); + }); + + List<JSONObject> docs = new ArrayList<>(); + + for (Pair<Long, String> pair : messages) { + try { + docs.add(messageToJson(topic, pair)); + } catch (Exception e) { + log.error(pair.getRight(), e); + } + } + + saveJsons(topic, docs); + } + + private JSONObject messageToJson(Topic topic, Pair<Long, String> pair) throws JSONException, JsonParseException, JsonMappingException, IOException { + + long timestamp = pair.getLeft(); + String text = pair.getRight(); + + //for debug, to be remove +// String topicStr = topic.getId(); +// if (!"TestTopic1".equals(topicStr) && !"msgrtr.apinode.metrics.dmaap".equals(topicStr) && !"AAI-EVENT".equals(topicStr) && !"unauthenticated.DCAE_CL_OUTPUT".equals(topicStr) && !"unauthenticated.SEC_FAULT_OUTPUT".equals(topicStr)) { + // log.debug("{} ={}", topicStr, text); + //} + + boolean storeRaw = topic.isSaveRaw(); + + JSONObject json = null; + + DataFormat dataFormat = topic.getDataFormat(); + + switch (dataFormat) { + case JSON: + json = new JSONObject(text); + break; + case XML://XML and YAML can be directly inserted into ES, we may not need to convert it to JSON + json = XML.toJSONObject(text); + break; + case YAML:// Do we need to support YAML? + Object obj = yamlReader.readValue(text, Object.class); + ObjectMapper jsonWriter = new ObjectMapper(); + String jsonString = jsonWriter.writeValueAsString(obj); + json = new JSONObject(jsonString); + break; + default: + json = new JSONObject(); + storeRaw = true; + break; + } + + //FIXME for debug, to be remove + json.remove("_id"); + json.remove("_dl_text_"); + + json.put("_ts", timestamp); + if (storeRaw) { + json.put("_text", text); + } + + return json; + } + + private void saveJsons(Topic topic, List<JSONObject> jsons) { + if (topic.isSupportCouchbase()) { + couchbaseService.saveJsons(topic, jsons); + } + + if (topic.isSupportElasticsearch()) { + elasticsearchService.saveJsons(topic, jsons); + } + } + +} diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java new file mode 100644 index 00000000..9b8fabc1 --- /dev/null +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java @@ -0,0 +1,94 @@ +/* +* ============LICENSE_START======================================================= +* ONAP : DATALAKE +* ================================================================================ +* Copyright 2019 China Mobile +*================================================================================= +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* ============LICENSE_END========================================================= +*/ + +package org.onap.datalake.feeder.service; + +import java.io.IOException; +import java.util.Optional; + +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; + +import org.onap.datalake.feeder.domain.Topic; +import org.onap.datalake.feeder.repository.TopicRepository; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +/** + * Service for topics topic setting is stored in Couchbase, bucket 'dl', see + * application.properties for Spring setup + * + * @author Guobiao Mo + * + */ +@Service +public class TopicService { + + private final Logger log = LoggerFactory.getLogger(this.getClass()); + + @Autowired + private TopicRepository topicRepository; + + @Autowired + private ElasticsearchService elasticsearchService; + + @PostConstruct + private void init() { + } + + @PreDestroy + public void cleanUp() { + } + + public Topic getEffectiveTopic(String topicStr) { + try { + return getEffectiveTopic(topicStr, false); + } catch (IOException e) { + log.error(topicStr, e); + } + return null; + } + + public Topic getEffectiveTopic(String topicStr, boolean ensureTableExist) throws IOException { + Topic topic = getTopic(topicStr); + if (topic == null) { + topic = new Topic(topicStr); + } + + topic.setDefaultTopic(getDefaultTopic()); + + if(ensureTableExist && topic.isEnabled() && topic.isSupportElasticsearch()) { + elasticsearchService.ensureTableExist(topicStr); + } + return topic; + } + + public Topic getTopic(String topicStr) { + Optional<Topic> ret = topicRepository.findById(topicStr); + return ret.isPresent() ? ret.get() : null; + } + + public Topic getDefaultTopic() { + return getTopic("_DL_DEFAULT_"); + } + +} diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/util/DruidSupervisorGenerator.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/util/DruidSupervisorGenerator.java new file mode 100644 index 00000000..819590b7 --- /dev/null +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/util/DruidSupervisorGenerator.java @@ -0,0 +1,172 @@ +/*
+* ============LICENSE_START=======================================================
+* ONAP : DataLake
+* ================================================================================
+* Copyright 2019 China Mobile
+*=================================================================================
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* ============LICENSE_END=========================================================
+*/
+package org.onap.datalake.feeder.util;
+
+import java.io.BufferedWriter;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.velocity.Template;
+import org.apache.velocity.VelocityContext;
+import org.apache.velocity.app.Velocity;
+import org.apache.velocity.runtime.RuntimeConstants;
+import org.apache.velocity.runtime.resource.loader.ClasspathResourceLoader;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.JsonNodeType;
+
+
+/*
+ * read sample json and output supervisor to resources\druid\generated
+ * need manual edit to be production ready, final versions are in resources\druid
+ *
+ * http://druid.io/docs/latest/tutorials/tutorial-ingestion-spec.html
+ * http://druid.io/docs/latest/ingestion/flatten-json
+ *
+ *
+ * todo:
+ * reduce the manual editing
+ * path hard coded
+ * auto get topics,
+ * auto get sample, and for each topic, get multiple samples.
+ * make supervisor file names consistent
+ * dimension type default is string, in msgrtr.apinode.metrics.dmaap , many are long/double, so need to generate dimensionsSpec, this is done at the end of printFlattenSpec()
+ */
+
+public class DruidSupervisorGenerator {
+
+ Template template = null;
+ VelocityContext context;
+
+ List<String[]> dimensions;
+
+ public DruidSupervisorGenerator() {
+ dimensions = new ArrayList<>();
+
+ Velocity.setProperty(RuntimeConstants.RESOURCE_LOADER, "classpath");
+ Velocity.setProperty("classpath.resource.loader.class", ClasspathResourceLoader.class.getName());
+
+ Velocity.init();
+
+ context = new VelocityContext();
+
+ context.put("host", "dl_dmaap_kf");
+
+ template = Velocity.getTemplate("druid/kafka-supervisor-template.vm");
+ }
+
+ public void printNode(String prefix, JsonNode node) {
+
+ // lets see what type the node is
+ // System.out.println("NodeType=" + node.getNodeType() + ", isContainerNode=" + node.isContainerNode() + ", " + node); // prints OBJECT
+
+ if (node.isContainerNode()) {
+
+ Iterator<Entry<String, JsonNode>> fields = node.fields();
+
+ while (fields.hasNext()) {
+ Entry<String, JsonNode> field = fields.next();
+ // System.out.println("--------"+field.getKey()+"--------");
+ printNode(prefix + "." + field.getKey(), field.getValue());
+ }
+
+ if (node.isArray()) {
+ Iterator<JsonNode> elements = node.elements();
+ int i = 0;
+ while (elements.hasNext()) {
+ JsonNode element = elements.next();
+ printNode(prefix + "[" + i + "]", element);
+ i++;
+ }
+ }
+
+ } else {
+ printFlattenSpec(node.getNodeType(), prefix);
+ }
+
+ }
+
+ public void printFlattenSpec(JsonNodeType type, String path) {
+ String name = path.substring(2).replace('.', ':');
+ // lets see what type the node is
+ System.out.println("{");
+ System.out.println("\"type\": \"path\",");
+ System.out.println("\"name\": \"" + name + "\",");
+ System.out.println("\"expr\": \"" + path + "\"");
+ System.out.println("},");
+
+ dimensions.add(new String[] { name, path });
+ /*
+ //for dimensionsSpec
+ if (JsonNodeType.NUMBER.equals(type)) {
+ System.out.println("{");
+ System.out.println("\"type\": \"long\",");
+ System.out.println("\"name\": \"" + name + "\",");
+ System.out.println("},");
+ } else {
+ System.out.println("\"" + name + "\",");
+
+ }
+ */
+ }
+
+ public void doTopic(String topic) throws IOException {
+ dimensions.clear();
+
+ String sampleFileName = "C:\\git\\onap\\datalake\\olap\\src\\main\\resources\\druid\\" + topic + "-sample-format.json";//FIXME hard coded path
+ String outputFileName = "C:\\git\\onap\\datalake\\olap\\src\\main\\resources\\druid\\generated\\" + topic + "-kafka-supervisor.json";
+
+ // Get the contents of json as a string using commons IO IOUTils class.
+ String sampleJson = Util.getTextFromFile(sampleFileName);
+
+ // create an ObjectMapper instance.
+ ObjectMapper mapper = new ObjectMapper();
+ // use the ObjectMapper to read the json string and create a tree
+ JsonNode root = mapper.readTree(sampleJson);
+ printNode("$", root);
+
+ context.put("topic", topic);
+ context.put("timestamp", "event-header:timestamp");//FIXME hard coded, should be topic based
+ context.put("timestampFormat", "yyyyMMdd-HH:mm:ss:SSS");//FIXME hard coded, should be topic based
+
+ context.put("dimensions", dimensions);
+
+ BufferedWriter out = new BufferedWriter(new FileWriter(outputFileName));
+
+ template.merge(context, out);
+ out.close();
+ }
+
+ public static void main(String[] args) throws MalformedURLException, IOException {
+ String[] topics = new String[] { "AAI-EVENT", "msgrtr.apinode.metrics.dmaap", "unauthenticated.DCAE_CL_OUTPUT", "unauthenticated.SEC_FAULT_OUTPUT" };//FIXME hard coded
+
+ DruidSupervisorGenerator p = new DruidSupervisorGenerator();
+
+ for (String topic : topics) {
+ p.doTopic(topic);
+ }
+ }
+}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/util/Util.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/util/Util.java new file mode 100644 index 00000000..89e81af1 --- /dev/null +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/util/Util.java @@ -0,0 +1,65 @@ +/* +* ============LICENSE_START======================================================= +* ONAP : DCAE +* ================================================================================ +* Copyright 2019 China Mobile +*================================================================================= +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* ============LICENSE_END========================================================= +*/ + +package org.onap.datalake.feeder.util; + +import java.io.File; +import java.io.IOException; + +import org.apache.commons.io.FileUtils; + +/** + * utils + * + * @author Guobiao Mo + * + */ +public class Util { + + // https://commons.apache.org/proper/commons-io/description.html + public static String getTextFromFile(String fileName) throws IOException { + File file = new File(fileName); + String string = FileUtils.readFileToString(file, "UTF-8"); + return string; + } + + /** + * given a json file, remove dot(.) in all keys + * http://www.vogella.com/tutorials/JavaRegularExpressions/article.html + */ + public static String replaceDotInKey(String json) { + String regex = "(\"[\\-\\w]+)(\\.)([\\.\\-\\w]+\\\\?\"\\:)"; + + String newJson = json.replaceAll(regex, "$1_$3"); + + if (json.equals(newJson)) { + return json; + } else { + return replaceDotInKey(newJson);// there maybe more to replace + } + } + + public static void main(String[] args) { + String a = "\"u-y.t.y-t\":\"u.gfh\",\\\"jg.h\\\":\"j_9889\""; + String b = replaceDotInKey(a); + System.out.println(a); + System.out.println(b); + } +} |