summaryrefslogtreecommitdiffstats
path: root/prh-app-server/src/test/java/org
diff options
context:
space:
mode:
authorpranitk1905 <pranit.kapdule@t-systems.com>2023-09-01 14:52:39 +0530
committerpranitk1905 <pranit.kapdule@t-systems.com>2023-09-04 13:25:24 +0530
commitd82c53bd799b22660be17219da516415d4c56b46 (patch)
tree852f840dfaf3947a5f3c6266beb16ece581a7757 /prh-app-server/src/test/java/org
parentbe165c3b701c8c08b4ad4a895d988b33e14a01e0 (diff)
[DCAEGEN2] Pass autoCommitDisabled mode configuration via CBSContentParser
Pass autoCommitDisabled mode configuration via CBSContentParser instead of environment variables. Issue-ID: DCAEGEN2-3365 Change-Id: I3c56153d66a7291966d298874f12a01839f3ee3f Signed-off-by: pranitk1905 <pranit.kapdule@t-systems.com> [DCAEGEN2] Pass autoCommitDisabled mode configuration via CBSContentParser Mentioned Copyrights and removed tabbed indentations Issue-ID: DCAEGEN2-3365 Change-Id: I3c56153d66a7291966d298874f12a01839f3ee3f Signed-off-by: pranitk1905 <pranit.kapdule@t-systems.com> [DCAEGEN2] Pass autoCommitDisabled mode configuration via CBSContentParser Added missing copyrights Issue-ID: DCAEGEN2-3365 Change-Id: I3c56153d66a7291966d298874f12a01839f3ee3f Signed-off-by: pranitk1905 <pranit.kapdule@t-systems.com> [DCAEGEN2] Pass autoCommitDisabled mode configuration via CBSContentParser Added missing copyrights Issue-ID: DCAEGEN2-3365 Change-Id: I3c56153d66a7291966d298874f12a01839f3ee3f Signed-off-by: pranitk1905 <pranit.kapdule@t-systems.com> [DCAEGEN2] Pass autoCommitDisabled mode configuration via CBSContentParser Removed Docs folder from the project as it is maintained in different place Issue-ID: DCAEGEN2-3365 Change-Id: I3c56153d66a7291966d298874f12a01839f3ee3f Signed-off-by: pranitk1905 <pranit.kapdule@t-systems.com>
Diffstat (limited to 'prh-app-server/src/test/java/org')
-rw-r--r--prh-app-server/src/test/java/org/onap/dcaegen2/services/bootstrap/CbsClientConfigurationResolverTest.java11
-rw-r--r--prh-app-server/src/test/java/org/onap/dcaegen2/services/bootstrap/CbsPropertySourceLocatorForAutoCommitDisabledTest.java112
-rw-r--r--prh-app-server/src/test/java/org/onap/dcaegen2/services/bootstrap/CbsPropertySourceLocatorTest.java61
-rw-r--r--prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/configuration/CbsConfigurationForAutoCommitDisabledModeTest.java109
-rw-r--r--prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/configuration/CbsConfigurationTest.java14
-rw-r--r--prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/configuration/KafkaConfigTest.java122
-rw-r--r--prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/controllers/AppInfoControllerTest.java24
-rw-r--r--prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/controllers/ScheduleControllerForAutoCommitDisabledTest.java96
-rw-r--r--prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/controllers/ScheduleControllerTest.java5
-rw-r--r--prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/integration/PrhWorkflowIntegrationForAutoCommitDisabledTest.java193
-rw-r--r--prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/integration/PrhWorkflowIntegrationTest.java12
-rw-r--r--prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/integration/PrhWorkflowSchedulingIntegrationForAutoCommitDisabledTest.java100
-rw-r--r--prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/integration/PrhWorkflowSchedulingIntegrationTest.java8
-rw-r--r--prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/commit/KafkaConsumerTaskImplTest.java119
14 files changed, 874 insertions, 112 deletions
diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/bootstrap/CbsClientConfigurationResolverTest.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/bootstrap/CbsClientConfigurationResolverTest.java
index 87dd18ca..ca1413a0 100644
--- a/prh-app-server/src/test/java/org/onap/dcaegen2/services/bootstrap/CbsClientConfigurationResolverTest.java
+++ b/prh-app-server/src/test/java/org/onap/dcaegen2/services/bootstrap/CbsClientConfigurationResolverTest.java
@@ -3,6 +3,7 @@
* PNF-REGISTRATION-HANDLER
* ================================================================================
* Copyright (C) 2019 NOKIA Intellectual Property. All rights reserved.
+ * Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -24,7 +25,6 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledIfEnvironmentVariable;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsClientConfiguration;
-
import static org.assertj.core.api.Assertions.assertThat;
class CbsClientConfigurationResolverTest {
@@ -34,18 +34,15 @@ class CbsClientConfigurationResolverTest {
@BeforeEach
void setUp() {
cbsProperties = new CbsProperties();
- cbsProperties.setHostname("some-cbs-host");
- cbsProperties.setPort(123);
cbsProperties.setAppName("client-app-name");
}
@Test
@DisabledIfEnvironmentVariable(named = "CONFIG_BINDING_SERVICE", matches = ".+")
void whenCbsEnvPropertiesAreNotePresentInEnvironment_ShouldFallbackToLoadingDefaultsFromCbsProperties() {
- CbsClientConfiguration config = new CbsClientConfigurationResolver(cbsProperties).resolveCbsClientConfiguration();
+ CbsClientConfiguration config = new CbsClientConfigurationResolver(cbsProperties)
+ .resolveCbsClientConfiguration();
- assertThat(config.hostname()).isEqualTo(cbsProperties.getHostname());
- assertThat(config.port()).isEqualTo(cbsProperties.getPort());
assertThat(config.appName()).isEqualTo(cbsProperties.getAppName());
}
-} \ No newline at end of file
+}
diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/bootstrap/CbsPropertySourceLocatorForAutoCommitDisabledTest.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/bootstrap/CbsPropertySourceLocatorForAutoCommitDisabledTest.java
new file mode 100644
index 00000000..f5863ac5
--- /dev/null
+++ b/prh-app-server/src/test/java/org/onap/dcaegen2/services/bootstrap/CbsPropertySourceLocatorForAutoCommitDisabledTest.java
@@ -0,0 +1,112 @@
+/*
+ * ============LICENSE_START=======================================================
+ * PNF-REGISTRATION-HANDLER
+ * ================================================================================
+ * Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcaegen2.services.bootstrap;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import java.util.Map;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.onap.dcaegen2.services.prh.configuration.CbsConfiguration;
+import org.onap.dcaegen2.services.prh.configuration.CbsConfigurationForAutoCommitDisabledMode;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.RequestPath;
+import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
+import org.springframework.core.env.Environment;
+import org.springframework.test.context.ActiveProfiles;
+import com.google.common.collect.ImmutableMap;
+import com.google.gson.JsonObject;
+import reactor.core.publisher.Mono;
+import reactor.test.scheduler.VirtualTimeScheduler;
+
+/**
+ * * @author <a href="mailto:PRANIT.KAPDULE@t-systems.com">Pranit Kapdule</a> on
+ * * 24/08/23
+ * */
+
+@ExtendWith(MockitoExtension.class)
+@ActiveProfiles(value = "autoCommitDisabled")
+public class CbsPropertySourceLocatorForAutoCommitDisabledTest {
+ private static final RequestPath GET_ALL_REQUEST_PATH = CbsRequests.getAll(RequestDiagnosticContext.create())
+ .requestPath();
+
+ private CbsProperties cbsProperties = new CbsProperties();
+ @Mock
+ private CbsJsonToPropertyMapConverter cbsJsonToPropertyMapConverter;
+ @Mock
+ private CbsClientConfigurationResolver cbsClientConfigurationResolver;
+ @Mock
+ private CbsClientFactoryFacade cbsClientFactoryFacade;
+ @Mock
+ private CbsConfiguration cbsConfiguration;
+ @Mock
+ private CbsConfigurationForAutoCommitDisabledMode cbsConfigurationForAutoCommitDisabledMode;
+ @Mock
+ private Environment environment;
+ @Mock
+ private CbsClient cbsClient;
+ @Mock
+ private JsonObject cbsConfigJsonObject;
+
+ private Map<String, Object> cbsConfigMap = ImmutableMap.of("foo", "bar");
+
+ private VirtualTimeScheduler virtualTimeScheduler;
+
+ private CbsPropertySourceLocatorForAutoCommitDisabled cbsPropertySourceLocatorACDM;
+
+ @BeforeEach
+ void setup() {
+ virtualTimeScheduler = VirtualTimeScheduler.getOrSet(true);
+
+ cbsPropertySourceLocatorACDM = new CbsPropertySourceLocatorForAutoCommitDisabled(cbsProperties,
+ cbsJsonToPropertyMapConverter, cbsClientConfigurationResolver, cbsClientFactoryFacade,
+ cbsConfigurationForAutoCommitDisabledMode);
+
+ }
+
+ @AfterEach
+ void cleanup() {
+ virtualTimeScheduler.dispose();
+ }
+
+ @Test
+ void cbsProperySourceLocatorForAutoCommitDisabledTest() throws Exception {
+
+ Mono<CbsClient> just = Mono.just(cbsClient);
+ when(cbsClientFactoryFacade.createCbsClient(any())).thenReturn(just);
+ when(cbsClient.get(argThat(request -> request.requestPath().equals(GET_ALL_REQUEST_PATH))))
+ .thenReturn(Mono.just(cbsConfigJsonObject));
+ when(cbsJsonToPropertyMapConverter.convertToMap(cbsConfigJsonObject)).thenReturn(cbsConfigMap);
+
+ cbsPropertySourceLocatorACDM.locate(environment);
+
+ verify(cbsConfigurationForAutoCommitDisabledMode).parseCBSConfig(cbsConfigJsonObject);
+
+
+ }
+
+}
diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/bootstrap/CbsPropertySourceLocatorTest.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/bootstrap/CbsPropertySourceLocatorTest.java
index 22a11ed6..c1e938e5 100644
--- a/prh-app-server/src/test/java/org/onap/dcaegen2/services/bootstrap/CbsPropertySourceLocatorTest.java
+++ b/prh-app-server/src/test/java/org/onap/dcaegen2/services/bootstrap/CbsPropertySourceLocatorTest.java
@@ -3,6 +3,7 @@
* PNF-REGISTRATION-HANDLER
* ================================================================================
* Copyright (C) 2019-2021 NOKIA Intellectual Property. All rights reserved.
+ * Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -29,6 +30,7 @@ import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.onap.dcaegen2.services.prh.configuration.CbsConfiguration;
+import org.onap.dcaegen2.services.prh.configuration.CbsConfigurationForAutoCommitDisabledMode;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsClientConfiguration;
@@ -39,20 +41,21 @@ import org.springframework.core.env.Environment;
import org.springframework.core.env.PropertySource;
import reactor.core.publisher.Mono;
import reactor.test.scheduler.VirtualTimeScheduler;
-
import java.util.Map;
-
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Assumptions.assumeThat;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.argThat;
-import static org.mockito.Mockito.*;
-
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.doThrow;
@ExtendWith(MockitoExtension.class)
class CbsPropertySourceLocatorTest {
- private static final RequestPath GET_ALL_REQUEST_PATH = CbsRequests.getAll(RequestDiagnosticContext.create()).requestPath();
+ private static final RequestPath GET_ALL_REQUEST_PATH = CbsRequests.getAll(RequestDiagnosticContext.create())
+ .requestPath();
private CbsProperties cbsProperties = new CbsProperties();
@Mock
@@ -66,28 +69,24 @@ class CbsPropertySourceLocatorTest {
@Mock
private CbsConfiguration cbsConfiguration;
@Mock
+ private CbsConfigurationForAutoCommitDisabledMode cbsConfigurationForAutoCommitDisabledMode;
+ @Mock
private Environment environment;
@Mock
private CbsClient cbsClient;
@Mock
private JsonObject cbsConfigJsonObject;
private Map<String, Object> cbsConfigMap = ImmutableMap.of("foo", "bar");
-
private VirtualTimeScheduler virtualTimeScheduler;
-
private CbsPropertySourceLocator cbsPropertySourceLocator;
-
@BeforeEach
void setup() {
virtualTimeScheduler = VirtualTimeScheduler.getOrSet(true);
- when(cbsClientConfigurationResolver.resolveCbsClientConfiguration()).thenReturn(cbsClientConfiguration);
- when(cbsClientFactoryFacade.createCbsClient(cbsClientConfiguration)).thenReturn(Mono.just(cbsClient));
+ cbsPropertySourceLocator = new CbsPropertySourceLocator(cbsProperties, cbsJsonToPropertyMapConverter,
+ cbsClientConfigurationResolver, cbsClientFactoryFacade, cbsConfiguration);
- cbsPropertySourceLocator = new CbsPropertySourceLocator(
- cbsProperties, cbsJsonToPropertyMapConverter, cbsClientConfigurationResolver,
- cbsClientFactoryFacade, cbsConfiguration);
}
@AfterEach
@@ -95,9 +94,10 @@ class CbsPropertySourceLocatorTest {
virtualTimeScheduler.dispose();
}
-
@Test
void shouldBuildCbsPropertySourceBasedOnDataFetchedUsingCbsClient() {
+ Mono<CbsClient> just = Mono.just(cbsClient);
+ when(cbsClientFactoryFacade.createCbsClient(any())).thenReturn(just);
when(cbsClient.get(argThat(request -> request.requestPath().equals(GET_ALL_REQUEST_PATH))))
.thenReturn(Mono.just(cbsConfigJsonObject));
when(cbsJsonToPropertyMapConverter.convertToMap(cbsConfigJsonObject)).thenReturn(cbsConfigMap);
@@ -108,9 +108,10 @@ class CbsPropertySourceLocatorTest {
assertThat(propertySource).extracting(s -> s.getProperty("foo")).isEqualTo("bar");
}
-
@Test
void shouldUpdateCbsConfigurationStateBasedOnDataFetchedUsingCbsClient() {
+ Mono<CbsClient> just = Mono.just(cbsClient);
+ when(cbsClientFactoryFacade.createCbsClient(any())).thenReturn(just);
when(cbsClient.get(argThat(request -> request.requestPath().equals(GET_ALL_REQUEST_PATH))))
.thenReturn(Mono.just(cbsConfigJsonObject));
when(cbsJsonToPropertyMapConverter.convertToMap(cbsConfigJsonObject)).thenReturn(cbsConfigMap);
@@ -120,9 +121,10 @@ class CbsPropertySourceLocatorTest {
verify(cbsConfiguration).parseCBSConfig(cbsConfigJsonObject);
}
-
@Test
void shouldPropagateExceptionWhenCbsConfigurationParsingFails() {
+ Mono<CbsClient> just = Mono.just(cbsClient);
+ when(cbsClientFactoryFacade.createCbsClient(any())).thenReturn(just);
when(cbsClient.get(any(CbsRequest.class))).thenReturn(Mono.just(cbsConfigJsonObject));
RuntimeException someCbsConfigParsingException = new RuntimeException("boom!");
@@ -134,13 +136,13 @@ class CbsPropertySourceLocatorTest {
@Test
void shouldRetryFetchingConfigFromCbsInCaseOfFailure() {
+ Mono<CbsClient> just = Mono.just(cbsClient);
+ when(cbsClientFactoryFacade.createCbsClient(any())).thenReturn(just);
assumeThat(cbsProperties.getFetchRetries().getMaxAttempts()).isGreaterThan(1);
- when(cbsClient.get(any(CbsRequest.class)))
- .thenReturn(Mono.defer(() -> {
- virtualTimeScheduler.advanceTimeBy(cbsProperties.getFetchRetries().getMaxBackoff());
- return Mono.error(new RuntimeException("some connection failure"));
- }))
- .thenReturn(Mono.just(cbsConfigJsonObject));
+ when(cbsClient.get(any(CbsRequest.class))).thenReturn(Mono.defer(() -> {
+ virtualTimeScheduler.advanceTimeBy(cbsProperties.getFetchRetries().getMaxBackoff());
+ return Mono.error(new RuntimeException("some connection failure"));
+ })).thenReturn(Mono.just(cbsConfigJsonObject));
when(cbsJsonToPropertyMapConverter.convertToMap(cbsConfigJsonObject)).thenReturn(cbsConfigMap);
PropertySource<?> propertySource = cbsPropertySourceLocator.locate(environment);
@@ -150,15 +152,16 @@ class CbsPropertySourceLocatorTest {
@Test
void shouldFailAfterExhaustingAllOfConfiguredRetryAttempts() {
+ Mono<CbsClient> just = Mono.just(cbsClient);
+ when(cbsClientFactoryFacade.createCbsClient(any())).thenReturn(just);
assumeThat(cbsProperties.getFetchRetries().getMaxAttempts()).isGreaterThan(1);
- when(cbsClient.get(any(CbsRequest.class)))
- .thenReturn(Mono.defer(() -> {
- virtualTimeScheduler.advanceTimeBy(cbsProperties.getFetchRetries().getMaxBackoff());
- return Mono.error(new RuntimeException("some connection failure"));
- }));
+ when(cbsClient.get(any(CbsRequest.class))).thenReturn(Mono.defer(() -> {
+ virtualTimeScheduler.advanceTimeBy(cbsProperties.getFetchRetries().getMaxBackoff());
+ return Mono.error(new RuntimeException("some connection failure"));
+ }));
- assertThatThrownBy(() -> cbsPropertySourceLocator.locate(environment))
- .hasMessageContaining("Retries exhausted")
+ assertThatThrownBy(() -> cbsPropertySourceLocator.locate(environment)).hasMessageContaining("Retries exhausted")
.hasMessageContaining(cbsProperties.getFetchRetries().getMaxAttempts().toString());
}
+
}
diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/configuration/CbsConfigurationForAutoCommitDisabledModeTest.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/configuration/CbsConfigurationForAutoCommitDisabledModeTest.java
new file mode 100644
index 00000000..89f1a043
--- /dev/null
+++ b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/configuration/CbsConfigurationForAutoCommitDisabledModeTest.java
@@ -0,0 +1,109 @@
+/*
+ * ============LICENSE_START=======================================================
+ * PNF-REGISTRATION-HANDLER
+ * ================================================================================
+ * Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcaegen2.services.prh.configuration;
+
+import static com.github.stefanbirkner.systemlambda.SystemLambda.withEnvironmentVariable;
+import static java.lang.ClassLoader.getSystemResource;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import org.junit.jupiter.api.Test;
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+
+/**
+ * * @author <a href="mailto:PRANIT.KAPDULE@t-systems.com">Pranit Kapdule</a> on
+ * * 24/08/23
+ * */
+
+public class CbsConfigurationForAutoCommitDisabledModeTest {
+
+ /**
+ * Testcase is used to check correctness of values provided by
+ * autoCommitDisabledConfigurationFromCbs2.json
+ */
+
+ @Test
+ void beforecbsConfigurationForAutoCommitDisabledMode() throws Exception {
+ withEnvironmentVariable("JAAS_CONFIG", "jaas_config").execute(() -> {
+ this.cbsConfigurationForAutoCommitDisabledMode();
+ });
+ }
+
+ void cbsConfigurationForAutoCommitDisabledMode() throws Exception {
+
+ JsonObject cbsConfigJsonForAutoCommitDisabled = new Gson().fromJson(
+ new String(Files.readAllBytes(
+ Paths.get(getSystemResource("autoCommitDisabledConfigurationFromCbs2.json").toURI()))),
+ JsonObject.class);
+ CbsConfigurationForAutoCommitDisabledMode cbsConfigurationForAutoCommitDisabled = new CbsConfigurationForAutoCommitDisabledMode();
+
+ cbsConfigurationForAutoCommitDisabled.parseCBSConfig(cbsConfigJsonForAutoCommitDisabled);
+
+ String expectedKafKaBoostrapServerConfig = "onap-strimzi-kafka-bootstrap:9092";
+ String actualKafkaBoostrapServerConfig = (cbsConfigurationForAutoCommitDisabled.getKafkaConfig()
+ .kafkaBoostrapServerConfig());
+
+ String expectedGroupIdConfig = "OpenDCAE-c12";
+ String actualGroupIdConfig = (cbsConfigurationForAutoCommitDisabled.getKafkaConfig().groupIdConfig());
+
+ String expectedKafkaSecurityProtocol = "SASL_PLAINTEXT";
+ String actualKafkaSecurityProtocol = (cbsConfigurationForAutoCommitDisabled.getKafkaConfig()
+ .kafkaSecurityProtocol());
+
+ String expectedKafkaSaslMechanism = "SCRAM-SHA-512";
+ String actualKafkaSaslMechanism = (cbsConfigurationForAutoCommitDisabled.getKafkaConfig().kafkaSaslMechanism());
+
+ String expectedKafkaJaasConfig = "jaas_config";
+ String actualKafkaJaasConfig = (cbsConfigurationForAutoCommitDisabled.getKafkaConfig().kafkaJaasConfig());
+
+ String expectedAaiUserName = "AAI";
+ String actualAaiUserName = (cbsConfigurationForAutoCommitDisabled.getAaiClientConfiguration().aaiUserName());
+
+ String expectedConsumerGroup = "OpenDCAE-c12";
+ String actualConsumerGroup = (cbsConfigurationForAutoCommitDisabled.getMessageRouterSubscribeRequest()
+ .consumerGroup());
+
+ assertEquals(expectedKafKaBoostrapServerConfig, actualKafkaBoostrapServerConfig,
+ "Expected value of KafKaBoostrapServerConfig is not matching with actual value");
+ assertEquals(expectedGroupIdConfig, actualGroupIdConfig,
+ "Expected value of GroupIdConfig is not matching with actual value");
+ assertEquals(expectedKafkaSecurityProtocol, actualKafkaSecurityProtocol,
+ "Expected value of KafkaSecurityProtocol is not matching with actual value");
+ assertEquals(expectedKafkaSaslMechanism, actualKafkaSaslMechanism,
+ "Expected value of KafkaSaslMechanism is not matching with actual value");
+ assertEquals(expectedKafkaJaasConfig, actualKafkaJaasConfig,
+ "Expected value of KafkaJaasConfig is not matching with actual value");
+ assertEquals(expectedAaiUserName, actualAaiUserName,
+ "Expected value of AaiUserName is not matching with actual value");
+ assertEquals(expectedConsumerGroup, actualConsumerGroup,
+ "Expected value of ConsumerGroup is not matching with actual value");
+
+ assertThat((cbsConfigurationForAutoCommitDisabled).getAaiClientConfiguration()).isNotNull();
+ assertThat((cbsConfigurationForAutoCommitDisabled).getMessageRouterPublisher()).isNotNull();
+ assertThat((cbsConfigurationForAutoCommitDisabled).getMessageRouterSubscriber()).isNotNull();
+ assertThat((cbsConfigurationForAutoCommitDisabled).getMessageRouterPublishRequest()).isNotNull();
+ assertThat((cbsConfigurationForAutoCommitDisabled).getMessageRouterSubscribeRequest()).isNotNull();
+ assertThat((cbsConfigurationForAutoCommitDisabled).getMessageRouterUpdatePublishRequest()).isNotNull();
+
+ }
+
+}
diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/configuration/CbsConfigurationTest.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/configuration/CbsConfigurationTest.java
index 7f5d26fc..4f3cd864 100644
--- a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/configuration/CbsConfigurationTest.java
+++ b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/configuration/CbsConfigurationTest.java
@@ -3,6 +3,7 @@
* PNF-REGISTRATION-HANDLER
* ================================================================================
* Copyright (C) 2019 NOKIA Intellectual Property. All rights reserved.
+ * Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -23,10 +24,8 @@ package org.onap.dcaegen2.services.prh.configuration;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import org.junit.jupiter.api.Test;
-
import java.nio.file.Files;
import java.nio.file.Paths;
-
import static java.lang.ClassLoader.getSystemResource;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -51,11 +50,11 @@ class CbsConfigurationTest {
.hasMessage(EXPECTED_ERROR_MESSAGE_WHEN_CBS_CONFIG_IS_NOT_INITIALIZED);
}
-
@Test
void cbsConfigurationShouldExposeDataReceivedAsJsonFromCbs() throws Exception {
- JsonObject cbsConfigJson = new Gson().fromJson(new String(Files.readAllBytes(Paths.get(
- getSystemResource("configurationFromCbs.json").toURI()))), JsonObject.class);
+ JsonObject cbsConfigJson = new Gson().fromJson(
+ new String(Files.readAllBytes(Paths.get(getSystemResource("configurationFromCbs.json").toURI()))),
+ JsonObject.class);
CbsConfiguration cbsConfiguration = new CbsConfiguration();
cbsConfiguration.parseCBSConfig(cbsConfigJson);
@@ -66,5 +65,8 @@ class CbsConfigurationTest {
assertThat(cbsConfiguration.getMessageRouterPublishRequest()).isNotNull();
assertThat(cbsConfiguration.getMessageRouterSubscribeRequest()).isNotNull();
assertThat(cbsConfiguration.getMessageRouterUpdatePublishRequest()).isNotNull();
+
}
-} \ No newline at end of file
+
+
+}
diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/configuration/KafkaConfigTest.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/configuration/KafkaConfigTest.java
index 22b82e3d..bd7d7779 100644
--- a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/configuration/KafkaConfigTest.java
+++ b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/configuration/KafkaConfigTest.java
@@ -19,38 +19,118 @@
*/
package org.onap.dcaegen2.services.prh.configuration;
-import org.junit.jupiter.api.BeforeEach;
+import static com.github.stefanbirkner.systemlambda.SystemLambda.withEnvironmentVariable;
+import static java.lang.ClassLoader.getSystemResource;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import java.nio.file.Files;
+import java.nio.file.Paths;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.InjectMocks;
+import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
+import org.springframework.kafka.core.ConsumerFactory;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
@ExtendWith(MockitoExtension.class)
public class KafkaConfigTest {
- @InjectMocks
- KafkaConfig kafkaConfig;
-
- @BeforeEach
- void setUp() {
- kafkaConfig.kafkaBoostrapServerConfig = "0.0.0.0";
- kafkaConfig.groupIdConfig = "consumer-test";
- kafkaConfig.kafkaSecurityProtocol = "test";
- kafkaConfig.kafkaSaslMechanism = "test";
- kafkaConfig.kafkaUsername = "test";
- kafkaConfig.kafkaPassword = "test";
- kafkaConfig.kafkaJaasConfig = null;
- kafkaConfig.kafkaLoginModuleClassConfig = "test";
- kafkaConfig.kafkaJaasConfig = "test";
- }
+ KafkaConfig kafkaConfig = new KafkaConfig();
+
+ CbsConfigurationForAutoCommitDisabledMode cbsConfigurationForAutoCommitDisabledMode = new
+ CbsConfigurationForAutoCommitDisabledMode();
+
+
+// @BeforeEach
+// void setUp() {
+// kafkaConfig.kafkaBoostrapServerConfig = "0.0.0.0";
+// kafkaConfig.groupIdConfig = "consumer-test";
+// kafkaConfig.kafkaSecurityProtocol = "test";
+// kafkaConfig.kafkaSaslMechanism = "test";
+// kafkaConfig.kafkaUsername = "test";
+// kafkaConfig.kafkaPassword = "test";
+// kafkaConfig.kafkaJaasConfig = null;
+// kafkaConfig.kafkaLoginModuleClassConfig = "test";
+// kafkaConfig.kafkaJaasConfig = "test";
+// }
@Test
- public void consumerFactoryTest(){
- kafkaConfig.consumerFactory();
+ void beforecbsConfigurationForAutoCommitDisabledMode() throws Exception {
+ withEnvironmentVariable("JAAS_CONFIG", "jaas_config").execute(() -> {
+ this.consumerFactoryTest();
+ });
+ }
+
+ void consumerFactoryTest() throws Exception {
+ JsonObject cbsConfigJsonForAutoCommitDisabled = new Gson().fromJson(
+ new String(Files.readAllBytes(
+ Paths.get(getSystemResource("autoCommitDisabledConfigurationFromCbs2.json").toURI()))),
+ JsonObject.class);
+ cbsConfigurationForAutoCommitDisabledMode.parseCBSConfig(cbsConfigJsonForAutoCommitDisabled);
+ ConsumerFactory<String, String> consumerFactory = kafkaConfig
+ .consumerFactory(cbsConfigurationForAutoCommitDisabledMode);
+
+ String expectedKafkaBoostrapServerConfig = "onap-strimzi-kafka-bootstrap:9092";
+ String actualKafkaBoostrapServerConfig = consumerFactory.getConfigurationProperties().get("bootstrap.servers")
+ .toString();
+
+ String expectedGroupIdConfig = "OpenDCAE-c12";
+ String actualGroupIdConfig = consumerFactory.getConfigurationProperties().get("group.id").toString();
+
+ String expectedKafkaSecurityProtocol = "SASL_PLAINTEXT";
+ String actualKafkaSecurityProtocol = consumerFactory.getConfigurationProperties().get("security.protocol")
+ .toString();
+
+ String expectedKafkaSaslMechanism = "SCRAM-SHA-512";
+ String actualKafkaSaslMechanism = consumerFactory.getConfigurationProperties().get("sasl.mechanism").toString();
+
+ String expectedKafkaJaasConfig = "jaas_config";
+ String actualKafkaJaasConfig = consumerFactory.getConfigurationProperties().get("sasl.jaas.config").toString();
+
+ String expectedKeyDeserializer = "org.apache.kafka.common.serialization.StringDeserializer";
+ String actualKeyDeserializer = consumerFactory.getConfigurationProperties().get("key.deserializer").toString();
+
+ String expectedValueDeserializer = "org.apache.kafka.common.serialization.StringDeserializer";
+ String actualValueDeserializer = consumerFactory.getConfigurationProperties().get("value.deserializer")
+ .toString();
+
+ String expectedEnableAutoCommit = "false";
+ String actualEnableAutoCommit = consumerFactory.getConfigurationProperties().get("enable.auto.commit")
+ .toString();
+
+ assertEquals(expectedKafkaBoostrapServerConfig, actualKafkaBoostrapServerConfig,
+ "Expected value of KafKaBoostrapServerConfig is not matching with actual value");
+ assertEquals(expectedGroupIdConfig, actualGroupIdConfig,
+ "Expected value of GroupIdConfig is not matching with actual value");
+ assertEquals(expectedKafkaSecurityProtocol, actualKafkaSecurityProtocol,
+ "Expected value of KafkaSecurityProtocol is not matching with actual value");
+ assertEquals(expectedKafkaSaslMechanism, actualKafkaSaslMechanism,
+ "Expected value of KafkaSaslMechanism is not matching with actual value");
+ assertEquals(expectedKafkaJaasConfig, actualKafkaJaasConfig,
+ "Expected value of KafkaJaasConfig is not matching with actual value");
+ assertEquals(expectedKeyDeserializer, actualKeyDeserializer,
+ "Expected value of KeyDeserializer is not matching with actual value");
+ assertEquals(expectedValueDeserializer, actualValueDeserializer,
+ "Expected value of ValueDeserializer is not matching with actual value");
+ assertEquals(expectedEnableAutoCommit, actualEnableAutoCommit,
+ "Expected value of EnableAutoCommit is not matching with actual value");
+
}
@Test
- public void kafkaListenerContainerFactoryTest(){
- kafkaConfig.kafkaListenerContainerFactory();
+ void beforeKafkaListenerContainerFactoryTest() throws Exception {
+ withEnvironmentVariable("JAAS_CONFIG", "jaas_config").execute(() -> {
+ this.kafkaListenerContainerFactoryTest();
+ });
+ }
+
+ public void kafkaListenerContainerFactoryTest() throws Exception {
+ JsonObject cbsConfigJsonForAutoCommitDisabled = new Gson().fromJson(
+ new String(Files.readAllBytes(
+ Paths.get(getSystemResource("autoCommitDisabledConfigurationFromCbs2.json").toURI()))),
+ JsonObject.class);
+ cbsConfigurationForAutoCommitDisabledMode.parseCBSConfig(cbsConfigJsonForAutoCommitDisabled);
+ kafkaConfig.kafkaListenerContainerFactory(cbsConfigurationForAutoCommitDisabledMode);
}
}
diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/controllers/AppInfoControllerTest.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/controllers/AppInfoControllerTest.java
index 1dba66a1..40b42480 100644
--- a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/controllers/AppInfoControllerTest.java
+++ b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/controllers/AppInfoControllerTest.java
@@ -3,6 +3,7 @@
* PNF-REGISTRATION-HANDLER
* ================================================================================
* Copyright (C) 2019 NOKIA Intellectual Property. All rights reserved.
+ * Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,19 +22,28 @@
package org.onap.dcaegen2.services.prh.controllers;
import org.junit.jupiter.api.Test;
+import org.onap.dcaegen2.services.prh.configuration.CbsConfigurationForAutoCommitDisabledMode;
+import org.onap.dcaegen2.services.prh.configuration.KafkaConfig;
import org.onap.dcaegen2.services.prh.configuration.PrhAppConfig;
+import org.onap.dcaegen2.services.prh.tasks.DmaapConsumerTaskImpl;
+import org.onap.dcaegen2.services.prh.tasks.ScheduledTasks;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.MockBean;
+import org.springframework.context.annotation.Profile;
import org.springframework.core.io.ByteArrayResource;
import org.springframework.http.MediaType;
+import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
+import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.test.annotation.DirtiesContext;
+import org.springframework.test.annotation.IfProfileValue;
+import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.web.reactive.server.WebTestClient;
-
import static org.mockito.Mockito.when;
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@DirtiesContext
+@ActiveProfiles(value = "prod")
class AppInfoControllerTest {
private static final String SAMPLE_GIT_INFO_CONTENT = "{ \"git.commit.id\" : \"37444e\" }";
@@ -44,17 +54,15 @@ class AppInfoControllerTest {
@Autowired
private WebTestClient webTestClient;
+ @MockBean
+ private ScheduledTasks scheduledTasks;
+
@Test
void shouldProvideHeartbeatResponse() {
- webTestClient
- .get().uri("/heartbeat")
- .accept(MediaType.TEXT_PLAIN)
- .exchange()
- .expectStatus().isOk()
+ webTestClient.get().uri("/heartbeat").accept(MediaType.TEXT_PLAIN).exchange().expectStatus().isOk()
.expectBody(String.class).isEqualTo("alive");
}
-
@Test
void shouldProvideVersionInfo() {
when(prhAppConfig.getGitInfo()).thenReturn(new ByteArrayResource(SAMPLE_GIT_INFO_CONTENT.getBytes()));
@@ -66,4 +74,4 @@ class AppInfoControllerTest {
.expectStatus().isOk()
.expectBody(String.class).isEqualTo(SAMPLE_GIT_INFO_CONTENT);
}
-} \ No newline at end of file
+}
diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/controllers/ScheduleControllerForAutoCommitDisabledTest.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/controllers/ScheduleControllerForAutoCommitDisabledTest.java
new file mode 100644
index 00000000..43dcadf9
--- /dev/null
+++ b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/controllers/ScheduleControllerForAutoCommitDisabledTest.java
@@ -0,0 +1,96 @@
+/*
+ * ============LICENSE_START=======================================================
+ * PNF-REGISTRATION-HANDLER
+ * ================================================================================
+ * Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.prh.controllers;
+
+import org.junit.Ignore;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.onap.dcaegen2.services.prh.configuration.KafkaConfig;
+import org.onap.dcaegen2.services.prh.tasks.commit.KafkaConsumerTaskImpl;
+import org.onap.dcaegen2.services.prh.tasks.commit.ScheduledTasksRunnerWithCommit;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.mock.mockito.MockBean;
+import org.springframework.http.HttpStatus;
+import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
+import org.springframework.kafka.core.ConsumerFactory;
+import org.springframework.test.annotation.DirtiesContext;
+import org.springframework.test.annotation.DirtiesContext.ClassMode;
+import org.springframework.test.context.ActiveProfiles;
+import org.springframework.test.web.reactive.server.WebTestClient;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
+@DirtiesContext(classMode = ClassMode.AFTER_CLASS)
+@ActiveProfiles(value = "autoCommitDisabled")
+class ScheduleControllerForAutoCommitDisabledTest {
+
+ @MockBean
+ private ScheduledTasksRunnerWithCommit scheduledTasksRunnerWithCommit;
+
+ @MockBean
+ private KafkaConfig kafkaConfig;
+
+ @MockBean
+ private ConsumerFactory<String, String> consumerFactory;
+
+ @MockBean
+ private ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory;
+
+ @MockBean
+ private KafkaConsumerTaskImpl kafkaConsumerTaskImpl;
+
+ @Autowired
+ private WebTestClient webTestClient;
+
+
+ @Test
+ void startEndpointShouldAllowStartingPrhTasks() {
+ when(scheduledTasksRunnerWithCommit.tryToStartTaskWithCommit()).thenReturn(true);
+ webTestClient
+ .get().uri("/start")
+ .exchange()
+ .expectStatus().isCreated()
+ .expectBody(String.class).isEqualTo("PRH Service has been started!");
+ }
+
+ @Test
+ void whenPrhTasksAreAlreadyStarted_shouldRespondThatRequestWasNotAccepted() {
+ when(scheduledTasksRunnerWithCommit.tryToStartTaskWithCommit()).thenReturn(false);
+ webTestClient
+ .get().uri("/start")
+ .exchange()
+ .expectStatus().isEqualTo(HttpStatus.NOT_ACCEPTABLE)
+ .expectBody(String.class).isEqualTo("PRH Service is already running!");
+ }
+
+ @Test
+ void stopEndpointShouldAllowStoppingPrhTasks() {
+ webTestClient
+ .get().uri("/stopPrh")
+ .exchange()
+ .expectStatus().isOk()
+ .expectBody(String.class).isEqualTo("PRH Service has been stopped!");
+
+ verify(scheduledTasksRunnerWithCommit).cancelTasks();
+ }
+}
diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/controllers/ScheduleControllerTest.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/controllers/ScheduleControllerTest.java
index bbc6b968..92a527b7 100644
--- a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/controllers/ScheduleControllerTest.java
+++ b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/controllers/ScheduleControllerTest.java
@@ -23,18 +23,21 @@ package org.onap.dcaegen2.services.prh.controllers;
import org.junit.jupiter.api.Test;
import org.onap.dcaegen2.services.prh.tasks.ScheduledTasksRunner;
+import org.onap.dcaegen2.services.prh.tasks.commit.ScheduledTasksRunnerWithCommit;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.http.HttpStatus;
import org.springframework.test.annotation.DirtiesContext;
+import org.springframework.test.annotation.IfProfileValue;
+import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.web.reactive.server.WebTestClient;
-
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@DirtiesContext
+@ActiveProfiles("prod")
class ScheduleControllerTest {
@MockBean
diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/integration/PrhWorkflowIntegrationForAutoCommitDisabledTest.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/integration/PrhWorkflowIntegrationForAutoCommitDisabledTest.java
new file mode 100644
index 00000000..b10c1ad8
--- /dev/null
+++ b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/integration/PrhWorkflowIntegrationForAutoCommitDisabledTest.java
@@ -0,0 +1,193 @@
+/*
+ * ============LICENSE_START=======================================================
+ * PNF-REGISTRATION-HANDLER
+ * ================================================================================
+ * Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.prh.integration;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonMappingException;
+import com.github.tomakehurst.wiremock.client.WireMock;
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+import com.jayway.jsonpath.JsonPath;
+import reactor.core.publisher.Flux;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.onap.dcaegen2.services.prh.MainApp;
+import org.onap.dcaegen2.services.prh.adapter.aai.api.ConsumerDmaapModel;
+import org.onap.dcaegen2.services.prh.adapter.kafka.ImmutableKafkaConfiguration;
+import org.onap.dcaegen2.services.prh.adapter.kafka.KafkaConfiguration;
+import org.onap.dcaegen2.services.prh.configuration.CbsConfigurationForAutoCommitDisabledMode;
+import org.onap.dcaegen2.services.prh.service.DmaapConsumerJsonParser;
+import org.onap.dcaegen2.services.prh.tasks.commit.KafkaConsumerTaskImpl;
+import org.onap.dcaegen2.services.prh.tasks.commit.ScheduledTasksRunnerWithCommit;
+import org.onap.dcaegen2.services.prh.tasks.commit.ScheduledTasksWithCommit;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.configurationprocessor.json.JSONException;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.mock.mockito.MockBean;
+import org.springframework.cloud.contract.wiremock.AutoConfigureWireMock;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Import;
+import org.springframework.test.context.ActiveProfiles;
+import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo;
+import static com.github.stefanbirkner.systemlambda.SystemLambda.withEnvironmentVariable;
+import static com.github.tomakehurst.wiremock.client.WireMock.urlPathMatching;
+import static com.github.tomakehurst.wiremock.client.WireMock.ok;
+import static com.github.tomakehurst.wiremock.client.WireMock.equalTo;
+import static com.github.tomakehurst.wiremock.client.WireMock.get;
+import static com.github.tomakehurst.wiremock.client.WireMock.post;
+import static com.github.tomakehurst.wiremock.client.WireMock.anyRequestedFor;
+import static com.github.tomakehurst.wiremock.client.WireMock.postRequestedFor;
+import static com.github.tomakehurst.wiremock.client.WireMock.matchingJsonPath;
+import static com.github.tomakehurst.wiremock.client.WireMock.stubFor;
+import static com.github.tomakehurst.wiremock.client.WireMock.verify;
+import static com.github.tomakehurst.wiremock.client.WireMock.patch;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import static java.lang.ClassLoader.getSystemResource;
+import static org.mockito.Mockito.when;
+
+/**
+ * * @author <a href="mailto:PRANIT.KAPDULE@t-systems.com">Pranit Kapdule</a> on
+ * * 24/08/23
+ * */
+
+@SpringBootTest
+@AutoConfigureWireMock(port = 0)
+@ActiveProfiles(value = "autoCommitDisabled")
+class PrhWorkflowIntegrationForAutoCommitDisabledTest {
+
+ @Autowired
+ private ScheduledTasksWithCommit scheduledTasksWithCommit;
+
+ @MockBean
+ private ScheduledTasksRunnerWithCommit scheduledTasksRunnerWithCommit; // just to disable scheduling - some
+ // configurability in ScheduledTaskRunner not
+ // to start tasks at app startup would be
+ // welcome
+
+ @MockBean
+ private KafkaConsumerTaskImpl kafkaConsumerTaskImpl;
+
+ @Autowired
+ private DmaapConsumerJsonParser dmaapConsumerJsonParser;
+
+ @Configuration
+ @Import(MainApp.class)
+ static class CbsConfigTestConfig {
+
+ @Value("http://localhost:${wiremock.server.port}")
+ private String wiremockServerAddress;
+
+ protected KafkaConfiguration kafkaConfiguration;
+
+ @Bean
+ public CbsConfigurationForAutoCommitDisabledMode cbsConfigurationForAutoCommitDisabledMode() {
+
+ JsonObject cbsConfigJson = new Gson()
+ .fromJson(getResourceContent("autoCommitDisabledConfigurationFromCbs2.json")
+ .replaceAll("https?://dmaap-mr[\\w.]*:\\d+", wiremockServerAddress)
+ .replaceAll("https?://aai[\\w.]*:\\d+", wiremockServerAddress), JsonObject.class);
+
+ CbsConfigurationForAutoCommitDisabledMode cbsConfigurationForAutoCommitDisabledMode = new CbsConfigurationForAutoCommitDisabledMode();
+
+ try {
+ cbsConfigurationForAutoCommitDisabledMode.parseCBSConfig(cbsConfigJson);
+ } catch (Exception e) {
+ //Exception is expected as environment variable for JAAS_CONFIG is not available
+ if (e.getMessage() == "kafkaJaasConfig") {
+ kafkaConfiguration = new ImmutableKafkaConfiguration.Builder().kafkaBoostrapServerConfig("0.0.0.0")
+ .groupIdConfig("CG1").kafkaSaslMechanism("SASL_MECHANISM")
+ .kafkaSecurityProtocol("SEC-PROTOCOL").kafkaJaasConfig("JAAS_CONFIG").build();
+ cbsConfigurationForAutoCommitDisabledMode.setKafkaConfiguration(kafkaConfiguration);
+
+ }
+
+ }
+ return cbsConfigurationForAutoCommitDisabledMode;
+ };
+
+ }
+
+ @BeforeEach
+ void resetWireMock() {
+ WireMock.reset();
+ }
+
+ @Test
+ void beforeCbsConfigurationForAutoCommitDisabledMode() throws Exception {
+ withEnvironmentVariable("JAAS_CONFIG", "jaas_config").execute(() -> {
+ this.whenThereAreNoEventsInDmaap_WorkflowShouldFinish();
+ });
+ }
+
+ void whenThereAreNoEventsInDmaap_WorkflowShouldFinish() throws JSONException {
+
+ when(kafkaConsumerTaskImpl.execute()).thenReturn(Flux.empty());
+
+ scheduledTasksWithCommit.scheduleKafkaPrhEventTask();
+
+ verify(0, anyRequestedFor(urlPathMatching("/aai.*")));
+ verify(0, postRequestedFor(urlPathMatching("/events.*")));
+ }
+
+ @Test
+ void beforeWhenThereIsAnEventsInDmaap_ShouldSendPnfReadyNotification() throws Exception {
+ withEnvironmentVariable("JAAS_CONFIG", "jaas_config").execute(() -> {
+ this.whenThereIsAnEventsInDmaap_ShouldSendPnfReadyNotification();
+ });
+ }
+
+ void whenThereIsAnEventsInDmaap_ShouldSendPnfReadyNotification()
+ throws JSONException, JsonMappingException, JsonProcessingException {
+
+ String event = getResourceContent("integration/event.json");
+ String pnfName = JsonPath.read(event, "$.event.commonEventHeader.sourceName");
+
+ java.util.List<String> eventList = new ArrayList<>();
+ eventList.add(event);
+
+ Flux<ConsumerDmaapModel> fluxList = dmaapConsumerJsonParser
+ .getConsumerDmaapModelFromKafkaConsumerRecord(eventList);
+
+ stubFor(get(urlEqualTo("/aai/v23/network/pnfs/pnf/" + pnfName)).willReturn(ok().withBody("{}")));
+ stubFor(patch(urlEqualTo("/aai/v23/network/pnfs/pnf/" + pnfName)));
+ stubFor(post(urlEqualTo("/events/unauthenticated.PNF_READY")));
+
+ when(kafkaConsumerTaskImpl.execute()).thenReturn(fluxList);
+
+ scheduledTasksWithCommit.scheduleKafkaPrhEventTask();
+
+ verify(1, postRequestedFor(urlEqualTo("/events/unauthenticated.PNF_READY"))
+ .withRequestBody(matchingJsonPath("$[0].correlationId", equalTo(pnfName))));
+
+ }
+
+ private static String getResourceContent(String resourceName) {
+ try {
+ return new String(Files.readAllBytes(Paths.get(getSystemResource(resourceName).toURI())));
+ } catch (Exception e) {
+ throw new RuntimeException("failed loading content of '" + resourceName + "'", e);
+ }
+ }
+}
diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/integration/PrhWorkflowIntegrationTest.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/integration/PrhWorkflowIntegrationTest.java
index 1a6c76c4..f5033ca2 100644
--- a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/integration/PrhWorkflowIntegrationTest.java
+++ b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/integration/PrhWorkflowIntegrationTest.java
@@ -39,6 +39,7 @@ import org.springframework.cloud.contract.wiremock.AutoConfigureWireMock;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
+import org.springframework.test.context.ActiveProfiles;
import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo;
import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
import static com.github.tomakehurst.wiremock.client.WireMock.urlPathMatching;
@@ -52,22 +53,15 @@ import static com.github.tomakehurst.wiremock.client.WireMock.matchingJsonPath;
import static com.github.tomakehurst.wiremock.client.WireMock.stubFor;
import static com.github.tomakehurst.wiremock.client.WireMock.verify;
import static com.github.tomakehurst.wiremock.client.WireMock.patch;
-
-
-
-
-
-
import java.nio.file.Files;
import java.nio.file.Paths;
-
-
import static java.lang.ClassLoader.getSystemResource;
import static java.util.Collections.singletonList;
@SpringBootTest
@AutoConfigureWireMock(port = 0)
+@ActiveProfiles(value = "prod")
class PrhWorkflowIntegrationTest {
@Autowired
@@ -95,6 +89,8 @@ class PrhWorkflowIntegrationTest {
cbsConfiguration.parseCBSConfig(cbsConfigJson);
return cbsConfiguration;
}
+
+
}
@BeforeEach
diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/integration/PrhWorkflowSchedulingIntegrationForAutoCommitDisabledTest.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/integration/PrhWorkflowSchedulingIntegrationForAutoCommitDisabledTest.java
new file mode 100644
index 00000000..a623e24c
--- /dev/null
+++ b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/integration/PrhWorkflowSchedulingIntegrationForAutoCommitDisabledTest.java
@@ -0,0 +1,100 @@
+/*
+ * ============LICENSE_START=======================================================
+ * PNF-REGISTRATION-HANDLER
+ * ================================================================================
+ * Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.prh.integration;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.mockito.stubbing.Answer;
+import org.onap.dcaegen2.services.prh.configuration.KafkaConfig;
+import org.onap.dcaegen2.services.prh.tasks.commit.KafkaConsumerTask;
+import org.onap.dcaegen2.services.prh.tasks.commit.ScheduledTasksRunnerWithCommit;
+import org.onap.dcaegen2.services.prh.tasks.commit.ScheduledTasksWithCommit;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.mock.mockito.MockBean;
+import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
+import org.springframework.kafka.core.ConsumerFactory;
+import org.springframework.test.annotation.DirtiesContext;
+import org.springframework.test.context.ActiveProfiles;
+import org.springframework.test.context.TestPropertySource;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.doAnswer;
+
+
+@SpringBootTest
+@TestPropertySource (properties = {"prh.workflow-scheduling-interval=20ms","spring.main.allow-bean-definition-overriding=true"})
+@DirtiesContext
+@ActiveProfiles(value = "autoCommitDisabled")
+@Disabled
+class PrhWorkflowSchedulingIntegrationForAutoCommitDisabledTest {
+
+ private static final int EXPECTED_INVOCATIONS_NUMBER = 1;
+ private static final int REMAINING_INVOCATIONS_NUMBER = 0;
+
+ @MockBean
+ private ScheduledTasksRunnerWithCommit scheduledTasksRunnerWithCommit;
+
+ @MockBean
+ private ScheduledTasksWithCommit scheduledTasksWithCommit;
+
+ @MockBean
+ private KafkaConsumerTask kafkaConsumerTask;
+
+
+ @MockBean
+ private KafkaConfig kafkaConfig;
+
+ @MockBean
+ private ConsumerFactory<String, String> consumerFactory;
+
+ @MockBean
+ private ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory;
+
+ private CountDownLatch invocationLatch;
+
+ @Test
+ void prhWorkflowShouldBeExecutedRightAfterApplicationStart() {
+ try {
+
+ invocationLatch = new CountDownLatch(EXPECTED_INVOCATIONS_NUMBER);
+ doAnswer(registerInvocation(invocationLatch)).when(scheduledTasksWithCommit).scheduleKafkaPrhEventTask();
+ assertThatMethodWasInvokedOnce();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ private void assertThatMethodWasInvokedOnce() throws InterruptedException {
+ boolean awaitResult = invocationLatch.await(1, TimeUnit.SECONDS);
+ System.out.println("###awaitResult="+awaitResult);
+ assertEquals(REMAINING_INVOCATIONS_NUMBER, invocationLatch.getCount());
+ }
+
+ private static Answer registerInvocation(CountDownLatch invocationLatch) {
+ return invocation -> {
+ System.out.println("###before countDown:"+invocationLatch.getCount());
+ invocationLatch.countDown();
+ System.out.println("###after countDown:"+invocationLatch.getCount());
+ return null;
+ };
+ }
+}
diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/integration/PrhWorkflowSchedulingIntegrationTest.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/integration/PrhWorkflowSchedulingIntegrationTest.java
index 939dd2a3..8fb952d0 100644
--- a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/integration/PrhWorkflowSchedulingIntegrationTest.java
+++ b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/integration/PrhWorkflowSchedulingIntegrationTest.java
@@ -3,6 +3,7 @@
* PNF-REGISTRATION-HANDLER
* ================================================================================
* Copyright (C) 2019 NOKIA Intellectual Property. All rights reserved.
+ * Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -27,14 +28,17 @@ import org.mockito.stubbing.Answer;
import org.onap.dcaegen2.services.prh.tasks.ScheduledTasks;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.MockBean;
+import org.springframework.test.annotation.DirtiesContext;
+import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.TestPropertySource;
-
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.doAnswer;
@SpringBootTest
@TestPropertySource (properties = {"prh.workflow-scheduling-interval=20ms"})
+@ActiveProfiles(value = "prod")
+@DirtiesContext
class PrhWorkflowSchedulingIntegrationTest {
private static final int EXPECTED_INVOCATIONS_NUMBER = 1;
@@ -61,4 +65,4 @@ class PrhWorkflowSchedulingIntegrationTest {
return null;
};
}
-} \ No newline at end of file
+}
diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/commit/KafkaConsumerTaskImplTest.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/commit/KafkaConsumerTaskImplTest.java
index c23a1886..f33ff43e 100644
--- a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/commit/KafkaConsumerTaskImplTest.java
+++ b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/commit/KafkaConsumerTaskImplTest.java
@@ -18,30 +18,35 @@
* ============LICENSE_END=========================================================
*/
-package org.onap.dcaegen2.services.prh.tasks;
+package org.onap.dcaegen2.services.prh.tasks.commit;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.record.TimestampType;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.onap.dcaegen2.services.prh.adapter.aai.api.ConsumerDmaapModel;
-import org.onap.dcaegen2.services.prh.adapter.aai.api.ImmutableConsumerDmaapModel;
+import org.onap.dcaegen2.services.prh.configuration.CbsConfigurationForAutoCommitDisabledMode;
import org.onap.dcaegen2.services.prh.service.DmaapConsumerJsonParser;
-import org.onap.dcaegen2.services.prh.tasks.commit.EpochDateTimeConversion;
-import org.onap.dcaegen2.services.prh.tasks.commit.KafkaConsumerTaskImpl;
-import org.springframework.boot.configurationprocessor.json.JSONException;
import org.springframework.kafka.support.Acknowledgment;
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonSyntaxException;
import reactor.core.publisher.Flux;
-
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
-
-import static org.mockito.Mockito.when;
+import static com.github.stefanbirkner.systemlambda.SystemLambda.withEnvironmentVariable;
+import static java.lang.ClassLoader.getSystemResource;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
@ExtendWith(MockitoExtension.class)
public class KafkaConsumerTaskImplTest {
@@ -49,38 +54,92 @@ public class KafkaConsumerTaskImplTest {
@Mock
private Acknowledgment acknowledgment;
- @Mock
+ private KafkaConsumerTaskImpl kafkaConsumerTask;
+
private DmaapConsumerJsonParser dmaapConsumerJsonParser;
- @Mock
private EpochDateTimeConversion epochDateTimeConversion;
- @InjectMocks
- private KafkaConsumerTaskImpl kafkaConsumerTask;
+ private CbsConfigurationForAutoCommitDisabledMode cbsConfigurationForAutoCommitDisabled;
+
+ private JsonObject cbsConfigJsonForAutoCommitDisabled;
+
+ @BeforeEach
+ void beforeEach() throws JsonSyntaxException, IOException, URISyntaxException {
+ cbsConfigJsonForAutoCommitDisabled = new Gson().fromJson(
+ new String(Files.readAllBytes(
+ Paths.get(getSystemResource("autoCommitDisabledConfigurationFromCbs2.json").toURI()))),
+ JsonObject.class);
+ cbsConfigurationForAutoCommitDisabled = new CbsConfigurationForAutoCommitDisabledMode();
+ dmaapConsumerJsonParser = new DmaapConsumerJsonParser();
+ epochDateTimeConversion = new EpochDateTimeConversion();
+
+ }
@Test
- public void onMessageTest(){
- List<ConsumerRecord<String, String>> list = new ArrayList<>();
- TimestampType timestampType = null;
- Headers headers = new RecordHeaders();
- epochDateTimeConversion.setDaysForRecords("3");
- ConsumerRecord<String, String> records = new ConsumerRecord<>
- ("test-topic", 1, 1l, 0l, timestampType, 1, 1, "test-key", "test-value", headers
- , null);
- list.add(records);
- kafkaConsumerTask.onMessage(list, acknowledgment);
+ void beforeOnMessageTest() throws Exception {
+ withEnvironmentVariable("JAAS_CONFIG", "jaas_config").execute(() -> {
+ cbsConfigurationForAutoCommitDisabled.parseCBSConfig(cbsConfigJsonForAutoCommitDisabled);
+ kafkaConsumerTask = new KafkaConsumerTaskImpl(cbsConfigurationForAutoCommitDisabled,
+ dmaapConsumerJsonParser, epochDateTimeConversion);
+ List<ConsumerRecord<String, String>> list = new ArrayList<>();
+ TimestampType timestampType = null;
+ Headers headers = new RecordHeaders();
+ epochDateTimeConversion.setDaysForRecords("3");
+ ConsumerRecord<String, String> records = new ConsumerRecord<>("test-topic", 1, 1l, 0l, timestampType, 1, 1,
+ "test-key", "test-value", headers, null);
+ list.add(records);
+ kafkaConsumerTask.onMessage(list, acknowledgment);
+ String actualTopicInList = list.get(0).topic();
+ String expectedTopicInList = "test-topic";
+ assertEquals(expectedTopicInList, actualTopicInList, "topic is not matching");
+ assertThat(kafkaConsumerTask.getOffset().equals(acknowledgment));
+ assertThat(kafkaConsumerTask.getJsonEvent().contains("test-topic"));
+ });
}
@Test
- public void commitOffsetTest(){
- kafkaConsumerTask.commitOffset();
+ void beforeCommitOffsetTest() throws Exception {
+ withEnvironmentVariable("JAAS_CONFIG", "jaas_config").execute(() -> {
+ cbsConfigurationForAutoCommitDisabled.parseCBSConfig(cbsConfigJsonForAutoCommitDisabled);
+ kafkaConsumerTask = new KafkaConsumerTaskImpl(cbsConfigurationForAutoCommitDisabled,
+ dmaapConsumerJsonParser, epochDateTimeConversion);
+ kafkaConsumerTask.commitOffset();
+ });
}
@Test
- public void executeTest() throws JSONException {
- List<String> jsonEvent = new ArrayList<>();
- ConsumerDmaapModel consumerDmaapModel = ImmutableConsumerDmaapModel.builder().correlationId("123").build();
- when(dmaapConsumerJsonParser.getConsumerDmaapModelFromKafkaConsumerRecord(jsonEvent)).thenReturn(Flux.just(consumerDmaapModel));
- kafkaConsumerTask.execute();
+ void beforeExecuteTest() throws Exception {
+ withEnvironmentVariable("JAAS_CONFIG", "jaas_config").execute(() -> {
+ cbsConfigurationForAutoCommitDisabled.parseCBSConfig(cbsConfigJsonForAutoCommitDisabled);
+ kafkaConsumerTask = new KafkaConsumerTaskImpl(cbsConfigurationForAutoCommitDisabled,
+ dmaapConsumerJsonParser, epochDateTimeConversion);
+ String event = getResourceContent("integration/event.json");
+ java.util.List<String> eventList = new ArrayList<>();
+ eventList.add(event);
+ kafkaConsumerTask.setJsonEvent(eventList);
+ Flux<ConsumerDmaapModel> flux = kafkaConsumerTask.execute();
+ String expectedSourceName = "NOK6061ZW8";
+ String actualSourceName = flux.blockFirst().getCorrelationId();
+
+ String expectedOamV4IpAddress = "val3";
+ String actualOamV4IpAddress = flux.blockFirst().getIpv4();
+
+ String expectedOamV6IpAddress = "val4";
+ String actualOamV6IpAddress = flux.blockFirst().getIpv6();
+
+ assertEquals(expectedSourceName, actualSourceName, "SourceName is not matching");
+ assertEquals(expectedOamV4IpAddress, actualOamV4IpAddress, "OamV4IpAddress is not matching");
+ assertEquals(expectedOamV6IpAddress, actualOamV6IpAddress, "OamV6IpAddress is not matching");
+ });
+ }
+
+ private static String getResourceContent(String resourceName) {
+ try {
+ return new String(Files.readAllBytes(Paths.get(getSystemResource(resourceName).toURI())));
+ } catch (Exception e) {
+ throw new RuntimeException("failed loading content of '" + resourceName + "'", e);
+ }
}
+
}