aboutsummaryrefslogtreecommitdiffstats
path: root/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTaskImplTest.java
diff options
context:
space:
mode:
Diffstat (limited to 'datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTaskImplTest.java')
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTaskImplTest.java208
1 files changed, 108 insertions, 100 deletions
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTaskImplTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTaskImplTest.java
index f88e301d..a695e20d 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTaskImplTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTaskImplTest.java
@@ -28,6 +28,8 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
+import java.nio.file.Path;
+import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
@@ -74,7 +76,7 @@ public class DMaaPMessageConsumerTaskImplTest {
private static final String PORT_22 = "22";
private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz";
private static final String REMOTE_FILE_LOCATION = "/ftp/rop/" + PM_FILE_NAME;
- private static final String LOCAL_FILE_LOCATION = "target/" + PM_FILE_NAME;
+ private static final Path LOCAL_FILE_LOCATION = Paths.get("target/" + PM_FILE_NAME);
private static final String FTPES_LOCATION = FTPES_SCHEME + SERVER_ADDRESS + ":" + PORT_22 + REMOTE_FILE_LOCATION;
private static final String SFTP_LOCATION = SFTP_SCHEME + SERVER_ADDRESS + ":" + PORT_22 + REMOTE_FILE_LOCATION;
private static final String GZIP_COMPRESSION = "gzip";
@@ -86,7 +88,7 @@ public class DMaaPMessageConsumerTaskImplTest {
private static AppConfig appConfig;
private static DmaapConsumerConfiguration dmaapConsumerConfiguration;
private DMaaPMessageConsumerTask messageConsumerTask;
- private DMaaPConsumerReactiveHttpClient dmaapConsumerReactiveHttpClient;
+ private DMaaPConsumerReactiveHttpClient httpClientMock;
private static String ftpesMessageString;
private static FileData ftpesFileData;
@@ -96,156 +98,163 @@ public class DMaaPMessageConsumerTaskImplTest {
private static FileData sftpFileData;
private static FileReadyMessage expectedSftpMessage;
+ /**
+ * Sets up data for the test.
+ */
@BeforeAll
public static void setUp() {
- //@formatter:off
- dmaapConsumerConfiguration = new ImmutableDmaapConsumerConfiguration.Builder()
- .consumerGroup("OpenDCAE-c12")
- .consumerId("c12")
- .dmaapContentType("application/json")
- .dmaapHostName("54.45.33.2")
- .dmaapPortNumber(1234).dmaapProtocol("https")
- .dmaapUserName("Datafile")
- .dmaapUserPassword("Datafile")
- .dmaapTopicName("unauthenticated.NOTIFICATION")
- .timeoutMs(-1)
- .messageLimit(-1)
- .trustStorePath("trustStorePath")
- .trustStorePasswordPath("trustStorePasswordPath")
- .keyStorePath("keyStorePath")
- .keyStorePasswordPath("keyStorePasswordPath")
- .enableDmaapCertAuth(true)
+ dmaapConsumerConfiguration = new ImmutableDmaapConsumerConfiguration.Builder() //
+ .consumerGroup("OpenDCAE-c12") //
+ .consumerId("c12") //
+ .dmaapContentType("application/json") //
+ .dmaapHostName("54.45.33.2") //
+ .dmaapPortNumber(1234).dmaapProtocol("https") //
+ .dmaapUserName("Datafile") //
+ .dmaapUserPassword("Datafile") //
+ .dmaapTopicName("unauthenticated.NOTIFICATION") //
+ .timeoutMs(-1) //
+ .messageLimit(-1) //
+ .trustStorePath("trustStorePath") //
+ .trustStorePasswordPath("trustStorePasswordPath") //
+ .keyStorePath("keyStorePath") //
+ .keyStorePasswordPath("keyStorePasswordPath") //
+ .enableDmaapCertAuth(true) //
.build();
appConfig = mock(AppConfig.class);
- AdditionalField ftpesAdditionalField = new JsonMessage.AdditionalFieldBuilder()
- .location(FTPES_LOCATION)
- .compression(GZIP_COMPRESSION)
- .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
+ AdditionalField ftpesAdditionalField = new JsonMessage.AdditionalFieldBuilder() //
+ .location(FTPES_LOCATION) //
+ .compression(GZIP_COMPRESSION) //
+ .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) //
+ .fileFormatVersion(FILE_FORMAT_VERSION) //
.build();
- JsonMessage ftpesJsonMessage = new JsonMessage.JsonMessageBuilder()
- .eventName(NR_RADIO_ERICSSON_EVENT_NAME)
- .changeIdentifier(PM_MEAS_CHANGE_IDENTIFIER)
- .changeType(FILE_READY_CHANGE_TYPE)
- .notificationFieldsVersion("1.0")
- .addAdditionalField(ftpesAdditionalField)
+ JsonMessage ftpesJsonMessage = new JsonMessage.JsonMessageBuilder() //
+ .eventName(NR_RADIO_ERICSSON_EVENT_NAME) //
+ .changeIdentifier(PM_MEAS_CHANGE_IDENTIFIER) //
+ .changeType(FILE_READY_CHANGE_TYPE) //
+ .notificationFieldsVersion("1.0") //
+ .addAdditionalField(ftpesAdditionalField) //
.build();
ftpesMessageString = ftpesJsonMessage.toString();
- MessageMetaData messageMetaData = ImmutableMessageMetaData.builder()
- .productName(PRODUCT_NAME)
- .vendorName(VENDOR_NAME)
- .lastEpochMicrosec(LAST_EPOCH_MICROSEC)
- .sourceName(SOURCE_NAME)
- .startEpochMicrosec(START_EPOCH_MICROSEC)
- .timeZoneOffset(TIME_ZONE_OFFSET)
- .changeIdentifier(PM_MEAS_CHANGE_IDENTIFIER)
- .changeType(FILE_READY_CHANGE_TYPE)
+ MessageMetaData messageMetaData = ImmutableMessageMetaData.builder() //
+ .productName(PRODUCT_NAME) //
+ .vendorName(VENDOR_NAME) //
+ .lastEpochMicrosec(LAST_EPOCH_MICROSEC) //
+ .sourceName(SOURCE_NAME) //
+ .startEpochMicrosec(START_EPOCH_MICROSEC) //
+ .timeZoneOffset(TIME_ZONE_OFFSET) //
+ .changeIdentifier(PM_MEAS_CHANGE_IDENTIFIER) //
+ .changeType(FILE_READY_CHANGE_TYPE) //
.build();
- ftpesFileData = ImmutableFileData.builder()
- .name(PM_FILE_NAME)
- .location(FTPES_LOCATION)
- .scheme(Scheme.FTPS)
- .compression(GZIP_COMPRESSION)
- .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
+ ftpesFileData = ImmutableFileData.builder() //
+ .name(PM_FILE_NAME) //
+ .location(FTPES_LOCATION) //
+ .scheme(Scheme.FTPS) //
+ .compression(GZIP_COMPRESSION) //
+ .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) //
+ .fileFormatVersion(FILE_FORMAT_VERSION) //
.build();
List<FileData> files = new ArrayList<>();
files.add(ftpesFileData);
- expectedFtpesMessage = ImmutableFileReadyMessage.builder()
- .pnfName(SOURCE_NAME)
- .messageMetaData(messageMetaData)
- .files(files)
+ expectedFtpesMessage = ImmutableFileReadyMessage.builder() //
+ .pnfName(SOURCE_NAME) //
+ .messageMetaData(messageMetaData) //
+ .files(files) //
.build();
- AdditionalField sftpAdditionalField = new JsonMessage.AdditionalFieldBuilder()
- .location(SFTP_LOCATION)
- .compression(GZIP_COMPRESSION)
- .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
+ AdditionalField sftpAdditionalField = new JsonMessage.AdditionalFieldBuilder() //
+ .location(SFTP_LOCATION) //
+ .compression(GZIP_COMPRESSION) //
+ .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) //
+ .fileFormatVersion(FILE_FORMAT_VERSION) //
.build();
- JsonMessage sftpJsonMessage = new JsonMessage.JsonMessageBuilder()
- .eventName(NR_RADIO_ERICSSON_EVENT_NAME)
- .changeIdentifier(PM_MEAS_CHANGE_IDENTIFIER)
- .changeType(FILE_READY_CHANGE_TYPE)
- .notificationFieldsVersion("1.0")
- .addAdditionalField(sftpAdditionalField)
+ JsonMessage sftpJsonMessage = new JsonMessage.JsonMessageBuilder() //
+ .eventName(NR_RADIO_ERICSSON_EVENT_NAME) //
+ .changeIdentifier(PM_MEAS_CHANGE_IDENTIFIER) //
+ .changeType(FILE_READY_CHANGE_TYPE) //
+ .notificationFieldsVersion("1.0") //
+ .addAdditionalField(sftpAdditionalField) //
.build();
sftpMessageString = sftpJsonMessage.toString();
- sftpFileData = ImmutableFileData.builder()
- .name(PM_FILE_NAME)
- .location(SFTP_LOCATION)
- .scheme(Scheme.FTPS)
- .compression(GZIP_COMPRESSION)
- .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
+ sftpFileData = ImmutableFileData.builder() //
+ .name(PM_FILE_NAME) //
+ .location(SFTP_LOCATION) //
+ .scheme(Scheme.FTPS) //
+ .compression(GZIP_COMPRESSION) //
+ .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) //
+ .fileFormatVersion(FILE_FORMAT_VERSION) //
.build();
- ImmutableConsumerDmaapModel consumerDmaapModel = ImmutableConsumerDmaapModel.builder()
- .productName(PRODUCT_NAME)
- .vendorName(VENDOR_NAME)
- .lastEpochMicrosec(LAST_EPOCH_MICROSEC)
- .sourceName(SOURCE_NAME)
- .startEpochMicrosec(START_EPOCH_MICROSEC)
- .timeZoneOffset(TIME_ZONE_OFFSET)
- .name(PM_FILE_NAME)
- .location(FTPES_LOCATION)
- .internalLocation(LOCAL_FILE_LOCATION)
- .compression(GZIP_COMPRESSION)
- .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE)
- .fileFormatVersion(FILE_FORMAT_VERSION)
+ ImmutableConsumerDmaapModel consumerDmaapModel = ImmutableConsumerDmaapModel.builder() //
+ .productName(PRODUCT_NAME) //
+ .vendorName(VENDOR_NAME) //
+ .lastEpochMicrosec(LAST_EPOCH_MICROSEC) //
+ .sourceName(SOURCE_NAME) //
+ .startEpochMicrosec(START_EPOCH_MICROSEC) //
+ .timeZoneOffset(TIME_ZONE_OFFSET) //
+ .name(PM_FILE_NAME) //
+ .location(FTPES_LOCATION) //
+ .internalLocation(LOCAL_FILE_LOCATION) //
+ .compression(GZIP_COMPRESSION) //
+ .fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) //
+ .fileFormatVersion(FILE_FORMAT_VERSION) //
.build();
listOfConsumerDmaapModel.add(consumerDmaapModel);
files = new ArrayList<>();
files.add(sftpFileData);
- expectedSftpMessage = ImmutableFileReadyMessage.builder()
- .pnfName(SOURCE_NAME)
- .messageMetaData(messageMetaData)
- .files(files)
+ expectedSftpMessage = ImmutableFileReadyMessage.builder() //
+ .pnfName(SOURCE_NAME) //
+ .messageMetaData(messageMetaData) //
+ .files(files) //
.build();
- //@formatter:on
}
@Test
public void whenPassedObjectDoesntFit_ThrowsDatafileTaskException() {
prepareMocksForDmaapConsumer("", null);
- StepVerifier.create(messageConsumerTask.execute()).expectSubscription()
- .expectError(DatafileTaskException.class).verify();
+ StepVerifier.create(messageConsumerTask.execute()) //
+ .expectSubscription() //
+ .expectError(DatafileTaskException.class) //
+ .verify();
- verify(dmaapConsumerReactiveHttpClient, times(1)).getDMaaPConsumerResponse();
+ verify(httpClientMock, times(1)).getDMaaPConsumerResponse();
}
@Test
public void whenFtpes_ReturnsCorrectResponse() throws DatafileTaskException {
prepareMocksForDmaapConsumer(ftpesMessageString, expectedFtpesMessage);
- StepVerifier.create(messageConsumerTask.execute()).expectNext(expectedFtpesMessage).verifyComplete();
+ StepVerifier.create(messageConsumerTask.execute()) //
+ .expectNext(expectedFtpesMessage) //
+ .verifyComplete();
- verify(dmaapConsumerReactiveHttpClient, times(1)).getDMaaPConsumerResponse();
- verifyNoMoreInteractions(dmaapConsumerReactiveHttpClient);
+ verify(httpClientMock, times(1)).getDMaaPConsumerResponse();
+ verifyNoMoreInteractions(httpClientMock);
}
@Test
public void whenSftp_ReturnsCorrectResponse() throws DatafileTaskException {
prepareMocksForDmaapConsumer(sftpMessageString, expectedSftpMessage);
- StepVerifier.create(messageConsumerTask.execute()).expectNext(expectedSftpMessage).verifyComplete();
+ StepVerifier.create(messageConsumerTask.execute()) //
+ .expectNext(expectedSftpMessage) //
+ .verifyComplete();
- verify(dmaapConsumerReactiveHttpClient, times(1)).getDMaaPConsumerResponse();
- verifyNoMoreInteractions(dmaapConsumerReactiveHttpClient);
+ verify(httpClientMock, times(1)).getDMaaPConsumerResponse();
+ verifyNoMoreInteractions(httpClientMock);
}
private void prepareMocksForDmaapConsumer(String message, FileReadyMessage fileReadyMessageAfterConsume) {
Mono<String> messageAsMono = Mono.just(message);
JsonMessageParser jsonMessageParserMock = mock(JsonMessageParser.class);
- dmaapConsumerReactiveHttpClient = mock(DMaaPConsumerReactiveHttpClient.class);
- when(dmaapConsumerReactiveHttpClient.getDMaaPConsumerResponse()).thenReturn(messageAsMono);
+ httpClientMock = mock(DMaaPConsumerReactiveHttpClient.class);
+ when(httpClientMock.getDMaaPConsumerResponse()).thenReturn(messageAsMono);
if (!message.isEmpty()) {
when(jsonMessageParserMock.getMessagesFromJson(messageAsMono))
@@ -255,9 +264,8 @@ public class DMaaPMessageConsumerTaskImplTest {
.thenReturn(Flux.error(new DatafileTaskException("problemas")));
}
- messageConsumerTask =
- spy(new DMaaPMessageConsumerTask(appConfig, dmaapConsumerReactiveHttpClient, jsonMessageParserMock));
+ messageConsumerTask = spy(new DMaaPMessageConsumerTask(appConfig, httpClientMock, jsonMessageParserMock));
when(messageConsumerTask.resolveConfiguration()).thenReturn(dmaapConsumerConfiguration);
- doReturn(dmaapConsumerReactiveHttpClient).when(messageConsumerTask).resolveClient();
+ doReturn(httpClientMock).when(messageConsumerTask).resolveClient();
}
}