summaryrefslogtreecommitdiffstats
path: root/components/datalake-handler/feeder
diff options
context:
space:
mode:
authorZhangZihao <zhangzihao@chinamobile.com>2019-07-04 10:33:32 +0800
committerZhangZihao <zhangzihao@chinamobile.com>2019-07-04 10:33:44 +0800
commit027c75f93a010cd115fe5fa882646d6f258b566b (patch)
tree494f62100b15c146dae15080c34b245f55fb8a3d /components/datalake-handler/feeder
parent773360d05595c3d1f8c5d55200f06b0e42e7dc3c (diff)
kafka
Change-Id: I3084858f2ddc06c42e062f65d65b4d3dec620fd7 Issue-ID: DCAEGEN2-1631 Signed-off-by: ZhangZihao <zhangzihao@chinamobile.com>
Diffstat (limited to 'components/datalake-handler/feeder')
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/KafkaController.java149
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Kafka.java22
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java8
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/KafkaConfig.java66
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java1
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/KafkaService.java87
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java21
7 files changed, 354 insertions, 0 deletions
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/KafkaController.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/KafkaController.java
new file mode 100644
index 00000000..901adf29
--- /dev/null
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/controller/KafkaController.java
@@ -0,0 +1,149 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP : DataLake
+ * ================================================================================
+ * Copyright 2019 China Mobile
+ *=================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.datalake.feeder.controller;
+
+import io.swagger.annotations.ApiOperation;
+import org.onap.datalake.feeder.controller.domain.PostReturnBody;
+import org.onap.datalake.feeder.domain.Kafka;
+import org.onap.datalake.feeder.dto.KafkaConfig;
+import org.onap.datalake.feeder.repository.KafkaRepository;
+import org.onap.datalake.feeder.service.KafkaService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.MediaType;
+import org.springframework.validation.BindingResult;
+import org.springframework.web.bind.annotation.*;
+
+import javax.servlet.http.HttpServletResponse;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * This controller manages kafka settings
+ *
+ * @author guochunmeng
+ */
+@RestController
+@RequestMapping(value = "/kafkas", produces = { MediaType.APPLICATION_JSON_VALUE })
+public class KafkaController {
+
+ private final Logger log = LoggerFactory.getLogger(this.getClass());
+
+ @Autowired
+ private KafkaService kafkaService;
+
+ @Autowired
+ private KafkaRepository kafkaRepository;
+
+ @PostMapping("")
+ @ResponseBody
+ @ApiOperation(value="Create a kafka.")
+ public PostReturnBody<KafkaConfig> createKafka(@RequestBody KafkaConfig kafkaConfig, BindingResult result, HttpServletResponse response) throws IOException {
+
+ if (result.hasErrors()) {
+ sendError(response, 400, "Error parsing KafkaConfig : "+result.toString());
+ return null;
+ }
+
+ Kafka oldKafka = kafkaService.getKafkaById(kafkaConfig.getId());
+
+ if (oldKafka != null) {
+ sendError(response, 400, "kafka is exist "+kafkaConfig.getId());
+ return null;
+ } else {
+ Kafka kafka = null;
+ try {
+ kafka = kafkaService.fillKafkaConfiguration(kafkaConfig);
+ } catch (Exception e) {
+ log.debug("FillKafkaConfiguration failed", e.getMessage());
+ sendError(response, 400, "Error FillKafkaConfiguration: "+e.getMessage());
+ return null;
+ }
+ kafkaRepository.save(kafka);
+ log.info("Kafka save successed");
+ return mkPostReturnBody(200, kafka);
+ }
+ }
+
+ @PutMapping("/{id}")
+ @ResponseBody
+ @ApiOperation(value="Update a kafka.")
+ public PostReturnBody<KafkaConfig> updateKafka(@RequestBody KafkaConfig kafkaConfig, BindingResult result, @PathVariable String id, HttpServletResponse response) throws IOException {
+
+ if (result.hasErrors()) {
+ sendError(response, 400, "Error parsing KafkaConfig : "+result.toString());
+ return null;
+ }
+
+ Kafka oldKafka = kafkaService.getKafkaById(id);
+
+ if (oldKafka == null) {
+ sendError(response, 400, "Kafka not found: "+id);
+ return null;
+ } else {
+ try {
+ kafkaService.fillKafkaConfiguration(kafkaConfig, oldKafka);
+ } catch (Exception e) {
+ log.debug("FillKafkaConfiguration failed", e.getMessage());
+ sendError(response, 400, "Error FillKafkaConfiguration: "+e.getMessage());
+ return null;
+ }
+ kafkaRepository.save(oldKafka);
+ log.info("kafka update successed");
+ return mkPostReturnBody(200, oldKafka);
+ }
+ }
+
+ @DeleteMapping("/{id}")
+ @ResponseBody
+ @ApiOperation(value="delete a kafka.")
+ public void deleteKafka(@PathVariable("id") String id, HttpServletResponse response) throws IOException{
+
+ Kafka oldKafka = kafkaService.getKafkaById(id);
+ if (oldKafka == null) {
+ sendError(response, 400, "kafka not found "+id);
+ } else {
+ kafkaRepository.delete(oldKafka);
+ response.setStatus(204);
+ }
+ }
+
+ @GetMapping("")
+ @ResponseBody
+ @ApiOperation(value="List all Kafkas")
+ public List<KafkaConfig> queryAllKafka(){
+ return kafkaService.getAllKafka();
+ }
+
+ private PostReturnBody<KafkaConfig> mkPostReturnBody(int statusCode, Kafka kafka) {
+ PostReturnBody<KafkaConfig> retBody = new PostReturnBody<>();
+ retBody.setStatusCode(statusCode);
+ retBody.setReturnBody(kafka.getKafkaConfig());
+ return retBody;
+ }
+
+ private void sendError(HttpServletResponse response, int sc, String msg) throws IOException {
+ log.info(msg);
+ response.sendError(sc, msg);
+ }
+
+}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Kafka.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Kafka.java
index 26be942a..2741c638 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Kafka.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Kafka.java
@@ -32,6 +32,7 @@ import javax.persistence.Table;
import com.fasterxml.jackson.annotation.JsonBackReference;
import lombok.Getter;
import lombok.Setter;
+import org.onap.datalake.feeder.dto.KafkaConfig;
/**
@@ -123,4 +124,25 @@ public class Kafka {
public int hashCode() {
return id.hashCode();
}
+
+ public KafkaConfig getKafkaConfig() {
+ KafkaConfig kafkaConfig = new KafkaConfig();
+
+ kafkaConfig.setId(getId());
+ kafkaConfig.setBrokerList(getBrokerList());
+ kafkaConfig.setConsumerCount(getConsumerCount());
+ kafkaConfig.setEnabled(isEnabled());
+ kafkaConfig.setExcludedTopic(getExcludedTopic());
+ kafkaConfig.setGroup(getGroup());
+ kafkaConfig.setIncludedTopic(getIncludedTopic());
+ kafkaConfig.setLogin(getLogin());
+ kafkaConfig.setName(getName());
+ kafkaConfig.setPass(getPass());
+ kafkaConfig.setSecure(isSecure());
+ kafkaConfig.setSecurityProtocol(getSecurityProtocol());
+ kafkaConfig.setTimeout(getTimeout());
+ kafkaConfig.setZooKeeper(getZooKeeper());
+
+ return kafkaConfig;
+ }
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java
index a27b6756..c680e71b 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java
@@ -251,6 +251,14 @@ public class Topic {
tConfig.setSinkdbs(dbList);
tConfig.setEnabledSinkdbs(enabledDbList);
+ Set<Kafka> topicKafka = getKafkas();
+ List<String> kafkaList = new ArrayList<>();
+ if (topicKafka != null) {
+ for (Kafka kafka : topicKafka) {
+ kafkaList.add(kafka.getId());
+ }
+ }
+ tConfig.setSinkKafkas(kafkaList);
return tConfig;
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/KafkaConfig.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/KafkaConfig.java
new file mode 100644
index 00000000..b158d167
--- /dev/null
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/KafkaConfig.java
@@ -0,0 +1,66 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP : DataLake
+ * ================================================================================
+ * Copyright 2019 QCT
+ *=================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.datalake.feeder.dto;
+
+import lombok.Getter;
+import lombok.Setter;
+
+import java.util.List;
+
+/**
+ * JSON request body for Kafka Config.
+ *
+ * @author guochunmeng
+ *
+ */
+@Getter
+@Setter
+public class KafkaConfig {
+
+ private String id;
+
+ private String name;
+
+ private boolean enabled;
+
+ private String brokerList;
+
+ private String zooKeeper;
+
+ private String group;
+
+ private boolean secure;
+
+ private String login;
+
+ private String pass;
+
+ private String securityProtocol;
+
+ private String includedTopic;
+
+ private String excludedTopic;
+
+ private Integer consumerCount;
+
+ private Integer timeout;
+
+}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java
index ace7bfa9..942526d2 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java
@@ -55,6 +55,7 @@ public class TopicConfig {
private String messageIdPath;
private String aggregateArrayPath;
private String flattenArrayPath;
+ private List<String> sinkKafkas;
@Override
public String toString() {
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/KafkaService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/KafkaService.java
new file mode 100644
index 00000000..58ee9087
--- /dev/null
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/KafkaService.java
@@ -0,0 +1,87 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP : DataLake
+ * ================================================================================
+ * Copyright 2019 China Mobile
+ *=================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.datalake.feeder.service;
+
+import org.onap.datalake.feeder.domain.Kafka;
+import org.onap.datalake.feeder.dto.KafkaConfig;
+import org.onap.datalake.feeder.repository.KafkaRepository;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.*;
+
+/**
+ * Service for kafkas
+ *
+ * @author guochunmeng
+ */
+@Service
+public class KafkaService {
+
+ @Autowired
+ private KafkaRepository kafkaRepository;
+
+ public Kafka getKafkaById(String id) {
+
+ Optional<Kafka> ret = kafkaRepository.findById(id);
+ return ret.isPresent() ? ret.get() : null;
+ }
+
+ public List<KafkaConfig> getAllKafka() {
+
+ List<KafkaConfig> kafkaConfigList = new ArrayList<>();
+ Iterable<Kafka> kafkaIterable = kafkaRepository.findAll();
+ for(Kafka portal : kafkaIterable) {
+ kafkaConfigList.add(portal.getKafkaConfig());
+ }
+ return kafkaConfigList;
+ }
+
+ public Kafka fillKafkaConfiguration(KafkaConfig kafkaConfig) {
+ Kafka kafka = new Kafka();
+ fillKafka(kafkaConfig, kafka);
+ return kafka;
+ }
+
+ public void fillKafkaConfiguration(KafkaConfig kafkaConfig, Kafka kafka) {
+ fillKafka(kafkaConfig, kafka);
+ }
+
+ private void fillKafka(KafkaConfig kafkaConfig, Kafka kafka) {
+
+ kafka.setId(kafkaConfig.getId());
+ kafka.setBrokerList(kafkaConfig.getBrokerList());
+ kafka.setConsumerCount(kafkaConfig.getConsumerCount());
+ kafka.setEnabled(kafkaConfig.isEnabled());
+ kafka.setExcludedTopic(kafkaConfig.getExcludedTopic());
+ kafka.setIncludedTopic(kafkaConfig.getIncludedTopic());
+ kafka.setGroup(kafkaConfig.getGroup());
+ kafka.setLogin(kafkaConfig.getLogin());
+ kafka.setName(kafkaConfig.getName());
+ kafka.setPass(kafkaConfig.getPass());
+ kafka.setSecure(kafkaConfig.isSecure());
+ kafka.setSecurityProtocol(kafkaConfig.getSecurityProtocol());
+ kafka.setTimeout(kafkaConfig.getTimeout());
+ kafka.setZooKeeper(kafkaConfig.getZooKeeper());
+
+ }
+
+}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java
index 645160e2..73f6293f 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java
@@ -35,6 +35,7 @@ import org.onap.datalake.feeder.domain.EffectiveTopic;
import org.onap.datalake.feeder.domain.Kafka;
import org.onap.datalake.feeder.domain.Topic;
import org.onap.datalake.feeder.repository.DbRepository;
+import org.onap.datalake.feeder.repository.KafkaRepository;
import org.onap.datalake.feeder.repository.TopicNameRepository;
import org.onap.datalake.feeder.repository.TopicRepository;
import org.onap.datalake.feeder.service.db.ElasticsearchService;
@@ -68,6 +69,9 @@ public class TopicService {
@Autowired
private DbService dbService;
+
+ @Autowired
+ private KafkaRepository kafkaRepository;
public List<EffectiveTopic> getEnabledEffectiveTopic(Kafka kafka, String topicStr, boolean ensureTableExist) throws IOException {
@@ -166,6 +170,23 @@ public class TopicService {
} else {
topic.setDbs(relateDb);
}
+
+ Set<Kafka> relateKafka = new HashSet<>();
+ if (tConfig.getSinkKafkas() != null) {
+ for (String item : tConfig.getSinkKafkas()) {
+ Optional<Kafka> sinkKafka = kafkaRepository.findById(item);
+ if (sinkKafka.isPresent()) {
+ relateKafka.add(sinkKafka.get());
+ }
+ }
+ if (relateKafka.size() > 0) {
+ topic.setKafkas(relateKafka);
+ } else if (relateKafka.size() == 0) {
+ topic.getKafkas().clear();
+ }
+ } else {
+ topic.setKafkas(relateKafka);
+ }
}
}