diff options
Diffstat (limited to 'datafile-app-server/src')
5 files changed, 50 insertions, 46 deletions
diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java index 3c606deb..a8f79ea1 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParser.java @@ -123,12 +123,11 @@ public class JsonMessageParser { } private Flux<FileReadyMessage> createMessages(Flux<JsonObject> jsonObject) { - return jsonObject.flatMap(monoJsonP -> !containsNotificationFields(monoJsonP) - ? logErrorAndReturnEmptyMessageFlux("Incorrect JsonObject - missing header. " + jsonObject) - : transformMessages(monoJsonP)); + return jsonObject.flatMap(monoJsonP -> containsNotificationFields(monoJsonP) ? transformMessages(monoJsonP) + : logErrorAndReturnEmptyMessageFlux("Incorrect JsonObject - missing header. " + jsonObject)); } - private Flux<FileReadyMessage> transformMessages(JsonObject message) { + private Mono<FileReadyMessage> transformMessages(JsonObject message) { Optional<MessageMetaData> optionalMessageMetaData = getMessageMetaData(message); if (optionalMessageMetaData.isPresent()) { JsonObject notificationFields = message.getAsJsonObject(EVENT).getAsJsonObject(NOTIFICATION_FIELDS); @@ -138,22 +137,22 @@ public class JsonMessageParser { if (!allFileDataFromJson.isEmpty()) { MessageMetaData messageMetaData = optionalMessageMetaData.get(); // @formatter:off - return Flux.just(ImmutableFileReadyMessage.builder() + return Mono.just(ImmutableFileReadyMessage.builder() .pnfName(messageMetaData.sourceName()) .messageMetaData(messageMetaData) .files(allFileDataFromJson) .build()); // @formatter:on } else { - return Flux.empty(); + return Mono.empty(); } } logger.error("Unable to collect file from xNF. Missing arrayOfNamedHashMap in message. {}", message); - return Flux.empty(); + return Mono.empty(); } logger.error("Unable to collect file from xNF. FileReady event has incorrect JsonObject. {}", message); - return Flux.empty(); + return Mono.empty(); } private Optional<MessageMetaData> getMessageMetaData(JsonObject message) { diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java index 338c8323..4c0dcce5 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisher.java @@ -27,7 +27,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.http.HttpStatus; -import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18 @@ -50,27 +50,27 @@ public class DataRouterPublisher { * @param firstBackoffTimeout the time to delay the first retry * @return the HTTP response status as a string */ - public Flux<ConsumerDmaapModel> execute(ConsumerDmaapModel model, long numRetries, Duration firstBackoff) { + public Mono<ConsumerDmaapModel> execute(ConsumerDmaapModel model, long numRetries, Duration firstBackoff) { logger.trace("Method called with arg {}", model); DmaapProducerReactiveHttpClient dmaapProducerReactiveHttpClient = resolveClient(); //@formatter:off - return Flux.just(model) - .cache(1) + return Mono.just(model) + .cache() .flatMap(dmaapProducerReactiveHttpClient::getDmaapProducerResponse) .flatMap(httpStatus -> handleHttpResponse(httpStatus, model)) .retryBackoff(numRetries, firstBackoff); //@formatter:on } - private Flux<ConsumerDmaapModel> handleHttpResponse(HttpStatus response, ConsumerDmaapModel model) { + private Mono<ConsumerDmaapModel> handleHttpResponse(HttpStatus response, ConsumerDmaapModel model) { if (HttpUtils.isSuccessfulResponseCode(response.value())) { logger.trace("Publish to DR successful!"); - return Flux.just(model); + return Mono.just(model); } else { - logger.warn("Publish to DR unsuccessful, response code: " + response); - return Flux.error(new Exception("Publish to DR unsuccessful, response code: " + response)); + logger.warn("Publish to DR unsuccessful, response code: {}", response); + return Mono.error(new Exception("Publish to DR unsuccessful, response code: " + response)); } } diff --git a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java index f22c7bf9..37b7a559 100644 --- a/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java +++ b/datafile-app-server/src/main/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasks.java @@ -98,7 +98,7 @@ public class ScheduledTasks { .doOnNext(fileData -> taskCounter.incrementAndGet()) .flatMap(this::collectFileFromXnf) .flatMap(this::publishToDataRouter) - .flatMap(model -> deleteFile(Paths.get(model.getInternalLocation()))) + .doOnNext(model -> deleteFile(Paths.get(model.getInternalLocation()))) .doOnNext(model -> taskCounter.decrementAndGet()) .sequential() .subscribe(this::onSuccess, this::onError, this::onComplete); @@ -109,8 +109,8 @@ public class ScheduledTasks { logger.info("Datafile tasks have been completed"); } - private void onSuccess(Path localFile) { - logger.info("Datafile consumed tasks." + localFile); + private void onSuccess(ConsumerDmaapModel model) { + logger.info("Datafile consumed tasks {}", model.getInternalLocation()); } private void onError(Throwable throwable) { @@ -138,18 +138,19 @@ public class ScheduledTasks { return fileCollect.collectorTask .execute(fileCollect.fileData, fileCollect.metaData, maxNUmberOfRetries, initialRetryTimeout) - .onErrorResume(exception -> handleCollectFailure(fileCollect.fileData, exception)); + .onErrorResume(exception -> handleCollectFailure(fileCollect.fileData)); } - private Mono<ConsumerDmaapModel> handleCollectFailure(FileData fileData, Throwable exception) { - logger.error("File fetching failed: {}, reason: {}", fileData.name(), exception.getMessage()); - deleteFile(fileData.getLocalFileName()); - alreadyPublishedFiles.remove(fileData.getLocalFileName()); + private Mono<ConsumerDmaapModel> handleCollectFailure(FileData fileData) { + Path localFileName = fileData.getLocalFileName(); + logger.error("File fetching failed: {}", localFileName); + deleteFile(localFileName); + alreadyPublishedFiles.remove(localFileName); taskCounter.decrementAndGet(); return Mono.empty(); } - private Flux<ConsumerDmaapModel> publishToDataRouter(ConsumerDmaapModel model) { + private Mono<ConsumerDmaapModel> publishToDataRouter(ConsumerDmaapModel model) { final long maxNumberOfRetries = 3; final Duration initialRetryTimeout = Duration.ofSeconds(5); @@ -160,13 +161,13 @@ public class ScheduledTasks { } - private Flux<ConsumerDmaapModel> handlePublishFailure(ConsumerDmaapModel model, Throwable exception) { + private Mono<ConsumerDmaapModel> handlePublishFailure(ConsumerDmaapModel model, Throwable exception) { logger.error("File publishing failed: {}, exception: {}", model.getName(), exception); Path internalFileName = Paths.get(model.getInternalLocation()); deleteFile(internalFileName); alreadyPublishedFiles.remove(internalFileName); taskCounter.decrementAndGet(); - return Flux.empty(); + return Mono.empty(); } private Flux<FileReadyMessage> consumeMessagesFromDmaap() { @@ -179,7 +180,7 @@ public class ScheduledTasks { final DMaaPMessageConsumerTask messageConsumerTask = new DMaaPMessageConsumerTask(this.applicationConfiguration); return messageConsumerTask.execute() - .onErrorResume(exception -> handleConsumeMessageFailure(exception)); + .onErrorResume(this::handleConsumeMessageFailure); } private Flux<FileReadyMessage> handleConsumeMessageFailure(Throwable exception) { @@ -187,13 +188,12 @@ public class ScheduledTasks { return Flux.empty(); } - private Flux<Path> deleteFile(Path localFile) { + private void deleteFile(Path localFile) { logger.trace("Deleting file: {}", localFile); try { Files.delete(localFile); } catch (Exception e) { logger.warn("Could not delete file: {}, {}", localFile, e); } - return Flux.just(localFile); } } diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java index 73511d19..d2240f18 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java @@ -37,7 +37,7 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPub import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration; import org.springframework.http.HttpStatus; -import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; import reactor.test.StepVerifier; /** @@ -96,7 +96,7 @@ class DataRouterPublisherTest { @Test public void whenPassedObjectFits_ReturnsCorrectStatus() { - prepareMocksForTests(Flux.just(HttpStatus.OK)); + prepareMocksForTests(Mono.just(HttpStatus.OK)); StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel, 1, Duration.ofSeconds(0))) .expectNext(consumerDmaapModel).verifyComplete(); @@ -107,7 +107,7 @@ class DataRouterPublisherTest { @Test public void whenPassedObjectFits_firstFailsThenSucceeds() { - prepareMocksForTests(Flux.just(HttpStatus.BAD_GATEWAY), Flux.just(HttpStatus.OK)); + prepareMocksForTests(Mono.just(HttpStatus.BAD_GATEWAY), Mono.just(HttpStatus.OK)); StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel, 1, Duration.ofSeconds(0))) .expectNext(consumerDmaapModel).verifyComplete(); @@ -118,7 +118,7 @@ class DataRouterPublisherTest { @Test public void whenPassedObjectFits_firstFailsThenFails() { - prepareMocksForTests(Flux.just(HttpStatus.BAD_GATEWAY), Flux.just(HttpStatus.BAD_GATEWAY)); + prepareMocksForTests(Mono.just(HttpStatus.BAD_GATEWAY), Mono.just(HttpStatus.BAD_GATEWAY)); StepVerifier.create(dmaapPublisherTask.execute(consumerDmaapModel, 1, Duration.ofSeconds(0))) .expectErrorMessage("Retries exhausted: 1/1").verify(); @@ -128,7 +128,7 @@ class DataRouterPublisherTest { } @SafeVarargs - final void prepareMocksForTests(Flux<HttpStatus> firstResponse, Flux<HttpStatus>... nextHttpResponses) { + final void prepareMocksForTests(Mono<HttpStatus> firstResponse, Mono<HttpStatus>... nextHttpResponses) { dMaaPProducerReactiveHttpClient = mock(DmaapProducerReactiveHttpClient.class); when(dMaaPProducerReactiveHttpClient.getDmaapProducerResponse(any())).thenReturn(firstResponse, nextHttpResponses); diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java index 10c5b167..804b46e9 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/XnfCollectorTaskImplTest.java @@ -67,7 +67,12 @@ public class XnfCollectorTaskImplTest { private static final String PWD = "pwd"; private static final String FTPES_LOCATION = FTPES_SCHEME + USER + ":" + PWD + "@" + SERVER_ADDRESS + ":" + PORT_22 + REMOTE_FILE_LOCATION; + + private static final String FTPES_LOCATION_NO_PORT = + FTPES_SCHEME + USER + ":" + PWD + "@" + SERVER_ADDRESS + REMOTE_FILE_LOCATION; private static final String SFTP_LOCATION = SFTP_SCHEME + SERVER_ADDRESS + ":" + PORT_22 + REMOTE_FILE_LOCATION; + private static final String SFTP_LOCATION_NO_PORT = SFTP_SCHEME + SERVER_ADDRESS + REMOTE_FILE_LOCATION; + private static final String GZIP_COMPRESSION = "gzip"; private static final String MEAS_COLLECT_FILE_FORMAT_TYPE = "org.3GPP.32.435#measCollec"; private static final String FILE_FORMAT_VERSION = "V10"; @@ -100,11 +105,11 @@ public class XnfCollectorTaskImplTest { // @formatter:on } - private FileData createFileData() { + private FileData createFileData(String location) { // @formatter:off return ImmutableFileData.builder() .name(PM_FILE_NAME) - .location(FTPES_LOCATION) + .location(location) .compression(GZIP_COMPRESSION) .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) .fileFormatVersion(FILE_FORMAT_VERSION) @@ -113,7 +118,7 @@ public class XnfCollectorTaskImplTest { // @formatter:on } - private ConsumerDmaapModel createExpectedConsumerDmaapModel() { + private ConsumerDmaapModel createExpectedConsumerDmaapModel(String location) { // @formatter:off return ImmutableConsumerDmaapModel.builder() .productName(PRODUCT_NAME) @@ -123,7 +128,7 @@ public class XnfCollectorTaskImplTest { .startEpochMicrosec(START_EPOCH_MICROSEC) .timeZoneOffset(TIME_ZONE_OFFSET) .name(PM_FILE_NAME) - .location(FTPES_LOCATION) + .location(location) .internalLocation(LOCAL_FILE_LOCATION.toString()) .compression(GZIP_COMPRESSION) .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) @@ -146,9 +151,9 @@ public class XnfCollectorTaskImplTest { FileCollector collectorUndetTest = new FileCollector(appConfigMock, ftpsClientMock, sftpClientMock); - FileData fileData = createFileData(); + FileData fileData = createFileData(FTPES_LOCATION_NO_PORT); - ConsumerDmaapModel expectedConsumerDmaapModel = createExpectedConsumerDmaapModel(); + ConsumerDmaapModel expectedConsumerDmaapModel = createExpectedConsumerDmaapModel(FTPES_LOCATION_NO_PORT); StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0))) .expectNext(expectedConsumerDmaapModel).verifyComplete(); @@ -168,7 +173,7 @@ public class XnfCollectorTaskImplTest { // @formatter:off FileData fileData = ImmutableFileData.builder() .name(PM_FILE_NAME) - .location(SFTP_LOCATION) + .location(SFTP_LOCATION_NO_PORT) .compression(GZIP_COMPRESSION) .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) .fileFormatVersion(FILE_FORMAT_VERSION) @@ -183,7 +188,7 @@ public class XnfCollectorTaskImplTest { .startEpochMicrosec(START_EPOCH_MICROSEC) .timeZoneOffset(TIME_ZONE_OFFSET) .name(PM_FILE_NAME) - .location(SFTP_LOCATION) + .location(SFTP_LOCATION_NO_PORT) .internalLocation(LOCAL_FILE_LOCATION.toString()) .compression(GZIP_COMPRESSION) .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) @@ -202,7 +207,7 @@ public class XnfCollectorTaskImplTest { public void whenFtpesFileAlwaysFail_retryAndFail() throws Exception { FileCollector collectorUndetTest = new FileCollector(appConfigMock, ftpsClientMock, sftpClientMock); - FileData fileData = createFileData(); + FileData fileData = createFileData(FTPES_LOCATION); doThrow(new DatafileTaskException("Unable to collect file.")).when(ftpsClientMock) .collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION); @@ -219,9 +224,9 @@ public class XnfCollectorTaskImplTest { doThrow(new DatafileTaskException("Unable to collect file.")).doNothing().when(ftpsClientMock) .collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION); - ConsumerDmaapModel expectedConsumerDmaapModel = createExpectedConsumerDmaapModel(); + ConsumerDmaapModel expectedConsumerDmaapModel = createExpectedConsumerDmaapModel(FTPES_LOCATION_NO_PORT); - FileData fileData = createFileData(); + FileData fileData = createFileData(FTPES_LOCATION_NO_PORT); StepVerifier.create(collectorUndetTest.execute(fileData, createMessageMetaData(), 3, Duration.ofSeconds(0))) .expectNext(expectedConsumerDmaapModel).verifyComplete(); |