aboutsummaryrefslogtreecommitdiffstats
path: root/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
diff options
context:
space:
mode:
Diffstat (limited to 'datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java')
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java126
1 files changed, 105 insertions, 21 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
index 57edc364..0fef9ab4 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
@@ -1,24 +1,46 @@
-/*
- * ============LICENSE_START======================================================================
- * Copyright (C) 2019 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
*/
package org.onap.dcaegen2.collectors.datafile.tasks;
+import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.REQUEST_ID;
+import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.X_INVOCATION_ID;
+import static org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables.X_ONAP_REQUEST_ID;
+
+import com.google.gson.JsonElement;
+import com.google.gson.JsonParser;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.nio.file.Path;
import java.time.Duration;
import java.util.Map;
+import java.util.UUID;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.entity.ByteArrayEntity;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
+import org.onap.dcaegen2.collectors.datafile.model.CommonFunctions;
import org.onap.dcaegen2.collectors.datafile.model.ConsumerDmaapModel;
import org.onap.dcaegen2.collectors.datafile.model.logging.MdcVariables;
import org.onap.dcaegen2.collectors.datafile.service.HttpUtils;
@@ -26,17 +48,30 @@ import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerReact
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
+import org.springframework.core.io.FileSystemResource;
+import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
+
import reactor.core.publisher.Mono;
/**
+ * Publishes a file to the DataRouter.
+ *
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
public class DataRouterPublisher {
+ private static final String X_DMAAP_DR_META = "X-DMAAP-DR-META";
+ private static final String CONTENT_TYPE = "application/octet-stream";
+ private static final String NAME_JSON_TAG = "name";
+ private static final String INTERNAL_LOCATION_JSON_TAG = "internalLocation";
+ private static final String PUBLISH_TOPIC = "publish";
+ private static final String DEFAULT_FEED_ID = "1";
private static final Logger logger = LoggerFactory.getLogger(DataRouterPublisher.class);
private final AppConfig datafileAppConfig;
+ private DmaapProducerReactiveHttpClient dmaapProducerReactiveHttpClient;
public DataRouterPublisher(AppConfig datafileAppConfig) {
this.datafileAppConfig = datafileAppConfig;
@@ -44,25 +79,70 @@ public class DataRouterPublisher {
/**
- * Publish one file
- * @param consumerDmaapModel information about the file to publish
- * @param maxNumberOfRetries the maximal number of retries if the publishing fails
- * @param firstBackoffTimeout the time to delay the first retry
+ * Publish one file.
+ *
+ * @param model information about the file to publish
+ * @param numRetries the maximal number of retries if the publishing fails
+ * @param firstBackoff the time to delay the first retry
* @return the HTTP response status as a string
*/
public Mono<ConsumerDmaapModel> execute(ConsumerDmaapModel model, long numRetries, Duration firstBackoff,
Map<String, String> contextMap) {
MdcVariables.setMdcContextMap(contextMap);
logger.trace("Method called with arg {}", model);
- DmaapProducerReactiveHttpClient dmaapProducerReactiveHttpClient = resolveClient();
+ dmaapProducerReactiveHttpClient = resolveClient();
- //@formatter:off
return Mono.just(model)
.cache()
- .flatMap(m -> dmaapProducerReactiveHttpClient.getDmaapProducerResponse(m, contextMap))
- .flatMap(httpStatus -> handleHttpResponse(httpStatus, model, contextMap))
+ .flatMap(m -> publishFile(m, contextMap)) //
+ .flatMap(httpStatus -> handleHttpResponse(httpStatus, model, contextMap)) //
.retryBackoff(numRetries, firstBackoff);
- //@formatter:on
+ }
+
+ private Mono<HttpStatus> publishFile(ConsumerDmaapModel consumerDmaapModel, Map<String, String> contextMap) {
+ logger.trace("Entering publishFile with {}", consumerDmaapModel);
+ try {
+ HttpPut put = new HttpPut();
+ String requestId = MDC.get(REQUEST_ID);
+ put.addHeader(X_ONAP_REQUEST_ID, requestId);
+ String invocationId = UUID.randomUUID().toString();
+ put.addHeader(X_INVOCATION_ID, invocationId);
+
+ prepareHead(consumerDmaapModel, put);
+ prepareBody(consumerDmaapModel, put);
+ dmaapProducerReactiveHttpClient.addUserCredentialsToHead(put);
+
+ HttpResponse response =
+ dmaapProducerReactiveHttpClient.getDmaapProducerResponseWithRedirect(put, contextMap);
+ logger.trace(response.toString());
+ return Mono.just(HttpStatus.valueOf(response.getStatusLine().getStatusCode()));
+ } catch (Exception e) {
+ logger.error("Unable to send file to DataRouter. Data: {}", consumerDmaapModel.getInternalLocation(), e);
+ return Mono.error(e);
+ }
+ }
+
+ private void prepareHead(ConsumerDmaapModel model, HttpPut put) {
+ put.addHeader(HttpHeaders.CONTENT_TYPE, CONTENT_TYPE);
+ JsonElement metaData = new JsonParser().parse(CommonFunctions.createJsonBody(model));
+ metaData.getAsJsonObject().remove(NAME_JSON_TAG).getAsString();
+ metaData.getAsJsonObject().remove(INTERNAL_LOCATION_JSON_TAG);
+ put.addHeader(X_DMAAP_DR_META, metaData.toString());
+ put.setURI(getPublishUri(model.getInternalLocation().getFileName().toString()));
+ }
+
+ private void prepareBody(ConsumerDmaapModel model, HttpPut put) throws IOException {
+ Path fileLocation = model.getInternalLocation();
+ try (InputStream fileInputStream = createInputStream(fileLocation)) {
+ put.setEntity(new ByteArrayEntity(IOUtils.toByteArray(fileInputStream)));
+ }
+ }
+
+ private URI getPublishUri(String fileName) {
+ return dmaapProducerReactiveHttpClient.getBaseUri() //
+ .pathSegment(PUBLISH_TOPIC) //
+ .pathSegment(DEFAULT_FEED_ID) //
+ .pathSegment(fileName).build();
}
private Mono<ConsumerDmaapModel> handleHttpResponse(HttpStatus response, ConsumerDmaapModel model,
@@ -77,6 +157,10 @@ public class DataRouterPublisher {
}
}
+ InputStream createInputStream(Path filePath) throws IOException {
+ FileSystemResource realResource = new FileSystemResource(filePath);
+ return realResource.getInputStream();
+ }
DmaapPublisherConfiguration resolveConfiguration() {
return datafileAppConfig.getDmaapPublisherConfiguration();