aboutsummaryrefslogtreecommitdiffstats
path: root/prh-app-server/src/test/java/org/onap/dcaegen2/services
diff options
context:
space:
mode:
Diffstat (limited to 'prh-app-server/src/test/java/org/onap/dcaegen2/services')
-rw-r--r--prh-app-server/src/test/java/org/onap/dcaegen2/services/bootstrap/CbsClientConfigurationResolverTest.java11
-rw-r--r--prh-app-server/src/test/java/org/onap/dcaegen2/services/bootstrap/CbsPropertySourceLocatorForAutoCommitDisabledTest.java112
-rw-r--r--prh-app-server/src/test/java/org/onap/dcaegen2/services/bootstrap/CbsPropertySourceLocatorTest.java61
-rw-r--r--prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/configuration/CbsConfigurationForAutoCommitDisabledModeTest.java111
-rw-r--r--prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/configuration/CbsConfigurationTest.java36
-rw-r--r--prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/configuration/KafkaConfigTest.java140
-rw-r--r--prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/controllers/AppInfoControllerTest.java24
-rw-r--r--prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/controllers/ScheduleControllerForAutoCommitDisabledTest.java96
-rw-r--r--prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/controllers/ScheduleControllerTest.java10
-rw-r--r--prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/integration/PrhWorkflowIntegrationForAutoCommitDisabledTest.java225
-rw-r--r--prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/integration/PrhWorkflowIntegrationTest.java97
-rw-r--r--prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/integration/PrhWorkflowSchedulingIntegrationForAutoCommitDisabledTest.java100
-rw-r--r--prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/integration/PrhWorkflowSchedulingIntegrationTest.java8
-rw-r--r--prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParserTest.java28
-rw-r--r--prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/AaiQueryTaskImplTest.java24
-rw-r--r--prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/commit/EpochDateTimeConversionTest.java52
-rw-r--r--prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/commit/KafkaConsumerTaskImplTest.java151
-rw-r--r--prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/commit/ScheduledTasksRunnerWithCommitTest.java72
-rw-r--r--prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/commit/ScheduledTasksWithCommitTest.java263
19 files changed, 1515 insertions, 106 deletions
diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/bootstrap/CbsClientConfigurationResolverTest.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/bootstrap/CbsClientConfigurationResolverTest.java
index 87dd18ca..ca1413a0 100644
--- a/prh-app-server/src/test/java/org/onap/dcaegen2/services/bootstrap/CbsClientConfigurationResolverTest.java
+++ b/prh-app-server/src/test/java/org/onap/dcaegen2/services/bootstrap/CbsClientConfigurationResolverTest.java
@@ -3,6 +3,7 @@
* PNF-REGISTRATION-HANDLER
* ================================================================================
* Copyright (C) 2019 NOKIA Intellectual Property. All rights reserved.
+ * Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -24,7 +25,6 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledIfEnvironmentVariable;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsClientConfiguration;
-
import static org.assertj.core.api.Assertions.assertThat;
class CbsClientConfigurationResolverTest {
@@ -34,18 +34,15 @@ class CbsClientConfigurationResolverTest {
@BeforeEach
void setUp() {
cbsProperties = new CbsProperties();
- cbsProperties.setHostname("some-cbs-host");
- cbsProperties.setPort(123);
cbsProperties.setAppName("client-app-name");
}
@Test
@DisabledIfEnvironmentVariable(named = "CONFIG_BINDING_SERVICE", matches = ".+")
void whenCbsEnvPropertiesAreNotePresentInEnvironment_ShouldFallbackToLoadingDefaultsFromCbsProperties() {
- CbsClientConfiguration config = new CbsClientConfigurationResolver(cbsProperties).resolveCbsClientConfiguration();
+ CbsClientConfiguration config = new CbsClientConfigurationResolver(cbsProperties)
+ .resolveCbsClientConfiguration();
- assertThat(config.hostname()).isEqualTo(cbsProperties.getHostname());
- assertThat(config.port()).isEqualTo(cbsProperties.getPort());
assertThat(config.appName()).isEqualTo(cbsProperties.getAppName());
}
-} \ No newline at end of file
+}
diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/bootstrap/CbsPropertySourceLocatorForAutoCommitDisabledTest.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/bootstrap/CbsPropertySourceLocatorForAutoCommitDisabledTest.java
new file mode 100644
index 00000000..f5863ac5
--- /dev/null
+++ b/prh-app-server/src/test/java/org/onap/dcaegen2/services/bootstrap/CbsPropertySourceLocatorForAutoCommitDisabledTest.java
@@ -0,0 +1,112 @@
+/*
+ * ============LICENSE_START=======================================================
+ * PNF-REGISTRATION-HANDLER
+ * ================================================================================
+ * Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcaegen2.services.bootstrap;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import java.util.Map;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.onap.dcaegen2.services.prh.configuration.CbsConfiguration;
+import org.onap.dcaegen2.services.prh.configuration.CbsConfigurationForAutoCommitDisabledMode;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.RequestPath;
+import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
+import org.springframework.core.env.Environment;
+import org.springframework.test.context.ActiveProfiles;
+import com.google.common.collect.ImmutableMap;
+import com.google.gson.JsonObject;
+import reactor.core.publisher.Mono;
+import reactor.test.scheduler.VirtualTimeScheduler;
+
+/**
+ * * @author <a href="mailto:PRANIT.KAPDULE@t-systems.com">Pranit Kapdule</a> on
+ * * 24/08/23
+ * */
+
+@ExtendWith(MockitoExtension.class)
+@ActiveProfiles(value = "autoCommitDisabled")
+public class CbsPropertySourceLocatorForAutoCommitDisabledTest {
+ private static final RequestPath GET_ALL_REQUEST_PATH = CbsRequests.getAll(RequestDiagnosticContext.create())
+ .requestPath();
+
+ private CbsProperties cbsProperties = new CbsProperties();
+ @Mock
+ private CbsJsonToPropertyMapConverter cbsJsonToPropertyMapConverter;
+ @Mock
+ private CbsClientConfigurationResolver cbsClientConfigurationResolver;
+ @Mock
+ private CbsClientFactoryFacade cbsClientFactoryFacade;
+ @Mock
+ private CbsConfiguration cbsConfiguration;
+ @Mock
+ private CbsConfigurationForAutoCommitDisabledMode cbsConfigurationForAutoCommitDisabledMode;
+ @Mock
+ private Environment environment;
+ @Mock
+ private CbsClient cbsClient;
+ @Mock
+ private JsonObject cbsConfigJsonObject;
+
+ private Map<String, Object> cbsConfigMap = ImmutableMap.of("foo", "bar");
+
+ private VirtualTimeScheduler virtualTimeScheduler;
+
+ private CbsPropertySourceLocatorForAutoCommitDisabled cbsPropertySourceLocatorACDM;
+
+ @BeforeEach
+ void setup() {
+ virtualTimeScheduler = VirtualTimeScheduler.getOrSet(true);
+
+ cbsPropertySourceLocatorACDM = new CbsPropertySourceLocatorForAutoCommitDisabled(cbsProperties,
+ cbsJsonToPropertyMapConverter, cbsClientConfigurationResolver, cbsClientFactoryFacade,
+ cbsConfigurationForAutoCommitDisabledMode);
+
+ }
+
+ @AfterEach
+ void cleanup() {
+ virtualTimeScheduler.dispose();
+ }
+
+ @Test
+ void cbsProperySourceLocatorForAutoCommitDisabledTest() throws Exception {
+
+ Mono<CbsClient> just = Mono.just(cbsClient);
+ when(cbsClientFactoryFacade.createCbsClient(any())).thenReturn(just);
+ when(cbsClient.get(argThat(request -> request.requestPath().equals(GET_ALL_REQUEST_PATH))))
+ .thenReturn(Mono.just(cbsConfigJsonObject));
+ when(cbsJsonToPropertyMapConverter.convertToMap(cbsConfigJsonObject)).thenReturn(cbsConfigMap);
+
+ cbsPropertySourceLocatorACDM.locate(environment);
+
+ verify(cbsConfigurationForAutoCommitDisabledMode).parseCBSConfig(cbsConfigJsonObject);
+
+
+ }
+
+}
diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/bootstrap/CbsPropertySourceLocatorTest.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/bootstrap/CbsPropertySourceLocatorTest.java
index 22a11ed6..c1e938e5 100644
--- a/prh-app-server/src/test/java/org/onap/dcaegen2/services/bootstrap/CbsPropertySourceLocatorTest.java
+++ b/prh-app-server/src/test/java/org/onap/dcaegen2/services/bootstrap/CbsPropertySourceLocatorTest.java
@@ -3,6 +3,7 @@
* PNF-REGISTRATION-HANDLER
* ================================================================================
* Copyright (C) 2019-2021 NOKIA Intellectual Property. All rights reserved.
+ * Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -29,6 +30,7 @@ import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.onap.dcaegen2.services.prh.configuration.CbsConfiguration;
+import org.onap.dcaegen2.services.prh.configuration.CbsConfigurationForAutoCommitDisabledMode;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests;
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsClientConfiguration;
@@ -39,20 +41,21 @@ import org.springframework.core.env.Environment;
import org.springframework.core.env.PropertySource;
import reactor.core.publisher.Mono;
import reactor.test.scheduler.VirtualTimeScheduler;
-
import java.util.Map;
-
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Assumptions.assumeThat;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.argThat;
-import static org.mockito.Mockito.*;
-
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.doThrow;
@ExtendWith(MockitoExtension.class)
class CbsPropertySourceLocatorTest {
- private static final RequestPath GET_ALL_REQUEST_PATH = CbsRequests.getAll(RequestDiagnosticContext.create()).requestPath();
+ private static final RequestPath GET_ALL_REQUEST_PATH = CbsRequests.getAll(RequestDiagnosticContext.create())
+ .requestPath();
private CbsProperties cbsProperties = new CbsProperties();
@Mock
@@ -66,28 +69,24 @@ class CbsPropertySourceLocatorTest {
@Mock
private CbsConfiguration cbsConfiguration;
@Mock
+ private CbsConfigurationForAutoCommitDisabledMode cbsConfigurationForAutoCommitDisabledMode;
+ @Mock
private Environment environment;
@Mock
private CbsClient cbsClient;
@Mock
private JsonObject cbsConfigJsonObject;
private Map<String, Object> cbsConfigMap = ImmutableMap.of("foo", "bar");
-
private VirtualTimeScheduler virtualTimeScheduler;
-
private CbsPropertySourceLocator cbsPropertySourceLocator;
-
@BeforeEach
void setup() {
virtualTimeScheduler = VirtualTimeScheduler.getOrSet(true);
- when(cbsClientConfigurationResolver.resolveCbsClientConfiguration()).thenReturn(cbsClientConfiguration);
- when(cbsClientFactoryFacade.createCbsClient(cbsClientConfiguration)).thenReturn(Mono.just(cbsClient));
+ cbsPropertySourceLocator = new CbsPropertySourceLocator(cbsProperties, cbsJsonToPropertyMapConverter,
+ cbsClientConfigurationResolver, cbsClientFactoryFacade, cbsConfiguration);
- cbsPropertySourceLocator = new CbsPropertySourceLocator(
- cbsProperties, cbsJsonToPropertyMapConverter, cbsClientConfigurationResolver,
- cbsClientFactoryFacade, cbsConfiguration);
}
@AfterEach
@@ -95,9 +94,10 @@ class CbsPropertySourceLocatorTest {
virtualTimeScheduler.dispose();
}
-
@Test
void shouldBuildCbsPropertySourceBasedOnDataFetchedUsingCbsClient() {
+ Mono<CbsClient> just = Mono.just(cbsClient);
+ when(cbsClientFactoryFacade.createCbsClient(any())).thenReturn(just);
when(cbsClient.get(argThat(request -> request.requestPath().equals(GET_ALL_REQUEST_PATH))))
.thenReturn(Mono.just(cbsConfigJsonObject));
when(cbsJsonToPropertyMapConverter.convertToMap(cbsConfigJsonObject)).thenReturn(cbsConfigMap);
@@ -108,9 +108,10 @@ class CbsPropertySourceLocatorTest {
assertThat(propertySource).extracting(s -> s.getProperty("foo")).isEqualTo("bar");
}
-
@Test
void shouldUpdateCbsConfigurationStateBasedOnDataFetchedUsingCbsClient() {
+ Mono<CbsClient> just = Mono.just(cbsClient);
+ when(cbsClientFactoryFacade.createCbsClient(any())).thenReturn(just);
when(cbsClient.get(argThat(request -> request.requestPath().equals(GET_ALL_REQUEST_PATH))))
.thenReturn(Mono.just(cbsConfigJsonObject));
when(cbsJsonToPropertyMapConverter.convertToMap(cbsConfigJsonObject)).thenReturn(cbsConfigMap);
@@ -120,9 +121,10 @@ class CbsPropertySourceLocatorTest {
verify(cbsConfiguration).parseCBSConfig(cbsConfigJsonObject);
}
-
@Test
void shouldPropagateExceptionWhenCbsConfigurationParsingFails() {
+ Mono<CbsClient> just = Mono.just(cbsClient);
+ when(cbsClientFactoryFacade.createCbsClient(any())).thenReturn(just);
when(cbsClient.get(any(CbsRequest.class))).thenReturn(Mono.just(cbsConfigJsonObject));
RuntimeException someCbsConfigParsingException = new RuntimeException("boom!");
@@ -134,13 +136,13 @@ class CbsPropertySourceLocatorTest {
@Test
void shouldRetryFetchingConfigFromCbsInCaseOfFailure() {
+ Mono<CbsClient> just = Mono.just(cbsClient);
+ when(cbsClientFactoryFacade.createCbsClient(any())).thenReturn(just);
assumeThat(cbsProperties.getFetchRetries().getMaxAttempts()).isGreaterThan(1);
- when(cbsClient.get(any(CbsRequest.class)))
- .thenReturn(Mono.defer(() -> {
- virtualTimeScheduler.advanceTimeBy(cbsProperties.getFetchRetries().getMaxBackoff());
- return Mono.error(new RuntimeException("some connection failure"));
- }))
- .thenReturn(Mono.just(cbsConfigJsonObject));
+ when(cbsClient.get(any(CbsRequest.class))).thenReturn(Mono.defer(() -> {
+ virtualTimeScheduler.advanceTimeBy(cbsProperties.getFetchRetries().getMaxBackoff());
+ return Mono.error(new RuntimeException("some connection failure"));
+ })).thenReturn(Mono.just(cbsConfigJsonObject));
when(cbsJsonToPropertyMapConverter.convertToMap(cbsConfigJsonObject)).thenReturn(cbsConfigMap);
PropertySource<?> propertySource = cbsPropertySourceLocator.locate(environment);
@@ -150,15 +152,16 @@ class CbsPropertySourceLocatorTest {
@Test
void shouldFailAfterExhaustingAllOfConfiguredRetryAttempts() {
+ Mono<CbsClient> just = Mono.just(cbsClient);
+ when(cbsClientFactoryFacade.createCbsClient(any())).thenReturn(just);
assumeThat(cbsProperties.getFetchRetries().getMaxAttempts()).isGreaterThan(1);
- when(cbsClient.get(any(CbsRequest.class)))
- .thenReturn(Mono.defer(() -> {
- virtualTimeScheduler.advanceTimeBy(cbsProperties.getFetchRetries().getMaxBackoff());
- return Mono.error(new RuntimeException("some connection failure"));
- }));
+ when(cbsClient.get(any(CbsRequest.class))).thenReturn(Mono.defer(() -> {
+ virtualTimeScheduler.advanceTimeBy(cbsProperties.getFetchRetries().getMaxBackoff());
+ return Mono.error(new RuntimeException("some connection failure"));
+ }));
- assertThatThrownBy(() -> cbsPropertySourceLocator.locate(environment))
- .hasMessageContaining("Retries exhausted")
+ assertThatThrownBy(() -> cbsPropertySourceLocator.locate(environment)).hasMessageContaining("Retries exhausted")
.hasMessageContaining(cbsProperties.getFetchRetries().getMaxAttempts().toString());
}
+
}
diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/configuration/CbsConfigurationForAutoCommitDisabledModeTest.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/configuration/CbsConfigurationForAutoCommitDisabledModeTest.java
new file mode 100644
index 00000000..80a0007f
--- /dev/null
+++ b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/configuration/CbsConfigurationForAutoCommitDisabledModeTest.java
@@ -0,0 +1,111 @@
+/*
+ * ============LICENSE_START=======================================================
+ * PNF-REGISTRATION-HANDLER
+ * ================================================================================
+ * Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcaegen2.services.prh.configuration;
+
+import static com.github.stefanbirkner.systemlambda.SystemLambda.withEnvironmentVariable;
+import static java.lang.ClassLoader.getSystemResource;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import org.junit.jupiter.api.Test;
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+
+/**
+ * * @author <a href="mailto:PRANIT.KAPDULE@t-systems.com">Pranit Kapdule</a> on
+ * * 24/08/23
+ * */
+
+public class CbsConfigurationForAutoCommitDisabledModeTest {
+
+ /**
+ * Testcase is used to check correctness of values provided by
+ * autoCommitDisabledConfigurationFromCbs2.json
+ */
+
+ @Test
+ void beforecbsConfigurationForAutoCommitDisabledMode() throws Exception {
+ withEnvironmentVariable("JAAS_CONFIG", "jaas_config")
+ .and("BOOTSTRAP_SERVERS", "localhost:9092")
+ .execute(() -> {
+ this.cbsConfigurationForAutoCommitDisabledMode();
+ });
+ }
+
+ void cbsConfigurationForAutoCommitDisabledMode() throws Exception {
+
+ JsonObject cbsConfigJsonForAutoCommitDisabled = new Gson().fromJson(
+ new String(Files.readAllBytes(
+ Paths.get(getSystemResource("autoCommitDisabledConfigurationFromCbs2.json").toURI()))),
+ JsonObject.class);
+ CbsConfigurationForAutoCommitDisabledMode cbsConfigurationForAutoCommitDisabled = new CbsConfigurationForAutoCommitDisabledMode();
+
+ cbsConfigurationForAutoCommitDisabled.parseCBSConfig(cbsConfigJsonForAutoCommitDisabled);
+
+ String expectedKafKaBoostrapServerConfig = "onap-strimzi-kafka-bootstrap:9092";
+ String actualKafkaBoostrapServerConfig = (cbsConfigurationForAutoCommitDisabled.getKafkaConfig()
+ .kafkaBoostrapServerConfig());
+
+ String expectedGroupIdConfig = "OpenDCAE-c12";
+ String actualGroupIdConfig = (cbsConfigurationForAutoCommitDisabled.getKafkaConfig().groupIdConfig());
+
+ String expectedKafkaSecurityProtocol = "SASL_PLAINTEXT";
+ String actualKafkaSecurityProtocol = (cbsConfigurationForAutoCommitDisabled.getKafkaConfig()
+ .kafkaSecurityProtocol());
+
+ String expectedKafkaSaslMechanism = "SCRAM-SHA-512";
+ String actualKafkaSaslMechanism = (cbsConfigurationForAutoCommitDisabled.getKafkaConfig().kafkaSaslMechanism());
+
+ String expectedKafkaJaasConfig = "jaas_config";
+ String actualKafkaJaasConfig = (cbsConfigurationForAutoCommitDisabled.getKafkaConfig().kafkaJaasConfig());
+
+ String expectedAaiUserName = "AAI";
+ String actualAaiUserName = (cbsConfigurationForAutoCommitDisabled.getAaiClientConfiguration().aaiUserName());
+
+ String expectedConsumerGroup = "OpenDCAE-c12";
+ String actualConsumerGroup = (cbsConfigurationForAutoCommitDisabled.getMessageRouterSubscribeRequest()
+ .consumerGroup());
+
+ assertEquals(expectedKafKaBoostrapServerConfig, actualKafkaBoostrapServerConfig,
+ "Expected value of KafKaBoostrapServerConfig is not matching with actual value");
+ assertEquals(expectedGroupIdConfig, actualGroupIdConfig,
+ "Expected value of GroupIdConfig is not matching with actual value");
+ assertEquals(expectedKafkaSecurityProtocol, actualKafkaSecurityProtocol,
+ "Expected value of KafkaSecurityProtocol is not matching with actual value");
+ assertEquals(expectedKafkaSaslMechanism, actualKafkaSaslMechanism,
+ "Expected value of KafkaSaslMechanism is not matching with actual value");
+ assertEquals(expectedKafkaJaasConfig, actualKafkaJaasConfig,
+ "Expected value of KafkaJaasConfig is not matching with actual value");
+ assertEquals(expectedAaiUserName, actualAaiUserName,
+ "Expected value of AaiUserName is not matching with actual value");
+ assertEquals(expectedConsumerGroup, actualConsumerGroup,
+ "Expected value of ConsumerGroup is not matching with actual value");
+
+ assertThat((cbsConfigurationForAutoCommitDisabled).getAaiClientConfiguration()).isNotNull();
+ assertThat((cbsConfigurationForAutoCommitDisabled).getMessageRouterPublisher()).isNotNull();
+ assertThat((cbsConfigurationForAutoCommitDisabled).getMessageRouterSubscriber()).isNotNull();
+ assertThat((cbsConfigurationForAutoCommitDisabled).getMessageRouterPublishRequest()).isNotNull();
+ assertThat((cbsConfigurationForAutoCommitDisabled).getMessageRouterSubscribeRequest()).isNotNull();
+ assertThat((cbsConfigurationForAutoCommitDisabled).getMessageRouterUpdatePublishRequest()).isNotNull();
+
+ }
+
+}
diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/configuration/CbsConfigurationTest.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/configuration/CbsConfigurationTest.java
index 7f5d26fc..8cd7d5e8 100644
--- a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/configuration/CbsConfigurationTest.java
+++ b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/configuration/CbsConfigurationTest.java
@@ -3,6 +3,7 @@
* PNF-REGISTRATION-HANDLER
* ================================================================================
* Copyright (C) 2019 NOKIA Intellectual Property. All rights reserved.
+ * Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -23,10 +24,10 @@ package org.onap.dcaegen2.services.prh.configuration;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import org.junit.jupiter.api.Test;
-
import java.nio.file.Files;
import java.nio.file.Paths;
+import static com.github.stefanbirkner.systemlambda.SystemLambda.withEnvironmentVariable;
import static java.lang.ClassLoader.getSystemResource;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -51,20 +52,29 @@ class CbsConfigurationTest {
.hasMessage(EXPECTED_ERROR_MESSAGE_WHEN_CBS_CONFIG_IS_NOT_INITIALIZED);
}
-
@Test
void cbsConfigurationShouldExposeDataReceivedAsJsonFromCbs() throws Exception {
- JsonObject cbsConfigJson = new Gson().fromJson(new String(Files.readAllBytes(Paths.get(
- getSystemResource("configurationFromCbs.json").toURI()))), JsonObject.class);
- CbsConfiguration cbsConfiguration = new CbsConfiguration();
+
+ withEnvironmentVariable("JAAS_CONFIG", "jaas_config")
+ .and("BOOTSTRAP_SERVERS", "localhost:9092")
+ .execute(() -> {
+ JsonObject cbsConfigJson = new Gson().fromJson(
+ new String(Files.readAllBytes(Paths.get(getSystemResource("configurationFromCbs.json").toURI()))),
+ JsonObject.class);
+ CbsConfiguration cbsConfiguration = new CbsConfiguration();
- cbsConfiguration.parseCBSConfig(cbsConfigJson);
+ cbsConfiguration.parseCBSConfig(cbsConfigJson);
+
+ assertThat(cbsConfiguration.getAaiClientConfiguration()).isNotNull();
+ assertThat(cbsConfiguration.getMessageRouterPublisher()).isNotNull();
+ assertThat(cbsConfiguration.getMessageRouterSubscriber()).isNotNull();
+ assertThat(cbsConfiguration.getMessageRouterPublishRequest()).isNotNull();
+ assertThat(cbsConfiguration.getMessageRouterSubscribeRequest()).isNotNull();
+ assertThat(cbsConfiguration.getMessageRouterUpdatePublishRequest()).isNotNull();
+ });
+
- assertThat(cbsConfiguration.getAaiClientConfiguration()).isNotNull();
- assertThat(cbsConfiguration.getMessageRouterPublisher()).isNotNull();
- assertThat(cbsConfiguration.getMessageRouterSubscriber()).isNotNull();
- assertThat(cbsConfiguration.getMessageRouterPublishRequest()).isNotNull();
- assertThat(cbsConfiguration.getMessageRouterSubscribeRequest()).isNotNull();
- assertThat(cbsConfiguration.getMessageRouterUpdatePublishRequest()).isNotNull();
}
-} \ No newline at end of file
+
+
+}
diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/configuration/KafkaConfigTest.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/configuration/KafkaConfigTest.java
new file mode 100644
index 00000000..b9a05a99
--- /dev/null
+++ b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/configuration/KafkaConfigTest.java
@@ -0,0 +1,140 @@
+/*
+ * ============LICENSE_START=======================================================
+ * PNF-REGISTRATION-HANDLER
+ * ================================================================================
+ * Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcaegen2.services.prh.configuration;
+
+import static com.github.stefanbirkner.systemlambda.SystemLambda.withEnvironmentVariable;
+import static java.lang.ClassLoader.getSystemResource;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.springframework.kafka.core.ConsumerFactory;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+
+@ExtendWith(MockitoExtension.class)
+public class KafkaConfigTest {
+
+ KafkaConfig kafkaConfig = new KafkaConfig();
+
+ CbsConfigurationForAutoCommitDisabledMode cbsConfigurationForAutoCommitDisabledMode = new
+ CbsConfigurationForAutoCommitDisabledMode();
+
+
+// @BeforeEach
+// void setUp() {
+// kafkaConfig.kafkaBoostrapServerConfig = "0.0.0.0";
+// kafkaConfig.groupIdConfig = "consumer-test";
+// kafkaConfig.kafkaSecurityProtocol = "test";
+// kafkaConfig.kafkaSaslMechanism = "test";
+// kafkaConfig.kafkaUsername = "test";
+// kafkaConfig.kafkaPassword = "test";
+// kafkaConfig.kafkaJaasConfig = null;
+// kafkaConfig.kafkaLoginModuleClassConfig = "test";
+// kafkaConfig.kafkaJaasConfig = "test";
+// }
+
+ @Test
+ void beforecbsConfigurationForAutoCommitDisabledMode() throws Exception {
+ withEnvironmentVariable("JAAS_CONFIG", "jaas_config")
+ .and("BOOTSTRAP_SERVERS", "localhost:9092")
+ .execute(() -> {
+ this.consumerFactoryTest();
+ });
+ }
+
+ void consumerFactoryTest() throws Exception {
+ JsonObject cbsConfigJsonForAutoCommitDisabled = new Gson().fromJson(
+ new String(Files.readAllBytes(
+ Paths.get(getSystemResource("autoCommitDisabledConfigurationFromCbs2.json").toURI()))),
+ JsonObject.class);
+ cbsConfigurationForAutoCommitDisabledMode.parseCBSConfig(cbsConfigJsonForAutoCommitDisabled);
+ ConsumerFactory<String, String> consumerFactory = kafkaConfig
+ .consumerFactory(cbsConfigurationForAutoCommitDisabledMode);
+
+ String expectedKafkaBoostrapServerConfig = "onap-strimzi-kafka-bootstrap:9092";
+ String actualKafkaBoostrapServerConfig = consumerFactory.getConfigurationProperties().get("bootstrap.servers")
+ .toString();
+
+ String expectedGroupIdConfig = "OpenDCAE-c12";
+ String actualGroupIdConfig = consumerFactory.getConfigurationProperties().get("group.id").toString();
+
+ String expectedKafkaSecurityProtocol = "SASL_PLAINTEXT";
+ String actualKafkaSecurityProtocol = consumerFactory.getConfigurationProperties().get("security.protocol")
+ .toString();
+
+ String expectedKafkaSaslMechanism = "SCRAM-SHA-512";
+ String actualKafkaSaslMechanism = consumerFactory.getConfigurationProperties().get("sasl.mechanism").toString();
+
+ String expectedKafkaJaasConfig = "jaas_config";
+ String actualKafkaJaasConfig = consumerFactory.getConfigurationProperties().get("sasl.jaas.config").toString();
+
+ String expectedKeyDeserializer = "org.apache.kafka.common.serialization.StringDeserializer";
+ String actualKeyDeserializer = consumerFactory.getConfigurationProperties().get("key.deserializer").toString();
+
+ String expectedValueDeserializer = "org.apache.kafka.common.serialization.StringDeserializer";
+ String actualValueDeserializer = consumerFactory.getConfigurationProperties().get("value.deserializer")
+ .toString();
+
+ String expectedEnableAutoCommit = "false";
+ String actualEnableAutoCommit = consumerFactory.getConfigurationProperties().get("enable.auto.commit")
+ .toString();
+
+ assertEquals(expectedKafkaBoostrapServerConfig, actualKafkaBoostrapServerConfig,
+ "Expected value of KafKaBoostrapServerConfig is not matching with actual value");
+ assertEquals(expectedGroupIdConfig, actualGroupIdConfig,
+ "Expected value of GroupIdConfig is not matching with actual value");
+ assertEquals(expectedKafkaSecurityProtocol, actualKafkaSecurityProtocol,
+ "Expected value of KafkaSecurityProtocol is not matching with actual value");
+ assertEquals(expectedKafkaSaslMechanism, actualKafkaSaslMechanism,
+ "Expected value of KafkaSaslMechanism is not matching with actual value");
+ assertEquals(expectedKafkaJaasConfig, actualKafkaJaasConfig,
+ "Expected value of KafkaJaasConfig is not matching with actual value");
+ assertEquals(expectedKeyDeserializer, actualKeyDeserializer,
+ "Expected value of KeyDeserializer is not matching with actual value");
+ assertEquals(expectedValueDeserializer, actualValueDeserializer,
+ "Expected value of ValueDeserializer is not matching with actual value");
+ assertEquals(expectedEnableAutoCommit, actualEnableAutoCommit,
+ "Expected value of EnableAutoCommit is not matching with actual value");
+
+ }
+
+ @Test
+ void beforeKafkaListenerContainerFactoryTest() throws Exception {
+ withEnvironmentVariable("JAAS_CONFIG", "jaas_config")
+ .and("BOOTSTRAP_SERVERS", "localhost:9092")
+ .execute(() -> {
+ this.kafkaListenerContainerFactoryTest();
+ });
+ }
+
+ public void kafkaListenerContainerFactoryTest() throws Exception {
+ JsonObject cbsConfigJsonForAutoCommitDisabled = new Gson().fromJson(
+ new String(Files.readAllBytes(
+ Paths.get(getSystemResource("autoCommitDisabledConfigurationFromCbs2.json").toURI()))),
+ JsonObject.class);
+ cbsConfigurationForAutoCommitDisabledMode.parseCBSConfig(cbsConfigJsonForAutoCommitDisabled);
+ kafkaConfig.kafkaListenerContainerFactory(cbsConfigurationForAutoCommitDisabledMode);
+ }
+}
diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/controllers/AppInfoControllerTest.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/controllers/AppInfoControllerTest.java
index 1dba66a1..40b42480 100644
--- a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/controllers/AppInfoControllerTest.java
+++ b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/controllers/AppInfoControllerTest.java
@@ -3,6 +3,7 @@
* PNF-REGISTRATION-HANDLER
* ================================================================================
* Copyright (C) 2019 NOKIA Intellectual Property. All rights reserved.
+ * Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,19 +22,28 @@
package org.onap.dcaegen2.services.prh.controllers;
import org.junit.jupiter.api.Test;
+import org.onap.dcaegen2.services.prh.configuration.CbsConfigurationForAutoCommitDisabledMode;
+import org.onap.dcaegen2.services.prh.configuration.KafkaConfig;
import org.onap.dcaegen2.services.prh.configuration.PrhAppConfig;
+import org.onap.dcaegen2.services.prh.tasks.DmaapConsumerTaskImpl;
+import org.onap.dcaegen2.services.prh.tasks.ScheduledTasks;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.MockBean;
+import org.springframework.context.annotation.Profile;
import org.springframework.core.io.ByteArrayResource;
import org.springframework.http.MediaType;
+import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
+import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.test.annotation.DirtiesContext;
+import org.springframework.test.annotation.IfProfileValue;
+import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.web.reactive.server.WebTestClient;
-
import static org.mockito.Mockito.when;
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@DirtiesContext
+@ActiveProfiles(value = "prod")
class AppInfoControllerTest {
private static final String SAMPLE_GIT_INFO_CONTENT = "{ \"git.commit.id\" : \"37444e\" }";
@@ -44,17 +54,15 @@ class AppInfoControllerTest {
@Autowired
private WebTestClient webTestClient;
+ @MockBean
+ private ScheduledTasks scheduledTasks;
+
@Test
void shouldProvideHeartbeatResponse() {
- webTestClient
- .get().uri("/heartbeat")
- .accept(MediaType.TEXT_PLAIN)
- .exchange()
- .expectStatus().isOk()
+ webTestClient.get().uri("/heartbeat").accept(MediaType.TEXT_PLAIN).exchange().expectStatus().isOk()
.expectBody(String.class).isEqualTo("alive");
}
-
@Test
void shouldProvideVersionInfo() {
when(prhAppConfig.getGitInfo()).thenReturn(new ByteArrayResource(SAMPLE_GIT_INFO_CONTENT.getBytes()));
@@ -66,4 +74,4 @@ class AppInfoControllerTest {
.expectStatus().isOk()
.expectBody(String.class).isEqualTo(SAMPLE_GIT_INFO_CONTENT);
}
-} \ No newline at end of file
+}
diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/controllers/ScheduleControllerForAutoCommitDisabledTest.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/controllers/ScheduleControllerForAutoCommitDisabledTest.java
new file mode 100644
index 00000000..43dcadf9
--- /dev/null
+++ b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/controllers/ScheduleControllerForAutoCommitDisabledTest.java
@@ -0,0 +1,96 @@
+/*
+ * ============LICENSE_START=======================================================
+ * PNF-REGISTRATION-HANDLER
+ * ================================================================================
+ * Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.prh.controllers;
+
+import org.junit.Ignore;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.onap.dcaegen2.services.prh.configuration.KafkaConfig;
+import org.onap.dcaegen2.services.prh.tasks.commit.KafkaConsumerTaskImpl;
+import org.onap.dcaegen2.services.prh.tasks.commit.ScheduledTasksRunnerWithCommit;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.mock.mockito.MockBean;
+import org.springframework.http.HttpStatus;
+import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
+import org.springframework.kafka.core.ConsumerFactory;
+import org.springframework.test.annotation.DirtiesContext;
+import org.springframework.test.annotation.DirtiesContext.ClassMode;
+import org.springframework.test.context.ActiveProfiles;
+import org.springframework.test.web.reactive.server.WebTestClient;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
+@DirtiesContext(classMode = ClassMode.AFTER_CLASS)
+@ActiveProfiles(value = "autoCommitDisabled")
+class ScheduleControllerForAutoCommitDisabledTest {
+
+ @MockBean
+ private ScheduledTasksRunnerWithCommit scheduledTasksRunnerWithCommit;
+
+ @MockBean
+ private KafkaConfig kafkaConfig;
+
+ @MockBean
+ private ConsumerFactory<String, String> consumerFactory;
+
+ @MockBean
+ private ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory;
+
+ @MockBean
+ private KafkaConsumerTaskImpl kafkaConsumerTaskImpl;
+
+ @Autowired
+ private WebTestClient webTestClient;
+
+
+ @Test
+ void startEndpointShouldAllowStartingPrhTasks() {
+ when(scheduledTasksRunnerWithCommit.tryToStartTaskWithCommit()).thenReturn(true);
+ webTestClient
+ .get().uri("/start")
+ .exchange()
+ .expectStatus().isCreated()
+ .expectBody(String.class).isEqualTo("PRH Service has been started!");
+ }
+
+ @Test
+ void whenPrhTasksAreAlreadyStarted_shouldRespondThatRequestWasNotAccepted() {
+ when(scheduledTasksRunnerWithCommit.tryToStartTaskWithCommit()).thenReturn(false);
+ webTestClient
+ .get().uri("/start")
+ .exchange()
+ .expectStatus().isEqualTo(HttpStatus.NOT_ACCEPTABLE)
+ .expectBody(String.class).isEqualTo("PRH Service is already running!");
+ }
+
+ @Test
+ void stopEndpointShouldAllowStoppingPrhTasks() {
+ webTestClient
+ .get().uri("/stopPrh")
+ .exchange()
+ .expectStatus().isOk()
+ .expectBody(String.class).isEqualTo("PRH Service has been stopped!");
+
+ verify(scheduledTasksRunnerWithCommit).cancelTasks();
+ }
+}
diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/controllers/ScheduleControllerTest.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/controllers/ScheduleControllerTest.java
index ebdec09e..92a527b7 100644
--- a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/controllers/ScheduleControllerTest.java
+++ b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/controllers/ScheduleControllerTest.java
@@ -3,6 +3,7 @@
* PNF-REGISTRATION-HANDLER
* ================================================================================
* Copyright (C) 2019 NOKIA Intellectual Property. All rights reserved.
+ * Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -22,18 +23,21 @@ package org.onap.dcaegen2.services.prh.controllers;
import org.junit.jupiter.api.Test;
import org.onap.dcaegen2.services.prh.tasks.ScheduledTasksRunner;
+import org.onap.dcaegen2.services.prh.tasks.commit.ScheduledTasksRunnerWithCommit;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.http.HttpStatus;
import org.springframework.test.annotation.DirtiesContext;
+import org.springframework.test.annotation.IfProfileValue;
+import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.web.reactive.server.WebTestClient;
-
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@DirtiesContext
+@ActiveProfiles("prod")
class ScheduleControllerTest {
@MockBean
@@ -42,7 +46,7 @@ class ScheduleControllerTest {
@Autowired
private WebTestClient webTestClient;
- @Test
+ @Test
void startEndpointShouldAllowStartingPrhTasks() {
when(scheduledTasksRunner.tryToStartTask()).thenReturn(true);
webTestClient
@@ -72,4 +76,4 @@ class ScheduleControllerTest {
verify(scheduledTasksRunner).cancelTasks();
}
-} \ No newline at end of file
+}
diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/integration/PrhWorkflowIntegrationForAutoCommitDisabledTest.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/integration/PrhWorkflowIntegrationForAutoCommitDisabledTest.java
new file mode 100644
index 00000000..21f9d099
--- /dev/null
+++ b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/integration/PrhWorkflowIntegrationForAutoCommitDisabledTest.java
@@ -0,0 +1,225 @@
+/*
+ * ============LICENSE_START=======================================================
+ * PNF-REGISTRATION-HANDLER
+ * ================================================================================
+ * Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.prh.integration;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonMappingException;
+import com.github.tomakehurst.wiremock.client.WireMock;
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonPrimitive;
+import com.jayway.jsonpath.JsonPath;
+
+import io.vavr.collection.List;
+import reactor.core.publisher.Flux;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+import org.onap.dcaegen2.services.prh.MainApp;
+import org.onap.dcaegen2.services.prh.adapter.aai.api.ConsumerDmaapModel;
+import org.onap.dcaegen2.services.prh.adapter.kafka.ImmutableKafkaConfiguration;
+import org.onap.dcaegen2.services.prh.adapter.kafka.KafkaConfiguration;
+import org.onap.dcaegen2.services.prh.configuration.CbsConfiguration;
+import org.onap.dcaegen2.services.prh.configuration.CbsConfigurationForAutoCommitDisabledMode;
+import org.onap.dcaegen2.services.prh.service.DmaapConsumerJsonParser;
+import org.onap.dcaegen2.services.prh.tasks.commit.KafkaConsumerTaskImpl;
+import org.onap.dcaegen2.services.prh.tasks.commit.ScheduledTasksRunnerWithCommit;
+import org.onap.dcaegen2.services.prh.tasks.commit.ScheduledTasksWithCommit;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.configurationprocessor.json.JSONException;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.mock.mockito.MockBean;
+import org.springframework.boot.test.mock.mockito.SpyBean;
+import org.springframework.cloud.contract.wiremock.AutoConfigureWireMock;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Import;
+import org.springframework.test.context.ActiveProfiles;
+import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo;
+import static com.github.stefanbirkner.systemlambda.SystemLambda.withEnvironmentVariable;
+import static com.github.tomakehurst.wiremock.client.WireMock.urlPathMatching;
+import static com.github.tomakehurst.wiremock.client.WireMock.ok;
+import static com.github.tomakehurst.wiremock.client.WireMock.equalTo;
+import static com.github.tomakehurst.wiremock.client.WireMock.get;
+import static com.github.tomakehurst.wiremock.client.WireMock.post;
+import static com.github.tomakehurst.wiremock.client.WireMock.anyRequestedFor;
+import static com.github.tomakehurst.wiremock.client.WireMock.postRequestedFor;
+import static com.github.tomakehurst.wiremock.client.WireMock.matchingJsonPath;
+import static com.github.tomakehurst.wiremock.client.WireMock.stubFor;
+import static com.github.tomakehurst.wiremock.client.WireMock.verify;
+import static com.github.tomakehurst.wiremock.client.WireMock.patch;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import static java.lang.ClassLoader.getSystemResource;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * * @author <a href="mailto:PRANIT.KAPDULE@t-systems.com">Pranit Kapdule</a> on
+ * * 24/08/23
+ * */
+
+@SpringBootTest
+@AutoConfigureWireMock(port = 0)
+@ActiveProfiles(value = "autoCommitDisabled")
+class PrhWorkflowIntegrationForAutoCommitDisabledTest {
+
+ @Autowired
+ private ScheduledTasksWithCommit scheduledTasksWithCommit;
+
+ @MockBean
+ private ScheduledTasksRunnerWithCommit scheduledTasksRunnerWithCommit; // just to disable scheduling - some
+ // configurability in ScheduledTaskRunner not
+ // to start tasks at app startup would be
+ // welcome
+
+ @MockBean
+ private KafkaConsumerTaskImpl kafkaConsumerTaskImpl;
+
+ @Autowired
+ private DmaapConsumerJsonParser dmaapConsumerJsonParser;
+
+ @SpyBean
+ CbsConfiguration cbsConfiguration;
+
+ @Mock
+ MessageRouterPublisher publisher;
+
+ @Configuration
+ @Import(MainApp.class)
+ static class CbsConfigTestConfig {
+
+ @Value("http://localhost:${wiremock.server.port}")
+ private String wiremockServerAddress;
+
+ protected KafkaConfiguration kafkaConfiguration;
+
+ @Bean
+ public CbsConfigurationForAutoCommitDisabledMode cbsConfigurationForAutoCommitDisabledMode() {
+
+ JsonObject cbsConfigJson = new Gson()
+ .fromJson(getResourceContent("autoCommitDisabledConfigurationFromCbs2.json")
+ .replaceAll("https?://dmaap-mr[\\w.]*:\\d+", wiremockServerAddress)
+ .replaceAll("https?://aai[\\w.]*:\\d+", wiremockServerAddress), JsonObject.class);
+
+ CbsConfigurationForAutoCommitDisabledMode cbsConfigurationForAutoCommitDisabledMode = new CbsConfigurationForAutoCommitDisabledMode();
+
+ try {
+ withEnvironmentVariable("JAAS_CONFIG", "jaas_config")
+ .and("BOOTSTRAP_SERVERS", "localhost:9092")
+ .execute(() -> {
+ cbsConfigurationForAutoCommitDisabledMode.parseCBSConfig(cbsConfigJson);
+ });
+
+ } catch (Exception e) {
+ //Exception is expected as environment variable for JAAS_CONFIG is not available
+ if (e.getMessage() == "kafkaJaasConfig") {
+ kafkaConfiguration = new ImmutableKafkaConfiguration.Builder().kafkaBoostrapServerConfig("0.0.0.0")
+ .groupIdConfig("CG1").kafkaSaslMechanism("SASL_MECHANISM")
+ .kafkaSecurityProtocol("SEC-PROTOCOL").kafkaJaasConfig("JAAS_CONFIG").build();
+ cbsConfigurationForAutoCommitDisabledMode.setKafkaConfiguration(kafkaConfiguration);
+
+ }
+
+ }
+ return cbsConfigurationForAutoCommitDisabledMode;
+ };
+
+ }
+
+ @BeforeEach
+ void resetWireMock() {
+ WireMock.reset();
+ }
+
+ @Test
+ void beforeCbsConfigurationForAutoCommitDisabledMode() throws Exception {
+ withEnvironmentVariable("JAAS_CONFIG", "jaas_config")
+ .execute(() -> {
+ this.whenThereAreNoEventsInDmaap_WorkflowShouldFinish();
+ });
+ }
+
+ void whenThereAreNoEventsInDmaap_WorkflowShouldFinish() throws JSONException {
+
+ when(kafkaConsumerTaskImpl.execute()).thenReturn(Flux.empty());
+
+ scheduledTasksWithCommit.scheduleKafkaPrhEventTask();
+
+ verify(0, anyRequestedFor(urlPathMatching("/aai.*")));
+ verify(0, postRequestedFor(urlPathMatching("/events.*")));
+ }
+
+ @Test
+ void beforeWhenThereIsAnEventsInDmaap_ShouldSendPnfReadyNotification() throws Exception {
+ withEnvironmentVariable("JAAS_CONFIG", "jaas_config")
+ .execute(() -> {
+ this.whenThereIsAnEventsInDmaap_ShouldSendPnfReadyNotification();
+ });
+ }
+
+ void whenThereIsAnEventsInDmaap_ShouldSendPnfReadyNotification()
+ throws JSONException, JsonMappingException, JsonProcessingException {
+
+ String event = getResourceContent("integration/event.json");
+ String pnfName = JsonPath.read(event, "$.event.commonEventHeader.sourceName");
+
+ java.util.List<String> eventList = new ArrayList<>();
+ eventList.add(event);
+
+ Flux<ConsumerDmaapModel> fluxList = dmaapConsumerJsonParser
+ .getConsumerDmaapModelFromKafkaConsumerRecord(eventList);
+
+ stubFor(get(urlEqualTo("/aai/v23/network/pnfs/pnf/" + pnfName)).willReturn(ok().withBody("{}")));
+ stubFor(patch(urlEqualTo("/aai/v23/network/pnfs/pnf/" + pnfName)));
+ stubFor(post(urlEqualTo("/events/unauthenticated.PNF_READY")));
+
+ when(kafkaConsumerTaskImpl.execute()).thenReturn(fluxList);
+
+ List<String> expectedItems = List.of(event);
+ Flux<MessageRouterPublishResponse> pubresp = Flux.just(ImmutableMessageRouterPublishResponse
+ .builder()
+ .items(expectedItems.map(JsonPrimitive::new))
+ .build());
+ when(cbsConfiguration.getMessageRouterPublisher()).thenReturn(publisher);
+ when(publisher.put(any(MessageRouterPublishRequest.class),any())).thenReturn(pubresp);
+ scheduledTasksWithCommit.scheduleKafkaPrhEventTask();
+ verify(publisher,times(1)).put(any(MessageRouterPublishRequest.class),any());
+
+ }
+
+ private static String getResourceContent(String resourceName) {
+ try {
+ return new String(Files.readAllBytes(Paths.get(getSystemResource(resourceName).toURI())));
+ } catch (Exception e) {
+ throw new RuntimeException("failed loading content of '" + resourceName + "'", e);
+ }
+ }
+}
diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/integration/PrhWorkflowIntegrationTest.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/integration/PrhWorkflowIntegrationTest.java
index 01beb88b..a77fcd75 100644
--- a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/integration/PrhWorkflowIntegrationTest.java
+++ b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/integration/PrhWorkflowIntegrationTest.java
@@ -3,6 +3,7 @@
* PNF-REGISTRATION-HANDLER
* ================================================================================
* Copyright (C) 2019-2021 NOKIA Intellectual Property. All rights reserved.
+ * Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -23,59 +24,107 @@ package org.onap.dcaegen2.services.prh.integration;
import com.github.tomakehurst.wiremock.client.WireMock;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
+import com.google.gson.JsonPrimitive;
import com.jayway.jsonpath.JsonPath;
+
+import io.vavr.collection.List;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
import org.onap.dcaegen2.services.prh.MainApp;
import org.onap.dcaegen2.services.prh.configuration.CbsConfiguration;
import org.onap.dcaegen2.services.prh.tasks.ScheduledTasks;
import org.onap.dcaegen2.services.prh.tasks.ScheduledTasksRunner;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.MockBean;
+import org.springframework.boot.test.mock.mockito.SpyBean;
import org.springframework.cloud.contract.wiremock.AutoConfigureWireMock;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
-
+import org.springframework.test.context.ActiveProfiles;
+import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo;
+import static com.github.stefanbirkner.systemlambda.SystemLambda.withEnvironmentVariable;
+import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
+import static com.github.tomakehurst.wiremock.client.WireMock.urlPathMatching;
+import static com.github.tomakehurst.wiremock.client.WireMock.ok;
+import static com.github.tomakehurst.wiremock.client.WireMock.equalTo;
+import static com.github.tomakehurst.wiremock.client.WireMock.get;
+import static com.github.tomakehurst.wiremock.client.WireMock.post;
+import static com.github.tomakehurst.wiremock.client.WireMock.anyRequestedFor;
+import static com.github.tomakehurst.wiremock.client.WireMock.postRequestedFor;
+import static com.github.tomakehurst.wiremock.client.WireMock.matchingJsonPath;
+import static com.github.tomakehurst.wiremock.client.WireMock.stubFor;
+import static com.github.tomakehurst.wiremock.client.WireMock.verify;
+import static com.github.tomakehurst.wiremock.client.WireMock.patch;
import java.nio.file.Files;
import java.nio.file.Paths;
-
-import static com.github.tomakehurst.wiremock.client.WireMock.*;
import static java.lang.ClassLoader.getSystemResource;
import static java.util.Collections.singletonList;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
@SpringBootTest
@AutoConfigureWireMock(port = 0)
+@ActiveProfiles(value = "prod")
class PrhWorkflowIntegrationTest {
@Autowired
private ScheduledTasks scheduledTasks;
-
+
+ @SpyBean
+ CbsConfiguration cbsConfiguration;
+
@MockBean
private ScheduledTasksRunner scheduledTasksRunner; // just to disable scheduling - some configurability in ScheduledTaskRunner not to start tasks at app startup would be welcome
-
-
+
+ @Mock
+ MessageRouterSubscriber subscriber;
+
+ @Mock
+ MessageRouterPublisher publisher;
+
@Configuration
@Import(MainApp.class)
static class CbsConfigTestConfig {
@Value("http://localhost:${wiremock.server.port}")
private String wiremockServerAddress;
-
+
@Bean
- public CbsConfiguration cbsConfiguration() {
+ public CbsConfiguration cbsConfiguration() throws Exception {
JsonObject cbsConfigJson = new Gson().fromJson(getResourceContent("configurationFromCbs.json")
.replaceAll("https?://dmaap-mr[\\w.]*:\\d+", wiremockServerAddress)
.replaceAll("https?://aai[\\w.]*:\\d+", wiremockServerAddress),
JsonObject.class);
-
+
CbsConfiguration cbsConfiguration = new CbsConfiguration();
- cbsConfiguration.parseCBSConfig(cbsConfigJson);
+ withEnvironmentVariable("JAAS_CONFIG", "jaas_config")
+ .and("BOOTSTRAP_SERVERS", "localhost:9092")
+ .execute(() -> {
+ cbsConfiguration.parseCBSConfig(cbsConfigJson);
+ });
+
return cbsConfiguration;
}
+
+
}
@BeforeEach
@@ -85,7 +134,7 @@ class PrhWorkflowIntegrationTest {
@Test
- void whenThereAreNoEventsInDmaap_WorkflowShouldFinish() {
+ void whenThereAreNoEventsInDmaap_WorkflowShouldFinish() {
stubFor(get(urlEqualTo("/events/unauthenticated.VES_PNFREG_OUTPUT/OpenDCAE-c12/c12"))
.willReturn(aResponse().withBody("[]")));
@@ -100,17 +149,27 @@ class PrhWorkflowIntegrationTest {
void whenThereIsAnEventsInDmaap_ShouldSendPnfReadyNotification() {
String event = getResourceContent("integration/event.json");
String pnfName = JsonPath.read(event, "$.event.commonEventHeader.sourceName");
-
- stubFor(get(urlEqualTo("/events/unauthenticated.VES_PNFREG_OUTPUT/OpenDCAE-c12/c12"))
- .willReturn(ok().withBody(new Gson().toJson(singletonList(event)))));
stubFor(get(urlEqualTo("/aai/v23/network/pnfs/pnf/" + pnfName)).willReturn(ok().withBody("{}")));
stubFor(patch(urlEqualTo("/aai/v23/network/pnfs/pnf/" + pnfName)));
- stubFor(post(urlEqualTo("/events/unauthenticated.PNF_READY")));
-
+
+ List<String> expectedItems = List.of(event);
+ Mono<MessageRouterSubscribeResponse> resp = Mono.just(ImmutableMessageRouterSubscribeResponse
+ .builder()
+ .items(expectedItems.map(JsonPrimitive::new))
+ .build());
+ Flux<MessageRouterPublishResponse> pubresp = Flux.just(ImmutableMessageRouterPublishResponse
+ .builder()
+ .items(expectedItems.map(JsonPrimitive::new))
+ .build());
+
+ when(cbsConfiguration.getMessageRouterSubscriber()).thenReturn(subscriber);
+ when(cbsConfiguration.getMessageRouterPublisher()).thenReturn(publisher);
+ when(subscriber.get(any(MessageRouterSubscribeRequest.class))).thenReturn(resp);
+ when(publisher.put(any(MessageRouterPublishRequest.class),any())).thenReturn(pubresp);
+
scheduledTasks.scheduleMainPrhEventTask();
-
- verify(1, postRequestedFor(urlEqualTo("/events/unauthenticated.PNF_READY"))
- .withRequestBody(matchingJsonPath("$[0].correlationId", equalTo(pnfName))));
+ verify(subscriber,times(1)).get(any(MessageRouterSubscribeRequest.class));
+ verify(publisher,times(1)).put(any(MessageRouterPublishRequest.class),any());
}
diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/integration/PrhWorkflowSchedulingIntegrationForAutoCommitDisabledTest.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/integration/PrhWorkflowSchedulingIntegrationForAutoCommitDisabledTest.java
new file mode 100644
index 00000000..a623e24c
--- /dev/null
+++ b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/integration/PrhWorkflowSchedulingIntegrationForAutoCommitDisabledTest.java
@@ -0,0 +1,100 @@
+/*
+ * ============LICENSE_START=======================================================
+ * PNF-REGISTRATION-HANDLER
+ * ================================================================================
+ * Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.prh.integration;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.mockito.stubbing.Answer;
+import org.onap.dcaegen2.services.prh.configuration.KafkaConfig;
+import org.onap.dcaegen2.services.prh.tasks.commit.KafkaConsumerTask;
+import org.onap.dcaegen2.services.prh.tasks.commit.ScheduledTasksRunnerWithCommit;
+import org.onap.dcaegen2.services.prh.tasks.commit.ScheduledTasksWithCommit;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.mock.mockito.MockBean;
+import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
+import org.springframework.kafka.core.ConsumerFactory;
+import org.springframework.test.annotation.DirtiesContext;
+import org.springframework.test.context.ActiveProfiles;
+import org.springframework.test.context.TestPropertySource;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.doAnswer;
+
+
+@SpringBootTest
+@TestPropertySource (properties = {"prh.workflow-scheduling-interval=20ms","spring.main.allow-bean-definition-overriding=true"})
+@DirtiesContext
+@ActiveProfiles(value = "autoCommitDisabled")
+@Disabled
+class PrhWorkflowSchedulingIntegrationForAutoCommitDisabledTest {
+
+ private static final int EXPECTED_INVOCATIONS_NUMBER = 1;
+ private static final int REMAINING_INVOCATIONS_NUMBER = 0;
+
+ @MockBean
+ private ScheduledTasksRunnerWithCommit scheduledTasksRunnerWithCommit;
+
+ @MockBean
+ private ScheduledTasksWithCommit scheduledTasksWithCommit;
+
+ @MockBean
+ private KafkaConsumerTask kafkaConsumerTask;
+
+
+ @MockBean
+ private KafkaConfig kafkaConfig;
+
+ @MockBean
+ private ConsumerFactory<String, String> consumerFactory;
+
+ @MockBean
+ private ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory;
+
+ private CountDownLatch invocationLatch;
+
+ @Test
+ void prhWorkflowShouldBeExecutedRightAfterApplicationStart() {
+ try {
+
+ invocationLatch = new CountDownLatch(EXPECTED_INVOCATIONS_NUMBER);
+ doAnswer(registerInvocation(invocationLatch)).when(scheduledTasksWithCommit).scheduleKafkaPrhEventTask();
+ assertThatMethodWasInvokedOnce();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ private void assertThatMethodWasInvokedOnce() throws InterruptedException {
+ boolean awaitResult = invocationLatch.await(1, TimeUnit.SECONDS);
+ System.out.println("###awaitResult="+awaitResult);
+ assertEquals(REMAINING_INVOCATIONS_NUMBER, invocationLatch.getCount());
+ }
+
+ private static Answer registerInvocation(CountDownLatch invocationLatch) {
+ return invocation -> {
+ System.out.println("###before countDown:"+invocationLatch.getCount());
+ invocationLatch.countDown();
+ System.out.println("###after countDown:"+invocationLatch.getCount());
+ return null;
+ };
+ }
+}
diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/integration/PrhWorkflowSchedulingIntegrationTest.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/integration/PrhWorkflowSchedulingIntegrationTest.java
index 939dd2a3..8fb952d0 100644
--- a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/integration/PrhWorkflowSchedulingIntegrationTest.java
+++ b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/integration/PrhWorkflowSchedulingIntegrationTest.java
@@ -3,6 +3,7 @@
* PNF-REGISTRATION-HANDLER
* ================================================================================
* Copyright (C) 2019 NOKIA Intellectual Property. All rights reserved.
+ * Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -27,14 +28,17 @@ import org.mockito.stubbing.Answer;
import org.onap.dcaegen2.services.prh.tasks.ScheduledTasks;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.MockBean;
+import org.springframework.test.annotation.DirtiesContext;
+import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.TestPropertySource;
-
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.doAnswer;
@SpringBootTest
@TestPropertySource (properties = {"prh.workflow-scheduling-interval=20ms"})
+@ActiveProfiles(value = "prod")
+@DirtiesContext
class PrhWorkflowSchedulingIntegrationTest {
private static final int EXPECTED_INVOCATIONS_NUMBER = 1;
@@ -61,4 +65,4 @@ class PrhWorkflowSchedulingIntegrationTest {
return null;
};
}
-} \ No newline at end of file
+}
diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParserTest.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParserTest.java
index 9dab7aaa..ba759354 100644
--- a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParserTest.java
+++ b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParserTest.java
@@ -3,6 +3,7 @@
* PNF-REGISTRATION-HANDLER
* ================================================================================
* Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,16 +21,12 @@
package org.onap.dcaegen2.services.prh.service;
-import static org.mockito.Mockito.spy;
-
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import io.vavr.collection.List;
-import java.util.Optional;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
-import org.mockito.Mockito;
import org.onap.dcaegen2.services.prh.adapter.aai.api.ConsumerDmaapModel;
import org.onap.dcaegen2.services.prh.adapter.aai.api.ImmutableConsumerDmaapModel;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeResponse;
@@ -37,6 +34,11 @@ import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRo
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
+import java.util.Optional;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/8/18
*/
@@ -108,7 +110,7 @@ class DmaapConsumerJsonParserTest {
//when
DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser());
JsonElement jsonElement = jsonParser.parse(parsed);
- Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject()))
+ doReturn(Optional.of(jsonElement.getAsJsonObject()))
.when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement);
ConsumerDmaapModel consumerDmaapModel = dmaapConsumerJsonParser
@@ -171,7 +173,7 @@ class DmaapConsumerJsonParserTest {
//when
DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser());
JsonElement jsonElement = new JsonParser().parse(parsed);
- Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject()))
+ doReturn(Optional.of(jsonElement.getAsJsonObject()))
.when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement);
ConsumerDmaapModel consumerDmaapModel = dmaapConsumerJsonParser
.getJsonObject(Mono.just((response))).blockFirst();
@@ -238,7 +240,7 @@ class DmaapConsumerJsonParserTest {
//when
DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser());
JsonElement jsonElement = jsonParser.parse(parsed);
- Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject()))
+ doReturn(Optional.of(jsonElement.getAsJsonObject()))
.when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement);
ConsumerDmaapModel consumerDmaapModel = dmaapConsumerJsonParser
@@ -302,7 +304,7 @@ class DmaapConsumerJsonParserTest {
//when
DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser());
JsonElement jsonElement = jsonParser.parse(parsed);
- Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject()))
+ doReturn(Optional.of(jsonElement.getAsJsonObject()))
.when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement);
ConsumerDmaapModel consumerDmaapModel = dmaapConsumerJsonParser
@@ -334,7 +336,7 @@ class DmaapConsumerJsonParserTest {
DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser());
JsonElement jsonElement = jsonParser.parse(parsed);
- Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject()))
+ doReturn(Optional.of(jsonElement.getAsJsonObject()))
.when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement);
String incorrectMessage = "{\"event\": {"
+ "\"commonEventHeader\": {},"
@@ -380,7 +382,7 @@ class DmaapConsumerJsonParserTest {
DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser());
JsonElement jsonElement = jsonParser.parse(parsed);
- Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject()))
+ doReturn(Optional.of(jsonElement.getAsJsonObject()))
.when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement);
String jsonWithoutSourceName =
"{\"event\": {"
@@ -430,7 +432,7 @@ class DmaapConsumerJsonParserTest {
DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser());
JsonElement jsonElement = jsonParser.parse(parsed);
- Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject()))
+ doReturn(Optional.of(jsonElement.getAsJsonObject()))
.when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement);
String jsonWithoutIpInformation =
"{\"event\": {"
@@ -497,7 +499,7 @@ class DmaapConsumerJsonParserTest {
//when
DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser());
JsonElement jsonElement = jsonParser.parse(parsed);
- Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject()))
+ doReturn(Optional.of(jsonElement.getAsJsonObject()))
.when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement);
dmaapConsumerJsonParser.getJsonObject(Mono.just((response)));
@@ -573,7 +575,7 @@ class DmaapConsumerJsonParserTest {
//when
DmaapConsumerJsonParser dmaapConsumerJsonParser = spy(new DmaapConsumerJsonParser());
JsonElement jsonElement = jsonParser.parse(parsed);
- Mockito.doReturn(Optional.of(jsonElement.getAsJsonObject()))
+ doReturn(Optional.of(jsonElement.getAsJsonObject()))
.when(dmaapConsumerJsonParser).getJsonObjectFromAnArray(jsonElement);
ConsumerDmaapModel consumerDmaapModel = dmaapConsumerJsonParser.getJsonObject(Mono.just(response))
.blockFirst();
diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/AaiQueryTaskImplTest.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/AaiQueryTaskImplTest.java
index e81b3746..517fe73a 100644
--- a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/AaiQueryTaskImplTest.java
+++ b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/AaiQueryTaskImplTest.java
@@ -3,6 +3,7 @@
* PROJECT
* ================================================================================
* Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -26,6 +27,8 @@ import static org.mockito.Mockito.mock;
import java.util.Collections;
import java.util.List;
+
+
import org.assertj.core.util.Lists;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
@@ -38,6 +41,7 @@ import org.onap.dcaegen2.services.prh.adapter.aai.api.AaiPnfResultModel;
import org.onap.dcaegen2.services.prh.adapter.aai.api.AaiServiceInstanceQueryModel;
import org.onap.dcaegen2.services.prh.adapter.aai.api.AaiServiceInstanceResultModel;
import org.onap.dcaegen2.services.prh.adapter.aai.api.ConsumerDmaapModel;
+import org.onap.dcaegen2.services.prh.adapter.aai.api.ImmutableConsumerDmaapModel;
import org.onap.dcaegen2.services.prh.model.ImmutableRelationshipData;
import org.onap.dcaegen2.services.prh.model.Relationship;
import org.onap.dcaegen2.services.prh.model.RelationshipData;
@@ -85,18 +89,6 @@ class AaiQueryTaskImplTest {
}
@Test
- void whenPnfIsUnavailable_ShouldThrowException() {
- //given
- given(getPnfModelClient.getAaiResponse(aaiModel)).willReturn(Mono.error(new Exception("404")));
-
- //when
- final Mono<Boolean> task = sut.execute(aaiModel);
-
- //then
- Assertions.assertThrows(Exception.class, task::block);
- }
-
- @Test
void whenPnfIsAvailableButRelationshipIsNull_ShouldReturnFalse() {
//given
given(pnfResultModel.getRelationshipList()).willReturn(null);
@@ -203,4 +195,12 @@ class AaiQueryTaskImplTest {
private void configurePnfClient(final ConsumerDmaapModel aaiModel, final AaiPnfResultModel pnfResultModel) {
given(getPnfModelClient.getAaiResponse(aaiModel)).willReturn(Mono.just(pnfResultModel));
}
+
+ @Test
+ void testFindPnfInAAIActive(){
+ ConsumerDmaapModel model = ImmutableConsumerDmaapModel.builder().correlationId("123").build();
+ configurePnfClient(model, pnfResultModel);
+ Mono<ConsumerDmaapModel> test = sut.findPnfinAAI(model);
+ Assertions.assertNotNull(test);
+ }
}
diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/commit/EpochDateTimeConversionTest.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/commit/EpochDateTimeConversionTest.java
new file mode 100644
index 00000000..850587e0
--- /dev/null
+++ b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/commit/EpochDateTimeConversionTest.java
@@ -0,0 +1,52 @@
+/*
+ * ============LICENSE_START=======================================================
+ * PNF-REGISTRATION-HANDLER
+ * ================================================================================
+ * Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcaegen2.services.prh.tasks.commit;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.InjectMocks;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+public class EpochDateTimeConversionTest {
+
+ private EpochDateTimeConversion epochDateTimeConversion;
+
+ @BeforeEach
+ void setUp() {
+ epochDateTimeConversion = new EpochDateTimeConversion();
+ epochDateTimeConversion.setDaysForRecords("3");
+ }
+
+ @Test
+ public void getStartDateOfTheDayTest(){
+ epochDateTimeConversion.getDaysForRecords();
+ Long day = epochDateTimeConversion.getStartDateOfTheDay();
+ Assertions.assertNotNull(day);
+ }
+
+ @Test
+ public void getEndDateOfTheDayTest(){
+ Long day = epochDateTimeConversion.getEndDateOfTheDay();
+ Assertions.assertNotNull(day);
+ }
+}
diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/commit/KafkaConsumerTaskImplTest.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/commit/KafkaConsumerTaskImplTest.java
new file mode 100644
index 00000000..42a2e7f4
--- /dev/null
+++ b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/commit/KafkaConsumerTaskImplTest.java
@@ -0,0 +1,151 @@
+/*
+ * ============LICENSE_START=======================================================
+ * PNF-REGISTRATION-HANDLER
+ * ================================================================================
+ * Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.prh.tasks.commit;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.record.TimestampType;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.onap.dcaegen2.services.prh.adapter.aai.api.ConsumerDmaapModel;
+import org.onap.dcaegen2.services.prh.configuration.CbsConfigurationForAutoCommitDisabledMode;
+import org.onap.dcaegen2.services.prh.service.DmaapConsumerJsonParser;
+import org.springframework.kafka.support.Acknowledgment;
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonSyntaxException;
+import reactor.core.publisher.Flux;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+import static com.github.stefanbirkner.systemlambda.SystemLambda.withEnvironmentVariable;
+import static java.lang.ClassLoader.getSystemResource;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+@ExtendWith(MockitoExtension.class)
+public class KafkaConsumerTaskImplTest {
+
+ @Mock
+ private Acknowledgment acknowledgment;
+
+ private KafkaConsumerTaskImpl kafkaConsumerTask;
+
+ private DmaapConsumerJsonParser dmaapConsumerJsonParser;
+
+ private EpochDateTimeConversion epochDateTimeConversion;
+
+ private CbsConfigurationForAutoCommitDisabledMode cbsConfigurationForAutoCommitDisabled;
+
+ private JsonObject cbsConfigJsonForAutoCommitDisabled;
+
+ @BeforeEach
+ void beforeEach() throws JsonSyntaxException, IOException, URISyntaxException {
+ cbsConfigJsonForAutoCommitDisabled = new Gson().fromJson(
+ new String(Files.readAllBytes(
+ Paths.get(getSystemResource("autoCommitDisabledConfigurationFromCbs2.json").toURI()))),
+ JsonObject.class);
+ cbsConfigurationForAutoCommitDisabled = new CbsConfigurationForAutoCommitDisabledMode();
+ dmaapConsumerJsonParser = new DmaapConsumerJsonParser();
+ epochDateTimeConversion = new EpochDateTimeConversion();
+
+ }
+
+ @Test
+ void beforeOnMessageTest() throws Exception {
+ withEnvironmentVariable("JAAS_CONFIG", "jaas_config")
+ .and("BOOTSTRAP_SERVERS", "localhost:9092")
+ .execute(() -> {
+ cbsConfigurationForAutoCommitDisabled.parseCBSConfig(cbsConfigJsonForAutoCommitDisabled);
+ kafkaConsumerTask = new KafkaConsumerTaskImpl(cbsConfigurationForAutoCommitDisabled,
+ dmaapConsumerJsonParser, epochDateTimeConversion);
+ List<ConsumerRecord<String, String>> list = new ArrayList<>();
+ TimestampType timestampType = null;
+ Headers headers = new RecordHeaders();
+ epochDateTimeConversion.setDaysForRecords("3");
+ ConsumerRecord<String, String> records = new ConsumerRecord<>("test-topic", 1, 1l, 0l, timestampType, 1, 1,
+ "test-key", "test-value", headers, null);
+ list.add(records);
+ kafkaConsumerTask.onMessage(list, acknowledgment);
+ String actualTopicInList = list.get(0).topic();
+ String expectedTopicInList = "test-topic";
+ assertEquals(expectedTopicInList, actualTopicInList, "topic is not matching");
+ assertThat(kafkaConsumerTask.getOffset().equals(acknowledgment));
+ assertThat(kafkaConsumerTask.getJsonEvent().contains("test-topic"));
+ });
+ }
+
+ @Test
+ void beforeCommitOffsetTest() throws Exception {
+ withEnvironmentVariable("JAAS_CONFIG", "jaas_config")
+ .and("BOOTSTRAP_SERVERS", "localhost:9092")
+ .execute(() -> {
+ cbsConfigurationForAutoCommitDisabled.parseCBSConfig(cbsConfigJsonForAutoCommitDisabled);
+ kafkaConsumerTask = new KafkaConsumerTaskImpl(cbsConfigurationForAutoCommitDisabled,
+ dmaapConsumerJsonParser, epochDateTimeConversion);
+ kafkaConsumerTask.commitOffset();
+ });
+ }
+
+ @Test
+ void beforeExecuteTest() throws Exception {
+ withEnvironmentVariable("JAAS_CONFIG", "jaas_config")
+ .and("BOOTSTRAP_SERVERS", "localhost:9092")
+ .execute(() -> {
+ cbsConfigurationForAutoCommitDisabled.parseCBSConfig(cbsConfigJsonForAutoCommitDisabled);
+ kafkaConsumerTask = new KafkaConsumerTaskImpl(cbsConfigurationForAutoCommitDisabled,
+ dmaapConsumerJsonParser, epochDateTimeConversion);
+ String event = getResourceContent("integration/event.json");
+ java.util.List<String> eventList = new ArrayList<>();
+ eventList.add(event);
+ kafkaConsumerTask.setJsonEvent(eventList);
+ Flux<ConsumerDmaapModel> flux = kafkaConsumerTask.execute();
+ String expectedSourceName = "NOK6061ZW8";
+ String actualSourceName = flux.blockFirst().getCorrelationId();
+
+ String expectedOamV4IpAddress = "val3";
+ String actualOamV4IpAddress = flux.blockFirst().getIpv4();
+
+ String expectedOamV6IpAddress = "val4";
+ String actualOamV6IpAddress = flux.blockFirst().getIpv6();
+
+ assertEquals(expectedSourceName, actualSourceName, "SourceName is not matching");
+ assertEquals(expectedOamV4IpAddress, actualOamV4IpAddress, "OamV4IpAddress is not matching");
+ assertEquals(expectedOamV6IpAddress, actualOamV6IpAddress, "OamV6IpAddress is not matching");
+ });
+ }
+
+ private static String getResourceContent(String resourceName) {
+ try {
+ return new String(Files.readAllBytes(Paths.get(getSystemResource(resourceName).toURI())));
+ } catch (Exception e) {
+ throw new RuntimeException("failed loading content of '" + resourceName + "'", e);
+ }
+ }
+
+}
diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/commit/ScheduledTasksRunnerWithCommitTest.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/commit/ScheduledTasksRunnerWithCommitTest.java
new file mode 100644
index 00000000..401e351f
--- /dev/null
+++ b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/commit/ScheduledTasksRunnerWithCommitTest.java
@@ -0,0 +1,72 @@
+/*
+ * ============LICENSE_START=======================================================
+ * PNF-REGISTRATION-HANDLER
+ * ================================================================================
+ * Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcaegen2.services.prh.tasks.commit;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.onap.dcaegen2.services.prh.configuration.PrhProperties;
+import org.springframework.boot.context.event.ApplicationStartedEvent;
+import org.springframework.scheduling.TaskScheduler;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+@ExtendWith(MockitoExtension.class)
+public class ScheduledTasksRunnerWithCommitTest {
+
+ @Mock
+ private ScheduledTasksWithCommit scheduledTasksWithCommit;
+
+ @Mock
+ private TaskScheduler taskScheduler;
+
+ @Mock
+ private PrhProperties prhProperties;
+
+ @Mock
+ private ApplicationStartedEvent applicationStartedEvent;
+
+ private ScheduledTasksRunnerWithCommit scheduledTasksRunnerWithCommit;
+
+ @BeforeEach
+ void setUp() {
+ scheduledTasksRunnerWithCommit = new ScheduledTasksRunnerWithCommit(taskScheduler, scheduledTasksWithCommit, prhProperties);
+ }
+
+ @Test
+ void onApplicationStartedEvent() {
+ scheduledTasksRunnerWithCommit.onApplicationStartedEvent(applicationStartedEvent);
+ assertFalse(scheduledTasksRunnerWithCommit.tryToStartTaskWithCommit());
+ }
+
+ @Test
+ void cancelTasks() {
+ scheduledTasksRunnerWithCommit.cancelTasks();
+ }
+
+ @Test
+ void tryToStartTaskWithCommit() {
+ scheduledTasksRunnerWithCommit.tryToStartTaskWithCommit();
+ assertFalse(scheduledTasksRunnerWithCommit.tryToStartTaskWithCommit());
+ }
+}
diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/commit/ScheduledTasksWithCommitTest.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/commit/ScheduledTasksWithCommitTest.java
new file mode 100644
index 00000000..64779027
--- /dev/null
+++ b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/commit/ScheduledTasksWithCommitTest.java
@@ -0,0 +1,263 @@
+/*
+ * ============LICENSE_START=======================================================
+ * PNF-REGISTRATION-HANDLER
+ * ================================================================================
+ * Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcaegen2.services.prh.tasks.commit;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.BDDMockito.given;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import java.util.Collections;
+import java.util.Map;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.onap.dcaegen2.services.prh.adapter.aai.api.ConsumerDmaapModel;
+import org.onap.dcaegen2.services.prh.adapter.aai.api.ImmutableConsumerDmaapModel;
+import org.onap.dcaegen2.services.prh.exceptions.DmaapEmptyResponseException;
+import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
+import org.onap.dcaegen2.services.prh.tasks.AaiProducerTask;
+import org.onap.dcaegen2.services.prh.tasks.AaiQueryTask;
+import org.onap.dcaegen2.services.prh.tasks.BbsActionsTask;
+import org.onap.dcaegen2.services.prh.tasks.DmaapPublisherTask;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
+import org.springframework.boot.configurationprocessor.json.JSONException;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+
+@ExtendWith(MockitoExtension.class)
+class ScheduledTasksWithCommitTest {
+ private final static ConsumerDmaapModel DMAAP_MODEL =
+ ImmutableConsumerDmaapModel
+ .builder()
+ .correlationId("SomeId")
+ .ipv4("ipv4")
+ .ipv6("ipv6")
+ .build();
+
+ @Mock
+ private DmaapPublisherTask readyPublisher;
+
+ @Mock
+ private DmaapPublisherTask updatePublisher;
+
+
+ @Mock
+ private BbsActionsTask bbsActionsTask;
+
+ @Mock
+ private KafkaConsumerTask kafkaConsumerTask;
+
+ @Mock
+ private AaiQueryTask aaiQueryTask;
+
+ @Mock
+ private AaiProducerTask aaiProducerTask;
+
+ private final Map<String, String> context = Collections.emptyMap();
+
+ private ScheduledTasksWithCommit sut;
+
+ @BeforeEach
+ void setUp() {
+ sut = new ScheduledTasksWithCommit(
+ kafkaConsumerTask,
+ readyPublisher,
+ updatePublisher,
+ aaiQueryTask,
+ aaiProducerTask,
+ bbsActionsTask,
+ context);
+ }
+
+ @Test
+ void testQueryAAiForPNFOnSuccess() throws JSONException, PrhTaskException {
+ ScheduledTasksWithCommit.State state = new ScheduledTasksWithCommit.State(DMAAP_MODEL, false );
+ MessageRouterPublishResponse messageRouterPublishResponse = new MessageRouterPublishResponse() {
+ @Override
+ public @Nullable String failReason() {
+ return null;
+ }
+ };
+ when(kafkaConsumerTask.execute()).thenReturn(Flux.just(DMAAP_MODEL));
+ when(aaiQueryTask.findPnfinAAI(DMAAP_MODEL)).thenReturn(Mono.just(DMAAP_MODEL));
+ when(aaiQueryTask.execute(DMAAP_MODEL)).thenReturn(Mono.just(true));
+ when(aaiProducerTask.execute(state.dmaapModel)).thenReturn(Mono.just(DMAAP_MODEL));
+ when(updatePublisher.execute(state.dmaapModel)).thenReturn(Flux.just(messageRouterPublishResponse));
+
+ sut.scheduleKafkaPrhEventTask();
+
+ verifyIfLogicalLinkWasNotCreated();
+ verifyThatPnfModelWasNotSentDmaapPnfReadyTopic();
+ }
+
+ @Test
+ void testQueryAAiForPNF() throws JSONException, PrhTaskException {
+ ScheduledTasksWithCommit.State state = new ScheduledTasksWithCommit.State(DMAAP_MODEL, true);
+ MessageRouterPublishResponse messageRouterPublishResponse = new MessageRouterPublishResponse() {
+ @Override
+ public @Nullable String failReason() {
+ return null;
+ }
+ };
+ when(kafkaConsumerTask.execute()).thenReturn(Flux.just(DMAAP_MODEL));
+ when(aaiQueryTask.findPnfinAAI(DMAAP_MODEL)).thenReturn(Mono.just(DMAAP_MODEL));
+ when(aaiQueryTask.execute(DMAAP_MODEL)).thenReturn(Mono.just(true));
+ when(aaiProducerTask.execute(state.dmaapModel)).thenReturn(Mono.just(DMAAP_MODEL));
+ when(updatePublisher.execute(state.dmaapModel)).thenReturn(Flux.just(messageRouterPublishResponse));
+
+ sut.scheduleKafkaPrhEventTask();
+
+ verifyIfLogicalLinkWasNotCreated();
+ verifyThatPnfModelWasNotSentDmaapPnfReadyTopic();
+ }
+
+ @Test
+ void testQueryAAiForPNFOnError() throws JSONException, PrhTaskException {
+ when(kafkaConsumerTask.execute()).thenReturn(Flux.just(DMAAP_MODEL));
+
+ sut.scheduleKafkaPrhEventTask();
+
+ verifyThatPnfUpdateWasNotSentToAai();
+
+ verifyIfLogicalLinkWasNotCreated();
+ verifyThatPnfModelWasNotSentDmaapPnfReadyTopic();
+ verifyThatPnfModelWasNotSentDmaapPnfUpdateTopic();
+ }
+
+ @Test
+ void testQueryAAiForPNFOnPRHException() throws JSONException, PrhTaskException {
+ ScheduledTasksWithCommit.State state = new ScheduledTasksWithCommit.State(DMAAP_MODEL, false );
+ MessageRouterPublishResponse messageRouterPublishResponse = new MessageRouterPublishResponse() {
+ @Override
+ public @Nullable String failReason() {
+ return null;
+ }
+ };
+ when(kafkaConsumerTask.execute()).thenReturn(Flux.just(DMAAP_MODEL));
+ when(aaiQueryTask.findPnfinAAI(DMAAP_MODEL)).thenReturn(Mono.just(DMAAP_MODEL));
+ when(aaiQueryTask.execute(DMAAP_MODEL)).thenReturn(Mono.just(true));
+ when(aaiProducerTask.execute(state.dmaapModel)).thenThrow(new PrhTaskException());
+
+ sut.scheduleKafkaPrhEventTask();
+
+ verifyIfLogicalLinkWasNotCreated();
+ verifyThatPnfModelWasNotSentDmaapPnfReadyTopic();
+ }
+
+ @Test
+ void queryAAiForPNFOnPRHExceptionTest() throws JSONException, PrhTaskException {
+ ScheduledTasksWithCommit.State state = new ScheduledTasksWithCommit.State(DMAAP_MODEL, true);
+ MessageRouterPublishResponse messageRouterPublishResponse = new MessageRouterPublishResponse() {
+ @Override
+ public @Nullable String failReason() {
+ return null;
+ }
+ };
+ when(kafkaConsumerTask.execute()).thenReturn(Flux.just(DMAAP_MODEL));
+ when(aaiQueryTask.findPnfinAAI(DMAAP_MODEL)).thenReturn(Mono.just(DMAAP_MODEL));
+ when(aaiQueryTask.execute(DMAAP_MODEL)).thenReturn(Mono.just(true));
+ when(aaiProducerTask.execute(state.dmaapModel)).thenReturn(Mono.just(DMAAP_MODEL));
+ when(updatePublisher.execute(state.dmaapModel)).thenThrow(new PrhTaskException());
+
+ sut.scheduleKafkaPrhEventTask();
+
+ verifyIfLogicalLinkWasNotCreated();
+ verifyThatPnfModelWasNotSentDmaapPnfReadyTopic();
+ }
+
+ @Test
+ void queryAAiForPNFOnPRHExceptionOnDmaapEmptyResponseExceptionTest() throws JSONException, PrhTaskException {
+ ScheduledTasksWithCommit.State state = new ScheduledTasksWithCommit.State(DMAAP_MODEL, true);
+ MessageRouterPublishResponse messageRouterPublishResponse = new MessageRouterPublishResponse() {
+ @Override
+ public @Nullable String failReason() {
+ return null;
+ }
+ };
+ when(kafkaConsumerTask.execute()).thenReturn(Flux.just(DMAAP_MODEL));
+ when(aaiQueryTask.findPnfinAAI(DMAAP_MODEL)).thenReturn(Mono.just(DMAAP_MODEL));
+ when(aaiQueryTask.execute(DMAAP_MODEL)).thenReturn(Mono.just(true));
+ when(aaiProducerTask.execute(state.dmaapModel)).thenReturn(Mono.just(DMAAP_MODEL));
+ when(updatePublisher.execute(state.dmaapModel)).thenThrow(new DmaapEmptyResponseException());
+
+ sut.scheduleKafkaPrhEventTask();
+
+ verifyIfLogicalLinkWasNotCreated();
+ verifyThatPnfModelWasNotSentDmaapPnfReadyTopic();
+ }
+
+ @Test
+ void queryAAiForPNFOnPRHExceptionOnFalseTest() throws JSONException, PrhTaskException {
+ ScheduledTasksWithCommit.State state = new ScheduledTasksWithCommit.State(DMAAP_MODEL, false);
+ MessageRouterPublishResponse messageRouterPublishResponse = new MessageRouterPublishResponse() {
+ @Override
+ public @Nullable String failReason() {
+ return null;
+ }
+ };
+ when(kafkaConsumerTask.execute()).thenReturn(Flux.just(DMAAP_MODEL));
+ when(aaiQueryTask.findPnfinAAI(DMAAP_MODEL)).thenReturn(Mono.just(DMAAP_MODEL));
+ when(aaiQueryTask.execute(DMAAP_MODEL)).thenReturn(Mono.just(false));
+ when(aaiProducerTask.execute(state.dmaapModel)).thenReturn(Mono.just(DMAAP_MODEL));
+
+ sut.scheduleKafkaPrhEventTask();
+ verifyThatPnfModelWasNotSentDmaapPnfReadyTopic();
+ }
+
+ @Test
+ void queryAAiForPNFOnPRHExceptionOnJSONExceptionTest() throws PrhTaskException, JSONException {
+ ScheduledTasksWithCommit.State state = new ScheduledTasksWithCommit.State(DMAAP_MODEL, true);
+ MessageRouterPublishResponse messageRouterPublishResponse = new MessageRouterPublishResponse() {
+ @Override
+ public @Nullable String failReason() {
+ return null;
+ }
+ };
+ when(kafkaConsumerTask.execute()).thenThrow(new JSONException("json format exception"));
+
+ sut.scheduleKafkaPrhEventTask();
+
+ verifyIfLogicalLinkWasNotCreated();
+ verifyThatPnfModelWasNotSentDmaapPnfReadyTopic();
+ }
+
+ private void verifyThatPnfModelWasNotSentDmaapPnfReadyTopic() throws PrhTaskException {
+ verify(readyPublisher, never()).execute(DMAAP_MODEL);
+ }
+
+ private void verifyThatPnfModelWasNotSentDmaapPnfUpdateTopic() throws PrhTaskException {
+ verify(updatePublisher, never()).execute(DMAAP_MODEL);
+ }
+
+ private void verifyThatPnfUpdateWasNotSentToAai() throws PrhTaskException {
+ verify(aaiProducerTask, never()).execute(DMAAP_MODEL);
+ }
+
+ private void verifyIfLogicalLinkWasNotCreated(){
+ verify(bbsActionsTask, never()).execute(DMAAP_MODEL);
+ }
+}
+