aboutsummaryrefslogtreecommitdiffstats
path: root/datafile-app-server
diff options
context:
space:
mode:
Diffstat (limited to 'datafile-app-server')
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java15
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java16
-rw-r--r--datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java28
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java10
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java27
5 files changed, 50 insertions, 46 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java
index 3c606deb..a8f79ea1 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java
@@ -123,12 +123,11 @@ public class JsonMessageParser {
}
private Flux<FileReadyMessage> createMessages(Flux<JsonObject> jsonObject) {
- return jsonObject.flatMap(monoJsonP -> !containsNotificationFields(monoJsonP)
- ? logErrorAndReturnEmptyMessageFlux("Incorrect JsonObject - missing header. " + jsonObject)
- : transformMessages(monoJsonP));
+ return jsonObject.flatMap(monoJsonP -> containsNotificationFields(monoJsonP) ? transformMessages(monoJsonP)
+ : logErrorAndReturnEmptyMessageFlux("Incorrect JsonObject - missing header. " + jsonObject));
}
- private Flux<FileReadyMessage> transformMessages(JsonObject message) {
+ private Mono<FileReadyMessage> transformMessages(JsonObject message) {
Optional<MessageMetaData> optionalMessageMetaData = getMessageMetaData(message);
if (optionalMessageMetaData.isPresent()) {
JsonObject notificationFields = message.getAsJsonObject(EVENT).getAsJsonObject(NOTIFICATION_FIELDS);
@@ -138,22 +137,22 @@ public class JsonMessageParser {
if (!allFileDataFromJson.isEmpty()) {
MessageMetaData messageMetaData = optionalMessageMetaData.get();
// @formatter:off
- return Flux.just(ImmutableFileReadyMessage.builder()
+ return Mono.just(ImmutableFileReadyMessage.builder()
.pnfName(messageMetaData.sourceName())
.messageMetaData(messageMetaData)
.files(allFileDataFromJson)
.build());
// @formatter:on
} else {
- return Flux.empty();
+ return Mono.empty();
}
}
logger.error("Unable to collect file from xNF. Missing arrayOfNamedHashMap in message. {}", message);
- return Flux.empty();
+ return Mono.empty();
}
logger.error("Unable to collect file from xNF. FileReady event has incorrect JsonObject. {}", message);
- return Flux.empty();
+ return Mono.empty();
}
private Optional<MessageMetaData> getMessageMetaData(JsonObject message) {
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
index 338c8323..4c0dcce5 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java
@@ -27,7 +27,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpStatus;
-import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18
@@ -50,27 +50,27 @@ public class DataRouterPublisher {
* @param firstBackoffTimeout the time to delay the first retry
* @return the HTTP response status as a string
*/
- public Flux<ConsumerDmaapModel> execute(ConsumerDmaapModel model, long numRetries, Duration firstBackoff) {
+ public Mono<ConsumerDmaapModel> execute(ConsumerDmaapModel model, long numRetries, Duration firstBackoff) {
logger.trace("Method called with arg {}", model);
DmaapProducerReactiveHttpClient dmaapProducerReactiveHttpClient = resolveClient();
//@formatter:off
- return Flux.just(model)
- .cache(1)
+ return Mono.just(model)
+ .cache()
.flatMap(dmaapProducerReactiveHttpClient::getDmaapProducerResponse)
.flatMap(httpStatus -> handleHttpResponse(httpStatus, model))
.retryBackoff(numRetries, firstBackoff);
//@formatter:on
}
- private Flux<ConsumerDmaapModel> handleHttpResponse(HttpStatus response, ConsumerDmaapModel model) {
+ private Mono<ConsumerDmaapModel> handleHttpResponse(HttpStatus response, ConsumerDmaapModel model) {
if (HttpUtils.isSuccessfulResponseCode(response.value())) {
logger.trace("Publish to DR successful!");
- return Flux.just(model);
+ return Mono.just(model);
} else {
- logger.warn("Publish to DR unsuccessful, response code: " + response);
- return Flux.error(new Exception("Publish to DR unsuccessful, response code: " + response));
+ logger.warn("Publish to DR unsuccessful, response code: {}", response);
+ return Mono.error(new Exception("Publish to DR unsuccessful, response code: " + response));
}
}
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
index f22c7bf9..37b7a559 100644
--- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
+++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java
@@ -98,7 +98,7 @@ public class ScheduledTasks {
.doOnNext(fileData -> taskCounter.incrementAndGet())
.flatMap(this::collectFileFromXnf)
.flatMap(this::publishToDataRouter)
- .flatMap(model -> deleteFile(Paths.get(model.getInternalLocation())))
+ .doOnNext(model -> deleteFile(Paths.get(model.getInternalLocation())))
.doOnNext(model -> taskCounter.decrementAndGet())
.sequential()
.subscribe(this::onSuccess, this::onError, this::onComplete);
@@ -109,8 +109,8 @@ public class ScheduledTasks {
logger.info("Datafile tasks have been completed");
}
- private void onSuccess(Path localFile) {
- logger.info("Datafile consumed tasks." + localFile);
+ private void onSuccess(ConsumerDmaapModel model) {
+ logger.info("Datafile consumed tasks {}", model.getInternalLocation());
}
private void onError(Throwable throwable) {
@@ -138,18 +138,19 @@ public class ScheduledTasks {
return fileCollect.collectorTask
.execute(fileCollect.fileData, fileCollect.metaData, maxNUmberOfRetries, initialRetryTimeout)
- .onErrorResume(exception -> handleCollectFailure(fileCollect.fileData, exception));
+ .onErrorResume(exception -> handleCollectFailure(fileCollect.fileData));
}
- private Mono<ConsumerDmaapModel> handleCollectFailure(FileData fileData, Throwable exception) {
- logger.error("File fetching failed: {}, reason: {}", fileData.name(), exception.getMessage());
- deleteFile(fileData.getLocalFileName());
- alreadyPublishedFiles.remove(fileData.getLocalFileName());
+ private Mono<ConsumerDmaapModel> handleCollectFailure(FileData fileData) {
+ Path localFileName = fileData.getLocalFileName();
+ logger.error("File fetching failed: {}", localFileName);
+ deleteFile(localFileName);
+ alreadyPublishedFiles.remove(localFileName);
taskCounter.decrementAndGet();
return Mono.empty();
}
- private Flux<ConsumerDmaapModel> publishToDataRouter(ConsumerDmaapModel model) {
+ private Mono<ConsumerDmaapModel> publishToDataRouter(ConsumerDmaapModel model) {
final long maxNumberOfRetries = 3;
final Duration initialRetryTimeout = Duration.ofSeconds(5);
@@ -160,13 +161,13 @@ public class ScheduledTasks {
}
- private Flux<ConsumerDmaapModel> handlePublishFailure(ConsumerDmaapModel model, Throwable exception) {
+ private Mono<ConsumerDmaapModel> handlePublishFailure(ConsumerDmaapModel model, Throwable exception) {
logger.error("File publishing failed: {}, exception: {}", model.getName(), exception);
Path internalFileName = Paths.get(model.getInternalLocation());
deleteFile(internalFileName);
alreadyPublishedFiles.remove(internalFileName);
taskCounter.decrementAndGet();
- return Flux.empty();
+ return Mono.empty();
}
private Flux<FileReadyMessage> consumeMessagesFromDmaap() {
@@ -179,7 +180,7 @@ public class ScheduledTasks {
final DMaaPMessageConsumerTask messageConsumerTask =
new DMaaPMessageConsumerTask(this.applicationConfiguration);
return messageConsumerTask.execute()
- .onErrorResume(exception -> handleConsumeMessageFailure(exception));
+ .onErrorResume(this::handleConsumeMessageFailure);
}
private Flux<FileReadyMessage> handleConsumeMessageFailure(Throwable exception) {
@@ -187,13 +188,12 @@ public class ScheduledTasks {
return Flux.empty();
}
- private Flux<Path> deleteFile(Path localFile) {
+ private void deleteFile(Path localFile) {
logger.trace("Deleting file: {}", localFile);
try {
Files.delete(localFile);
} catch (Exception e) {
logger.warn("Could not delete file: {}, {}", localFile, e);
}
- return Flux.just(localFile);
}
}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java
index 73511d19..d2240f18 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java
@@ -37,7 +37,7 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPub
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration;
import org.springframework.http.HttpStatus;
-import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
/**
@@ -96,7 +96,7 @@ class DataRouterPublisherTest {
@Test
public void whenPassedObjectFits_ReturnsCorrectStatus() {
- prepareMocksForTests(Flux.just(HttpStatus.OK));
+ prepareMocksForTests(Mono.just(HttpStatus.OK));
StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel, 1, Duration.ofSeconds(0)))
.expectNext(consumerDmaapModel).verifyComplete();
@@ -107,7 +107,7 @@ class DataRouterPublisherTest {
@Test
public void whenPassedObjectFits_firstFailsThenSucceeds() {
- prepareMocksForTests(Flux.just(HttpStatus.BAD_GATEWAY), Flux.just(HttpStatus.OK));
+ prepareMocksForTests(Mono.just(HttpStatus.BAD_GATEWAY), Mono.just(HttpStatus.OK));
StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel, 1, Duration.ofSeconds(0)))
.expectNext(consumerDmaapModel).verifyComplete();
@@ -118,7 +118,7 @@ class DataRouterPublisherTest {
@Test
public void whenPassedObjectFits_firstFailsThenFails() {
- prepareMocksForTests(Flux.just(HttpStatus.BAD_GATEWAY), Flux.just(HttpStatus.BAD_GATEWAY));
+ prepareMocksForTests(Mono.just(HttpStatus.BAD_GATEWAY), Mono.just(HttpStatus.BAD_GATEWAY));
StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel, 1, Duration.ofSeconds(0)))
.expectErrorMessage("Retries exhausted: 1/1").verify();
@@ -128,7 +128,7 @@ class DataRouterPublisherTest {
}
@SafeVarargs
- final void prepareMocksForTests(Flux<HttpStatus> firstResponse, Flux<HttpStatus>... nextHttpResponses) {
+ final void prepareMocksForTests(Mono<HttpStatus> firstResponse, Mono<HttpStatus>... nextHttpResponses) {
dMaaPProducerReactiveHttpClient = mock(DmaapProducerReactiveHttpClient.class);
when(dMaaPProducerReactiveHttpClient.getDmaapProducerResponse(any())).thenReturn(firstResponse,
nextHttpResponses);
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java
index 10c5b167..804b46e9 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java
@@ -67,7 +67,12 @@ public class XnfCollectorTaskImplTest {
private static final String PWD = "pwd";
private static final String FTPES_LOCATION =
FTPES_SCHEME + USER + ":" + PWD + "@" + SERVER_ADDRESS + ":" + PORT_22 + REMOTE_FILE_LOCATION;
+
+ private static final String FTPES_LOCATION_NO_PORT =
+ FTPES_SCHEME + USER + ":" + PWD + "@" + SERVER_ADDRESS + REMOTE_FILE_LOCATION;
private static final String SFTP_LOCATION = SFTP_SCHEME + SERVER_ADDRESS + ":" + PORT_22 + REMOTE_FILE_LOCATION;
+ private static final String SFTP_LOCATION_NO_PORT = SFTP_SCHEME + SERVER_ADDRESS + REMOTE_FILE_LOCATION;
+
private static final String GZIP_COMPRESSION = "gzip";
private static final String MEAS_COLLECT_FILE_FORMAT_TYPE = "org.3GPP.32.435#measCollec";
private static final String FILE_FORMAT_VERSION = "V10";
@@ -100,11 +105,11 @@ public class XnfCollectorTaskImplTest {
// @formatter:on
}
- private FileData createFileData() {
+ private FileData createFileData(String location) {
// @formatter:off
return ImmutableFileData.builder()
.name(PM_FILE_NAME)
- .location(FTPES_LOCATION)
+ .location(location)
.compression(GZIP_COMPRESSION)
.fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
.fileFormatVersion(FILE_FORMAT_VERSION)
@@ -113,7 +118,7 @@ public class XnfCollectorTaskImplTest {
// @formatter:on
}
- private ConsumerDmaapModel createExpectedConsumerDmaapModel() {
+ private ConsumerDmaapModel createExpectedConsumerDmaapModel(String location) {
// @formatter:off
return ImmutableConsumerDmaapModel.builder()
.productName(PRODUCT_NAME)
@@ -123,7 +128,7 @@ public class XnfCollectorTaskImplTest {
.startEpochMicrosec(START_EPOCH_MICROSEC)
.timeZoneOffset(TIME_ZONE_OFFSET)
.name(PM_FILE_NAME)
- .location(FTPES_LOCATION)
+ .location(location)
.internalLocation(LOCAL_FILE_LOCATION.toString())
.compression(GZIP_COMPRESSION)
.fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
@@ -146,9 +151,9 @@ public class XnfCollectorTaskImplTest {
FileCollector collectorUndetTest =
new FileCollector(appConfigMock, ftpsClientMock, sftpClientMock);
- FileData fileData = createFileData();
+ FileData fileData = createFileData(FTPES_LOCATION_NO_PORT);
- ConsumerDmaapModel expectedConsumerDmaapModel = createExpectedConsumerDmaapModel();
+ ConsumerDmaapModel expectedConsumerDmaapModel = createExpectedConsumerDmaapModel(FTPES_LOCATION_NO_PORT);
StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0)))
.expectNext(expectedConsumerDmaapModel).verifyComplete();
@@ -168,7 +173,7 @@ public class XnfCollectorTaskImplTest {
// @formatter:off
FileData fileData = ImmutableFileData.builder()
.name(PM_FILE_NAME)
- .location(SFTP_LOCATION)
+ .location(SFTP_LOCATION_NO_PORT)
.compression(GZIP_COMPRESSION)
.fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
.fileFormatVersion(FILE_FORMAT_VERSION)
@@ -183,7 +188,7 @@ public class XnfCollectorTaskImplTest {
.startEpochMicrosec(START_EPOCH_MICROSEC)
.timeZoneOffset(TIME_ZONE_OFFSET)
.name(PM_FILE_NAME)
- .location(SFTP_LOCATION)
+ .location(SFTP_LOCATION_NO_PORT)
.internalLocation(LOCAL_FILE_LOCATION.toString())
.compression(GZIP_COMPRESSION)
.fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
@@ -202,7 +207,7 @@ public class XnfCollectorTaskImplTest {
public void whenFtpesFileAlwaysFail_retryAndFail() throws Exception {
FileCollector collectorUndetTest =
new FileCollector(appConfigMock, ftpsClientMock, sftpClientMock);
- FileData fileData = createFileData();
+ FileData fileData = createFileData(FTPES_LOCATION);
doThrow(new DatafileTaskException("Unable to collect file.")).when(ftpsClientMock)
.collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
@@ -219,9 +224,9 @@ public class XnfCollectorTaskImplTest {
doThrow(new DatafileTaskException("Unable to collect file.")).doNothing().when(ftpsClientMock)
.collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION);
- ConsumerDmaapModel expectedConsumerDmaapModel = createExpectedConsumerDmaapModel();
+ ConsumerDmaapModel expectedConsumerDmaapModel = createExpectedConsumerDmaapModel(FTPES_LOCATION_NO_PORT);
- FileData fileData = createFileData();
+ FileData fileData = createFileData(FTPES_LOCATION_NO_PORT);
StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0)))
.expectNext(expectedConsumerDmaapModel).verifyComplete();