diff options
Diffstat (limited to 'datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java')
-rw-r--r-- | datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java | 47 |
1 files changed, 27 insertions, 20 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java index e5dd01e9..1d6baa65 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java @@ -25,6 +25,7 @@ import com.google.gson.JsonParser; import java.io.IOException; import java.io.InputStream; +import java.net.MalformedURLException; import java.net.URI; import java.nio.file.Path; import java.time.Duration; @@ -34,6 +35,8 @@ import org.apache.http.HttpResponse; import org.apache.http.client.methods.HttpPut; import org.apache.http.entity.ByteArrayEntity; import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; +import org.onap.dcaegen2.collectors.datafile.configuration.PublisherConfiguration; +import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation; import org.onap.dcaegen2.collectors.datafile.model.JsonSerializer; import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext; @@ -46,6 +49,7 @@ import org.slf4j.MDC; import org.springframework.core.io.FileSystemResource; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpStatus; +import org.springframework.web.util.DefaultUriBuilderFactory; import reactor.core.publisher.Mono; @@ -58,12 +62,9 @@ import reactor.core.publisher.Mono; public class DataRouterPublisher { private static final String X_DMAAP_DR_META = "X-DMAAP-DR-META"; private static final String CONTENT_TYPE = "application/octet-stream"; - private static final String PUBLISH_TOPIC = "publish"; - private static final String DEFAULT_FEED_ID = "1"; private static final Logger logger = LoggerFactory.getLogger(DataRouterPublisher.class); private final AppConfig datafileAppConfig; - private DmaapProducerHttpClient dmaapProducerReactiveHttpClient; public DataRouterPublisher(AppConfig datafileAppConfig) { this.datafileAppConfig = datafileAppConfig; @@ -80,7 +81,6 @@ public class DataRouterPublisher { public Mono<FilePublishInformation> publishFile(FilePublishInformation publishInfo, long numRetries, Duration firstBackoff) { MDC.setContextMap(publishInfo.getContext()); - dmaapProducerReactiveHttpClient = resolveClient(); return Mono.just(publishInfo) // .cache() // .flatMap(this::publishFile) // @@ -92,13 +92,14 @@ public class DataRouterPublisher { MDC.setContextMap(publishInfo.getContext()); logger.trace("Entering publishFile with {}", publishInfo); try { + DmaapProducerHttpClient dmaapProducerHttpClient = resolveClient(publishInfo.getChangeIdentifier()); HttpPut put = new HttpPut(); prepareHead(publishInfo, put); prepareBody(publishInfo, put); - dmaapProducerReactiveHttpClient.addUserCredentialsToHead(put); + dmaapProducerHttpClient.addUserCredentialsToHead(put); HttpResponse response = - dmaapProducerReactiveHttpClient.getDmaapProducerResponseWithRedirect(put, publishInfo.getContext()); + dmaapProducerHttpClient.getDmaapProducerResponseWithRedirect(put, publishInfo.getContext()); logger.trace("{}", response); return Mono.just(HttpStatus.valueOf(response.getStatusLine().getStatusCode())); } catch (Exception e) { @@ -107,11 +108,18 @@ public class DataRouterPublisher { } } - private void prepareHead(FilePublishInformation publishInfo, HttpPut put) { + private void prepareHead(FilePublishInformation publishInfo, HttpPut put) throws DatafileTaskException { + put.addHeader(HttpHeaders.CONTENT_TYPE, CONTENT_TYPE); JsonElement metaData = new JsonParser().parse(JsonSerializer.createJsonBodyForDataRouter(publishInfo)); put.addHeader(X_DMAAP_DR_META, metaData.toString()); - put.setURI(getPublishUri(publishInfo.getName())); + URI uri = new DefaultUriBuilderFactory( + datafileAppConfig.getPublisherConfiguration(publishInfo.getChangeIdentifier()).publishUrl()) // + .builder() // + .pathSegment(publishInfo.getName()) // + .build(); + put.setURI(uri); + MappedDiagnosticContext.appendTraceInfo(put); } @@ -122,14 +130,7 @@ public class DataRouterPublisher { } } - private URI getPublishUri(String fileName) { - return dmaapProducerReactiveHttpClient.getBaseUri() // - .pathSegment(PUBLISH_TOPIC) // - .pathSegment(DEFAULT_FEED_ID) // - .pathSegment(fileName).build(); - } - - private Mono<FilePublishInformation> handleHttpResponse(HttpStatus response, FilePublishInformation publishInfo) { + private static Mono<FilePublishInformation> handleHttpResponse(HttpStatus response, FilePublishInformation publishInfo) { MDC.setContextMap(publishInfo.getContext()); if (HttpUtils.isSuccessfulResponseCode(response.value())) { logger.trace("Publish to DR successful!"); @@ -145,11 +146,17 @@ public class DataRouterPublisher { return realResource.getInputStream(); } - DmaapPublisherConfiguration resolveConfiguration() { - return datafileAppConfig.getDmaapPublisherConfiguration(); + PublisherConfiguration resolveConfiguration(String changeIdentifer) throws DatafileTaskException { + return datafileAppConfig.getPublisherConfiguration(changeIdentifer); } - DmaapProducerHttpClient resolveClient() { - return new DmaapProducerHttpClient(resolveConfiguration()); + DmaapProducerHttpClient resolveClient(String changeIdentifier) throws DatafileTaskException { + try { + DmaapPublisherConfiguration cfg = resolveConfiguration(changeIdentifier).toDmaap(); + return new DmaapProducerHttpClient(cfg); + } catch (MalformedURLException e) { + throw new DatafileTaskException("Cannot resolve producer client", e); + } + } } |