aboutsummaryrefslogtreecommitdiffstats
path: root/datafile-app-server/src/test/java/org/onap
diff options
context:
space:
mode:
Diffstat (limited to 'datafile-app-server/src/test/java/org/onap')
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfigTest.java356
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParserTest.java133
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfigTest.java38
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/integration/ScheduledXmlContextITest.java63
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformationTest.java72
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java6
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClientTest.java10
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTest.java2
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java33
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java4
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedCheckerTest.java62
-rw-r--r--datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java61
12 files changed, 364 insertions, 476 deletions
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfigTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfigTest.java
index 5be75ab3..b1148a6a 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfigTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/AppConfigTest.java
@@ -16,25 +16,51 @@
package org.onap.dcaegen2.collectors.datafile.configuration;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Resources;
import com.google.gson.JsonElement;
+import com.google.gson.JsonIOException;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
+import com.google.gson.JsonSyntaxException;
+
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.URL;
import java.nio.charset.StandardCharsets;
-import java.util.Objects;
+import java.util.Map;
+import java.util.Properties;
+
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
+import org.onap.dcaegen2.collectors.datafile.model.logging.MappedDiagnosticContext;
+import org.onap.dcaegen2.collectors.datafile.utils.LoggingUtils;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.EnvProperties;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.ImmutableEnvProperties;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.providers.CloudConfigurationProvider;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapConsumerConfiguration;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration;
+
+import ch.qos.logback.classic.spi.ILoggingEvent;
+import ch.qos.logback.core.read.ListAppender;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
/**
* Tests the AppConfig.
@@ -44,167 +70,285 @@ import org.junit.jupiter.api.Test;
*/
class AppConfigTest {
- private static final String DATAFILE_ENDPOINTS = "datafile_endpoints.json";
- private static final boolean CORRECT_JSON = true;
- private static final boolean INCORRECT_JSON = false;
+ private static final String CHANGE_IDENTIFIER = "PM_MEAS_FILES";
+
+
+ private static final ImmutableDmaapConsumerConfiguration CORRECT_DMAAP_CONSUMER_CONFIG = //
+ new ImmutableDmaapConsumerConfiguration.Builder() //
+ .timeoutMs(-1) //
+ .dmaapHostName("message-router.onap.svc.cluster.local") //
+ .dmaapUserName("admin") //
+ .dmaapUserPassword("admin") //
+ .dmaapTopicName("events/unauthenticated.VES_NOTIFICATION_OUTPUT") //
+ .dmaapPortNumber(2222) //
+ .dmaapContentType("application/json") //
+ .messageLimit(-1) //
+ .dmaapProtocol("http") //
+ .consumerId("C12") //
+ .consumerGroup("OpenDcae-c12") //
+ .trustStorePath("trustStorePath") //
+ .trustStorePasswordPath("trustStorePasswordPath") //
+ .keyStorePath("keyStorePath") //
+ .keyStorePasswordPath("keyStorePasswordPath") //
+ .enableDmaapCertAuth(true) //
+ .build();
+
+ private static final ConsumerConfiguration CORRECT_CONSUMER_CONFIG = ImmutableConsumerConfiguration.builder() //
+ .topicUrl(
+ "http://admin:admin@message-router.onap.svc.cluster.local:2222/events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12")
+ .trustStorePath("trustStorePath") //
+ .trustStorePasswordPath("trustStorePasswordPath") //
+ .keyStorePath("keyStorePath") //
+ .keyStorePasswordPath("keyStorePasswordPath") //
+ .enableDmaapCertAuth(true) //
+ .build();
+
+ private static final PublisherConfiguration CORRECT_PUBLISHER_CONFIG = //
+ ImmutablePublisherConfiguration.builder() //
+ .publishUrl("https://message-router.onap.svc.cluster.local:3907/publish/1") //
+ .logUrl("https://dmaap.example.com/feedlog/972").trustStorePath("trustStorePath") //
+ .trustStorePasswordPath("trustStorePasswordPath") //
+ .keyStorePath("keyStorePath") //
+ .keyStorePasswordPath("keyStorePasswordPath") //
+ .enableDmaapCertAuth(true) //
+ .changeIdentifier("PM_MEAS_FILES") //
+ .userName("user") //
+ .passWord("password") //
+ .build();
+
+ private static final ImmutableFtpesConfig CORRECT_FTPES_CONFIGURATION = //
+ new ImmutableFtpesConfig.Builder() //
+ .keyCert("/config/dfc.jks") //
+ .keyPassword("secret") //
+ .trustedCa("config/ftp.jks") //
+ .trustedCaPassword("secret") //
+ .build();
+
+ private static final ImmutableDmaapPublisherConfiguration CORRECT_DMAAP_PUBLISHER_CONFIG = //
+ new ImmutableDmaapPublisherConfiguration.Builder() //
+ .dmaapTopicName("/publish/1") //
+ .dmaapUserPassword("password") //
+ .dmaapPortNumber(3907) //
+ .dmaapProtocol("https") //
+ .dmaapContentType("application/octet-stream") //
+ .dmaapHostName("message-router.onap.svc.cluster.local") //
+ .dmaapUserName("user") //
+ .trustStorePath("trustStorePath") //
+ .trustStorePasswordPath("trustStorePasswordPath") //
+ .keyStorePath("keyStorePath") //
+ .keyStorePasswordPath("keyStorePasswordPath") //
+ .enableDmaapCertAuth(true) //
+ .build();
+
+ private static EnvProperties properties() {
+ return ImmutableEnvProperties.builder() //
+ .consulHost("host") //
+ .consulPort(123) //
+ .cbsName("cbsName") //
+ .appName("appName") //
+ .build();
+ }
- private static AppConfig appConfigUnderTest;
+ private AppConfig appConfigUnderTest;
+ private CloudConfigurationProvider cloudConfigurationProvider = mock(CloudConfigurationProvider.class);
+ private final Map<String, String> context = MappedDiagnosticContext.initializeTraceContext();
- private static String filePath =
- Objects.requireNonNull(AppConfigTest.class.getClassLoader().getResource(DATAFILE_ENDPOINTS)).getFile();
@BeforeEach
public void setUp() {
appConfigUnderTest = spy(AppConfig.class);
+ appConfigUnderTest.setCloudConfigurationProvider(cloudConfigurationProvider);
+ appConfigUnderTest.systemEnvironment = new Properties();
}
@Test
- public void whenApplicationWasStarted_FilePathIsSet() {
+ public void whenTheConfigurationFits() throws IOException, DatafileTaskException {
// When
- appConfigUnderTest.setFilepath(filePath);
+ doReturn(getCorrectJson()).when(appConfigUnderTest).createInputStream(any());
+ appConfigUnderTest.initialize();
// Then
- verify(appConfigUnderTest, times(1)).setFilepath(anyString());
- verify(appConfigUnderTest, times(0)).loadConfigurationFromFile();
- Assertions.assertEquals(filePath, appConfigUnderTest.getFilepath());
+ verify(appConfigUnderTest, times(1)).loadConfigurationFromFile();
+
+ ConsumerConfiguration consumerCfg = appConfigUnderTest.getDmaapConsumerConfiguration();
+ Assertions.assertNotNull(consumerCfg);
+ assertThat(consumerCfg.toDmaap()).isEqualToComparingFieldByField(CORRECT_DMAAP_CONSUMER_CONFIG);
+ assertThat(consumerCfg).isEqualToComparingFieldByField(CORRECT_CONSUMER_CONFIG);
+
+ PublisherConfiguration publisherCfg = appConfigUnderTest.getPublisherConfiguration(CHANGE_IDENTIFIER);
+ Assertions.assertNotNull(publisherCfg);
+ assertThat(publisherCfg).isEqualToComparingFieldByField(CORRECT_PUBLISHER_CONFIG);
+ assertThat(publisherCfg.toDmaap()).isEqualToComparingFieldByField(CORRECT_DMAAP_PUBLISHER_CONFIG);
+
+ FtpesConfig ftpesConfig = appConfigUnderTest.getFtpesConfiguration();
+ assertThat(ftpesConfig).isNotNull();
+ assertThat(ftpesConfig).isEqualToComparingFieldByField(CORRECT_FTPES_CONFIGURATION);
}
@Test
- public void whenTheConfigurationFits_GetFtpsAndDmaapObjectRepresentationConfiguration() throws IOException {
- // Given
- InputStream inputStream =
- new ByteArrayInputStream((getJsonConfig(CORRECT_JSON).getBytes(StandardCharsets.UTF_8)));
-
+ public void whenTheConfigurationFits_twoProducers() throws IOException, DatafileTaskException {
// When
- appConfigUnderTest.setFilepath(filePath);
- doReturn(inputStream).when(appConfigUnderTest).createInputStream(any());
+ doReturn(getCorrectJsonTwoProducers()).when(appConfigUnderTest).createInputStream(any());
appConfigUnderTest.loadConfigurationFromFile();
// Then
- verify(appConfigUnderTest, times(1)).setFilepath(anyString());
verify(appConfigUnderTest, times(1)).loadConfigurationFromFile();
Assertions.assertNotNull(appConfigUnderTest.getDmaapConsumerConfiguration());
- Assertions.assertNotNull(appConfigUnderTest.getDmaapPublisherConfiguration());
- Assertions.assertEquals(appConfigUnderTest.getDmaapPublisherConfiguration(),
- appConfigUnderTest.getDmaapPublisherConfiguration());
- Assertions.assertEquals(appConfigUnderTest.getDmaapConsumerConfiguration(),
- appConfigUnderTest.getDmaapConsumerConfiguration());
- Assertions.assertEquals(appConfigUnderTest.getFtpesConfiguration(), appConfigUnderTest.getFtpesConfiguration());
+ Assertions.assertNotNull(appConfigUnderTest.getPublisherConfiguration(CHANGE_IDENTIFIER));
+ Assertions.assertNotNull(appConfigUnderTest.getPublisherConfiguration("XX_FILES"));
+ Assertions.assertNotNull(appConfigUnderTest.getPublisherConfiguration("YY_FILES"));
+
+ assertThat(appConfigUnderTest.getPublisherConfiguration("XX_FILES").publishUrl())
+ .isEqualTo("feed01::publish_url");
+ assertThat(appConfigUnderTest.getPublisherConfiguration("YY_FILES").publishUrl())
+ .isEqualTo("feed01::publish_url");
}
@Test
- public void whenFileIsNotExist_ThrowIoException() {
+ public void whenFileIsNotExist_ThrowException() throws DatafileTaskException {
// Given
- filePath = "/temp.json";
- appConfigUnderTest.setFilepath(filePath);
+ appConfigUnderTest.setFilepath("/temp.json");
// When
appConfigUnderTest.loadConfigurationFromFile();
// Then
- verify(appConfigUnderTest, times(1)).setFilepath(anyString());
- verify(appConfigUnderTest, times(1)).loadConfigurationFromFile();
Assertions.assertNull(appConfigUnderTest.getDmaapConsumerConfiguration());
- Assertions.assertNull(appConfigUnderTest.getDmaapPublisherConfiguration());
- Assertions.assertNull(appConfigUnderTest.getFtpesConfiguration());
+ assertThatThrownBy(() -> appConfigUnderTest.getPublisherConfiguration(CHANGE_IDENTIFIER))
+ .hasMessageContaining("No PublishingConfiguration loaded, changeIdentifier: PM_MEAS_FILES");
+ Assertions.assertNull(appConfigUnderTest.getFtpesConfiguration());
}
@Test
- public void whenFileIsExistsButJsonIsIncorrect() throws IOException {
- // Given
- InputStream inputStream =
- new ByteArrayInputStream((getJsonConfig(INCORRECT_JSON).getBytes(StandardCharsets.UTF_8)));
+ public void whenFileIsExistsButJsonIsIncorrect() throws IOException, DatafileTaskException {
// When
- appConfigUnderTest.setFilepath(filePath);
- doReturn(inputStream).when(appConfigUnderTest).createInputStream(any());
+ doReturn(getIncorrectJson()).when(appConfigUnderTest).createInputStream(any());
appConfigUnderTest.loadConfigurationFromFile();
// Then
- verify(appConfigUnderTest, times(1)).setFilepath(anyString());
verify(appConfigUnderTest, times(1)).loadConfigurationFromFile();
Assertions.assertNull(appConfigUnderTest.getDmaapConsumerConfiguration());
- Assertions.assertNull(appConfigUnderTest.getDmaapPublisherConfiguration());
+ assertThatThrownBy(() -> appConfigUnderTest.getPublisherConfiguration(CHANGE_IDENTIFIER))
+ .hasMessageContaining(CHANGE_IDENTIFIER);
Assertions.assertNull(appConfigUnderTest.getFtpesConfiguration());
-
}
-
@Test
- public void whenTheConfigurationFits_ButRootElementIsNotAJsonObject() throws IOException {
- // Given
- InputStream inputStream =
- new ByteArrayInputStream((getJsonConfig(CORRECT_JSON).getBytes(StandardCharsets.UTF_8)));
+ public void whenTheConfigurationFits_ButRootElementIsNotAJsonObject() throws IOException, DatafileTaskException {
+
// When
- appConfigUnderTest.setFilepath(filePath);
- doReturn(inputStream).when(appConfigUnderTest).createInputStream(any());
+ doReturn(getCorrectJson()).when(appConfigUnderTest).createInputStream(any());
JsonElement jsonElement = mock(JsonElement.class);
when(jsonElement.isJsonObject()).thenReturn(false);
doReturn(jsonElement).when(appConfigUnderTest).getJsonElement(any(JsonParser.class), any(InputStream.class));
appConfigUnderTest.loadConfigurationFromFile();
// Then
- verify(appConfigUnderTest, times(1)).setFilepath(anyString());
verify(appConfigUnderTest, times(1)).loadConfigurationFromFile();
Assertions.assertNull(appConfigUnderTest.getDmaapConsumerConfiguration());
- Assertions.assertNull(appConfigUnderTest.getDmaapPublisherConfiguration());
+ assertThatThrownBy(() -> appConfigUnderTest.getPublisherConfiguration(CHANGE_IDENTIFIER))
+ .hasMessageContaining(CHANGE_IDENTIFIER);
Assertions.assertNull(appConfigUnderTest.getFtpesConfiguration());
}
- private String getJsonConfig(boolean correct) {
- JsonObject dmaapConsumerConfigData = new JsonObject();
- dmaapConsumerConfigData.addProperty("dmaapHostName", "localhost");
- dmaapConsumerConfigData.addProperty("dmaapPortNumber", 2222);
- dmaapConsumerConfigData.addProperty("dmaapTopicName", "/events/unauthenticated.VES_NOTIFICATION_OUTPUT");
- dmaapConsumerConfigData.addProperty("dmaapProtocol", "http");
- dmaapConsumerConfigData.addProperty("dmaapUserName", "admin");
- dmaapConsumerConfigData.addProperty("dmaapUserPassword", "admin");
- dmaapConsumerConfigData.addProperty("dmaapContentType", "application/json");
- dmaapConsumerConfigData.addProperty("consumerId", "C12");
- dmaapConsumerConfigData.addProperty("consumerGroup", "OpenDcae-c12");
- dmaapConsumerConfigData.addProperty("timeoutMs", -1);
- dmaapConsumerConfigData.addProperty("messageLimit", 1);
-
- JsonObject dmaapProducerConfigData = new JsonObject();
- dmaapProducerConfigData.addProperty("dmaapHostName", "localhost");
- dmaapProducerConfigData.addProperty("dmaapPortNumber", 3907);
- dmaapProducerConfigData.addProperty("dmaapTopicName", "publish");
- dmaapProducerConfigData.addProperty("dmaapProtocol", "https");
- if (correct) {
- dmaapProducerConfigData.addProperty("dmaapUserName", "dradmin");
- dmaapProducerConfigData.addProperty("dmaapUserPassword", "dradmin");
- dmaapProducerConfigData.addProperty("dmaapContentType", "application/octet-stream");
- }
-
- JsonObject dmaapConfigs = new JsonObject();
- dmaapConfigs.add("dmaapConsumerConfiguration", dmaapConsumerConfigData);
- dmaapConfigs.add("dmaapProducerConfiguration", dmaapProducerConfigData);
-
- JsonObject ftpesConfigData = new JsonObject();
- ftpesConfigData.addProperty("keyCert", "config/dfc.jks");
- ftpesConfigData.addProperty("keyPassword", "secret");
- ftpesConfigData.addProperty("trustedCa", "config/ftp.jks");
- ftpesConfigData.addProperty("trustedCaPassword", "secret");
-
- JsonObject security = new JsonObject();
- security.addProperty("trustStorePath", "trustStorePath");
- security.addProperty("trustStorePasswordPath", "trustStorePasswordPath");
- security.addProperty("keyStorePath", "keyStorePath");
- security.addProperty("keyStorePasswordPath", "keyStorePasswordPath");
- security.addProperty("enableDmaapCertAuth", "enableDmaapCertAuth");
-
- JsonObject ftpesConfiguration = new JsonObject();
- ftpesConfiguration.add("ftpesConfiguration", ftpesConfigData);
-
- JsonObject configs = new JsonObject();
- configs.add("dmaap", dmaapConfigs);
- configs.add("ftp", ftpesConfiguration);
- configs.add("security", security);
-
- JsonObject completeJson = new JsonObject();
- completeJson.add("configs", configs);
-
- return completeJson.toString();
+ @Test
+ public void whenPeriodicConfigRefreshNoEnvironmentVariables() {
+ ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(AppConfig.class);
+
+ Flux<AppConfig> task = appConfigUnderTest.createRefreshConfigurationTask(1L, context);
+
+ StepVerifier //
+ .create(task) //
+ .expectSubscription() //
+ .expectNextCount(0) //
+ .verifyComplete();
+
+ assertTrue(logAppender.list.toString().contains("$CONSUL_HOST environment has not been defined"));
+ }
+
+ @Test
+ public void whenPeriodicConfigRefreshNoConsul() {
+ ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(AppConfig.class);
+
+ doReturn(Mono.just(properties())).when(appConfigUnderTest).readEnvironmentVariables(any(), any());
+ Mono<JsonObject> err = Mono.error(new IOException());
+ doReturn(err).when(cloudConfigurationProvider).callForServiceConfigurationReactive(any());
+
+ Flux<AppConfig> task = appConfigUnderTest.createRefreshConfigurationTask(1L, context);
+
+ StepVerifier //
+ .create(task) //
+ .expectSubscription() //
+ .expectNextCount(0) //
+ .verifyComplete();
+
+ assertTrue(logAppender.list.toString()
+ .contains("Could not refresh application configuration java.io.IOException"));
+ }
+
+ @Test
+ public void whenPeriodicConfigRefreshSuccess() throws JsonIOException, JsonSyntaxException, IOException {
+ doReturn(Mono.just(properties())).when(appConfigUnderTest).readEnvironmentVariables(any(), any());
+
+ Mono<JsonObject> json = Mono.just(getJsonRootObject());
+
+ doReturn(json, json).when(cloudConfigurationProvider).callForServiceConfigurationReactive(any());
+
+ Flux<AppConfig> task = appConfigUnderTest.createRefreshConfigurationTask(1L, context);
+
+ StepVerifier //
+ .create(task) //
+ .expectSubscription() //
+ .expectNext(appConfigUnderTest) //
+ .verifyComplete();
+
+ Assertions.assertNotNull(appConfigUnderTest.getDmaapConsumerConfiguration());
+ }
+
+ @Test
+ public void whenPeriodicConfigRefreshSuccess2() throws JsonIOException, JsonSyntaxException, IOException {
+ doReturn(Mono.just(properties())).when(appConfigUnderTest).readEnvironmentVariables(any(), any());
+
+ Mono<JsonObject> json = Mono.just(getJsonRootObject());
+ Mono<JsonObject> err = Mono.error(new IOException()); // no config entry created by the
+ // dmaap plugin
+
+ doReturn(json, err).when(cloudConfigurationProvider).callForServiceConfigurationReactive(any());
+
+ Flux<AppConfig> task = appConfigUnderTest.createRefreshConfigurationTask(1L, context);
+
+ StepVerifier //
+ .create(task) //
+ .expectSubscription() //
+ .expectNext(appConfigUnderTest) //
+ .verifyComplete();
+
+ Assertions.assertNotNull(appConfigUnderTest.getDmaapConsumerConfiguration());
+ }
+
+ private JsonObject getJsonRootObject() throws JsonIOException, JsonSyntaxException, IOException {
+ JsonObject rootObject = (new JsonParser()).parse(new InputStreamReader(getCorrectJson())).getAsJsonObject();
+ return rootObject;
+ }
+
+ private static InputStream getCorrectJson() throws IOException {
+ URL url = CloudConfigParser.class.getClassLoader().getResource("datafile_endpoints_test.json");
+ String string = Resources.toString(url, Charsets.UTF_8);
+ return new ByteArrayInputStream((string.getBytes(StandardCharsets.UTF_8)));
+ }
+
+ private static InputStream getCorrectJsonTwoProducers() throws IOException {
+ URL url = CloudConfigParser.class.getClassLoader().getResource("datafile_endpoints_test_2producers.json");
+ String string = Resources.toString(url, Charsets.UTF_8);
+ return new ByteArrayInputStream((string.getBytes(StandardCharsets.UTF_8)));
+ }
+
+ private static InputStream getIncorrectJson() {
+ String string = "{" + //
+ " \"configs\": {" + //
+ " \"dmaap\": {"; //
+ return new ByteArrayInputStream((string.getBytes(StandardCharsets.UTF_8)));
}
}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParserTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParserTest.java
deleted file mode 100644
index 07233d95..00000000
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/CloudConfigParserTest.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*-
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.configuration;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import com.google.gson.JsonObject;
-import org.junit.jupiter.api.Test;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapConsumerConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration;
-
-
-class CloudConfigParserTest {
- private static final ImmutableDmaapConsumerConfiguration CORRECT_DMAAP_CONSUMER_CONFIG = //
- new ImmutableDmaapConsumerConfiguration.Builder() //
- .timeoutMs(-1) //
- .dmaapHostName("message-router.onap.svc.cluster.local") //
- .dmaapUserName("admin") //
- .dmaapUserPassword("admin") //
- .dmaapTopicName("/events/unauthenticated.VES_NOTIFICATION_OUTPUT") //
- .dmaapPortNumber(2222) //
- .dmaapContentType("application/json") //
- .messageLimit(-1) //
- .dmaapProtocol("http") //
- .consumerId("C12") //
- .consumerGroup("OpenDCAE-c12") //
- .trustStorePath("trustStorePath") //
- .trustStorePasswordPath("trustStorePasswordPath") //
- .keyStorePath("keyStorePath") //
- .keyStorePasswordPath("keyStorePasswordPath") //
- .enableDmaapCertAuth(true) //
- .build();
-
- private static final ImmutableDmaapPublisherConfiguration CORRECT_DMAAP_PUBLISHER_CONFIG = //
- new ImmutableDmaapPublisherConfiguration.Builder() //
- .dmaapTopicName("publish") //
- .dmaapUserPassword("dradmin") //
- .dmaapPortNumber(3907) //
- .dmaapProtocol("https") //
- .dmaapContentType("application/json") //
- .dmaapHostName("message-router.onap.svc.cluster.local") //
- .dmaapUserName("dradmin") //
- .trustStorePath("trustStorePath") //
- .trustStorePasswordPath("trustStorePasswordPath") //
- .keyStorePath("keyStorePath") //
- .keyStorePasswordPath("keyStorePasswordPath") //
- .enableDmaapCertAuth(true) //
- .build();
-
- private static final ImmutableFtpesConfig CORRECT_FTPES_CONFIGURATION = //
- new ImmutableFtpesConfig.Builder() //
- .keyCert("/config/dfc.jks") //
- .keyPassword("secret") //
- .trustedCa("config/ftp.jks") //
- .trustedCaPassword("secret") //
- .build();
-
- private CloudConfigParser cloudConfigParser = new CloudConfigParser(getCloudConfigJsonObject());
-
- @Test
- public void shouldCreateDmaapConsumerConfigurationCorrectly() {
- DmaapConsumerConfiguration dmaapConsumerConfig = cloudConfigParser.getDmaapConsumerConfig();
-
- assertThat(dmaapConsumerConfig).isNotNull();
- assertThat(dmaapConsumerConfig).isEqualToComparingFieldByField(CORRECT_DMAAP_CONSUMER_CONFIG);
- }
-
- @Test
- public void shouldCreateDmaapPublisherConfigurationCorrectly() {
- DmaapPublisherConfiguration dmaapPublisherConfig = cloudConfigParser.getDmaapPublisherConfig();
-
- assertThat(dmaapPublisherConfig).isNotNull();
- assertThat(dmaapPublisherConfig).isEqualToComparingFieldByField(CORRECT_DMAAP_PUBLISHER_CONFIG);
- }
-
- @Test
- public void shouldCreateFtpesConfigurationCorrectly() {
- FtpesConfig ftpesConfig = cloudConfigParser.getFtpesConfig();
-
- assertThat(ftpesConfig).isNotNull();
- assertThat(ftpesConfig).isEqualToComparingFieldByField(CORRECT_FTPES_CONFIGURATION);
- }
-
- public JsonObject getCloudConfigJsonObject() {
- JsonObject config = new JsonObject();
- config.addProperty("dmaap.dmaapConsumerConfiguration.timeoutMs", -1);
- config.addProperty("dmaap.dmaapConsumerConfiguration.dmaapHostName", "message-router.onap.svc.cluster.local");
- config.addProperty("dmaap.dmaapConsumerConfiguration.dmaapUserName", "admin");
- config.addProperty("dmaap.dmaapConsumerConfiguration.dmaapUserPassword", "admin");
- config.addProperty("dmaap.dmaapConsumerConfiguration.dmaapTopicName",
- "/events/unauthenticated.VES_NOTIFICATION_OUTPUT");
- config.addProperty("dmaap.dmaapConsumerConfiguration.dmaapPortNumber", 2222);
- config.addProperty("dmaap.dmaapConsumerConfiguration.dmaapContentType", "application/json");
- config.addProperty("dmaap.dmaapConsumerConfiguration.messageLimit", -1);
- config.addProperty("dmaap.dmaapConsumerConfiguration.dmaapProtocol", "http");
- config.addProperty("dmaap.dmaapConsumerConfiguration.consumerId", "C12");
- config.addProperty("dmaap.dmaapConsumerConfiguration.consumerGroup", "OpenDCAE-c12");
- config.addProperty("dmaap.dmaapProducerConfiguration.dmaapTopicName", "publish");
- config.addProperty("dmaap.dmaapProducerConfiguration.dmaapProtocol", "https");
- config.addProperty("dmaap.dmaapProducerConfiguration.dmaapContentType", "application/json");
- config.addProperty("dmaap.dmaapProducerConfiguration.dmaapHostName", "message-router.onap.svc.cluster.local");
- config.addProperty("dmaap.dmaapProducerConfiguration.dmaapPortNumber", 3907);
- config.addProperty("dmaap.dmaapProducerConfiguration.dmaapUserName", "dradmin");
- config.addProperty("dmaap.dmaapProducerConfiguration.dmaapUserPassword", "dradmin");
- config.addProperty("dmaap.ftpesConfig.keyCert", "/config/dfc.jks");
- config.addProperty("dmaap.ftpesConfig.keyPassword", "secret");
- config.addProperty("dmaap.ftpesConfig.trustedCa", "config/ftp.jks");
- config.addProperty("dmaap.ftpesConfig.trustedCaPassword", "secret");
-
- config.addProperty("dmaap.security.trustStorePath", "trustStorePath");
- config.addProperty("dmaap.security.trustStorePasswordPath", "trustStorePasswordPath");
- config.addProperty("dmaap.security.keyStorePath", "keyStorePath");
- config.addProperty("dmaap.security.keyStorePasswordPath", "keyStorePasswordPath");
- config.addProperty("dmaap.security.enableDmaapCertAuth", "true");
-
- return config;
- }
-}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfigTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfigTest.java
index 6e2140b4..eba88c33 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfigTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/configuration/SchedulerConfigTest.java
@@ -25,6 +25,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
@@ -35,26 +36,40 @@ import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ScheduledFuture;
+
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.onap.dcaegen2.collectors.datafile.tasks.ScheduledTasks;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.scheduling.TaskScheduler;
+
import reactor.test.StepVerifier;
public class SchedulerConfigTest {
+ private final AppConfig appConfigurationMock = mock(AppConfig.class);
+ private final TaskScheduler taskSchedulerMock = mock(TaskScheduler.class);
+ private final ScheduledTasks scheduledTasksMock = mock(ScheduledTasks.class);
+ private final SchedulerConfig schedulerUnderTest =
+ spy(new SchedulerConfig(taskSchedulerMock, scheduledTasksMock, appConfigurationMock));
+
+ @BeforeEach
+ public void setUp() {
+ doNothing().when(appConfigurationMock).stop();
+ doNothing().when(appConfigurationMock).initialize();
+ }
+
@Test
public void getResponseFromCancellationOfTasks_success() {
+
List<ScheduledFuture<?>> scheduledFutureList = new ArrayList<>();
ScheduledFuture<?> scheduledFutureMock = mock(ScheduledFuture.class);
scheduledFutureList.add(scheduledFutureMock);
SchedulerConfig.setScheduledFutureList(scheduledFutureList);
- SchedulerConfig schedulerUnderTest = new SchedulerConfig(null, null, null);
-
String msg = "Datafile Service has already been stopped!";
StepVerifier.create(schedulerUnderTest.getResponseFromCancellationOfTasks())
.expectNext(new ResponseEntity<String>(msg, HttpStatus.CREATED)) //
@@ -68,24 +83,17 @@ public class SchedulerConfigTest {
@Test
public void tryToStartTaskWhenNotStarted_success() {
- TaskScheduler taskSchedulerMock = mock(TaskScheduler.class);
- ScheduledTasks scheduledTasksMock = mock(ScheduledTasks.class);
- CloudConfiguration cloudConfigurationMock = mock(CloudConfiguration.class);
List<ScheduledFuture<?>> scheduledFutureList = new ArrayList<>();
SchedulerConfig.setScheduledFutureList(scheduledFutureList);
SchedulerConfig schedulerUnderTestSpy =
- spy(new SchedulerConfig(taskSchedulerMock, scheduledTasksMock, cloudConfigurationMock));
+ spy(new SchedulerConfig(taskSchedulerMock, scheduledTasksMock, appConfigurationMock));
boolean actualResult = schedulerUnderTestSpy.tryToStartTask();
assertTrue(actualResult);
- ArgumentCaptor<Runnable> runTaskRunnableCaptor = ArgumentCaptor.forClass(Runnable.class);
- verify(taskSchedulerMock).scheduleAtFixedRate(runTaskRunnableCaptor.capture(), any(Instant.class),
- eq(Duration.ofMinutes(5)));
-
ArgumentCaptor<Runnable> scheduleMainDatafileEventTaskCaptor = ArgumentCaptor.forClass(Runnable.class);
verify(taskSchedulerMock).scheduleWithFixedDelay(scheduleMainDatafileEventTaskCaptor.capture(),
eq(Duration.ofSeconds(15)));
@@ -100,22 +108,22 @@ public class SchedulerConfigTest {
verify(scheduledTasksMock).executeDatafileMainTask();
verifyNoMoreInteractions(scheduledTasksMock);
- runTaskRunnableCaptor.getValue().run();
- verify(cloudConfigurationMock).runTask();
- verifyNoMoreInteractions(cloudConfigurationMock);
+ verify(appConfigurationMock).initialize();
+ verifyNoMoreInteractions(appConfigurationMock);
- assertEquals(3, scheduledFutureList.size());
+ assertEquals(2, scheduledFutureList.size());
}
@Test
public void tryToStartTaskWhenAlreadyStarted_shouldReturnFalse() {
+ doNothing().when(appConfigurationMock).loadConfigurationFromFile();
List<ScheduledFuture<?>> scheduledFutureList = new ArrayList<>();
ScheduledFuture<?> scheduledFutureMock = mock(ScheduledFuture.class);
scheduledFutureList.add(scheduledFutureMock);
SchedulerConfig.setScheduledFutureList(scheduledFutureList);
- SchedulerConfig schedulerUnderTest = new SchedulerConfig(null, null, null);
+ SchedulerConfig schedulerUnderTest = new SchedulerConfig(null, null, appConfigurationMock);
boolean actualResult = schedulerUnderTest.tryToStartTask();
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/integration/ScheduledXmlContextITest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/integration/ScheduledXmlContextITest.java
deleted file mode 100644
index 7f6b8c51..00000000
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/integration/ScheduledXmlContextITest.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*-
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.integration;
-
-import static org.mockito.Mockito.atLeast;
-import static org.mockito.Mockito.verify;
-
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.onap.dcaegen2.collectors.datafile.tasks.ScheduledTasks;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.context.annotation.ComponentScan;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.test.context.ContextConfiguration;
-import org.springframework.test.context.junit.jupiter.SpringExtension;
-import org.springframework.test.context.testng.AbstractTestNGSpringContextTests;
-
-/**
- * Integration test for the ScheduledXmlContext.
- *
- * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/27/18
- * @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
- */
-
-@Configuration
-@ComponentScan
-@ExtendWith({ SpringExtension.class })
-@ContextConfiguration(locations = { "classpath:scheduled-context.xml" })
-class ScheduledXmlContextITest extends AbstractTestNGSpringContextTests {
-
- private static final int WAIT_FOR_SCHEDULING = 1;
-
- @Autowired
- private ScheduledTasks scheduledTask;
-
- @Test
- void testScheduling() {
- final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
- executorService.scheduleWithFixedDelay(this::verifyDmaapConsumerTask, 0, WAIT_FOR_SCHEDULING, TimeUnit.SECONDS);
- }
-
- private void verifyDmaapConsumerTask() {
- verify(scheduledTask, atLeast(1)).executeDatafileMainTask();
- }
-}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformationTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformationTest.java
deleted file mode 100644
index 83c92ef4..00000000
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/model/FilePublishInformationTest.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*-
- * ============LICENSE_START======================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property, 2018-2019 Nordix Foundation. All rights reserved.
- * ===============================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END========================================================================
- */
-
-package org.onap.dcaegen2.collectors.datafile.model;
-
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.HashMap;
-
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-
-public class FilePublishInformationTest {
- private static final String PRODUCT_NAME = "NrRadio";
- private static final String VENDOR_NAME = "Ericsson";
- private static final String LAST_EPOCH_MICROSEC = "8745745764578";
- private static final String SOURCE_NAME = "oteNB5309";
- private static final String START_EPOCH_MICROSEC = "8745745764578";
- private static final String TIME_ZONE_OFFSET = "UTC+05:00";
- private static final String NAME = "A20161224.1030-1045.bin.gz";
- private static final String LOCATION = "ftpes://192.168.0.101:22/ftp/rop/A20161224.1030-1145.bin.gz";
- private static final Path INTERNAL_LOCATION = Paths.get("target/A20161224.1030-1045.bin.gz");
- private static final String COMPRESSION = "gzip";
- private static final String FILE_FORMAT_TYPE = "org.3GPP.32.435#measCollec";
- private static final String FILE_FORMAT_VERSION = "V10";
-
- @Test
- public void filePublishInformationBuilder_shouldBuildAnObject() {
- FilePublishInformation filePublishInformation = ImmutableFilePublishInformation.builder() //
- .productName(PRODUCT_NAME) //
- .vendorName(VENDOR_NAME) //
- .lastEpochMicrosec(LAST_EPOCH_MICROSEC) //
- .sourceName(SOURCE_NAME) //
- .startEpochMicrosec(START_EPOCH_MICROSEC) //
- .timeZoneOffset(TIME_ZONE_OFFSET) //
- .name(NAME) //
- .location(LOCATION) //
- .internalLocation(INTERNAL_LOCATION) //
- .compression(COMPRESSION) //
- .fileFormatType(FILE_FORMAT_TYPE) //
- .fileFormatVersion(FILE_FORMAT_VERSION) //
- .context(new HashMap<String,String>()) //
- .build();
-
- Assertions.assertNotNull(filePublishInformation);
- Assertions.assertEquals(PRODUCT_NAME, filePublishInformation.getProductName());
- Assertions.assertEquals(VENDOR_NAME, filePublishInformation.getVendorName());
- Assertions.assertEquals(LAST_EPOCH_MICROSEC, filePublishInformation.getLastEpochMicrosec());
- Assertions.assertEquals(SOURCE_NAME, filePublishInformation.getSourceName());
- Assertions.assertEquals(START_EPOCH_MICROSEC, filePublishInformation.getStartEpochMicrosec());
- Assertions.assertEquals(TIME_ZONE_OFFSET, filePublishInformation.getTimeZoneOffset());
- Assertions.assertEquals(NAME, filePublishInformation.getName());
- Assertions.assertEquals(LOCATION, filePublishInformation.getLocation());
- Assertions.assertEquals(INTERNAL_LOCATION, filePublishInformation.getInternalLocation());
- Assertions.assertEquals(COMPRESSION, filePublishInformation.getCompression());
- Assertions.assertEquals(FILE_FORMAT_TYPE, filePublishInformation.getFileFormatType());
- Assertions.assertEquals(FILE_FORMAT_VERSION, filePublishInformation.getFileFormatVersion());
- }
-}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java
index becfba31..8c7938bf 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/JsonMessageParserTest.java
@@ -22,9 +22,12 @@ import static org.mockito.Mockito.spy;
import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
+
+import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
+
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
@@ -36,6 +39,7 @@ import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData;
import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage;
import org.onap.dcaegen2.collectors.datafile.utils.JsonMessage.AdditionalField;
+
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
@@ -65,7 +69,7 @@ class JsonMessageParserTest {
private static final String NOTIFICATION_FIELDS_VERSION = "1.0";
@Test
- void whenPassingCorrectJson_oneFileReadyMessage() {
+ void whenPassingCorrectJson_oneFileReadyMessage() throws URISyntaxException {
AdditionalField additionalField = new JsonMessage.AdditionalFieldBuilder() //
.name(PM_FILE_NAME) //
.location(LOCATION) //
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClientTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClientTest.java
index e21bbd7b..a71521cd 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClientTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/service/producer/DmaapProducerHttpClientTest.java
@@ -26,7 +26,7 @@ import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
-import java.net.URI;
+
import java.nio.charset.StandardCharsets;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
@@ -35,7 +35,9 @@ import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Future;
+
import javax.net.ssl.SSLContext;
+
import org.apache.commons.codec.binary.Base64;
import org.apache.http.Header;
import org.apache.http.HttpResponse;
@@ -192,10 +194,4 @@ class DmaapProducerHttpClientTest {
Header[] authorizationHeaders = request.getHeaders("Authorization");
assertEquals(base64Creds, authorizationHeaders[0].getValue());
}
-
- @Test
- public void getBaseUri_success() {
- URI uri = producerClientUnderTestSpy.getBaseUri().build();
- assertEquals(HTTPS_SCHEME + "://" + HOST + ":" + PORT, uri.toString());
- }
}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTest.java
index 5e737253..574ad18e 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DMaaPMessageConsumerTest.java
@@ -77,6 +77,7 @@ public class DMaaPMessageConsumerTest {
private static final String MEAS_COLLECT_FILE_FORMAT_TYPE = "org.3GPP.32.435#measCollec";
private static final String FILE_FORMAT_VERSION = "V10";
private static List<FilePublishInformation> listOfFilePublishInformation = new ArrayList<FilePublishInformation>();
+ private static final String CHANGE_IDENTIFIER = "PM_MEAS_FILES";
private DMaaPConsumerReactiveHttpClient httpClientMock;
@@ -173,6 +174,7 @@ public class DMaaPMessageConsumerTest {
.compression(GZIP_COMPRESSION) //
.fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) //
.fileFormatVersion(FILE_FORMAT_VERSION) //
+ .changeIdentifier(CHANGE_IDENTIFIER) //
.context(new HashMap<String,String>()) //
.build();
listOfFilePublishInformation.add(filePublishInformation);
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java
index 8f768d38..463c62c9 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/DataRouterPublisherTest.java
@@ -47,14 +47,12 @@ import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
+import org.onap.dcaegen2.collectors.datafile.configuration.PublisherConfiguration;
import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation;
import org.onap.dcaegen2.collectors.datafile.model.ImmutableFilePublishInformation;
import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerHttpClient;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
import org.springframework.http.HttpStatus;
-import org.springframework.web.util.DefaultUriBuilderFactory;
-import org.springframework.web.util.UriBuilder;
import reactor.test.StepVerifier;
@@ -65,6 +63,7 @@ import reactor.test.StepVerifier;
* @author <a href="mailto:henrik.b.andersson@est.tech">Henrik Andersson</a>
*/
class DataRouterPublisherTest {
+
private static final String PRODUCT_NAME = "NrRadio";
private static final String VENDOR_NAME = "Ericsson";
private static final String LAST_EPOCH_MICROSEC = "8745745764578";
@@ -73,6 +72,7 @@ class DataRouterPublisherTest {
private static final String TIME_ZONE_OFFSET = "UTC+05:00";
private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz";
private static final String FTPES_ADDRESS = "ftpes://192.168.0.101:22/ftp/rop/" + PM_FILE_NAME;
+ private static final String CHANGE_IDENTIFIER = "PM_MEAS_FILES";
private static final String COMPRESSION = "gzip";
private static final String FILE_FORMAT_TYPE = "org.3GPP.32.435#measCollec";
@@ -90,15 +90,17 @@ class DataRouterPublisherTest {
private static FilePublishInformation filePublishInformation;
private static DmaapProducerHttpClient httpClientMock;
private static AppConfig appConfig;
- private static DmaapPublisherConfiguration publisherConfigurationMock = mock(DmaapPublisherConfiguration.class);
+ private static PublisherConfiguration publisherConfigurationMock = mock(PublisherConfiguration.class);
private static Map<String, String> context = new HashMap<>();
private static DataRouterPublisher publisherTaskUnderTestSpy;
+ // "https://54.45.333.2:1234/publish/1";
+ private static final String PUBLISH_URL =
+ HTTPS_SCHEME + "://" + HOST + ":" + PORT + "/" + PUBLISH_TOPIC + "/" + FEED_ID;
+
@BeforeAll
public static void setUp() {
- when(publisherConfigurationMock.dmaapHostName()).thenReturn(HOST);
- when(publisherConfigurationMock.dmaapProtocol()).thenReturn(HTTPS_SCHEME);
- when(publisherConfigurationMock.dmaapPortNumber()).thenReturn(PORT);
+ when(publisherConfigurationMock.publishUrl()).thenReturn(PUBLISH_URL);
filePublishInformation = ImmutableFilePublishInformation.builder() //
.productName(PRODUCT_NAME) //
@@ -114,6 +116,7 @@ class DataRouterPublisherTest {
.fileFormatType(FILE_FORMAT_TYPE) //
.fileFormatVersion(FILE_FORMAT_VERSION) //
.context(context) //
+ .changeIdentifier(CHANGE_IDENTIFIER) //
.build(); //
appConfig = mock(AppConfig.class);
publisherTaskUnderTestSpy = spy(new DataRouterPublisher(appConfig));
@@ -128,7 +131,6 @@ class DataRouterPublisherTest {
.verifyComplete();
ArgumentCaptor<HttpUriRequest> requestCaptor = ArgumentCaptor.forClass(HttpUriRequest.class);
- verify(httpClientMock).getBaseUri();
verify(httpClientMock).addUserCredentialsToHead(any(HttpUriRequest.class));
verify(httpClientMock).getDmaapProducerResponseWithRedirect(requestCaptor.capture(), any());
verifyNoMoreInteractions(httpClientMock);
@@ -138,6 +140,7 @@ class DataRouterPublisherTest {
assertEquals(HTTPS_SCHEME, actualUri.getScheme());
assertEquals(HOST, actualUri.getHost());
assertEquals(PORT, actualUri.getPort());
+
Path actualPath = Paths.get(actualUri.getPath());
assertTrue(PUBLISH_TOPIC.equals(actualPath.getName(0).toString()));
assertTrue(FEED_ID.equals(actualPath.getName(1).toString()));
@@ -160,7 +163,8 @@ class DataRouterPublisherTest {
assertEquals(FILE_FORMAT_TYPE, metaHash.get("fileFormatType"));
assertEquals(FILE_FORMAT_VERSION, metaHash.get("fileFormatVersion"));
- // Note that the following line checks the number of properties that are sent to the data router.
+ // Note that the following line checks the number of properties that are sent to the data
+ // router.
// This should be 10 unless the API is updated (which is the fields checked above)
assertEquals(10, metaHash.size());
}
@@ -185,7 +189,6 @@ class DataRouterPublisherTest {
.expectNext(filePublishInformation) //
.verifyComplete();
- verify(httpClientMock, times(2)).getBaseUri();
verify(httpClientMock, times(2)).addUserCredentialsToHead(any(HttpUriRequest.class));
verify(httpClientMock, times(2)).getDmaapProducerResponseWithRedirect(any(HttpUriRequest.class), any());
verifyNoMoreInteractions(httpClientMock);
@@ -201,7 +204,6 @@ class DataRouterPublisherTest {
.expectErrorMessage("Retries exhausted: 1/1") //
.verify();
- verify(httpClientMock, times(2)).getBaseUri();
verify(httpClientMock, times(2)).addUserCredentialsToHead(any(HttpUriRequest.class));
verify(httpClientMock, times(2)).getDmaapProducerResponseWithRedirect(any(HttpUriRequest.class), any());
verifyNoMoreInteractions(httpClientMock);
@@ -211,12 +213,9 @@ class DataRouterPublisherTest {
final void prepareMocksForTests(Exception exception, Integer firstResponse, Integer... nextHttpResponses)
throws Exception {
httpClientMock = mock(DmaapProducerHttpClient.class);
- when(appConfig.getDmaapPublisherConfiguration()).thenReturn(publisherConfigurationMock);
- doReturn(publisherConfigurationMock).when(publisherTaskUnderTestSpy).resolveConfiguration();
- doReturn(httpClientMock).when(publisherTaskUnderTestSpy).resolveClient();
-
- UriBuilder uriBuilder = new DefaultUriBuilderFactory().builder().scheme(HTTPS_SCHEME).host(HOST).port(PORT);
- when(httpClientMock.getBaseUri()).thenReturn(uriBuilder);
+ when(appConfig.getPublisherConfiguration(CHANGE_IDENTIFIER)).thenReturn(publisherConfigurationMock);
+ doReturn(publisherConfigurationMock).when(publisherTaskUnderTestSpy).resolveConfiguration(CHANGE_IDENTIFIER);
+ doReturn(httpClientMock).when(publisherTaskUnderTestSpy).resolveClient(CHANGE_IDENTIFIER);
HttpResponse httpResponseMock = mock(HttpResponse.class);
if (exception == null) {
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java
index cad3486d..299a0238 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/FileCollectorTest.java
@@ -83,6 +83,7 @@ public class FileCollectorTest {
private static final String FTP_KEY_PASSWORD = "ftpKeyPassword";
private static final String TRUSTED_CA_PATH = "trustedCAPath";
private static final String TRUSTED_CA_PASSWORD = "trustedCAPassword";
+ private static final String CHANGE_IDENTIFIER = "PM_MEAS_FILES";
private static AppConfig appConfigMock = mock(AppConfig.class);
private static FtpesConfig ftpesConfigMock = mock(FtpesConfig.class);
@@ -132,7 +133,8 @@ public class FileCollectorTest {
.compression(GZIP_COMPRESSION) //
.fileFormatType(MEAS_COLLECT_FILE_FORMAT_TYPE) //
.fileFormatVersion(FILE_FORMAT_VERSION) //
- .context(new HashMap<String,String>())
+ .context(new HashMap<String,String>()) //
+ .changeIdentifier(CHANGE_IDENTIFIER) //
.build();
}
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedCheckerTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedCheckerTest.java
index 83643637..44755814 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedCheckerTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/PublishedCheckerTest.java
@@ -34,10 +34,9 @@ import static org.mockito.Mockito.when;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.net.URI;
-import java.nio.file.Path;
-import java.nio.file.Paths;
import java.util.HashMap;
import java.util.Map;
+
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.StatusLine;
@@ -47,55 +46,47 @@ import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
+import org.onap.dcaegen2.collectors.datafile.configuration.PublisherConfiguration;
import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.service.HttpUtils;
import org.onap.dcaegen2.collectors.datafile.service.producer.DmaapProducerHttpClient;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
-import org.springframework.web.util.DefaultUriBuilderFactory;
-import org.springframework.web.util.UriBuilder;
public class PublishedCheckerTest {
+ private static final String PUBLISH_URL = "https://54.45.33.2:1234/";
private static final String EMPTY_CONTENT = "[]";
- private static final String FEEDLOG_TOPIC = "feedlog";
- private static final String FEED_ID = "1";
- private static final String HTTPS_SCHEME = "https";
- private static final String HOST = "54.45.33.2";
- private static final int PORT = 1234;
private static final String SOURCE_NAME = "oteNB5309";
private static final String FILE_NAME = "A20161224.1030-1045.bin.gz";
private static final String LOCAL_FILE_NAME = SOURCE_NAME + "_" + FILE_NAME;
+ private static final String CHANGE_IDENTIFIER = "PM_MEAS_FILES";
+ private static final String LOG_URI = "https://localhost:3907/feedlog/1";
private static final Map<String, String> CONTEXT_MAP = new HashMap<>();
- private static DmaapPublisherConfiguration publisherConfigurationMock = mock(DmaapPublisherConfiguration.class);
+ private static PublisherConfiguration publisherConfigurationMock = mock(PublisherConfiguration.class);
private static AppConfig appConfigMock;
private DmaapProducerHttpClient httpClientMock = mock(DmaapProducerHttpClient.class);
private PublishedChecker publishedCheckerUnderTestSpy;
- /**
- * Sets up data for the tests.
- */
+
@BeforeAll
- public static void setUp() {
- when(publisherConfigurationMock.dmaapHostName()).thenReturn(HOST);
- when(publisherConfigurationMock.dmaapProtocol()).thenReturn(HTTPS_SCHEME);
- when(publisherConfigurationMock.dmaapPortNumber()).thenReturn(PORT);
+ public static void setUp() throws DatafileTaskException {
+ when(publisherConfigurationMock.publishUrl()).thenReturn(PUBLISH_URL);
appConfigMock = mock(AppConfig.class);
- when(appConfigMock.getDmaapPublisherConfiguration()).thenReturn(publisherConfigurationMock);
+ when(appConfigMock.getPublisherConfiguration(CHANGE_IDENTIFIER)).thenReturn(publisherConfigurationMock);
}
@Test
public void executeWhenNotPublished_returnsFalse() throws Exception {
prepareMocksForTests(HttpUtils.SC_OK, EMPTY_CONTENT, null);
- boolean isPublished = publishedCheckerUnderTestSpy.isFilePublished(LOCAL_FILE_NAME, CONTEXT_MAP);
+ boolean isPublished =
+ publishedCheckerUnderTestSpy.isFilePublished(LOCAL_FILE_NAME, CHANGE_IDENTIFIER, CONTEXT_MAP);
assertFalse(isPublished);
ArgumentCaptor<HttpUriRequest> requestCaptor = ArgumentCaptor.forClass(HttpUriRequest.class);
- verify(httpClientMock).getBaseUri();
verify(httpClientMock).addUserCredentialsToHead(any(HttpUriRequest.class));
verify(httpClientMock).getDmaapProducerResponseWithCustomTimeout(requestCaptor.capture(), any(), any());
verifyNoMoreInteractions(httpClientMock);
@@ -103,22 +94,17 @@ public class PublishedCheckerTest {
HttpUriRequest getRequest = requestCaptor.getValue();
assertTrue(getRequest instanceof HttpGet);
URI actualUri = getRequest.getURI();
- assertEquals(HTTPS_SCHEME, actualUri.getScheme());
- assertEquals(HOST, actualUri.getHost());
- assertEquals(PORT, actualUri.getPort());
- Path actualPath = Paths.get(actualUri.getPath());
- assertTrue(FEEDLOG_TOPIC.equals(actualPath.getName(0).toString()));
- assertTrue(FEED_ID.equals(actualPath.getName(1).toString()));
- String actualQuery = actualUri.getQuery();
- assertTrue(actualQuery.contains("type=pub"));
- assertTrue(actualQuery.contains("filename=" + LOCAL_FILE_NAME));
+ // https://localhost:3907/feedlog/1?type=pub&filename=oteNB5309_A20161224.1030-1045.bin.gz
+ String expUri = LOG_URI + "?type=pub&filename=" + LOCAL_FILE_NAME;
+ assertEquals(expUri, actualUri.toString());
}
@Test
public void executeWhenDataRouterReturnsNok_returnsFalse() throws Exception {
prepareMocksForTests(HttpUtils.SC_BAD_REQUEST, EMPTY_CONTENT, null);
- boolean isPublished = publishedCheckerUnderTestSpy.isFilePublished(LOCAL_FILE_NAME, CONTEXT_MAP);
+ boolean isPublished =
+ publishedCheckerUnderTestSpy.isFilePublished(LOCAL_FILE_NAME, CHANGE_IDENTIFIER, CONTEXT_MAP);
assertFalse(isPublished);
}
@@ -127,7 +113,8 @@ public class PublishedCheckerTest {
public void executeWhenPublished_returnsTrue() throws Exception {
prepareMocksForTests(HttpUtils.SC_OK, "[" + LOCAL_FILE_NAME + "]", null);
- boolean isPublished = publishedCheckerUnderTestSpy.isFilePublished(LOCAL_FILE_NAME, CONTEXT_MAP);
+ boolean isPublished =
+ publishedCheckerUnderTestSpy.isFilePublished(LOCAL_FILE_NAME, CHANGE_IDENTIFIER, CONTEXT_MAP);
assertTrue(isPublished);
}
@@ -136,7 +123,8 @@ public class PublishedCheckerTest {
public void executeWhenErrorInDataRouter_returnsFalse() throws Exception {
prepareMocksForTests(HttpUtils.SC_OK, EMPTY_CONTENT, new DatafileTaskException(""));
- boolean isPublished = publishedCheckerUnderTestSpy.isFilePublished(LOCAL_FILE_NAME, CONTEXT_MAP);
+ boolean isPublished =
+ publishedCheckerUnderTestSpy.isFilePublished(LOCAL_FILE_NAME, CHANGE_IDENTIFIER, CONTEXT_MAP);
assertFalse(isPublished);
}
@@ -144,11 +132,9 @@ public class PublishedCheckerTest {
final void prepareMocksForTests(int responseCode, String content, Exception exception) throws Exception {
publishedCheckerUnderTestSpy = spy(new PublishedChecker(appConfigMock));
- doReturn(publisherConfigurationMock).when(publishedCheckerUnderTestSpy).resolveConfiguration();
- doReturn(httpClientMock).when(publishedCheckerUnderTestSpy).resolveClient();
-
- UriBuilder uriBuilder = new DefaultUriBuilderFactory().builder().scheme(HTTPS_SCHEME).host(HOST).port(PORT);
- when(httpClientMock.getBaseUri()).thenReturn(uriBuilder);
+ doReturn(publisherConfigurationMock).when(publishedCheckerUnderTestSpy).resolveConfiguration(CHANGE_IDENTIFIER);
+ doReturn(LOG_URI).when(publisherConfigurationMock).logUrl();
+ doReturn(httpClientMock).when(publishedCheckerUnderTestSpy).resolveClient(publisherConfigurationMock);
HttpResponse httpResponseMock = mock(HttpResponse.class);
if (exception == null) {
diff --git a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java
index 0d5a4231..a1021868 100644
--- a/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java
+++ b/datafile-app-server/src/test/java/org/onap/dcaegen2/collectors/datafile/tasks/ScheduledTasksTest.java
@@ -42,6 +42,11 @@ import java.util.Map;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.onap.dcaegen2.collectors.datafile.configuration.AppConfig;
+import org.onap.dcaegen2.collectors.datafile.configuration.ConsumerConfiguration;
+import org.onap.dcaegen2.collectors.datafile.configuration.ImmutableConsumerConfiguration;
+import org.onap.dcaegen2.collectors.datafile.configuration.ImmutablePublisherConfiguration;
+import org.onap.dcaegen2.collectors.datafile.configuration.PublisherConfiguration;
+import org.onap.dcaegen2.collectors.datafile.exceptions.DatafileTaskException;
import org.onap.dcaegen2.collectors.datafile.ftp.Scheme;
import org.onap.dcaegen2.collectors.datafile.model.FileData;
import org.onap.dcaegen2.collectors.datafile.model.FilePublishInformation;
@@ -51,8 +56,6 @@ import org.onap.dcaegen2.collectors.datafile.model.ImmutableFilePublishInformati
import org.onap.dcaegen2.collectors.datafile.model.ImmutableFileReadyMessage;
import org.onap.dcaegen2.collectors.datafile.model.ImmutableMessageMetaData;
import org.onap.dcaegen2.collectors.datafile.model.MessageMetaData;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@@ -61,6 +64,7 @@ import reactor.test.StepVerifier;
public class ScheduledTasksTest {
private static final String PM_FILE_NAME = "A20161224.1030-1045.bin.gz";
+ private static final String CHANGE_IDENTIFIER = "PM_MEAS_FILES";
private AppConfig appConfig = mock(AppConfig.class);
private ScheduledTasks testedObject = spy(new ScheduledTasks(appConfig));
@@ -72,23 +76,33 @@ public class ScheduledTasksTest {
private DataRouterPublisher dataRouterMock;
private Map<String, String> contextMap = new HashMap<String, String>();
+ private final String publishUrl = "https://54.45.33.2:1234/unauthenticated.VES_NOTIFICATION_OUTPUT";
+
@BeforeEach
- private void setUp() {
- DmaapPublisherConfiguration dmaapPublisherConfiguration = new ImmutableDmaapPublisherConfiguration.Builder() //
- .dmaapContentType("application/json") //
- .dmaapHostName("54.45.33.2") //
- .dmaapPortNumber(1234) //
- .dmaapProtocol("https") //
- .dmaapUserName("DFC") //
- .dmaapUserPassword("DFC") //
- .dmaapTopicName("unauthenticated.VES_NOTIFICATION_OUTPUT") //
+ private void setUp() throws DatafileTaskException {
+ final PublisherConfiguration dmaapPublisherConfiguration = ImmutablePublisherConfiguration.builder() //
+ .publishUrl(publishUrl) //
+ .logUrl("") //
+ .userName("userName") //
+ .passWord("passWord") //
.trustStorePath("trustStorePath") //
.trustStorePasswordPath("trustStorePasswordPath") //
.keyStorePath("keyStorePath") //
.keyStorePasswordPath("keyStorePasswordPath") //
.enableDmaapCertAuth(true) //
+ .changeIdentifier(CHANGE_IDENTIFIER) //
.build(); //
- doReturn(dmaapPublisherConfiguration).when(appConfig).getDmaapPublisherConfiguration();
+ final ConsumerConfiguration dmaapConsumerConfiguration = ImmutableConsumerConfiguration.builder() //
+ .topicUrl("topicUrl").trustStorePath("trustStorePath") //
+ .trustStorePasswordPath("trustStorePasswordPath") //
+ .keyStorePath("keyStorePath") //
+ .keyStorePasswordPath("keyStorePasswordPath") //
+ .enableDmaapCertAuth(true) //
+ .build();
+
+ doReturn(dmaapPublisherConfiguration).when(appConfig).getPublisherConfiguration(CHANGE_IDENTIFIER);
+ doReturn(dmaapConsumerConfiguration).when(appConfig).getDmaapConsumerConfiguration();
+ doReturn(true).when(appConfig).isFeedConfigured(CHANGE_IDENTIFIER);
consumerMock = mock(DMaaPMessageConsumer.class);
publishedCheckerMock = mock(PublishedChecker.class);
@@ -109,7 +123,7 @@ public class ScheduledTasksTest {
.sourceName("") //
.startEpochMicrosec("") //
.timeZoneOffset("") //
- .changeIdentifier("") //
+ .changeIdentifier(CHANGE_IDENTIFIER) //
.changeType("") //
.build();
}
@@ -164,11 +178,12 @@ public class ScheduledTasksTest {
.compression("") //
.fileFormatType("") //
.fileFormatVersion("") //
+ .changeIdentifier(CHANGE_IDENTIFIER) //
.context(new HashMap<String, String>()).build();
}
@Test
- public void notingToConsume() {
+ public void notingToConsume() throws DatafileTaskException {
doReturn(consumerMock).when(testedObject).createConsumerTask();
doReturn(Flux.empty()).when(consumerMock).getMessageRouterResponse();
@@ -180,7 +195,7 @@ public class ScheduledTasksTest {
}
@Test
- public void consume_successfulCase() {
+ public void consume_successfulCase() throws DatafileTaskException {
final int noOfEvents = 200;
final int noOfFilesPerEvent = 200;
final int noOfFiles = noOfEvents * noOfFilesPerEvent;
@@ -188,7 +203,7 @@ public class ScheduledTasksTest {
Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, true);
doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse();
- doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), any());
+ doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), anyString(), any());
Mono<FilePublishInformation> collectedFile = Mono.just(filePublishInformation());
doReturn(collectedFile).when(fileCollectorMock).collectFile(notNull(), anyLong(), notNull(), notNull());
@@ -212,11 +227,11 @@ public class ScheduledTasksTest {
}
@Test
- public void consume_fetchFailedOnce() {
+ public void consume_fetchFailedOnce() throws DatafileTaskException {
Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(2, 2, true); // 4 files
doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse();
- doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), any());
+ doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), anyString(), any());
Mono<FilePublishInformation> collectedFile = Mono.just(filePublishInformation());
Mono<Object> error = Mono.error(new Exception("problem"));
@@ -246,12 +261,12 @@ public class ScheduledTasksTest {
}
@Test
- public void consume_publishFailedOnce() {
+ public void consume_publishFailedOnce() throws DatafileTaskException {
Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(2, 2, true); // 4 files
doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse();
- doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), any());
+ doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), anyString(), any());
Mono<FilePublishInformation> collectedFile = Mono.just(filePublishInformation());
doReturn(collectedFile).when(fileCollectorMock).collectFile(notNull(), anyLong(), notNull(), notNull());
@@ -279,7 +294,7 @@ public class ScheduledTasksTest {
}
@Test
- public void consume_successfulCase_sameFileNames() {
+ public void consume_successfulCase_sameFileNames() throws DatafileTaskException {
final int noOfEvents = 1;
final int noOfFilesPerEvent = 100;
@@ -287,7 +302,7 @@ public class ScheduledTasksTest {
Flux<FileReadyMessage> fileReadyMessages = fileReadyMessageFlux(noOfEvents, noOfFilesPerEvent, false);
doReturn(fileReadyMessages).when(consumerMock).getMessageRouterResponse();
- doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), any());
+ doReturn(false).when(publishedCheckerMock).isFilePublished(anyString(), anyString(), any());
Mono<FilePublishInformation> collectedFile = Mono.just(filePublishInformation());
doReturn(collectedFile).when(fileCollectorMock).collectFile(notNull(), anyLong(), notNull(), notNull());
@@ -303,7 +318,7 @@ public class ScheduledTasksTest {
verify(consumerMock, times(1)).getMessageRouterResponse();
verify(fileCollectorMock, times(1)).collectFile(notNull(), anyLong(), notNull(), notNull());
verify(dataRouterMock, times(1)).publishFile(notNull(), anyLong(), notNull());
- verify(publishedCheckerMock, times(1)).isFilePublished(notNull(), notNull());
+ verify(publishedCheckerMock, times(1)).isFilePublished(notNull(), anyString(), notNull());
verifyNoMoreInteractions(dataRouterMock);
verifyNoMoreInteractions(fileCollectorMock);
verifyNoMoreInteractions(consumerMock);