aboutsummaryrefslogtreecommitdiffstats
path: root/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
diff options
context:
space:
mode:
Diffstat (limited to 'datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java')
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java47
1 files changed, 27 insertions, 20 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
index e5dd01e9..1d6baa65 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
@@ -25,6 +25,7 @@ import com.google.gson.JsonParser;
import java.io.IOException;
import java.io.InputStream;
+import java.net.MalformedURLException;
import java.net.URI;
import java.nio.file.Path;
import java.time.Duration;
@@ -34,6 +35,8 @@ import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.ByteArrayEntity;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
+import org.onap.dcaegen2.collectors.datafile.configuration.PublisherConfiguration;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation;
import org.onap.dcaegen2.collectors.datafile.model.JsonSerializer;
import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext;
@@ -46,6 +49,7 @@ import org.slf4j.MDC;
import org.springframework.core.io.FileSystemResource;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
+import org.springframework.web.util.DefaultUriBuilderFactory;
import reactor.core.publisher.Mono;
@@ -58,12 +62,9 @@ import reactor.core.publisher.Mono;
public class DataRouterPublisher {
private static final String X_DMAAP_DR_META = "X-DMAAP-DR-META";
private static final String CONTENT_TYPE = "application/octet-stream";
- private static final String PUBLISH_TOPIC = "publish";
- private static final String DEFAULT_FEED_ID = "1";
private static final Logger logger = LoggerFactory.getLogger(DataRouterPublisher.class);
private final AppConfig datafileAppConfig;
- private DmaapProducerHttpClient dmaapProducerReactiveHttpClient;
public DataRouterPublisher(AppConfig datafileAppConfig) {
this.datafileAppConfig = datafileAppConfig;
@@ -80,7 +81,6 @@ public class DataRouterPublisher {
public Mono<FilePublishInformation> publishFile(FilePublishInformation publishInfo, long numRetries,
Duration firstBackoff) {
MDC.setContextMap(publishInfo.getContext());
- dmaapProducerReactiveHttpClient = resolveClient();
return Mono.just(publishInfo) //
.cache() //
.flatMap(this::publishFile) //
@@ -92,13 +92,14 @@ public class DataRouterPublisher {
MDC.setContextMap(publishInfo.getContext());
logger.trace("Entering publishFile with {}", publishInfo);
try {
+ DmaapProducerHttpClient dmaapProducerHttpClient = resolveClient(publishInfo.getChangeIdentifier());
HttpPut put = new HttpPut();
prepareHead(publishInfo, put);
prepareBody(publishInfo, put);
- dmaapProducerReactiveHttpClient.addUserCredentialsToHead(put);
+ dmaapProducerHttpClient.addUserCredentialsToHead(put);
HttpResponse response =
- dmaapProducerReactiveHttpClient.getDmaapProducerResponseWithRedirect(put, publishInfo.getContext());
+ dmaapProducerHttpClient.getDmaapProducerResponseWithRedirect(put, publishInfo.getContext());
logger.trace("{}", response);
return Mono.just(HttpStatus.valueOf(response.getStatusLine().getStatusCode()));
} catch (Exception e) {
@@ -107,11 +108,18 @@ public class DataRouterPublisher {
}
}
- private void prepareHead(FilePublishInformation publishInfo, HttpPut put) {
+ private void prepareHead(FilePublishInformation publishInfo, HttpPut put) throws DatafileTaskException {
+
put.addHeader(HttpHeaders.CONTENT_TYPE, CONTENT_TYPE);
JsonElement metaData = new JsonParser().parse(JsonSerializer.createJsonBodyForDataRouter(publishInfo));
put.addHeader(X_DMAAP_DR_META, metaData.toString());
- put.setURI(getPublishUri(publishInfo.getName()));
+ URI uri = new DefaultUriBuilderFactory(
+ datafileAppConfig.getPublisherConfiguration(publishInfo.getChangeIdentifier()).publishUrl()) //
+ .builder() //
+ .pathSegment(publishInfo.getName()) //
+ .build();
+ put.setURI(uri);
+
MappedDiagnosticContext.appendTraceInfo(put);
}
@@ -122,14 +130,7 @@ public class DataRouterPublisher {
}
}
- private URI getPublishUri(String fileName) {
- return dmaapProducerReactiveHttpClient.getBaseUri() //
- .pathSegment(PUBLISH_TOPIC) //
- .pathSegment(DEFAULT_FEED_ID) //
- .pathSegment(fileName).build();
- }
-
- private Mono<FilePublishInformation> handleHttpResponse(HttpStatus response, FilePublishInformation publishInfo) {
+ private static Mono<FilePublishInformation> handleHttpResponse(HttpStatus response, FilePublishInformation publishInfo) {
MDC.setContextMap(publishInfo.getContext());
if (HttpUtils.isSuccessfulResponseCode(response.value())) {
logger.trace("Publish to DR successful!");
@@ -145,11 +146,17 @@ public class DataRouterPublisher {
return realResource.getInputStream();
}
- DmaapPublisherConfiguration resolveConfiguration() {
- return datafileAppConfig.getDmaapPublisherConfiguration();
+ PublisherConfiguration resolveConfiguration(String changeIdentifer) throws DatafileTaskException {
+ return datafileAppConfig.getPublisherConfiguration(changeIdentifer);
}
- DmaapProducerHttpClient resolveClient() {
- return new DmaapProducerHttpClient(resolveConfiguration());
+ DmaapProducerHttpClient resolveClient(String changeIdentifier) throws DatafileTaskException {
+ try {
+ DmaapPublisherConfiguration cfg = resolveConfiguration(changeIdentifier).toDmaap();
+ return new DmaapProducerHttpClient(cfg);
+ } catch (MalformedURLException e) {
+ throw new DatafileTaskException("Cannot resolve producer client", e);
+ }
+
}
}