aboutsummaryrefslogtreecommitdiffstats
path: root/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/commit
diff options
context:
space:
mode:
Diffstat (limited to 'prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/commit')
-rw-r--r--prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/commit/EpochDateTimeConversionTest.java52
-rw-r--r--prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/commit/KafkaConsumerTaskImplTest.java151
-rw-r--r--prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/commit/ScheduledTasksRunnerWithCommitTest.java72
-rw-r--r--prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/commit/ScheduledTasksWithCommitTest.java263
4 files changed, 538 insertions, 0 deletions
diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/commit/EpochDateTimeConversionTest.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/commit/EpochDateTimeConversionTest.java
new file mode 100644
index 00000000..850587e0
--- /dev/null
+++ b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/commit/EpochDateTimeConversionTest.java
@@ -0,0 +1,52 @@
+/*
+ * ============LICENSE_START=======================================================
+ * PNF-REGISTRATION-HANDLER
+ * ================================================================================
+ * Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcaegen2.services.prh.tasks.commit;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.InjectMocks;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+public class EpochDateTimeConversionTest {
+
+ private EpochDateTimeConversion epochDateTimeConversion;
+
+ @BeforeEach
+ void setUp() {
+ epochDateTimeConversion = new EpochDateTimeConversion();
+ epochDateTimeConversion.setDaysForRecords("3");
+ }
+
+ @Test
+ public void getStartDateOfTheDayTest(){
+ epochDateTimeConversion.getDaysForRecords();
+ Long day = epochDateTimeConversion.getStartDateOfTheDay();
+ Assertions.assertNotNull(day);
+ }
+
+ @Test
+ public void getEndDateOfTheDayTest(){
+ Long day = epochDateTimeConversion.getEndDateOfTheDay();
+ Assertions.assertNotNull(day);
+ }
+}
diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/commit/KafkaConsumerTaskImplTest.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/commit/KafkaConsumerTaskImplTest.java
new file mode 100644
index 00000000..42a2e7f4
--- /dev/null
+++ b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/commit/KafkaConsumerTaskImplTest.java
@@ -0,0 +1,151 @@
+/*
+ * ============LICENSE_START=======================================================
+ * PNF-REGISTRATION-HANDLER
+ * ================================================================================
+ * Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.prh.tasks.commit;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.record.TimestampType;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.onap.dcaegen2.services.prh.adapter.aai.api.ConsumerDmaapModel;
+import org.onap.dcaegen2.services.prh.configuration.CbsConfigurationForAutoCommitDisabledMode;
+import org.onap.dcaegen2.services.prh.service.DmaapConsumerJsonParser;
+import org.springframework.kafka.support.Acknowledgment;
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonSyntaxException;
+import reactor.core.publisher.Flux;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+import static com.github.stefanbirkner.systemlambda.SystemLambda.withEnvironmentVariable;
+import static java.lang.ClassLoader.getSystemResource;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+@ExtendWith(MockitoExtension.class)
+public class KafkaConsumerTaskImplTest {
+
+ @Mock
+ private Acknowledgment acknowledgment;
+
+ private KafkaConsumerTaskImpl kafkaConsumerTask;
+
+ private DmaapConsumerJsonParser dmaapConsumerJsonParser;
+
+ private EpochDateTimeConversion epochDateTimeConversion;
+
+ private CbsConfigurationForAutoCommitDisabledMode cbsConfigurationForAutoCommitDisabled;
+
+ private JsonObject cbsConfigJsonForAutoCommitDisabled;
+
+ @BeforeEach
+ void beforeEach() throws JsonSyntaxException, IOException, URISyntaxException {
+ cbsConfigJsonForAutoCommitDisabled = new Gson().fromJson(
+ new String(Files.readAllBytes(
+ Paths.get(getSystemResource("autoCommitDisabledConfigurationFromCbs2.json").toURI()))),
+ JsonObject.class);
+ cbsConfigurationForAutoCommitDisabled = new CbsConfigurationForAutoCommitDisabledMode();
+ dmaapConsumerJsonParser = new DmaapConsumerJsonParser();
+ epochDateTimeConversion = new EpochDateTimeConversion();
+
+ }
+
+ @Test
+ void beforeOnMessageTest() throws Exception {
+ withEnvironmentVariable("JAAS_CONFIG", "jaas_config")
+ .and("BOOTSTRAP_SERVERS", "localhost:9092")
+ .execute(() -> {
+ cbsConfigurationForAutoCommitDisabled.parseCBSConfig(cbsConfigJsonForAutoCommitDisabled);
+ kafkaConsumerTask = new KafkaConsumerTaskImpl(cbsConfigurationForAutoCommitDisabled,
+ dmaapConsumerJsonParser, epochDateTimeConversion);
+ List<ConsumerRecord<String, String>> list = new ArrayList<>();
+ TimestampType timestampType = null;
+ Headers headers = new RecordHeaders();
+ epochDateTimeConversion.setDaysForRecords("3");
+ ConsumerRecord<String, String> records = new ConsumerRecord<>("test-topic", 1, 1l, 0l, timestampType, 1, 1,
+ "test-key", "test-value", headers, null);
+ list.add(records);
+ kafkaConsumerTask.onMessage(list, acknowledgment);
+ String actualTopicInList = list.get(0).topic();
+ String expectedTopicInList = "test-topic";
+ assertEquals(expectedTopicInList, actualTopicInList, "topic is not matching");
+ assertThat(kafkaConsumerTask.getOffset().equals(acknowledgment));
+ assertThat(kafkaConsumerTask.getJsonEvent().contains("test-topic"));
+ });
+ }
+
+ @Test
+ void beforeCommitOffsetTest() throws Exception {
+ withEnvironmentVariable("JAAS_CONFIG", "jaas_config")
+ .and("BOOTSTRAP_SERVERS", "localhost:9092")
+ .execute(() -> {
+ cbsConfigurationForAutoCommitDisabled.parseCBSConfig(cbsConfigJsonForAutoCommitDisabled);
+ kafkaConsumerTask = new KafkaConsumerTaskImpl(cbsConfigurationForAutoCommitDisabled,
+ dmaapConsumerJsonParser, epochDateTimeConversion);
+ kafkaConsumerTask.commitOffset();
+ });
+ }
+
+ @Test
+ void beforeExecuteTest() throws Exception {
+ withEnvironmentVariable("JAAS_CONFIG", "jaas_config")
+ .and("BOOTSTRAP_SERVERS", "localhost:9092")
+ .execute(() -> {
+ cbsConfigurationForAutoCommitDisabled.parseCBSConfig(cbsConfigJsonForAutoCommitDisabled);
+ kafkaConsumerTask = new KafkaConsumerTaskImpl(cbsConfigurationForAutoCommitDisabled,
+ dmaapConsumerJsonParser, epochDateTimeConversion);
+ String event = getResourceContent("integration/event.json");
+ java.util.List<String> eventList = new ArrayList<>();
+ eventList.add(event);
+ kafkaConsumerTask.setJsonEvent(eventList);
+ Flux<ConsumerDmaapModel> flux = kafkaConsumerTask.execute();
+ String expectedSourceName = "NOK6061ZW8";
+ String actualSourceName = flux.blockFirst().getCorrelationId();
+
+ String expectedOamV4IpAddress = "val3";
+ String actualOamV4IpAddress = flux.blockFirst().getIpv4();
+
+ String expectedOamV6IpAddress = "val4";
+ String actualOamV6IpAddress = flux.blockFirst().getIpv6();
+
+ assertEquals(expectedSourceName, actualSourceName, "SourceName is not matching");
+ assertEquals(expectedOamV4IpAddress, actualOamV4IpAddress, "OamV4IpAddress is not matching");
+ assertEquals(expectedOamV6IpAddress, actualOamV6IpAddress, "OamV6IpAddress is not matching");
+ });
+ }
+
+ private static String getResourceContent(String resourceName) {
+ try {
+ return new String(Files.readAllBytes(Paths.get(getSystemResource(resourceName).toURI())));
+ } catch (Exception e) {
+ throw new RuntimeException("failed loading content of '" + resourceName + "'", e);
+ }
+ }
+
+}
diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/commit/ScheduledTasksRunnerWithCommitTest.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/commit/ScheduledTasksRunnerWithCommitTest.java
new file mode 100644
index 00000000..401e351f
--- /dev/null
+++ b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/commit/ScheduledTasksRunnerWithCommitTest.java
@@ -0,0 +1,72 @@
+/*
+ * ============LICENSE_START=======================================================
+ * PNF-REGISTRATION-HANDLER
+ * ================================================================================
+ * Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcaegen2.services.prh.tasks.commit;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.onap.dcaegen2.services.prh.configuration.PrhProperties;
+import org.springframework.boot.context.event.ApplicationStartedEvent;
+import org.springframework.scheduling.TaskScheduler;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+@ExtendWith(MockitoExtension.class)
+public class ScheduledTasksRunnerWithCommitTest {
+
+ @Mock
+ private ScheduledTasksWithCommit scheduledTasksWithCommit;
+
+ @Mock
+ private TaskScheduler taskScheduler;
+
+ @Mock
+ private PrhProperties prhProperties;
+
+ @Mock
+ private ApplicationStartedEvent applicationStartedEvent;
+
+ private ScheduledTasksRunnerWithCommit scheduledTasksRunnerWithCommit;
+
+ @BeforeEach
+ void setUp() {
+ scheduledTasksRunnerWithCommit = new ScheduledTasksRunnerWithCommit(taskScheduler, scheduledTasksWithCommit, prhProperties);
+ }
+
+ @Test
+ void onApplicationStartedEvent() {
+ scheduledTasksRunnerWithCommit.onApplicationStartedEvent(applicationStartedEvent);
+ assertFalse(scheduledTasksRunnerWithCommit.tryToStartTaskWithCommit());
+ }
+
+ @Test
+ void cancelTasks() {
+ scheduledTasksRunnerWithCommit.cancelTasks();
+ }
+
+ @Test
+ void tryToStartTaskWithCommit() {
+ scheduledTasksRunnerWithCommit.tryToStartTaskWithCommit();
+ assertFalse(scheduledTasksRunnerWithCommit.tryToStartTaskWithCommit());
+ }
+}
diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/commit/ScheduledTasksWithCommitTest.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/commit/ScheduledTasksWithCommitTest.java
new file mode 100644
index 00000000..64779027
--- /dev/null
+++ b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/commit/ScheduledTasksWithCommitTest.java
@@ -0,0 +1,263 @@
+/*
+ * ============LICENSE_START=======================================================
+ * PNF-REGISTRATION-HANDLER
+ * ================================================================================
+ * Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcaegen2.services.prh.tasks.commit;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.BDDMockito.given;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import java.util.Collections;
+import java.util.Map;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.onap.dcaegen2.services.prh.adapter.aai.api.ConsumerDmaapModel;
+import org.onap.dcaegen2.services.prh.adapter.aai.api.ImmutableConsumerDmaapModel;
+import org.onap.dcaegen2.services.prh.exceptions.DmaapEmptyResponseException;
+import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
+import org.onap.dcaegen2.services.prh.tasks.AaiProducerTask;
+import org.onap.dcaegen2.services.prh.tasks.AaiQueryTask;
+import org.onap.dcaegen2.services.prh.tasks.BbsActionsTask;
+import org.onap.dcaegen2.services.prh.tasks.DmaapPublisherTask;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
+import org.springframework.boot.configurationprocessor.json.JSONException;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+
+@ExtendWith(MockitoExtension.class)
+class ScheduledTasksWithCommitTest {
+ private final static ConsumerDmaapModel DMAAP_MODEL =
+ ImmutableConsumerDmaapModel
+ .builder()
+ .correlationId("SomeId")
+ .ipv4("ipv4")
+ .ipv6("ipv6")
+ .build();
+
+ @Mock
+ private DmaapPublisherTask readyPublisher;
+
+ @Mock
+ private DmaapPublisherTask updatePublisher;
+
+
+ @Mock
+ private BbsActionsTask bbsActionsTask;
+
+ @Mock
+ private KafkaConsumerTask kafkaConsumerTask;
+
+ @Mock
+ private AaiQueryTask aaiQueryTask;
+
+ @Mock
+ private AaiProducerTask aaiProducerTask;
+
+ private final Map<String, String> context = Collections.emptyMap();
+
+ private ScheduledTasksWithCommit sut;
+
+ @BeforeEach
+ void setUp() {
+ sut = new ScheduledTasksWithCommit(
+ kafkaConsumerTask,
+ readyPublisher,
+ updatePublisher,
+ aaiQueryTask,
+ aaiProducerTask,
+ bbsActionsTask,
+ context);
+ }
+
+ @Test
+ void testQueryAAiForPNFOnSuccess() throws JSONException, PrhTaskException {
+ ScheduledTasksWithCommit.State state = new ScheduledTasksWithCommit.State(DMAAP_MODEL, false );
+ MessageRouterPublishResponse messageRouterPublishResponse = new MessageRouterPublishResponse() {
+ @Override
+ public @Nullable String failReason() {
+ return null;
+ }
+ };
+ when(kafkaConsumerTask.execute()).thenReturn(Flux.just(DMAAP_MODEL));
+ when(aaiQueryTask.findPnfinAAI(DMAAP_MODEL)).thenReturn(Mono.just(DMAAP_MODEL));
+ when(aaiQueryTask.execute(DMAAP_MODEL)).thenReturn(Mono.just(true));
+ when(aaiProducerTask.execute(state.dmaapModel)).thenReturn(Mono.just(DMAAP_MODEL));
+ when(updatePublisher.execute(state.dmaapModel)).thenReturn(Flux.just(messageRouterPublishResponse));
+
+ sut.scheduleKafkaPrhEventTask();
+
+ verifyIfLogicalLinkWasNotCreated();
+ verifyThatPnfModelWasNotSentDmaapPnfReadyTopic();
+ }
+
+ @Test
+ void testQueryAAiForPNF() throws JSONException, PrhTaskException {
+ ScheduledTasksWithCommit.State state = new ScheduledTasksWithCommit.State(DMAAP_MODEL, true);
+ MessageRouterPublishResponse messageRouterPublishResponse = new MessageRouterPublishResponse() {
+ @Override
+ public @Nullable String failReason() {
+ return null;
+ }
+ };
+ when(kafkaConsumerTask.execute()).thenReturn(Flux.just(DMAAP_MODEL));
+ when(aaiQueryTask.findPnfinAAI(DMAAP_MODEL)).thenReturn(Mono.just(DMAAP_MODEL));
+ when(aaiQueryTask.execute(DMAAP_MODEL)).thenReturn(Mono.just(true));
+ when(aaiProducerTask.execute(state.dmaapModel)).thenReturn(Mono.just(DMAAP_MODEL));
+ when(updatePublisher.execute(state.dmaapModel)).thenReturn(Flux.just(messageRouterPublishResponse));
+
+ sut.scheduleKafkaPrhEventTask();
+
+ verifyIfLogicalLinkWasNotCreated();
+ verifyThatPnfModelWasNotSentDmaapPnfReadyTopic();
+ }
+
+ @Test
+ void testQueryAAiForPNFOnError() throws JSONException, PrhTaskException {
+ when(kafkaConsumerTask.execute()).thenReturn(Flux.just(DMAAP_MODEL));
+
+ sut.scheduleKafkaPrhEventTask();
+
+ verifyThatPnfUpdateWasNotSentToAai();
+
+ verifyIfLogicalLinkWasNotCreated();
+ verifyThatPnfModelWasNotSentDmaapPnfReadyTopic();
+ verifyThatPnfModelWasNotSentDmaapPnfUpdateTopic();
+ }
+
+ @Test
+ void testQueryAAiForPNFOnPRHException() throws JSONException, PrhTaskException {
+ ScheduledTasksWithCommit.State state = new ScheduledTasksWithCommit.State(DMAAP_MODEL, false );
+ MessageRouterPublishResponse messageRouterPublishResponse = new MessageRouterPublishResponse() {
+ @Override
+ public @Nullable String failReason() {
+ return null;
+ }
+ };
+ when(kafkaConsumerTask.execute()).thenReturn(Flux.just(DMAAP_MODEL));
+ when(aaiQueryTask.findPnfinAAI(DMAAP_MODEL)).thenReturn(Mono.just(DMAAP_MODEL));
+ when(aaiQueryTask.execute(DMAAP_MODEL)).thenReturn(Mono.just(true));
+ when(aaiProducerTask.execute(state.dmaapModel)).thenThrow(new PrhTaskException());
+
+ sut.scheduleKafkaPrhEventTask();
+
+ verifyIfLogicalLinkWasNotCreated();
+ verifyThatPnfModelWasNotSentDmaapPnfReadyTopic();
+ }
+
+ @Test
+ void queryAAiForPNFOnPRHExceptionTest() throws JSONException, PrhTaskException {
+ ScheduledTasksWithCommit.State state = new ScheduledTasksWithCommit.State(DMAAP_MODEL, true);
+ MessageRouterPublishResponse messageRouterPublishResponse = new MessageRouterPublishResponse() {
+ @Override
+ public @Nullable String failReason() {
+ return null;
+ }
+ };
+ when(kafkaConsumerTask.execute()).thenReturn(Flux.just(DMAAP_MODEL));
+ when(aaiQueryTask.findPnfinAAI(DMAAP_MODEL)).thenReturn(Mono.just(DMAAP_MODEL));
+ when(aaiQueryTask.execute(DMAAP_MODEL)).thenReturn(Mono.just(true));
+ when(aaiProducerTask.execute(state.dmaapModel)).thenReturn(Mono.just(DMAAP_MODEL));
+ when(updatePublisher.execute(state.dmaapModel)).thenThrow(new PrhTaskException());
+
+ sut.scheduleKafkaPrhEventTask();
+
+ verifyIfLogicalLinkWasNotCreated();
+ verifyThatPnfModelWasNotSentDmaapPnfReadyTopic();
+ }
+
+ @Test
+ void queryAAiForPNFOnPRHExceptionOnDmaapEmptyResponseExceptionTest() throws JSONException, PrhTaskException {
+ ScheduledTasksWithCommit.State state = new ScheduledTasksWithCommit.State(DMAAP_MODEL, true);
+ MessageRouterPublishResponse messageRouterPublishResponse = new MessageRouterPublishResponse() {
+ @Override
+ public @Nullable String failReason() {
+ return null;
+ }
+ };
+ when(kafkaConsumerTask.execute()).thenReturn(Flux.just(DMAAP_MODEL));
+ when(aaiQueryTask.findPnfinAAI(DMAAP_MODEL)).thenReturn(Mono.just(DMAAP_MODEL));
+ when(aaiQueryTask.execute(DMAAP_MODEL)).thenReturn(Mono.just(true));
+ when(aaiProducerTask.execute(state.dmaapModel)).thenReturn(Mono.just(DMAAP_MODEL));
+ when(updatePublisher.execute(state.dmaapModel)).thenThrow(new DmaapEmptyResponseException());
+
+ sut.scheduleKafkaPrhEventTask();
+
+ verifyIfLogicalLinkWasNotCreated();
+ verifyThatPnfModelWasNotSentDmaapPnfReadyTopic();
+ }
+
+ @Test
+ void queryAAiForPNFOnPRHExceptionOnFalseTest() throws JSONException, PrhTaskException {
+ ScheduledTasksWithCommit.State state = new ScheduledTasksWithCommit.State(DMAAP_MODEL, false);
+ MessageRouterPublishResponse messageRouterPublishResponse = new MessageRouterPublishResponse() {
+ @Override
+ public @Nullable String failReason() {
+ return null;
+ }
+ };
+ when(kafkaConsumerTask.execute()).thenReturn(Flux.just(DMAAP_MODEL));
+ when(aaiQueryTask.findPnfinAAI(DMAAP_MODEL)).thenReturn(Mono.just(DMAAP_MODEL));
+ when(aaiQueryTask.execute(DMAAP_MODEL)).thenReturn(Mono.just(false));
+ when(aaiProducerTask.execute(state.dmaapModel)).thenReturn(Mono.just(DMAAP_MODEL));
+
+ sut.scheduleKafkaPrhEventTask();
+ verifyThatPnfModelWasNotSentDmaapPnfReadyTopic();
+ }
+
+ @Test
+ void queryAAiForPNFOnPRHExceptionOnJSONExceptionTest() throws PrhTaskException, JSONException {
+ ScheduledTasksWithCommit.State state = new ScheduledTasksWithCommit.State(DMAAP_MODEL, true);
+ MessageRouterPublishResponse messageRouterPublishResponse = new MessageRouterPublishResponse() {
+ @Override
+ public @Nullable String failReason() {
+ return null;
+ }
+ };
+ when(kafkaConsumerTask.execute()).thenThrow(new JSONException("json format exception"));
+
+ sut.scheduleKafkaPrhEventTask();
+
+ verifyIfLogicalLinkWasNotCreated();
+ verifyThatPnfModelWasNotSentDmaapPnfReadyTopic();
+ }
+
+ private void verifyThatPnfModelWasNotSentDmaapPnfReadyTopic() throws PrhTaskException {
+ verify(readyPublisher, never()).execute(DMAAP_MODEL);
+ }
+
+ private void verifyThatPnfModelWasNotSentDmaapPnfUpdateTopic() throws PrhTaskException {
+ verify(updatePublisher, never()).execute(DMAAP_MODEL);
+ }
+
+ private void verifyThatPnfUpdateWasNotSentToAai() throws PrhTaskException {
+ verify(aaiProducerTask, never()).execute(DMAAP_MODEL);
+ }
+
+ private void verifyIfLogicalLinkWasNotCreated(){
+ verify(bbsActionsTask, never()).execute(DMAAP_MODEL);
+ }
+}
+