From dafd553cf1694585b35fd7132c6bafdef1e98ed6 Mon Sep 17 00:00:00 2001 From: PatrikBuhr Date: Fri, 14 Jun 2019 06:38:21 +0000 Subject: Bugfix, improved behaviour for large files Previously files was read into a buffer for publishing. This does not work when files are bigger than the available memory. After the fix , files are streamed instead. Implemented a new REST primitive for exposing status and statistics. To be used for test and trouble shooting. Change-Id: Iab5a1ee9ffcbf6836fcf709d115bf25ab0391732 Issue-ID: DCAEGEN2-1532 Signed-off-by: PatrikBuhr --- .../controller/HeartbeatControllerTest.java | 76 ----------------- .../datafile/controller/StatusControllerTest.java | 95 ++++++++++++++++++++++ .../collectors/datafile/ftp/FtpsClientTest.java | 3 +- .../collectors/datafile/ftp/SftpClientTest.java | 8 +- .../datafile/tasks/DataRouterPublisherTest.java | 12 +-- .../datafile/tasks/FileCollectorTest.java | 18 ++-- 6 files changed, 117 insertions(+), 95 deletions(-) delete mode 100644 datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/controller/HeartbeatControllerTest.java create mode 100644 datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/controller/StatusControllerTest.java (limited to 'datafile-app-server/src/test/java') diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/controller/HeartbeatControllerTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/controller/HeartbeatControllerTest.java deleted file mode 100644 index 012a6b3d..00000000 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/controller/HeartbeatControllerTest.java +++ /dev/null @@ -1,76 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * Copyright (C) 2019 Nordix Foundation. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ - -package org.onap.dcaegen2.collectors.datafile.controller; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import ch.qos.logback.classic.spi.ILoggingEvent; -import ch.qos.logback.core.read.ListAppender; -import org.apache.commons.lang3.StringUtils; -import org.junit.jupiter.api.Test; -import org.onap.dcaegen2.collectors.datafile.controllers.HeartbeatController; -import org.onap.dcaegen2.collectors.datafile.tasks.ScheduledTasks; -import org.onap.dcaegen2.collectors.datafile.utils.LoggingUtils; -import org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables; -import org.slf4j.MDC; -import org.springframework.http.HttpHeaders; -import org.springframework.http.ResponseEntity; -import reactor.core.publisher.Mono; - -public class HeartbeatControllerTest { - @Test - public void heartbeat_success() { - ScheduledTasks scheduledTasksMock = mock(ScheduledTasks.class); - when(scheduledTasksMock.getCurrentNumberOfTasks()).thenReturn(10); - when(scheduledTasksMock.publishedFilesCacheSize()).thenReturn(20); - - HttpHeaders httpHeaders = new HttpHeaders(); - - HeartbeatController controllerUnderTest = new HeartbeatController(scheduledTasksMock); - - ListAppender logAppender = LoggingUtils.getLogListAppender(HeartbeatController.class); - Mono> result = controllerUnderTest.heartbeat(httpHeaders); - - validateLogging(logAppender); - - String body = result.block().getBody(); - assertTrue(body.startsWith("I'm living! Status: ")); - assertTrue(body.contains("numberOfFileCollectionTasks=10")); - assertTrue(body.contains("fileCacheSize=20")); - - assertFalse(StringUtils.isBlank(MDC.get(MdcVariables.REQUEST_ID))); - assertFalse(StringUtils.isBlank(MDC.get(MdcVariables.INVOCATION_ID))); - } - - private void validateLogging(ListAppender logAppender) { - assertEquals(logAppender.list.get(0).getMarker().getName(), "ENTRY"); - assertNotNull(logAppender.list.get(0).getMDCPropertyMap().get("InvocationID")); - assertNotNull(logAppender.list.get(0).getMDCPropertyMap().get("RequestID")); - assertTrue("Info missing in log", logAppender.list.toString().contains("[INFO] Heartbeat request")); - assertEquals(logAppender.list.get(1).getMarker().getName(), "EXIT"); - logAppender.stop(); - } -} diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/controller/StatusControllerTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/controller/StatusControllerTest.java new file mode 100644 index 00000000..51097f52 --- /dev/null +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/controller/StatusControllerTest.java @@ -0,0 +1,95 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.collectors.datafile.controller; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import ch.qos.logback.classic.spi.ILoggingEvent; +import ch.qos.logback.core.read.ListAppender; +import org.apache.commons.lang3.StringUtils; +import org.junit.jupiter.api.Test; +import org.onap.dcaegen2.collectors.datafile.controllers.StatusController; +import org.onap.dcaegen2.collectors.datafile.model.Counters; +import org.onap.dcaegen2.collectors.datafile.tasks.ScheduledTasks; +import org.onap.dcaegen2.collectors.datafile.utils.LoggingUtils; +import org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables; +import org.slf4j.MDC; +import org.springframework.http.HttpHeaders; +import org.springframework.http.ResponseEntity; +import reactor.core.publisher.Mono; + +public class StatusControllerTest { + @Test + public void heartbeat_success() { + ScheduledTasks scheduledTasksMock = mock(ScheduledTasks.class); + + HttpHeaders httpHeaders = new HttpHeaders(); + + StatusController controllerUnderTest = new StatusController(scheduledTasksMock); + + ListAppender logAppender = LoggingUtils.getLogListAppender(StatusController.class); + Mono> result = controllerUnderTest.heartbeat(httpHeaders); + + validateLogging(logAppender); + + String body = result.block().getBody(); + assertTrue(body.startsWith("I'm living!")); + + assertFalse(StringUtils.isBlank(MDC.get(MdcVariables.REQUEST_ID))); + assertFalse(StringUtils.isBlank(MDC.get(MdcVariables.INVOCATION_ID))); + } + + + @Test + public void status() { + ScheduledTasks scheduledTasksMock = mock(ScheduledTasks.class); + Counters counters = new Counters(); + doReturn(counters).when(scheduledTasksMock).getCounters(); + + HttpHeaders httpHeaders = new HttpHeaders(); + + StatusController controllerUnderTest = new StatusController(scheduledTasksMock); + + Mono> result = controllerUnderTest.status(httpHeaders); + + String body = result.block().getBody(); + System.out.println(body); + + assertFalse(StringUtils.isBlank(MDC.get(MdcVariables.REQUEST_ID))); + assertFalse(StringUtils.isBlank(MDC.get(MdcVariables.INVOCATION_ID))); + } + + + private void validateLogging(ListAppender logAppender) { + assertEquals(logAppender.list.get(0).getMarker().getName(), "ENTRY"); + assertNotNull(logAppender.list.get(0).getMDCPropertyMap().get("InvocationID")); + assertNotNull(logAppender.list.get(0).getMDCPropertyMap().get("RequestID")); + assertTrue("Info missing in log", logAppender.list.toString().contains("[INFO] Heartbeat request")); + assertEquals(logAppender.list.get(1).getMarker().getName(), "EXIT"); + logAppender.stop(); + } +} diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClientTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClientTest.java index e0182560..f4e814f4 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClientTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/FtpsClientTest.java @@ -205,7 +205,8 @@ public class FtpsClientTest { doReturn(false).when(ftpsClientMock).retrieveFile(REMOTE_FILE_PATH, outputStreamMock); assertThatThrownBy(() -> clientUnderTestSpy.collectFile(REMOTE_FILE_PATH, LOCAL_FILE_PATH)) - .hasMessage("Could not retrieve file /dir/sample.txt"); + .hasMessageContaining(REMOTE_FILE_PATH) + .hasMessageContaining("No retry"); verifyFtpsClientMock_openOk(); verify(ftpsClientMock, times(1)).retrieveFile(ArgumentMatchers.eq(REMOTE_FILE_PATH), any()); diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClientTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClientTest.java index cb3735be..693806c2 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClientTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/ftp/SftpClientTest.java @@ -121,8 +121,7 @@ public class SftpClientTest { doReturn(jschMock).when(sftpClientSpy).createJsch(); when(jschMock.getSession(anyString(), anyString(), anyInt())).thenThrow(new JSchException("Failed")); - assertThatThrownBy(() -> sftpClientSpy.open()) - .hasMessageStartingWith("Could not open Sftp client. com.jcraft.jsch.JSchException: Failed"); + assertThatThrownBy(() -> sftpClientSpy.open()).hasMessageStartingWith("Could not open Sftp client."); } @SuppressWarnings("resource") @@ -161,8 +160,9 @@ public class SftpClientTest { assertThatThrownBy(() -> sftpClient.collectFile("remoteFile", Paths.get("localFile"))) .isInstanceOf(DatafileTaskException.class) - .hasMessageStartingWith("Unable to get file from xNF. Data: FileServerData{serverAddress=" + HOST - + ", " + "userId=" + USERNAME + ", password=####, port=" + SFTP_PORT); + .hasMessageStartingWith("Unable to get file from xNF. No retry attempts will be done") + .hasMessageContaining("Data: FileServerData{serverAddress=" + HOST + ", " + "userId=" + USERNAME + + ", password=####, port=" + SFTP_PORT); } } diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java index 4da22cbf..6a9dccda 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java @@ -27,10 +27,10 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; +import java.io.File; + import ch.qos.logback.classic.spi.ILoggingEvent; import ch.qos.logback.core.read.ListAppender; -import java.io.ByteArrayInputStream; -import java.io.InputStream; import java.net.URI; import java.nio.file.Path; import java.nio.file.Paths; @@ -50,6 +50,7 @@ import org.mockito.ArgumentCaptor; import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; import org.onap.dcaegen2.collectors.datafile.configuration.PublisherConfiguration; import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; +import org.onap.dcaegen2.collectors.datafile.model.Counters; import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation; import org.onap.dcaegen2.collectors.datafile.model.ImmutableFilePublishInformation; import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerHttpClient; @@ -86,7 +87,6 @@ class DataRouterPublisherTest { private static final String APPLICATION_OCTET_STREAM_CONTENT_TYPE = "application/octet-stream"; private static final String PUBLISH_TOPIC = "publish"; private static final String FEED_ID = "1"; - private static final String FILE_CONTENT = "Just a string."; private static FilePublishInformation filePublishInformation; private static DmaapProducerHttpClient httpClientMock; @@ -120,7 +120,7 @@ class DataRouterPublisherTest { .changeIdentifier(CHANGE_IDENTIFIER) // .build(); // appConfig = mock(AppConfig.class); - publisherTaskUnderTestSpy = spy(new DataRouterPublisher(appConfig)); + publisherTaskUnderTestSpy = spy(new DataRouterPublisher(appConfig, new Counters())); } @Test @@ -236,8 +236,8 @@ class DataRouterPublisherTest { when(httpResponseMock.getStatusLine()).thenReturn(statusLineMock); when(statusLineMock.getStatusCode()).thenReturn(firstResponse, nextHttpResponses); - InputStream fileStream = new ByteArrayInputStream(FILE_CONTENT.getBytes()); - doReturn(fileStream).when(publisherTaskUnderTestSpy).createInputStream(Paths.get("target", PM_FILE_NAME)); + File file = File.createTempFile("DFC", "tmp"); + doReturn(file).when(publisherTaskUnderTestSpy).createInputFile(Paths.get("target", PM_FILE_NAME)); } private Map getMetaDataAsMap(Header[] metaHeaders) { diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java index 299a0238..99e92bd2 100644 --- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java +++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java @@ -37,9 +37,11 @@ import org.junit.jupiter.api.Test; import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig; import org.onap.dcaegen2.collectors.datafile.configuration.FtpesConfig; import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException; +import org.onap.dcaegen2.collectors.datafile.exceptions.NonRetryableDatafileTaskException; import org.onap.dcaegen2.collectors.datafile.ftp.FtpsClient; import org.onap.dcaegen2.collectors.datafile.ftp.Scheme; import org.onap.dcaegen2.collectors.datafile.ftp.SftpClient; +import org.onap.dcaegen2.collectors.datafile.model.Counters; import org.onap.dcaegen2.collectors.datafile.model.FileData; import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation; import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileData; @@ -93,6 +95,7 @@ public class FileCollectorTest { private SftpClient sftpClientMock = mock(SftpClient.class); private final Map contextMap = new HashMap<>(); + private final Counters counters = new Counters(); private MessageMetaData createMessageMetaData() { return ImmutableMessageMetaData.builder() // @@ -133,7 +136,7 @@ public class FileCollectorTest { .compression(GZIP_COMPRESSION) // .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) // .fileFormatVersion(FILE_FORMAT_VERSION) // - .context(new HashMap()) // + .context(new HashMap()) // .changeIdentifier(CHANGE_IDENTIFIER) // .build(); } @@ -152,7 +155,7 @@ public class FileCollectorTest { @Test public void whenFtpesFile_returnCorrectResponse() throws Exception { - FileCollector collectorUndetTest = spy(new FileCollector(appConfigMock)); + FileCollector collectorUndetTest = spy(new FileCollector(appConfigMock, counters)); doReturn(ftpsClientMock).when(collectorUndetTest).createFtpsClient(any()); FileData fileData = createFileData(FTPES_LOCATION_NO_PORT, Scheme.FTPS); @@ -173,7 +176,7 @@ public class FileCollectorTest { @Test public void whenSftpFile_returnCorrectResponse() throws Exception { - FileCollector collectorUndetTest = spy(new FileCollector(appConfigMock)); + FileCollector collectorUndetTest = spy(new FileCollector(appConfigMock, counters)); doReturn(sftpClientMock).when(collectorUndetTest).createSftpClient(any()); @@ -201,7 +204,7 @@ public class FileCollectorTest { @Test public void whenFtpesFileAlwaysFail_retryAndFail() throws Exception { - FileCollector collectorUndetTest = spy(new FileCollector(appConfigMock)); + FileCollector collectorUndetTest = spy(new FileCollector(appConfigMock, counters)); doReturn(ftpsClientMock).when(collectorUndetTest).createFtpsClient(any()); FileData fileData = createFileData(FTPES_LOCATION, Scheme.FTPS); @@ -217,12 +220,11 @@ public class FileCollectorTest { @Test public void whenFtpesFileAlwaysFail_failWithoutRetry() throws Exception { - FileCollector collectorUndetTest = spy(new FileCollector(appConfigMock)); + FileCollector collectorUndetTest = spy(new FileCollector(appConfigMock, new Counters())); doReturn(ftpsClientMock).when(collectorUndetTest).createFtpsClient(any()); - final boolean retry = false; FileData fileData = createFileData(FTPES_LOCATION, Scheme.FTPS); - doThrow(new DatafileTaskException("Unable to collect file.", retry)).when(ftpsClientMock) + doThrow(new NonRetryableDatafileTaskException("Unable to collect file.")).when(ftpsClientMock) .collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION); StepVerifier.create(collectorUndetTest.collectFile(fileData, 3, Duration.ofSeconds(0), contextMap)) @@ -234,7 +236,7 @@ public class FileCollectorTest { @Test public void whenFtpesFileFailOnce_retryAndReturnCorrectResponse() throws Exception { - FileCollector collectorUndetTest = spy(new FileCollector(appConfigMock)); + FileCollector collectorUndetTest = spy(new FileCollector(appConfigMock, counters)); doReturn(ftpsClientMock).when(collectorUndetTest).createFtpsClient(any()); doThrow(new DatafileTaskException("Unable to collect file.")).doNothing().when(ftpsClientMock) .collectFile(REMOTE_FILE_LOCATION, LOCAL_FILE_LOCATION); -- cgit 1.2.3-korg