From 62841e53e41fd8e12b75641806f76c11878260ee Mon Sep 17 00:00:00 2001 From: sushant53 Date: Fri, 27 Oct 2023 16:44:30 +0530 Subject: [DCAEGEN2] Remove DMaaP dependency in PRH Removed DMaaP dependency in PRH by using new sdk library, which uses Kafka API directly. Issue-ID: DCAEGEN2-3402 Change-Id: I5456ce432a9fd4a58826275a17c603379b0c18ee Signed-off-by: sushant53 --- Changelog.md | 4 ++ pom.xml | 4 +- prh-app-server/pom.xml | 2 +- .../prh/configuration/CbsConfiguration.java | 8 ++- .../prh/controllers/ScheduleController.java | 1 + .../services/prh/tasks/ScheduledTasksRunner.java | 13 +++- ...ConfigurationForAutoCommitDisabledModeTest.java | 4 +- .../prh/configuration/CbsConfigurationTest.java | 30 +++++---- .../prh/configuration/KafkaConfigTest.java | 8 ++- ...rkflowIntegrationForAutoCommitDisabledTest.java | 46 +++++++++++-- .../integration/PrhWorkflowIntegrationTest.java | 76 +++++++++++++++++----- .../tasks/commit/KafkaConsumerTaskImplTest.java | 12 +++- prh-commons/pom.xml | 2 +- version.properties | 2 +- 14 files changed, 165 insertions(+), 47 deletions(-) diff --git a/Changelog.md b/Changelog.md index 2cb689eb..6b93a733 100644 --- a/Changelog.md +++ b/Changelog.md @@ -4,6 +4,10 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](http://keepachangelog.com/) and this project adheres to [Semantic Versioning](http://semver.org/). +## [1.10.1] - 2023/10/27 +### Changed +- [DCAEGEN2-3402] Remove DMaaP dependency in PRH + ## [1.10.0] - 2023/09/01 ### Changed - [DCAEGEN2-3365] Code changed so that the autoCommitDisabled mode of PRH use CBSContentParser for environment variables. diff --git a/pom.xml b/pom.xml index e5eacddc..431494db 100644 --- a/pom.xml +++ b/pom.xml @@ -34,7 +34,7 @@ org.onap.dcaegen2.services prh - 1.10.0-SNAPSHOT + 1.10.1-SNAPSHOT dcaegen2-services-prh PNF Registration Handler @@ -59,7 +59,7 @@ 2021.0.3 3.0.0 2.7.5 - 1.9.3 + 1.9.4 29.0-jre ${project.reporting.outputDirectory}/jacoco-ut/jacoco.xml diff --git a/prh-app-server/pom.xml b/prh-app-server/pom.xml index 189a37ca..39249716 100644 --- a/prh-app-server/pom.xml +++ b/prh-app-server/pom.xml @@ -28,7 +28,7 @@ org.onap.dcaegen2.services prh - 1.10.0-SNAPSHOT + 1.10.1-SNAPSHOT org.onap.dcaegen2.services.prh diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CbsConfiguration.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CbsConfiguration.java index 64fff9a7..22763e8b 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CbsConfiguration.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CbsConfiguration.java @@ -58,7 +58,14 @@ public class CbsConfiguration implements Config { messageRouterSubscriber = DmaapClientFactory .createMessageRouterSubscriber(consulConfigurationParser.getMessageRouterSubscriberConfig()); + String prevTopicUrl = null; + if(messageRouterCBSSubscribeRequest != null) { + prevTopicUrl = messageRouterCBSSubscribeRequest.sourceDefinition().topicUrl(); + } messageRouterCBSSubscribeRequest = consulConfigurationParser.getMessageRouterSubscribeRequest(); + if(!messageRouterCBSSubscribeRequest.sourceDefinition().topicUrl().equals(prevTopicUrl)) { + messageRouterSubscriber.close(); + } } @Override @@ -95,5 +102,4 @@ public class CbsConfiguration implements Config { .orElseThrow(() -> new RuntimeException(CBS_CONFIG_MISSING)); } - } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/ScheduleController.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/ScheduleController.java index fcbd10a5..aafcd81a 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/ScheduleController.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/ScheduleController.java @@ -65,6 +65,7 @@ public class ScheduleController { public Mono> stopTask() { LOGGER.trace("Receiving stop scheduling worker request"); return Mono.defer(() -> { + scheduledTasksRunner.closeKafkaPublisherSubscriber(); scheduledTasksRunner.cancelTasks(); return Mono.just(new ResponseEntity<>("PRH Service has been stopped!", HttpStatus.OK)); }); diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasksRunner.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasksRunner.java index e90b0271..5a5eb075 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasksRunner.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasksRunner.java @@ -25,6 +25,7 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.ScheduledFuture; import javax.annotation.PreDestroy; +import org.onap.dcaegen2.services.prh.configuration.CbsConfiguration; import org.onap.dcaegen2.services.prh.configuration.PrhProperties; import org.springframework.boot.context.event.ApplicationStartedEvent; import org.springframework.context.annotation.Configuration; @@ -46,11 +47,13 @@ public class ScheduledTasksRunner { private final TaskScheduler taskScheduler; private final ScheduledTasks scheduledTask; private final PrhProperties prhProperties; + private final CbsConfiguration cbsConfiguration; public ScheduledTasksRunner(TaskScheduler taskScheduler, ScheduledTasks scheduledTask, - PrhProperties prhProperties) { + PrhProperties prhProperties, CbsConfiguration cbsConfiguration) { this.taskScheduler = taskScheduler; this.scheduledTask = scheduledTask; this.prhProperties = prhProperties; + this.cbsConfiguration = cbsConfiguration; } @EventListener @@ -82,4 +85,12 @@ public class ScheduledTasksRunner { return false; } } + + /** + * Function for cleaning resources for kafka subscriber and publisher. + */ + public synchronized void closeKafkaPublisherSubscriber() { + cbsConfiguration.getMessageRouterSubscriber().close(); + cbsConfiguration.getMessageRouterPublisher().close(); + } } diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/configuration/CbsConfigurationForAutoCommitDisabledModeTest.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/configuration/CbsConfigurationForAutoCommitDisabledModeTest.java index 89f1a043..80a0007f 100644 --- a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/configuration/CbsConfigurationForAutoCommitDisabledModeTest.java +++ b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/configuration/CbsConfigurationForAutoCommitDisabledModeTest.java @@ -43,7 +43,9 @@ public class CbsConfigurationForAutoCommitDisabledModeTest { @Test void beforecbsConfigurationForAutoCommitDisabledMode() throws Exception { - withEnvironmentVariable("JAAS_CONFIG", "jaas_config").execute(() -> { + withEnvironmentVariable("JAAS_CONFIG", "jaas_config") + .and("BOOTSTRAP_SERVERS", "localhost:9092") + .execute(() -> { this.cbsConfigurationForAutoCommitDisabledMode(); }); } diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/configuration/CbsConfigurationTest.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/configuration/CbsConfigurationTest.java index 4f3cd864..8cd7d5e8 100644 --- a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/configuration/CbsConfigurationTest.java +++ b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/configuration/CbsConfigurationTest.java @@ -26,6 +26,8 @@ import com.google.gson.JsonObject; import org.junit.jupiter.api.Test; import java.nio.file.Files; import java.nio.file.Paths; + +import static com.github.stefanbirkner.systemlambda.SystemLambda.withEnvironmentVariable; import static java.lang.ClassLoader.getSystemResource; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -52,19 +54,25 @@ class CbsConfigurationTest { @Test void cbsConfigurationShouldExposeDataReceivedAsJsonFromCbs() throws Exception { - JsonObject cbsConfigJson = new Gson().fromJson( - new String(Files.readAllBytes(Paths.get(getSystemResource("configurationFromCbs.json").toURI()))), - JsonObject.class); - CbsConfiguration cbsConfiguration = new CbsConfiguration(); + + withEnvironmentVariable("JAAS_CONFIG", "jaas_config") + .and("BOOTSTRAP_SERVERS", "localhost:9092") + .execute(() -> { + JsonObject cbsConfigJson = new Gson().fromJson( + new String(Files.readAllBytes(Paths.get(getSystemResource("configurationFromCbs.json").toURI()))), + JsonObject.class); + CbsConfiguration cbsConfiguration = new CbsConfiguration(); - cbsConfiguration.parseCBSConfig(cbsConfigJson); + cbsConfiguration.parseCBSConfig(cbsConfigJson); - assertThat(cbsConfiguration.getAaiClientConfiguration()).isNotNull(); - assertThat(cbsConfiguration.getMessageRouterPublisher()).isNotNull(); - assertThat(cbsConfiguration.getMessageRouterSubscriber()).isNotNull(); - assertThat(cbsConfiguration.getMessageRouterPublishRequest()).isNotNull(); - assertThat(cbsConfiguration.getMessageRouterSubscribeRequest()).isNotNull(); - assertThat(cbsConfiguration.getMessageRouterUpdatePublishRequest()).isNotNull(); + assertThat(cbsConfiguration.getAaiClientConfiguration()).isNotNull(); + assertThat(cbsConfiguration.getMessageRouterPublisher()).isNotNull(); + assertThat(cbsConfiguration.getMessageRouterSubscriber()).isNotNull(); + assertThat(cbsConfiguration.getMessageRouterPublishRequest()).isNotNull(); + assertThat(cbsConfiguration.getMessageRouterSubscribeRequest()).isNotNull(); + assertThat(cbsConfiguration.getMessageRouterUpdatePublishRequest()).isNotNull(); + }); + } diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/configuration/KafkaConfigTest.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/configuration/KafkaConfigTest.java index bd7d7779..b9a05a99 100644 --- a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/configuration/KafkaConfigTest.java +++ b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/configuration/KafkaConfigTest.java @@ -57,7 +57,9 @@ public class KafkaConfigTest { @Test void beforecbsConfigurationForAutoCommitDisabledMode() throws Exception { - withEnvironmentVariable("JAAS_CONFIG", "jaas_config").execute(() -> { + withEnvironmentVariable("JAAS_CONFIG", "jaas_config") + .and("BOOTSTRAP_SERVERS", "localhost:9092") + .execute(() -> { this.consumerFactoryTest(); }); } @@ -120,7 +122,9 @@ public class KafkaConfigTest { @Test void beforeKafkaListenerContainerFactoryTest() throws Exception { - withEnvironmentVariable("JAAS_CONFIG", "jaas_config").execute(() -> { + withEnvironmentVariable("JAAS_CONFIG", "jaas_config") + .and("BOOTSTRAP_SERVERS", "localhost:9092") + .execute(() -> { this.kafkaListenerContainerFactoryTest(); }); } diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/integration/PrhWorkflowIntegrationForAutoCommitDisabledTest.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/integration/PrhWorkflowIntegrationForAutoCommitDisabledTest.java index b10c1ad8..21f9d099 100644 --- a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/integration/PrhWorkflowIntegrationForAutoCommitDisabledTest.java +++ b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/integration/PrhWorkflowIntegrationForAutoCommitDisabledTest.java @@ -25,24 +25,35 @@ import com.fasterxml.jackson.databind.JsonMappingException; import com.github.tomakehurst.wiremock.client.WireMock; import com.google.gson.Gson; import com.google.gson.JsonObject; +import com.google.gson.JsonPrimitive; import com.jayway.jsonpath.JsonPath; + +import io.vavr.collection.List; import reactor.core.publisher.Flux; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.mockito.Mock; import org.onap.dcaegen2.services.prh.MainApp; import org.onap.dcaegen2.services.prh.adapter.aai.api.ConsumerDmaapModel; import org.onap.dcaegen2.services.prh.adapter.kafka.ImmutableKafkaConfiguration; import org.onap.dcaegen2.services.prh.adapter.kafka.KafkaConfiguration; +import org.onap.dcaegen2.services.prh.configuration.CbsConfiguration; import org.onap.dcaegen2.services.prh.configuration.CbsConfigurationForAutoCommitDisabledMode; import org.onap.dcaegen2.services.prh.service.DmaapConsumerJsonParser; import org.onap.dcaegen2.services.prh.tasks.commit.KafkaConsumerTaskImpl; import org.onap.dcaegen2.services.prh.tasks.commit.ScheduledTasksRunnerWithCommit; import org.onap.dcaegen2.services.prh.tasks.commit.ScheduledTasksWithCommit; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishResponse; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.configurationprocessor.json.JSONException; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.mock.mockito.MockBean; +import org.springframework.boot.test.mock.mockito.SpyBean; import org.springframework.cloud.contract.wiremock.AutoConfigureWireMock; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -65,6 +76,9 @@ import java.nio.file.Files; import java.nio.file.Paths; import java.util.ArrayList; import static java.lang.ClassLoader.getSystemResource; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; /** @@ -91,6 +105,12 @@ class PrhWorkflowIntegrationForAutoCommitDisabledTest { @Autowired private DmaapConsumerJsonParser dmaapConsumerJsonParser; + + @SpyBean + CbsConfiguration cbsConfiguration; + + @Mock + MessageRouterPublisher publisher; @Configuration @Import(MainApp.class) @@ -112,7 +132,12 @@ class PrhWorkflowIntegrationForAutoCommitDisabledTest { CbsConfigurationForAutoCommitDisabledMode cbsConfigurationForAutoCommitDisabledMode = new CbsConfigurationForAutoCommitDisabledMode(); try { - cbsConfigurationForAutoCommitDisabledMode.parseCBSConfig(cbsConfigJson); + withEnvironmentVariable("JAAS_CONFIG", "jaas_config") + .and("BOOTSTRAP_SERVERS", "localhost:9092") + .execute(() -> { + cbsConfigurationForAutoCommitDisabledMode.parseCBSConfig(cbsConfigJson); + }); + } catch (Exception e) { //Exception is expected as environment variable for JAAS_CONFIG is not available if (e.getMessage() == "kafkaJaasConfig") { @@ -136,7 +161,8 @@ class PrhWorkflowIntegrationForAutoCommitDisabledTest { @Test void beforeCbsConfigurationForAutoCommitDisabledMode() throws Exception { - withEnvironmentVariable("JAAS_CONFIG", "jaas_config").execute(() -> { + withEnvironmentVariable("JAAS_CONFIG", "jaas_config") + .execute(() -> { this.whenThereAreNoEventsInDmaap_WorkflowShouldFinish(); }); } @@ -153,7 +179,8 @@ class PrhWorkflowIntegrationForAutoCommitDisabledTest { @Test void beforeWhenThereIsAnEventsInDmaap_ShouldSendPnfReadyNotification() throws Exception { - withEnvironmentVariable("JAAS_CONFIG", "jaas_config").execute(() -> { + withEnvironmentVariable("JAAS_CONFIG", "jaas_config") + .execute(() -> { this.whenThereIsAnEventsInDmaap_ShouldSendPnfReadyNotification(); }); } @@ -175,11 +202,16 @@ class PrhWorkflowIntegrationForAutoCommitDisabledTest { stubFor(post(urlEqualTo("/events/unauthenticated.PNF_READY"))); when(kafkaConsumerTaskImpl.execute()).thenReturn(fluxList); - + + List expectedItems = List.of(event); + Flux pubresp = Flux.just(ImmutableMessageRouterPublishResponse + .builder() + .items(expectedItems.map(JsonPrimitive::new)) + .build()); + when(cbsConfiguration.getMessageRouterPublisher()).thenReturn(publisher); + when(publisher.put(any(MessageRouterPublishRequest.class),any())).thenReturn(pubresp); scheduledTasksWithCommit.scheduleKafkaPrhEventTask(); - - verify(1, postRequestedFor(urlEqualTo("/events/unauthenticated.PNF_READY")) - .withRequestBody(matchingJsonPath("$[0].correlationId", equalTo(pnfName)))); + verify(publisher,times(1)).put(any(MessageRouterPublishRequest.class),any()); } diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/integration/PrhWorkflowIntegrationTest.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/integration/PrhWorkflowIntegrationTest.java index f5033ca2..a77fcd75 100644 --- a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/integration/PrhWorkflowIntegrationTest.java +++ b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/integration/PrhWorkflowIntegrationTest.java @@ -24,23 +24,40 @@ package org.onap.dcaegen2.services.prh.integration; import com.github.tomakehurst.wiremock.client.WireMock; import com.google.gson.Gson; import com.google.gson.JsonObject; +import com.google.gson.JsonPrimitive; import com.jayway.jsonpath.JsonPath; + +import io.vavr.collection.List; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.mockito.Mock; import org.onap.dcaegen2.services.prh.MainApp; import org.onap.dcaegen2.services.prh.configuration.CbsConfiguration; import org.onap.dcaegen2.services.prh.tasks.ScheduledTasks; import org.onap.dcaegen2.services.prh.tasks.ScheduledTasksRunner; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishResponse; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeResponse; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.mock.mockito.MockBean; +import org.springframework.boot.test.mock.mockito.SpyBean; import org.springframework.cloud.contract.wiremock.AutoConfigureWireMock; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Import; import org.springframework.test.context.ActiveProfiles; import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo; +import static com.github.stefanbirkner.systemlambda.SystemLambda.withEnvironmentVariable; import static com.github.tomakehurst.wiremock.client.WireMock.aResponse; import static com.github.tomakehurst.wiremock.client.WireMock.urlPathMatching; import static com.github.tomakehurst.wiremock.client.WireMock.ok; @@ -57,6 +74,10 @@ import java.nio.file.Files; import java.nio.file.Paths; import static java.lang.ClassLoader.getSystemResource; import static java.util.Collections.singletonList; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; @SpringBootTest @@ -66,27 +87,40 @@ class PrhWorkflowIntegrationTest { @Autowired private ScheduledTasks scheduledTasks; - + + @SpyBean + CbsConfiguration cbsConfiguration; + @MockBean private ScheduledTasksRunner scheduledTasksRunner; // just to disable scheduling - some configurability in ScheduledTaskRunner not to start tasks at app startup would be welcome - - + + @Mock + MessageRouterSubscriber subscriber; + + @Mock + MessageRouterPublisher publisher; + @Configuration @Import(MainApp.class) static class CbsConfigTestConfig { @Value("http://localhost:${wiremock.server.port}") private String wiremockServerAddress; - + @Bean - public CbsConfiguration cbsConfiguration() { + public CbsConfiguration cbsConfiguration() throws Exception { JsonObject cbsConfigJson = new Gson().fromJson(getResourceContent("configurationFromCbs.json") .replaceAll("https?://dmaap-mr[\\w.]*:\\d+", wiremockServerAddress) .replaceAll("https?://aai[\\w.]*:\\d+", wiremockServerAddress), JsonObject.class); - + CbsConfiguration cbsConfiguration = new CbsConfiguration(); - cbsConfiguration.parseCBSConfig(cbsConfigJson); + withEnvironmentVariable("JAAS_CONFIG", "jaas_config") + .and("BOOTSTRAP_SERVERS", "localhost:9092") + .execute(() -> { + cbsConfiguration.parseCBSConfig(cbsConfigJson); + }); + return cbsConfiguration; } @@ -100,7 +134,7 @@ class PrhWorkflowIntegrationTest { @Test - void whenThereAreNoEventsInDmaap_WorkflowShouldFinish() { + void whenThereAreNoEventsInDmaap_WorkflowShouldFinish() { stubFor(get(urlEqualTo("/events/unauthenticated.VES_PNFREG_OUTPUT/OpenDCAE-c12/c12")) .willReturn(aResponse().withBody("[]"))); @@ -115,17 +149,27 @@ class PrhWorkflowIntegrationTest { void whenThereIsAnEventsInDmaap_ShouldSendPnfReadyNotification() { String event = getResourceContent("integration/event.json"); String pnfName = JsonPath.read(event, "$.event.commonEventHeader.sourceName"); - - stubFor(get(urlEqualTo("/events/unauthenticated.VES_PNFREG_OUTPUT/OpenDCAE-c12/c12")) - .willReturn(ok().withBody(new Gson().toJson(singletonList(event))))); stubFor(get(urlEqualTo("/aai/v23/network/pnfs/pnf/" + pnfName)).willReturn(ok().withBody("{}"))); stubFor(patch(urlEqualTo("/aai/v23/network/pnfs/pnf/" + pnfName))); - stubFor(post(urlEqualTo("/events/unauthenticated.PNF_READY"))); - + + List expectedItems = List.of(event); + Mono resp = Mono.just(ImmutableMessageRouterSubscribeResponse + .builder() + .items(expectedItems.map(JsonPrimitive::new)) + .build()); + Flux pubresp = Flux.just(ImmutableMessageRouterPublishResponse + .builder() + .items(expectedItems.map(JsonPrimitive::new)) + .build()); + + when(cbsConfiguration.getMessageRouterSubscriber()).thenReturn(subscriber); + when(cbsConfiguration.getMessageRouterPublisher()).thenReturn(publisher); + when(subscriber.get(any(MessageRouterSubscribeRequest.class))).thenReturn(resp); + when(publisher.put(any(MessageRouterPublishRequest.class),any())).thenReturn(pubresp); + scheduledTasks.scheduleMainPrhEventTask(); - - verify(1, postRequestedFor(urlEqualTo("/events/unauthenticated.PNF_READY")) - .withRequestBody(matchingJsonPath("$[0].correlationId", equalTo(pnfName)))); + verify(subscriber,times(1)).get(any(MessageRouterSubscribeRequest.class)); + verify(publisher,times(1)).put(any(MessageRouterPublishRequest.class),any()); } diff --git a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/commit/KafkaConsumerTaskImplTest.java b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/commit/KafkaConsumerTaskImplTest.java index f33ff43e..42a2e7f4 100644 --- a/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/commit/KafkaConsumerTaskImplTest.java +++ b/prh-app-server/src/test/java/org/onap/dcaegen2/services/prh/tasks/commit/KafkaConsumerTaskImplTest.java @@ -78,7 +78,9 @@ public class KafkaConsumerTaskImplTest { @Test void beforeOnMessageTest() throws Exception { - withEnvironmentVariable("JAAS_CONFIG", "jaas_config").execute(() -> { + withEnvironmentVariable("JAAS_CONFIG", "jaas_config") + .and("BOOTSTRAP_SERVERS", "localhost:9092") + .execute(() -> { cbsConfigurationForAutoCommitDisabled.parseCBSConfig(cbsConfigJsonForAutoCommitDisabled); kafkaConsumerTask = new KafkaConsumerTaskImpl(cbsConfigurationForAutoCommitDisabled, dmaapConsumerJsonParser, epochDateTimeConversion); @@ -100,7 +102,9 @@ public class KafkaConsumerTaskImplTest { @Test void beforeCommitOffsetTest() throws Exception { - withEnvironmentVariable("JAAS_CONFIG", "jaas_config").execute(() -> { + withEnvironmentVariable("JAAS_CONFIG", "jaas_config") + .and("BOOTSTRAP_SERVERS", "localhost:9092") + .execute(() -> { cbsConfigurationForAutoCommitDisabled.parseCBSConfig(cbsConfigJsonForAutoCommitDisabled); kafkaConsumerTask = new KafkaConsumerTaskImpl(cbsConfigurationForAutoCommitDisabled, dmaapConsumerJsonParser, epochDateTimeConversion); @@ -110,7 +114,9 @@ public class KafkaConsumerTaskImplTest { @Test void beforeExecuteTest() throws Exception { - withEnvironmentVariable("JAAS_CONFIG", "jaas_config").execute(() -> { + withEnvironmentVariable("JAAS_CONFIG", "jaas_config") + .and("BOOTSTRAP_SERVERS", "localhost:9092") + .execute(() -> { cbsConfigurationForAutoCommitDisabled.parseCBSConfig(cbsConfigJsonForAutoCommitDisabled); kafkaConsumerTask = new KafkaConsumerTaskImpl(cbsConfigurationForAutoCommitDisabled, dmaapConsumerJsonParser, epochDateTimeConversion); diff --git a/prh-commons/pom.xml b/prh-commons/pom.xml index 074ef92e..d4a1c3cd 100644 --- a/prh-commons/pom.xml +++ b/prh-commons/pom.xml @@ -28,7 +28,7 @@ org.onap.dcaegen2.services prh - 1.10.0-SNAPSHOT + 1.10.1-SNAPSHOT org.onap.dcaegen2.services.prh diff --git a/version.properties b/version.properties index e9e55960..81b72c03 100644 --- a/version.properties +++ b/version.properties @@ -1,6 +1,6 @@ major=1 minor=10 -patch=0 +patch=1 base_version=${major}.${minor}.${patch} release_version=${base_version} snapshot_version=${base_version}-SNAPSHOT -- cgit 1.2.3-korg