diff options
author | k.kedron <k.kedron@partner.samsung.com> | 2021-08-17 13:54:29 +0200 |
---|---|---|
committer | Pawel Slowikowski <p.slowikows2@samsung.com> | 2021-08-31 08:57:17 +0000 |
commit | e26f1224fb91c1ca49f4b4b37a4a5de06ba3a7ad (patch) | |
tree | 12cd3614320a00dfb676449fe0599077315424b6 /datacollector/src/main | |
parent | 007b856430d1dbc11e21192913326071d3f3055f (diff) |
Many topics support
Issue-ID: INT-1947
Signed-off-by: Krystian Kedron <k.kedron@partner.samsung.com>
Change-Id: I1458c853ea7fabe8b393ec6f7e7ce8c5d13953f6
Diffstat (limited to 'datacollector/src/main')
4 files changed, 26 insertions, 13 deletions
diff --git a/datacollector/src/main/java/org/onap/rapp/datacollector/service/VesRetrievalService.java b/datacollector/src/main/java/org/onap/rapp/datacollector/service/VesRetrievalService.java index 379e32d..a3825f6 100644 --- a/datacollector/src/main/java/org/onap/rapp/datacollector/service/VesRetrievalService.java +++ b/datacollector/src/main/java/org/onap/rapp/datacollector/service/VesRetrievalService.java @@ -21,6 +21,7 @@ import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; +import java.util.stream.Stream; import org.onap.rapp.datacollector.entity.ves.AdditionalMeasurementValues; import org.onap.rapp.datacollector.entity.ves.AdditionalMeasurements; import org.onap.rapp.datacollector.entity.ves.Event; @@ -61,20 +62,25 @@ public class VesRetrievalService implements DmaapRestReader { @Override public Collection<String> retrieveEvents() { - logger.info("Reaching from dmaap: {}", config.getMeasurementsTopicUrl()); + logger.info("Reaching from dmaap: {}", config.getMeasurementsTopicUrls()); + return config.getMeasurementsTopicUrls().stream().flatMap(this::retrieveEventsFromTopic) + .collect(Collectors.toList()); + } + + private Stream<String> retrieveEventsFromTopic(String topic) { try { - ResponseEntity<String[]> responseEntity = - restTemplate.exchange(config.getMeasurementsTopicUrl(), HttpMethod.GET, - new HttpEntity<String>(createHeaders(config.getDmaapProperties().getUsername(), config.getDmaapProperties().getPassword())), - String[].class); + ResponseEntity<String[]> responseEntity = restTemplate.exchange(topic, HttpMethod.GET, + new HttpEntity<String>(createHeaders(config.getDmaapProperties().getUsername(), + config.getDmaapProperties().getPassword())), String[].class); if (responseEntity.hasBody()) { String[] events = responseEntity.getBody(); - return Arrays.stream(events).collect(Collectors.toList()); + return Arrays.stream(events); } } catch (RestClientException ex) { logger.error("Failed to reach to dmaap", ex); } - return Collections.emptyList(); + + return Arrays.stream(new String[0]); } private HttpHeaders createHeaders(String username, String password) { diff --git a/datacollector/src/main/java/org/onap/rapp/datacollector/service/configuration/DmaapProperties.java b/datacollector/src/main/java/org/onap/rapp/datacollector/service/configuration/DmaapProperties.java index 3560e89..a664e8e 100644 --- a/datacollector/src/main/java/org/onap/rapp/datacollector/service/configuration/DmaapProperties.java +++ b/datacollector/src/main/java/org/onap/rapp/datacollector/service/configuration/DmaapProperties.java @@ -13,6 +13,9 @@ package org.onap.rapp.datacollector.service.configuration; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; import org.springframework.boot.context.properties.ConfigurationProperties; import lombok.Getter; @@ -28,10 +31,11 @@ public class DmaapProperties { private String username; private String password; private int port; - private String measurementsTopic; + private List<String> measurementsTopics = new ArrayList<>(); - public String getMeasurementsTopicUrl() { - return String.format("%s://%s:%d/%s", protocol, host, port, measurementsTopic); + public List<String> getMeasurementsTopicUrls() { + return measurementsTopics.stream().map(topic -> String.format("%s://%s:%d/%s", protocol, host, port, topic)) + .collect(Collectors.toList()); } } diff --git a/datacollector/src/main/java/org/onap/rapp/datacollector/service/configuration/DmaapRestReaderConfiguration.java b/datacollector/src/main/java/org/onap/rapp/datacollector/service/configuration/DmaapRestReaderConfiguration.java index ab1bd0a..a3b2536 100644 --- a/datacollector/src/main/java/org/onap/rapp/datacollector/service/configuration/DmaapRestReaderConfiguration.java +++ b/datacollector/src/main/java/org/onap/rapp/datacollector/service/configuration/DmaapRestReaderConfiguration.java @@ -23,6 +23,7 @@ import java.security.KeyStoreException; import java.security.NoSuchAlgorithmException; import java.security.cert.CertificateException; import java.security.cert.X509Certificate; +import java.util.List; import javax.net.ssl.HostnameVerifier; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLSession; @@ -118,8 +119,8 @@ public class DmaapRestReaderConfiguration { this.databaseProperties = databaseProperties; } - public String getMeasurementsTopicUrl() { - return dmaapProperties.getMeasurementsTopicUrl(); + public List<String> getMeasurementsTopicUrls() { + return dmaapProperties.getMeasurementsTopicUrls(); } public DmaapProperties getDmaapProperties() { diff --git a/datacollector/src/main/resources/application.yml b/datacollector/src/main/resources/application.yml index 502d8de..01919fa 100644 --- a/datacollector/src/main/resources/application.yml +++ b/datacollector/src/main/resources/application.yml @@ -6,7 +6,9 @@ dmaap: port: 8181 username: dcae@dcae.onap.org password: demo123456! - measurements-topic: "measurements" + measurements-topics: + - measurements + - measurements2 database: host: mariadb-host port: 3306 |