aboutsummaryrefslogtreecommitdiffstats
path: root/prh-app-server/src
diff options
context:
space:
mode:
authorsushant53 <sushant.jadhav@t-systems.com>2023-10-27 16:44:30 +0530
committersushant53 <sushant.jadhav@t-systems.com>2023-10-27 16:45:02 +0530
commit62841e53e41fd8e12b75641806f76c11878260ee (patch)
tree624bc532ae85647cb8b5cb470ad3549be167473e /prh-app-server/src
parentd82c53bd799b22660be17219da516415d4c56b46 (diff)
[DCAEGEN2] Remove DMaaP dependency in PRH1.10.1
Removed DMaaP dependency in PRH by using new sdk library, which uses Kafka API directly. Issue-ID: DCAEGEN2-3402 Change-Id: I5456ce432a9fd4a58826275a17c603379b0c18ee Signed-off-by: sushant53 <sushant.jadhav@t-systems.com>
Diffstat (limited to 'prh-app-server/src')
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CbsConfiguration.java8
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/ScheduleController.java1
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasksRunner.java13
-rw-r--r--prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/configuration/CbsConfigurationForAutoCommitDisabledModeTest.java4
-rw-r--r--prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/configuration/CbsConfigurationTest.java30
-rw-r--r--prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/configuration/KafkaConfigTest.java8
-rw-r--r--prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/integration/PrhWorkflowIntegrationForAutoCommitDisabledTest.java46
-rw-r--r--prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/integration/PrhWorkflowIntegrationTest.java76
-rw-r--r--prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/commit/KafkaConsumerTaskImplTest.java12
9 files changed, 156 insertions, 42 deletions
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CbsConfiguration.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CbsConfiguration.java
index 64fff9a7..22763e8b 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CbsConfiguration.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CbsConfiguration.java
@@ -58,7 +58,14 @@ public class CbsConfiguration implements Config {
messageRouterSubscriber = DmaapClientFactory
.createMessageRouterSubscriber(consulConfigurationParser.getMessageRouterSubscriberConfig());
+ String prevTopicUrl = null;
+ if(messageRouterCBSSubscribeRequest != null) {
+ prevTopicUrl = messageRouterCBSSubscribeRequest.sourceDefinition().topicUrl();
+ }
messageRouterCBSSubscribeRequest = consulConfigurationParser.getMessageRouterSubscribeRequest();
+ if(!messageRouterCBSSubscribeRequest.sourceDefinition().topicUrl().equals(prevTopicUrl)) {
+ messageRouterSubscriber.close();
+ }
}
@Override
@@ -95,5 +102,4 @@ public class CbsConfiguration implements Config {
.orElseThrow(() -> new RuntimeException(CBS_CONFIG_MISSING));
}
-
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/ScheduleController.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/ScheduleController.java
index fcbd10a5..aafcd81a 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/ScheduleController.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/ScheduleController.java
@@ -65,6 +65,7 @@ public class ScheduleController {
public Mono<ResponseEntity<String>> stopTask() {
LOGGER.trace("Receiving stop scheduling worker request");
return Mono.defer(() -> {
+ scheduledTasksRunner.closeKafkaPublisherSubscriber();
scheduledTasksRunner.cancelTasks();
return Mono.just(new ResponseEntity<>("PRH Service has been stopped!", HttpStatus.OK));
});
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasksRunner.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasksRunner.java
index e90b0271..5a5eb075 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasksRunner.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasksRunner.java
@@ -25,6 +25,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ScheduledFuture;
import javax.annotation.PreDestroy;
+import org.onap.dcaegen2.services.prh.configuration.CbsConfiguration;
import org.onap.dcaegen2.services.prh.configuration.PrhProperties;
import org.springframework.boot.context.event.ApplicationStartedEvent;
import org.springframework.context.annotation.Configuration;
@@ -46,11 +47,13 @@ public class ScheduledTasksRunner {
private final TaskScheduler taskScheduler;
private final ScheduledTasks scheduledTask;
private final PrhProperties prhProperties;
+ private final CbsConfiguration cbsConfiguration;
public ScheduledTasksRunner(TaskScheduler taskScheduler, ScheduledTasks scheduledTask,
- PrhProperties prhProperties) {
+ PrhProperties prhProperties, CbsConfiguration cbsConfiguration) {
this.taskScheduler = taskScheduler;
this.scheduledTask = scheduledTask;
this.prhProperties = prhProperties;
+ this.cbsConfiguration = cbsConfiguration;
}
@EventListener
@@ -82,4 +85,12 @@ public class ScheduledTasksRunner {
return false;
}
}
+
+ /**
+ * Function for cleaning resources for kafka subscriber and publisher.
+ */
+ public synchronized void closeKafkaPublisherSubscriber() {
+ cbsConfiguration.getMessageRouterSubscriber().close();
+ cbsConfiguration.getMessageRouterPublisher().close();
+ }
}
diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/configuration/CbsConfigurationForAutoCommitDisabledModeTest.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/configuration/CbsConfigurationForAutoCommitDisabledModeTest.java
index 89f1a043..80a0007f 100644
--- a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/configuration/CbsConfigurationForAutoCommitDisabledModeTest.java
+++ b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/configuration/CbsConfigurationForAutoCommitDisabledModeTest.java
@@ -43,7 +43,9 @@ public class CbsConfigurationForAutoCommitDisabledModeTest {
@Test
void beforecbsConfigurationForAutoCommitDisabledMode() throws Exception {
- withEnvironmentVariable("JAAS_CONFIG", "jaas_config").execute(() -> {
+ withEnvironmentVariable("JAAS_CONFIG", "jaas_config")
+ .and("BOOTSTRAP_SERVERS", "localhost:9092")
+ .execute(() -> {
this.cbsConfigurationForAutoCommitDisabledMode();
});
}
diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/configuration/CbsConfigurationTest.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/configuration/CbsConfigurationTest.java
index 4f3cd864..8cd7d5e8 100644
--- a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/configuration/CbsConfigurationTest.java
+++ b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/configuration/CbsConfigurationTest.java
@@ -26,6 +26,8 @@ import com.google.gson.JsonObject;
import org.junit.jupiter.api.Test;
import java.nio.file.Files;
import java.nio.file.Paths;
+
+import static com.github.stefanbirkner.systemlambda.SystemLambda.withEnvironmentVariable;
import static java.lang.ClassLoader.getSystemResource;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -52,19 +54,25 @@ class CbsConfigurationTest {
@Test
void cbsConfigurationShouldExposeDataReceivedAsJsonFromCbs() throws Exception {
- JsonObject cbsConfigJson = new Gson().fromJson(
- new String(Files.readAllBytes(Paths.get(getSystemResource("configurationFromCbs.json").toURI()))),
- JsonObject.class);
- CbsConfiguration cbsConfiguration = new CbsConfiguration();
+
+ withEnvironmentVariable("JAAS_CONFIG", "jaas_config")
+ .and("BOOTSTRAP_SERVERS", "localhost:9092")
+ .execute(() -> {
+ JsonObject cbsConfigJson = new Gson().fromJson(
+ new String(Files.readAllBytes(Paths.get(getSystemResource("configurationFromCbs.json").toURI()))),
+ JsonObject.class);
+ CbsConfiguration cbsConfiguration = new CbsConfiguration();
- cbsConfiguration.parseCBSConfig(cbsConfigJson);
+ cbsConfiguration.parseCBSConfig(cbsConfigJson);
- assertThat(cbsConfiguration.getAaiClientConfiguration()).isNotNull();
- assertThat(cbsConfiguration.getMessageRouterPublisher()).isNotNull();
- assertThat(cbsConfiguration.getMessageRouterSubscriber()).isNotNull();
- assertThat(cbsConfiguration.getMessageRouterPublishRequest()).isNotNull();
- assertThat(cbsConfiguration.getMessageRouterSubscribeRequest()).isNotNull();
- assertThat(cbsConfiguration.getMessageRouterUpdatePublishRequest()).isNotNull();
+ assertThat(cbsConfiguration.getAaiClientConfiguration()).isNotNull();
+ assertThat(cbsConfiguration.getMessageRouterPublisher()).isNotNull();
+ assertThat(cbsConfiguration.getMessageRouterSubscriber()).isNotNull();
+ assertThat(cbsConfiguration.getMessageRouterPublishRequest()).isNotNull();
+ assertThat(cbsConfiguration.getMessageRouterSubscribeRequest()).isNotNull();
+ assertThat(cbsConfiguration.getMessageRouterUpdatePublishRequest()).isNotNull();
+ });
+
}
diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/configuration/KafkaConfigTest.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/configuration/KafkaConfigTest.java
index bd7d7779..b9a05a99 100644
--- a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/configuration/KafkaConfigTest.java
+++ b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/configuration/KafkaConfigTest.java
@@ -57,7 +57,9 @@ public class KafkaConfigTest {
@Test
void beforecbsConfigurationForAutoCommitDisabledMode() throws Exception {
- withEnvironmentVariable("JAAS_CONFIG", "jaas_config").execute(() -> {
+ withEnvironmentVariable("JAAS_CONFIG", "jaas_config")
+ .and("BOOTSTRAP_SERVERS", "localhost:9092")
+ .execute(() -> {
this.consumerFactoryTest();
});
}
@@ -120,7 +122,9 @@ public class KafkaConfigTest {
@Test
void beforeKafkaListenerContainerFactoryTest() throws Exception {
- withEnvironmentVariable("JAAS_CONFIG", "jaas_config").execute(() -> {
+ withEnvironmentVariable("JAAS_CONFIG", "jaas_config")
+ .and("BOOTSTRAP_SERVERS", "localhost:9092")
+ .execute(() -> {
this.kafkaListenerContainerFactoryTest();
});
}
diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/integration/PrhWorkflowIntegrationForAutoCommitDisabledTest.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/integration/PrhWorkflowIntegrationForAutoCommitDisabledTest.java
index b10c1ad8..21f9d099 100644
--- a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/integration/PrhWorkflowIntegrationForAutoCommitDisabledTest.java
+++ b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/integration/PrhWorkflowIntegrationForAutoCommitDisabledTest.java
@@ -25,24 +25,35 @@ import com.fasterxml.jackson.databind.JsonMappingException;
import com.github.tomakehurst.wiremock.client.WireMock;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
+import com.google.gson.JsonPrimitive;
import com.jayway.jsonpath.JsonPath;
+
+import io.vavr.collection.List;
import reactor.core.publisher.Flux;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
import org.onap.dcaegen2.services.prh.MainApp;
import org.onap.dcaegen2.services.prh.adapter.aai.api.ConsumerDmaapModel;
import org.onap.dcaegen2.services.prh.adapter.kafka.ImmutableKafkaConfiguration;
import org.onap.dcaegen2.services.prh.adapter.kafka.KafkaConfiguration;
+import org.onap.dcaegen2.services.prh.configuration.CbsConfiguration;
import org.onap.dcaegen2.services.prh.configuration.CbsConfigurationForAutoCommitDisabledMode;
import org.onap.dcaegen2.services.prh.service.DmaapConsumerJsonParser;
import org.onap.dcaegen2.services.prh.tasks.commit.KafkaConsumerTaskImpl;
import org.onap.dcaegen2.services.prh.tasks.commit.ScheduledTasksRunnerWithCommit;
import org.onap.dcaegen2.services.prh.tasks.commit.ScheduledTasksWithCommit;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.configurationprocessor.json.JSONException;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.MockBean;
+import org.springframework.boot.test.mock.mockito.SpyBean;
import org.springframework.cloud.contract.wiremock.AutoConfigureWireMock;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@@ -65,6 +76,9 @@ import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import static java.lang.ClassLoader.getSystemResource;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/**
@@ -91,6 +105,12 @@ class PrhWorkflowIntegrationForAutoCommitDisabledTest {
@Autowired
private DmaapConsumerJsonParser dmaapConsumerJsonParser;
+
+ @SpyBean
+ CbsConfiguration cbsConfiguration;
+
+ @Mock
+ MessageRouterPublisher publisher;
@Configuration
@Import(MainApp.class)
@@ -112,7 +132,12 @@ class PrhWorkflowIntegrationForAutoCommitDisabledTest {
CbsConfigurationForAutoCommitDisabledMode cbsConfigurationForAutoCommitDisabledMode = new CbsConfigurationForAutoCommitDisabledMode();
try {
- cbsConfigurationForAutoCommitDisabledMode.parseCBSConfig(cbsConfigJson);
+ withEnvironmentVariable("JAAS_CONFIG", "jaas_config")
+ .and("BOOTSTRAP_SERVERS", "localhost:9092")
+ .execute(() -> {
+ cbsConfigurationForAutoCommitDisabledMode.parseCBSConfig(cbsConfigJson);
+ });
+
} catch (Exception e) {
//Exception is expected as environment variable for JAAS_CONFIG is not available
if (e.getMessage() == "kafkaJaasConfig") {
@@ -136,7 +161,8 @@ class PrhWorkflowIntegrationForAutoCommitDisabledTest {
@Test
void beforeCbsConfigurationForAutoCommitDisabledMode() throws Exception {
- withEnvironmentVariable("JAAS_CONFIG", "jaas_config").execute(() -> {
+ withEnvironmentVariable("JAAS_CONFIG", "jaas_config")
+ .execute(() -> {
this.whenThereAreNoEventsInDmaap_WorkflowShouldFinish();
});
}
@@ -153,7 +179,8 @@ class PrhWorkflowIntegrationForAutoCommitDisabledTest {
@Test
void beforeWhenThereIsAnEventsInDmaap_ShouldSendPnfReadyNotification() throws Exception {
- withEnvironmentVariable("JAAS_CONFIG", "jaas_config").execute(() -> {
+ withEnvironmentVariable("JAAS_CONFIG", "jaas_config")
+ .execute(() -> {
this.whenThereIsAnEventsInDmaap_ShouldSendPnfReadyNotification();
});
}
@@ -175,11 +202,16 @@ class PrhWorkflowIntegrationForAutoCommitDisabledTest {
stubFor(post(urlEqualTo("/events/unauthenticated.PNF_READY")));
when(kafkaConsumerTaskImpl.execute()).thenReturn(fluxList);
-
+
+ List<String> expectedItems = List.of(event);
+ Flux<MessageRouterPublishResponse> pubresp = Flux.just(ImmutableMessageRouterPublishResponse
+ .builder()
+ .items(expectedItems.map(JsonPrimitive::new))
+ .build());
+ when(cbsConfiguration.getMessageRouterPublisher()).thenReturn(publisher);
+ when(publisher.put(any(MessageRouterPublishRequest.class),any())).thenReturn(pubresp);
scheduledTasksWithCommit.scheduleKafkaPrhEventTask();
-
- verify(1, postRequestedFor(urlEqualTo("/events/unauthenticated.PNF_READY"))
- .withRequestBody(matchingJsonPath("$[0].correlationId", equalTo(pnfName))));
+ verify(publisher,times(1)).put(any(MessageRouterPublishRequest.class),any());
}
diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/integration/PrhWorkflowIntegrationTest.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/integration/PrhWorkflowIntegrationTest.java
index f5033ca2..a77fcd75 100644
--- a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/integration/PrhWorkflowIntegrationTest.java
+++ b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/integration/PrhWorkflowIntegrationTest.java
@@ -24,23 +24,40 @@ package org.onap.dcaegen2.services.prh.integration;
import com.github.tomakehurst.wiremock.client.WireMock;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
+import com.google.gson.JsonPrimitive;
import com.jayway.jsonpath.JsonPath;
+
+import io.vavr.collection.List;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
import org.onap.dcaegen2.services.prh.MainApp;
import org.onap.dcaegen2.services.prh.configuration.CbsConfiguration;
import org.onap.dcaegen2.services.prh.tasks.ScheduledTasks;
import org.onap.dcaegen2.services.prh.tasks.ScheduledTasksRunner;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.MockBean;
+import org.springframework.boot.test.mock.mockito.SpyBean;
import org.springframework.cloud.contract.wiremock.AutoConfigureWireMock;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.test.context.ActiveProfiles;
import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo;
+import static com.github.stefanbirkner.systemlambda.SystemLambda.withEnvironmentVariable;
import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
import static com.github.tomakehurst.wiremock.client.WireMock.urlPathMatching;
import static com.github.tomakehurst.wiremock.client.WireMock.ok;
@@ -57,6 +74,10 @@ import java.nio.file.Files;
import java.nio.file.Paths;
import static java.lang.ClassLoader.getSystemResource;
import static java.util.Collections.singletonList;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
@SpringBootTest
@@ -66,27 +87,40 @@ class PrhWorkflowIntegrationTest {
@Autowired
private ScheduledTasks scheduledTasks;
-
+
+ @SpyBean
+ CbsConfiguration cbsConfiguration;
+
@MockBean
private ScheduledTasksRunner scheduledTasksRunner; // just to disable scheduling - some configurability in ScheduledTaskRunner not to start tasks at app startup would be welcome
-
-
+
+ @Mock
+ MessageRouterSubscriber subscriber;
+
+ @Mock
+ MessageRouterPublisher publisher;
+
@Configuration
@Import(MainApp.class)
static class CbsConfigTestConfig {
@Value("http://localhost:${wiremock.server.port}")
private String wiremockServerAddress;
-
+
@Bean
- public CbsConfiguration cbsConfiguration() {
+ public CbsConfiguration cbsConfiguration() throws Exception {
JsonObject cbsConfigJson = new Gson().fromJson(getResourceContent("configurationFromCbs.json")
.replaceAll("https?://dmaap-mr[\\w.]*:\\d+", wiremockServerAddress)
.replaceAll("https?://aai[\\w.]*:\\d+", wiremockServerAddress),
JsonObject.class);
-
+
CbsConfiguration cbsConfiguration = new CbsConfiguration();
- cbsConfiguration.parseCBSConfig(cbsConfigJson);
+ withEnvironmentVariable("JAAS_CONFIG", "jaas_config")
+ .and("BOOTSTRAP_SERVERS", "localhost:9092")
+ .execute(() -> {
+ cbsConfiguration.parseCBSConfig(cbsConfigJson);
+ });
+
return cbsConfiguration;
}
@@ -100,7 +134,7 @@ class PrhWorkflowIntegrationTest {
@Test
- void whenThereAreNoEventsInDmaap_WorkflowShouldFinish() {
+ void whenThereAreNoEventsInDmaap_WorkflowShouldFinish() {
stubFor(get(urlEqualTo("/events/unauthenticated.VES_PNFREG_OUTPUT/OpenDCAE-c12/c12"))
.willReturn(aResponse().withBody("[]")));
@@ -115,17 +149,27 @@ class PrhWorkflowIntegrationTest {
void whenThereIsAnEventsInDmaap_ShouldSendPnfReadyNotification() {
String event = getResourceContent("integration/event.json");
String pnfName = JsonPath.read(event, "$.event.commonEventHeader.sourceName");
-
- stubFor(get(urlEqualTo("/events/unauthenticated.VES_PNFREG_OUTPUT/OpenDCAE-c12/c12"))
- .willReturn(ok().withBody(new Gson().toJson(singletonList(event)))));
stubFor(get(urlEqualTo("/aai/v23/network/pnfs/pnf/" + pnfName)).willReturn(ok().withBody("{}")));
stubFor(patch(urlEqualTo("/aai/v23/network/pnfs/pnf/" + pnfName)));
- stubFor(post(urlEqualTo("/events/unauthenticated.PNF_READY")));
-
+
+ List<String> expectedItems = List.of(event);
+ Mono<MessageRouterSubscribeResponse> resp = Mono.just(ImmutableMessageRouterSubscribeResponse
+ .builder()
+ .items(expectedItems.map(JsonPrimitive::new))
+ .build());
+ Flux<MessageRouterPublishResponse> pubresp = Flux.just(ImmutableMessageRouterPublishResponse
+ .builder()
+ .items(expectedItems.map(JsonPrimitive::new))
+ .build());
+
+ when(cbsConfiguration.getMessageRouterSubscriber()).thenReturn(subscriber);
+ when(cbsConfiguration.getMessageRouterPublisher()).thenReturn(publisher);
+ when(subscriber.get(any(MessageRouterSubscribeRequest.class))).thenReturn(resp);
+ when(publisher.put(any(MessageRouterPublishRequest.class),any())).thenReturn(pubresp);
+
scheduledTasks.scheduleMainPrhEventTask();
-
- verify(1, postRequestedFor(urlEqualTo("/events/unauthenticated.PNF_READY"))
- .withRequestBody(matchingJsonPath("$[0].correlationId", equalTo(pnfName))));
+ verify(subscriber,times(1)).get(any(MessageRouterSubscribeRequest.class));
+ verify(publisher,times(1)).put(any(MessageRouterPublishRequest.class),any());
}
diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/commit/KafkaConsumerTaskImplTest.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/commit/KafkaConsumerTaskImplTest.java
index f33ff43e..42a2e7f4 100644
--- a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/commit/KafkaConsumerTaskImplTest.java
+++ b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/commit/KafkaConsumerTaskImplTest.java
@@ -78,7 +78,9 @@ public class KafkaConsumerTaskImplTest {
@Test
void beforeOnMessageTest() throws Exception {
- withEnvironmentVariable("JAAS_CONFIG", "jaas_config").execute(() -> {
+ withEnvironmentVariable("JAAS_CONFIG", "jaas_config")
+ .and("BOOTSTRAP_SERVERS", "localhost:9092")
+ .execute(() -> {
cbsConfigurationForAutoCommitDisabled.parseCBSConfig(cbsConfigJsonForAutoCommitDisabled);
kafkaConsumerTask = new KafkaConsumerTaskImpl(cbsConfigurationForAutoCommitDisabled,
dmaapConsumerJsonParser, epochDateTimeConversion);
@@ -100,7 +102,9 @@ public class KafkaConsumerTaskImplTest {
@Test
void beforeCommitOffsetTest() throws Exception {
- withEnvironmentVariable("JAAS_CONFIG", "jaas_config").execute(() -> {
+ withEnvironmentVariable("JAAS_CONFIG", "jaas_config")
+ .and("BOOTSTRAP_SERVERS", "localhost:9092")
+ .execute(() -> {
cbsConfigurationForAutoCommitDisabled.parseCBSConfig(cbsConfigJsonForAutoCommitDisabled);
kafkaConsumerTask = new KafkaConsumerTaskImpl(cbsConfigurationForAutoCommitDisabled,
dmaapConsumerJsonParser, epochDateTimeConversion);
@@ -110,7 +114,9 @@ public class KafkaConsumerTaskImplTest {
@Test
void beforeExecuteTest() throws Exception {
- withEnvironmentVariable("JAAS_CONFIG", "jaas_config").execute(() -> {
+ withEnvironmentVariable("JAAS_CONFIG", "jaas_config")
+ .and("BOOTSTRAP_SERVERS", "localhost:9092")
+ .execute(() -> {
cbsConfigurationForAutoCommitDisabled.parseCBSConfig(cbsConfigJsonForAutoCommitDisabled);
kafkaConsumerTask = new KafkaConsumerTaskImpl(cbsConfigurationForAutoCommitDisabled,
dmaapConsumerJsonParser, epochDateTimeConversion);