From e26f1224fb91c1ca49f4b4b37a4a5de06ba3a7ad Mon Sep 17 00:00:00 2001 From: "k.kedron" Date: Tue, 17 Aug 2021 13:54:29 +0200 Subject: Many topics support Issue-ID: INT-1947 Signed-off-by: Krystian Kedron Change-Id: I1458c853ea7fabe8b393ec6f7e7ce8c5d13953f6 --- .../datacollector/service/VesRetrievalService.java | 20 +++++++++++++------- .../service/configuration/DmaapProperties.java | 10 +++++++--- .../configuration/DmaapRestReaderConfiguration.java | 5 +++-- datacollector/src/main/resources/application.yml | 4 +++- 4 files changed, 26 insertions(+), 13 deletions(-) (limited to 'datacollector/src/main') diff --git a/datacollector/src/main/java/org/onap/rapp/datacollector/service/VesRetrievalService.java b/datacollector/src/main/java/org/onap/rapp/datacollector/service/VesRetrievalService.java index 379e32d..a3825f6 100644 --- a/datacollector/src/main/java/org/onap/rapp/datacollector/service/VesRetrievalService.java +++ b/datacollector/src/main/java/org/onap/rapp/datacollector/service/VesRetrievalService.java @@ -21,6 +21,7 @@ import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; +import java.util.stream.Stream; import org.onap.rapp.datacollector.entity.ves.AdditionalMeasurementValues; import org.onap.rapp.datacollector.entity.ves.AdditionalMeasurements; import org.onap.rapp.datacollector.entity.ves.Event; @@ -61,20 +62,25 @@ public class VesRetrievalService implements DmaapRestReader { @Override public Collection retrieveEvents() { - logger.info("Reaching from dmaap: {}", config.getMeasurementsTopicUrl()); + logger.info("Reaching from dmaap: {}", config.getMeasurementsTopicUrls()); + return config.getMeasurementsTopicUrls().stream().flatMap(this::retrieveEventsFromTopic) + .collect(Collectors.toList()); + } + + private Stream retrieveEventsFromTopic(String topic) { try { - ResponseEntity responseEntity = - restTemplate.exchange(config.getMeasurementsTopicUrl(), HttpMethod.GET, - new HttpEntity(createHeaders(config.getDmaapProperties().getUsername(), config.getDmaapProperties().getPassword())), - String[].class); + ResponseEntity responseEntity = restTemplate.exchange(topic, HttpMethod.GET, + new HttpEntity(createHeaders(config.getDmaapProperties().getUsername(), + config.getDmaapProperties().getPassword())), String[].class); if (responseEntity.hasBody()) { String[] events = responseEntity.getBody(); - return Arrays.stream(events).collect(Collectors.toList()); + return Arrays.stream(events); } } catch (RestClientException ex) { logger.error("Failed to reach to dmaap", ex); } - return Collections.emptyList(); + + return Arrays.stream(new String[0]); } private HttpHeaders createHeaders(String username, String password) { diff --git a/datacollector/src/main/java/org/onap/rapp/datacollector/service/configuration/DmaapProperties.java b/datacollector/src/main/java/org/onap/rapp/datacollector/service/configuration/DmaapProperties.java index 3560e89..a664e8e 100644 --- a/datacollector/src/main/java/org/onap/rapp/datacollector/service/configuration/DmaapProperties.java +++ b/datacollector/src/main/java/org/onap/rapp/datacollector/service/configuration/DmaapProperties.java @@ -13,6 +13,9 @@ package org.onap.rapp.datacollector.service.configuration; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; import org.springframework.boot.context.properties.ConfigurationProperties; import lombok.Getter; @@ -28,10 +31,11 @@ public class DmaapProperties { private String username; private String password; private int port; - private String measurementsTopic; + private List measurementsTopics = new ArrayList<>(); - public String getMeasurementsTopicUrl() { - return String.format("%s://%s:%d/%s", protocol, host, port, measurementsTopic); + public List getMeasurementsTopicUrls() { + return measurementsTopics.stream().map(topic -> String.format("%s://%s:%d/%s", protocol, host, port, topic)) + .collect(Collectors.toList()); } } diff --git a/datacollector/src/main/java/org/onap/rapp/datacollector/service/configuration/DmaapRestReaderConfiguration.java b/datacollector/src/main/java/org/onap/rapp/datacollector/service/configuration/DmaapRestReaderConfiguration.java index ab1bd0a..a3b2536 100644 --- a/datacollector/src/main/java/org/onap/rapp/datacollector/service/configuration/DmaapRestReaderConfiguration.java +++ b/datacollector/src/main/java/org/onap/rapp/datacollector/service/configuration/DmaapRestReaderConfiguration.java @@ -23,6 +23,7 @@ import java.security.KeyStoreException; import java.security.NoSuchAlgorithmException; import java.security.cert.CertificateException; import java.security.cert.X509Certificate; +import java.util.List; import javax.net.ssl.HostnameVerifier; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLSession; @@ -118,8 +119,8 @@ public class DmaapRestReaderConfiguration { this.databaseProperties = databaseProperties; } - public String getMeasurementsTopicUrl() { - return dmaapProperties.getMeasurementsTopicUrl(); + public List getMeasurementsTopicUrls() { + return dmaapProperties.getMeasurementsTopicUrls(); } public DmaapProperties getDmaapProperties() { diff --git a/datacollector/src/main/resources/application.yml b/datacollector/src/main/resources/application.yml index 502d8de..01919fa 100644 --- a/datacollector/src/main/resources/application.yml +++ b/datacollector/src/main/resources/application.yml @@ -6,7 +6,9 @@ dmaap: port: 8181 username: dcae@dcae.onap.org password: demo123456! - measurements-topic: "measurements" + measurements-topics: + - measurements + - measurements2 database: host: mariadb-host port: 3306 -- cgit 1.2.3-korg