summaryrefslogtreecommitdiffstats
path: root/datacollector/src/main
diff options
context:
space:
mode:
authork.kedron <k.kedron@partner.samsung.com>2021-08-17 13:54:29 +0200
committerPawel Slowikowski <p.slowikows2@samsung.com>2021-08-31 08:57:17 +0000
commite26f1224fb91c1ca49f4b4b37a4a5de06ba3a7ad (patch)
tree12cd3614320a00dfb676449fe0599077315424b6 /datacollector/src/main
parent007b856430d1dbc11e21192913326071d3f3055f (diff)
Many topics support
Issue-ID: INT-1947 Signed-off-by: Krystian Kedron <k.kedron@partner.samsung.com> Change-Id: I1458c853ea7fabe8b393ec6f7e7ce8c5d13953f6
Diffstat (limited to 'datacollector/src/main')
-rw-r--r--datacollector/src/main/java/org/onap/rapp/datacollector/service/VesRetrievalService.java20
-rw-r--r--datacollector/src/main/java/org/onap/rapp/datacollector/service/configuration/DmaapProperties.java10
-rw-r--r--datacollector/src/main/java/org/onap/rapp/datacollector/service/configuration/DmaapRestReaderConfiguration.java5
-rw-r--r--datacollector/src/main/resources/application.yml4
4 files changed, 26 insertions, 13 deletions
diff --git a/datacollector/src/main/java/org/onap/rapp/datacollector/service/VesRetrievalService.java b/datacollector/src/main/java/org/onap/rapp/datacollector/service/VesRetrievalService.java
index 379e32d..a3825f6 100644
--- a/datacollector/src/main/java/org/onap/rapp/datacollector/service/VesRetrievalService.java
+++ b/datacollector/src/main/java/org/onap/rapp/datacollector/service/VesRetrievalService.java
@@ -21,6 +21,7 @@ import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
import org.onap.rapp.datacollector.entity.ves.AdditionalMeasurementValues;
import org.onap.rapp.datacollector.entity.ves.AdditionalMeasurements;
import org.onap.rapp.datacollector.entity.ves.Event;
@@ -61,20 +62,25 @@ public class VesRetrievalService implements DmaapRestReader {
@Override
public Collection<String> retrieveEvents() {
- logger.info("Reaching from dmaap: {}", config.getMeasurementsTopicUrl());
+ logger.info("Reaching from dmaap: {}", config.getMeasurementsTopicUrls());
+ return config.getMeasurementsTopicUrls().stream().flatMap(this::retrieveEventsFromTopic)
+ .collect(Collectors.toList());
+ }
+
+ private Stream<String> retrieveEventsFromTopic(String topic) {
try {
- ResponseEntity<String[]> responseEntity =
- restTemplate.exchange(config.getMeasurementsTopicUrl(), HttpMethod.GET,
- new HttpEntity<String>(createHeaders(config.getDmaapProperties().getUsername(), config.getDmaapProperties().getPassword())),
- String[].class);
+ ResponseEntity<String[]> responseEntity = restTemplate.exchange(topic, HttpMethod.GET,
+ new HttpEntity<String>(createHeaders(config.getDmaapProperties().getUsername(),
+ config.getDmaapProperties().getPassword())), String[].class);
if (responseEntity.hasBody()) {
String[] events = responseEntity.getBody();
- return Arrays.stream(events).collect(Collectors.toList());
+ return Arrays.stream(events);
}
} catch (RestClientException ex) {
logger.error("Failed to reach to dmaap", ex);
}
- return Collections.emptyList();
+
+ return Arrays.stream(new String[0]);
}
private HttpHeaders createHeaders(String username, String password) {
diff --git a/datacollector/src/main/java/org/onap/rapp/datacollector/service/configuration/DmaapProperties.java b/datacollector/src/main/java/org/onap/rapp/datacollector/service/configuration/DmaapProperties.java
index 3560e89..a664e8e 100644
--- a/datacollector/src/main/java/org/onap/rapp/datacollector/service/configuration/DmaapProperties.java
+++ b/datacollector/src/main/java/org/onap/rapp/datacollector/service/configuration/DmaapProperties.java
@@ -13,6 +13,9 @@
package org.onap.rapp.datacollector.service.configuration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
import org.springframework.boot.context.properties.ConfigurationProperties;
import lombok.Getter;
@@ -28,10 +31,11 @@ public class DmaapProperties {
private String username;
private String password;
private int port;
- private String measurementsTopic;
+ private List<String> measurementsTopics = new ArrayList<>();
- public String getMeasurementsTopicUrl() {
- return String.format("%s://%s:%d/%s", protocol, host, port, measurementsTopic);
+ public List<String> getMeasurementsTopicUrls() {
+ return measurementsTopics.stream().map(topic -> String.format("%s://%s:%d/%s", protocol, host, port, topic))
+ .collect(Collectors.toList());
}
}
diff --git a/datacollector/src/main/java/org/onap/rapp/datacollector/service/configuration/DmaapRestReaderConfiguration.java b/datacollector/src/main/java/org/onap/rapp/datacollector/service/configuration/DmaapRestReaderConfiguration.java
index ab1bd0a..a3b2536 100644
--- a/datacollector/src/main/java/org/onap/rapp/datacollector/service/configuration/DmaapRestReaderConfiguration.java
+++ b/datacollector/src/main/java/org/onap/rapp/datacollector/service/configuration/DmaapRestReaderConfiguration.java
@@ -23,6 +23,7 @@ import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
+import java.util.List;
import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSession;
@@ -118,8 +119,8 @@ public class DmaapRestReaderConfiguration {
this.databaseProperties = databaseProperties;
}
- public String getMeasurementsTopicUrl() {
- return dmaapProperties.getMeasurementsTopicUrl();
+ public List<String> getMeasurementsTopicUrls() {
+ return dmaapProperties.getMeasurementsTopicUrls();
}
public DmaapProperties getDmaapProperties() {
diff --git a/datacollector/src/main/resources/application.yml b/datacollector/src/main/resources/application.yml
index 502d8de..01919fa 100644
--- a/datacollector/src/main/resources/application.yml
+++ b/datacollector/src/main/resources/application.yml
@@ -6,7 +6,9 @@ dmaap:
port: 8181
username: dcae@dcae.onap.org
password: demo123456!
- measurements-topic: "measurements"
+ measurements-topics:
+ - measurements
+ - measurements2
database:
host: mariadb-host
port: 3306