summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorPatrikBuhr <patrik.buhr@est.tech>2020-10-14 14:14:06 +0200
committerPatrikBuhr <patrik.buhr@est.tech>2020-10-19 08:34:58 +0200
commitf86407dfdf0e04979a6765da4eb13f9983e1150e (patch)
treed5a2fb87cfbbd062bf0e3d613cc8a0ee04e7b18d
parentdf373ec4c902a2596dd2dfe957425af1e3113b17 (diff)
Made DmaapMessageConsumer asynchronuous
Change-Id: Ib3d4951f3f9b2061353b5e50f427559a3781b10e Issue-ID: CCSDK-2502 Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/AsyncRestClient.java2
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageConsumer.java174
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageHandler.java15
-rw-r--r--a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageConsumerTest.java190
-rw-r--r--a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageHandlerTest.java15
-rw-r--r--a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTaskTest.java11
6 files changed, 184 insertions, 223 deletions
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/AsyncRestClient.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/AsyncRestClient.java
index 8409f45c..7f453a27 100644
--- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/AsyncRestClient.java
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/AsyncRestClient.java
@@ -188,7 +188,7 @@ public class AsyncRestClient {
logger.debug("{} HTTP error status = '{}', body '{}'", traceTag, exception.getStatusCode(),
exception.getResponseBodyAsString());
} else {
- logger.debug("{} HTTP error", traceTag, t);
+ logger.debug("{} HTTP error {}", traceTag, t.getMessage());
}
}
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageConsumer.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageConsumer.java
index 3a365178..f948e5f5 100644
--- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageConsumer.java
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageConsumer.java
@@ -20,7 +20,6 @@
package org.onap.ccsdk.oran.a1policymanagementservice.dmaap;
-import com.google.common.collect.Iterables;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonArray;
@@ -37,15 +36,17 @@ import java.util.ServiceLoader;
import org.onap.ccsdk.oran.a1policymanagementservice.clients.AsyncRestClient;
import org.onap.ccsdk.oran.a1policymanagementservice.clients.AsyncRestClientFactory;
import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfig;
-import org.onap.ccsdk.oran.a1policymanagementservice.exceptions.ServiceException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpStatus;
-import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Component;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.FluxSink;
+import reactor.core.publisher.Mono;
+
/**
* The class fetches incoming requests from DMAAP. It uses the timeout parameter
* that lets the MessageRouter keep the connection with the Kafka open until
@@ -74,60 +75,82 @@ public class DmaapMessageConsumer {
private final AsyncRestClientFactory restClientFactory;
+ private final InfiniteFlux infiniteSubmitter = new InfiniteFlux();
+
@Value("${server.http-port}")
private int localServerHttpPort;
+ private static class InfiniteFlux {
+ private FluxSink<Integer> sink;
+ private int counter = 0;
+
+ public synchronized Flux<Integer> start() {
+ stop();
+ return Flux.create(this::next).doOnRequest(this::onRequest);
+ }
+
+ public synchronized void stop() {
+ if (this.sink != null) {
+ this.sink.complete();
+ this.sink = null;
+ }
+ }
+
+ void onRequest(long no) {
+ logger.debug("InfiniteFlux.onRequest {}", no);
+ for (long i = 0; i < no; ++i) {
+ sink.next(counter++);
+ }
+ }
+
+ void next(FluxSink<Integer> sink) {
+ logger.debug("InfiniteFlux.next");
+ this.sink = sink;
+ sink.next(counter++);
+ }
+
+ }
+
@Autowired
public DmaapMessageConsumer(ApplicationConfig applicationConfig) {
this.applicationConfig = applicationConfig;
GsonBuilder gsonBuilder = new GsonBuilder();
ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory);
- gson = gsonBuilder.create();
+ this.gson = gsonBuilder.create();
this.restClientFactory = new AsyncRestClientFactory(applicationConfig.getWebClientConfig());
}
/**
- * Starts the consumer. If there is a DMaaP configuration, it will start polling
- * for messages. Otherwise it will check regularly for the configuration.
+ * Starts the DMAAP consumer. If there is a DMaaP configuration, it will start
+ * polling for messages. Otherwise it will check regularly for the
+ * configuration.
*
- * @return the running thread, for test purposes.
*/
- public Thread start() {
- Thread thread = new Thread(this::messageHandlingLoop);
- thread.start();
- return thread;
+ public void start() {
+ infiniteSubmitter.stop();
+
+ createTask().subscribe(//
+ value -> logger.debug("DmaapMessageConsumer next: {}", value), //
+ throwable -> logger.error("DmaapMessageConsumer error: {}", throwable), //
+ () -> logger.warn("DmaapMessageConsumer stopped") //
+ );
}
- private void messageHandlingLoop() {
- while (!isStopped()) {
- try {
- if (isDmaapConfigured()) {
- Iterable<DmaapRequestMessage> dmaapMsgs = fetchAllMessages();
- if (dmaapMsgs != null && Iterables.size(dmaapMsgs) > 0) {
- logger.debug("Fetched all the messages from DMAAP and will start to process the messages");
- for (DmaapRequestMessage msg : dmaapMsgs) {
- processMsg(msg);
- }
- }
- } else {
- sleep(TIME_BETWEEN_DMAAP_RETRIES); // wait for configuration
- }
- } catch (Exception e) {
- logger.warn("{}", e.getMessage());
- sleep(TIME_BETWEEN_DMAAP_RETRIES);
- }
- }
+ protected Flux<String> createTask() {
+ return infiniteFlux() //
+ .flatMap(notUsed -> fetchFromDmaap(), 1) //
+ .doOnNext(message -> logger.debug("Message Reveived from DMAAP : {}", message)) //
+ .flatMap(this::parseReceivedMessage, 1)//
+ .flatMap(this::handleDmaapMsg, 1) //
+ .onErrorResume(throwable -> Mono.empty());
}
- protected boolean isStopped() {
- return false;
+ protected Flux<Integer> infiniteFlux() {
+ return infiniteSubmitter.start();
}
- protected boolean isDmaapConfigured() {
- String producerTopicUrl = applicationConfig.getDmaapProducerTopicUrl();
- String consumerTopicUrl = applicationConfig.getDmaapConsumerTopicUrl();
- return (producerTopicUrl != null && consumerTopicUrl != null && !producerTopicUrl.isEmpty()
- && !consumerTopicUrl.isEmpty());
+ protected Mono<Object> delay() {
+ return Mono.delay(TIME_BETWEEN_DMAAP_RETRIES).flatMap(o -> Mono.empty());
}
private <T> List<T> parseList(String jsonString, Class<T> clazz) {
@@ -146,7 +169,36 @@ public class DmaapMessageConsumer {
return result;
}
- private void sendErrorResponse(String response) {
+ protected boolean isDmaapConfigured() {
+ String producerTopicUrl = applicationConfig.getDmaapProducerTopicUrl();
+ String consumerTopicUrl = applicationConfig.getDmaapConsumerTopicUrl();
+ return (producerTopicUrl != null && consumerTopicUrl != null && !producerTopicUrl.isEmpty()
+ && !consumerTopicUrl.isEmpty());
+ }
+
+ protected Mono<String> handleDmaapMsg(DmaapRequestMessage dmaapRequestMessage) {
+ return getDmaapMessageHandler().handleDmaapMsg(dmaapRequestMessage);
+ }
+
+ protected Mono<String> getFromMessageRouter(String topicUrl) {
+ logger.trace("getFromMessageRouter {}", topicUrl);
+ AsyncRestClient c = restClientFactory.createRestClient("");
+ return c.get(topicUrl);
+ }
+
+ protected Flux<DmaapRequestMessage> parseReceivedMessage(String jsonString) {
+ try {
+ logger.trace("parseMessages {}", jsonString);
+ return Flux.fromIterable(parseList(jsonString, DmaapRequestMessage.class));
+ } catch (Exception e) {
+ logger.error("parseMessages error {}", jsonString);
+ return sendErrorResponse("Could not parse: " + jsonString) //
+ .flatMapMany(s -> Flux.empty());
+ }
+ }
+
+ protected Mono<String> sendErrorResponse(String response) {
+ logger.debug("sendErrorResponse {}", response);
DmaapRequestMessage fakeRequest = ImmutableDmaapRequestMessage.builder() //
.apiVersion("") //
.correlationId("") //
@@ -158,37 +210,23 @@ public class DmaapMessageConsumer {
.timestamp("") //
.url("URL") //
.build();
- getDmaapMessageHandler().sendDmaapResponse(response, fakeRequest, HttpStatus.BAD_REQUEST).block();
+ return getDmaapMessageHandler().sendDmaapResponse(response, fakeRequest, HttpStatus.BAD_REQUEST) //
+ .onErrorResume(e -> Mono.empty());
}
- List<DmaapRequestMessage> parseMessages(String jsonString) throws ServiceException {
- try {
- return parseList(jsonString, DmaapRequestMessage.class);
- } catch (Exception e) {
- sendErrorResponse("Not parsable request received, reason:" + e.toString() + ", input :" + jsonString);
- throw new ServiceException("Could not parse incomming request. Reason :" + e.getMessage());
+ private Mono<String> fetchFromDmaap() {
+ if (!this.isDmaapConfigured()) {
+ logger.debug("fetchFromDmaap, no action DMAAP not configured");
+ return delay().flatMap(o -> Mono.empty());
}
- }
-
- protected Iterable<DmaapRequestMessage> fetchAllMessages() throws ServiceException {
+ logger.debug("fetchFromDmaap");
String topicUrl = this.applicationConfig.getDmaapConsumerTopicUrl();
- AsyncRestClient consumer = getMessageRouterConsumer();
- ResponseEntity<String> response = consumer.getForEntity(topicUrl).block();
- logger.debug("DMaaP consumer received {} : {}", response.getStatusCode(), response.getBody());
- if (response.getStatusCode().is2xxSuccessful()) {
- return parseMessages(response.getBody());
- } else {
- throw new ServiceException("Cannot fetch because of Error respons: " + response.getStatusCode().toString()
- + " " + response.getBody());
- }
- }
- private void processMsg(DmaapRequestMessage msg) {
- logger.debug("Message Reveived from DMAAP : {}", msg);
- getDmaapMessageHandler().handleDmaapMsg(msg);
+ return getFromMessageRouter(topicUrl) //
+ .onErrorResume(throwable -> delay().flatMap(o -> Mono.empty()));
}
- protected DmaapMessageHandler getDmaapMessageHandler() {
+ private DmaapMessageHandler getDmaapMessageHandler() {
if (this.dmaapMessageHandler == null) {
String pmsBaseUrl = "http://localhost:" + this.localServerHttpPort;
AsyncRestClient pmsClient = restClientFactory.createRestClient(pmsBaseUrl);
@@ -199,16 +237,4 @@ public class DmaapMessageConsumer {
return this.dmaapMessageHandler;
}
- protected void sleep(Duration duration) {
- try {
- Thread.sleep(duration.toMillis());
- } catch (Exception e) {
- logger.error("Failed to put the thread to sleep", e);
- }
- }
-
- protected AsyncRestClient getMessageRouterConsumer() {
- return restClientFactory.createRestClient("");
- }
-
}
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageHandler.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageHandler.java
index 967cab1d..c77087a5 100644
--- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageHandler.java
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageHandler.java
@@ -54,20 +54,13 @@ public class DmaapMessageHandler {
this.dmaapClient = dmaapClient;
}
- public void handleDmaapMsg(DmaapRequestMessage msg) {
- try {
- String result = this.createTask(msg).block();
- logger.debug("handleDmaapMsg: {}", result);
- } catch (Exception throwable) {
- logger.warn("handleDmaapMsg failure {}", throwable.getMessage());
- }
- }
-
- Mono<String> createTask(DmaapRequestMessage dmaapRequestMessage) {
+ public Mono<String> handleDmaapMsg(DmaapRequestMessage dmaapRequestMessage) {
return this.invokePolicyManagementService(dmaapRequestMessage) //
.onErrorResume(t -> handlePolicyManagementServiceCallError(t, dmaapRequestMessage)) //
.flatMap(response -> sendDmaapResponse(response.getBody(), dmaapRequestMessage,
- response.getStatusCode()));
+ response.getStatusCode()))
+ .doOnError(t -> logger.warn("Failed to handle DMAAP message : {}", t.getMessage()))//
+ .onErrorResume(t -> Mono.empty());
}
private Mono<ResponseEntity<String>> handlePolicyManagementServiceCallError(Throwable error,
diff --git a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageConsumerTest.java b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageConsumerTest.java
index b6d3cc07..72ca84a5 100644
--- a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageConsumerTest.java
+++ b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageConsumerTest.java
@@ -20,36 +20,26 @@
package org.onap.ccsdk.oran.a1policymanagementservice.dmaap;
-import static ch.qos.logback.classic.Level.WARN;
-import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doNothing;
+import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
-import ch.qos.logback.classic.spi.ILoggingEvent;
-import ch.qos.logback.core.read.ListAppender;
-
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonObject;
-import java.time.Duration;
-import java.util.LinkedList;
-import java.util.List;
+import java.util.ArrayList;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.ArgumentCaptor;
-import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.onap.ccsdk.oran.a1policymanagementservice.clients.AsyncRestClient;
@@ -57,8 +47,8 @@ import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationCo
import org.onap.ccsdk.oran.a1policymanagementservice.dmaap.DmaapRequestMessage.Operation;
import org.onap.ccsdk.oran.a1policymanagementservice.exceptions.ServiceException;
import org.onap.ccsdk.oran.a1policymanagementservice.utils.LoggingUtils;
-import org.springframework.http.HttpStatus;
-import org.springframework.http.ResponseEntity;
+
+import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@ExtendWith(MockitoExtension.class)
@@ -79,151 +69,113 @@ class DmaapMessageConsumerTest {
LoggingUtils.getLogListAppender(DmaapMessageConsumer.class);
}
- @Test
- void dmaapNotConfigured_thenSleepAndRetryUntilConfig() throws Exception {
- messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
-
- doNothing().when(messageConsumerUnderTest).sleep(any(Duration.class));
- doReturn(false, false, false, true).when(messageConsumerUnderTest).isStopped();
- doReturn(false, true, true).when(messageConsumerUnderTest).isDmaapConfigured();
- doReturn(new LinkedList<>()).when(messageConsumerUnderTest).fetchAllMessages();
-
- messageConsumerUnderTest.start().join();
-
- InOrder orderVerifier = inOrder(messageConsumerUnderTest);
- orderVerifier.verify(messageConsumerUnderTest).sleep(DmaapMessageConsumer.TIME_BETWEEN_DMAAP_RETRIES);
- orderVerifier.verify(messageConsumerUnderTest).fetchAllMessages();
+ private void setTaskNumberOfLoops(int number) {
+ ArrayList<Integer> l = new ArrayList<>();
+ for (int i = 0; i < number; ++i) {
+ l.add(i);
+ }
+ Flux<Integer> f = Flux.fromIterable(l);
+ doReturn(f).when(messageConsumerUnderTest).infiniteFlux();
}
- @Test
- void dmaapConfigurationRemoved_thenStopPollingDmaapSleepAndRetry() throws Exception {
- messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
-
- doNothing().when(messageConsumerUnderTest).sleep(any(Duration.class));
- doReturn(false, false, false, false, true).when(messageConsumerUnderTest).isStopped();
- doReturn(true, true, false).when(messageConsumerUnderTest).isDmaapConfigured();
- doReturn(new LinkedList<>()).when(messageConsumerUnderTest).fetchAllMessages();
-
- messageConsumerUnderTest.start().join();
-
- InOrder orderVerifier = inOrder(messageConsumerUnderTest);
- orderVerifier.verify(messageConsumerUnderTest).fetchAllMessages();
- orderVerifier.verify(messageConsumerUnderTest).sleep(DmaapMessageConsumer.TIME_BETWEEN_DMAAP_RETRIES);
+ private void disableTaskDelay() {
+ doReturn(Mono.empty()).when(messageConsumerUnderTest).delay();
}
@Test
- void dmaapConfiguredAndNoMessages_thenPollOnce() throws Exception {
- setUpMrConfig();
-
+ void successfulCase_dmaapNotConfigured_thenSleepAndRetryUntilConfig() throws Exception {
messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
- Mono<ResponseEntity<String>> response = Mono.empty();
+ setTaskNumberOfLoops(3);
+ disableTaskDelay();
- doReturn(false, true).when(messageConsumerUnderTest).isStopped();
- doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest).getMessageRouterConsumer();
- doReturn(response).when(messageRouterConsumerMock).getForEntity(any());
+ when(this.applicationConfigMock.getDmaapConsumerTopicUrl()).thenReturn("getDmaapConsumerTopicUrl");
+ doReturn(false, false, true).when(messageConsumerUnderTest).isDmaapConfigured();
+ doReturn(Mono.just(dmaapRequestMessageString())).when(messageConsumerUnderTest)
+ .getFromMessageRouter(anyString());
- messageConsumerUnderTest.start().join();
+ doReturn(Mono.just("responseFromHandler")).when(messageConsumerUnderTest).handleDmaapMsg(any());
- verify(messageRouterConsumerMock).getForEntity(any());
- verifyNoMoreInteractions(messageRouterConsumerMock);
+ String s = messageConsumerUnderTest.createTask().blockLast();
+ assertEquals("responseFromHandler", s);
+ verify(messageConsumerUnderTest, times(2)).delay();
+ verify(messageConsumerUnderTest, times(1)).handleDmaapMsg(dmaapRequestMessage());
}
@Test
- void dmaapConfiguredAndErrorGettingMessages_thenLogWarningAndSleep() throws Exception {
- setUpMrConfig();
-
+ void returnErrorFromDmapp_thenSleepAndRetry() throws Exception {
messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
- doNothing().when(messageConsumerUnderTest).sleep(any(Duration.class));
- doReturn(false, true).when(messageConsumerUnderTest).isStopped();
- doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest).getMessageRouterConsumer();
-
- Mono<ResponseEntity<String>> response = Mono.just(new ResponseEntity<>("Error", HttpStatus.BAD_REQUEST));
- when(messageRouterConsumerMock.getForEntity(any())).thenReturn(response);
+ setTaskNumberOfLoops(2);
+ disableTaskDelay();
+ setUpMrConfig();
- final ListAppender<ILoggingEvent> logAppender =
- LoggingUtils.getLogListAppender(DmaapMessageConsumer.class, WARN);
+ {
+ Mono<String> dmaapError = Mono.error(new ServiceException("dmaapError"));
+ Mono<String> dmaapResponse = Mono.just(dmaapRequestMessageString());
+ doReturn(dmaapError, dmaapResponse).when(messageConsumerUnderTest).getFromMessageRouter(anyString());
+ }
- messageConsumerUnderTest.start().join();
+ doReturn(Mono.just("response1")).when(messageConsumerUnderTest).handleDmaapMsg(any());
- assertThat(logAppender.list.get(0).getFormattedMessage())
- .isEqualTo("Cannot fetch because of Error respons: 400 BAD_REQUEST Error");
+ String s = messageConsumerUnderTest.createTask().blockLast();
- verify(messageConsumerUnderTest).sleep(DmaapMessageConsumer.TIME_BETWEEN_DMAAP_RETRIES);
+ verify(messageConsumerUnderTest, times(2)).getFromMessageRouter(anyString());
+ verify(messageConsumerUnderTest, times(0)).sendErrorResponse(anyString());
+ verify(messageConsumerUnderTest, times(1)).delay();
+ verify(messageConsumerUnderTest, times(1)).handleDmaapMsg(dmaapRequestMessage());
+ assertEquals("response1", s);
}
@Test
- void dmaapConfiguredAndOneMessage_thenPollOnceAndProcessMessage() throws Exception {
- // The message from MR is here an array of Json objects
- setUpMrConfig();
+ void unParsableMessage_thenSendResponseAndContinue() throws Exception {
messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
+ setTaskNumberOfLoops(2);
+ setUpMrConfig();
- String messages = jsonArray(gson.toJson(dmaapRequestMessage(Operation.PUT)));
-
- doReturn(false, true).when(messageConsumerUnderTest).isStopped();
- doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest).getMessageRouterConsumer();
-
- Mono<ResponseEntity<String>> response = Mono.just(new ResponseEntity<>(messages, HttpStatus.OK));
- when(messageRouterConsumerMock.getForEntity(any())).thenReturn(response);
+ {
+ Mono<String> dmaapError = Mono.just("Non valid JSON \"");
+ Mono<String> dmaapResponse = Mono.just(dmaapRequestMessageString());
+ doReturn(dmaapError, dmaapResponse).when(messageConsumerUnderTest).getFromMessageRouter(anyString());
+ }
- doReturn(messageHandlerMock).when(messageConsumerUnderTest).getDmaapMessageHandler();
+ doReturn(Mono.just("response1")).when(messageConsumerUnderTest).handleDmaapMsg(any());
- messageConsumerUnderTest.start().join();
+ String s = messageConsumerUnderTest.createTask().blockLast();
+ assertEquals("response1", s);
- ArgumentCaptor<DmaapRequestMessage> captor = ArgumentCaptor.forClass(DmaapRequestMessage.class);
- verify(messageHandlerMock).handleDmaapMsg(captor.capture());
- DmaapRequestMessage messageAfterJsonParsing = captor.getValue();
- assertThat(messageAfterJsonParsing.apiVersion()).isNotEmpty();
+ verify(messageConsumerUnderTest, times(2)).getFromMessageRouter(anyString());
+ verify(messageConsumerUnderTest, times(1)).sendErrorResponse(anyString());
+ verify(messageConsumerUnderTest, times(0)).delay();
+ verify(messageConsumerUnderTest, times(1)).handleDmaapMsg(dmaapRequestMessage());
+ }
- verifyNoMoreInteractions(messageHandlerMock);
+ private String dmaapRequestMessageString() {
+ String json = gson.toJson(dmaapRequestMessage());
+ return jsonArray(json);
}
@Test
void testMessageParsing() throws ServiceException {
messageConsumerUnderTest = new DmaapMessageConsumer(applicationConfigMock);
- String json = gson.toJson(dmaapRequestMessage(Operation.PUT));
+ String json = gson.toJson(dmaapRequestMessage());
{
String jsonArrayOfObject = jsonArray(json);
- List<DmaapRequestMessage> parsedMessage = messageConsumerUnderTest.parseMessages(jsonArrayOfObject);
+ DmaapRequestMessage parsedMessage =
+ messageConsumerUnderTest.parseReceivedMessage(jsonArrayOfObject).blockLast();
assertNotNull(parsedMessage);
- assertTrue(parsedMessage.get(0).payload().isPresent());
+ assertTrue(parsedMessage.payload().isPresent());
}
{
String jsonArrayOfString = jsonArray(quote(json));
- List<DmaapRequestMessage> parsedMessage = messageConsumerUnderTest.parseMessages(jsonArrayOfString);
+ DmaapRequestMessage parsedMessage =
+ messageConsumerUnderTest.parseReceivedMessage(jsonArrayOfString).blockLast();
assertNotNull(parsedMessage);
- assertTrue(parsedMessage.get(0).payload().isPresent());
+ assertTrue(parsedMessage.payload().isPresent());
}
}
- @Test
- void incomingUnparsableRequest_thenSendResponse() throws Exception {
- messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
- doReturn(messageHandlerMock).when(messageConsumerUnderTest).getDmaapMessageHandler();
- doReturn(Mono.just("OK")).when(messageHandlerMock).sendDmaapResponse(any(), any(), any());
- Exception actualException =
- assertThrows(ServiceException.class, () -> messageConsumerUnderTest.parseMessages("[\"abc:\"def\"]"));
- assertThat(actualException.getMessage())
- .contains("Could not parse incomming request. Reason :com.google.gson.stream.MalformedJsonException");
-
- verify(messageHandlerMock).sendDmaapResponse(any(), any(), any());
- }
-
- @Test
- void incomingUnparsableRequest_thenSendingResponseFailed() throws Exception {
- messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
- doReturn(messageHandlerMock).when(messageConsumerUnderTest).getDmaapMessageHandler();
- doReturn(Mono.error(new Exception("Sending response failed"))).when(messageHandlerMock).sendDmaapResponse(any(),
- any(), any());
- Exception actualException =
- assertThrows(Exception.class, () -> messageConsumerUnderTest.parseMessages("[\"abc:\"def\"]"));
- assertThat(actualException.getMessage()).contains("Sending response failed");
-
- verify(messageHandlerMock).sendDmaapResponse(any(), any(), any());
- }
-
private void setUpMrConfig() {
when(applicationConfigMock.getDmaapConsumerTopicUrl()).thenReturn("url");
when(applicationConfigMock.getDmaapProducerTopicUrl()).thenReturn("url");
@@ -237,11 +189,11 @@ class DmaapMessageConsumerTest {
return "\"" + s.replace("\"", "\\\"") + "\"";
}
- private DmaapRequestMessage dmaapRequestMessage(Operation operation) {
+ private DmaapRequestMessage dmaapRequestMessage() {
return ImmutableDmaapRequestMessage.builder() //
.apiVersion("apiVersion") //
.correlationId("correlationId") //
- .operation(operation) //
+ .operation(Operation.PUT) //
.originatorId("originatorId") //
.payload(new JsonObject()) //
.requestId("requestId") //
diff --git a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageHandlerTest.java b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageHandlerTest.java
index 99468811..df84ae05 100644
--- a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageHandlerTest.java
+++ b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageHandlerTest.java
@@ -110,7 +110,7 @@ class DmaapMessageHandlerTest {
DmaapRequestMessage message = dmaapRequestMessage(Operation.DELETE);
StepVerifier //
- .create(testedObject.createTask(message)) //
+ .create(testedObject.handleDmaapMsg(message)) //
.expectSubscription() //
.expectNext("OK") //
.verifyComplete(); //
@@ -130,7 +130,7 @@ class DmaapMessageHandlerTest {
DmaapRequestMessage message = dmaapRequestMessage(Operation.GET);
StepVerifier //
- .create(testedObject.createTask(message)) //
+ .create(testedObject.handleDmaapMsg(message)) //
.expectSubscription() //
.expectNext("OK") //
.verifyComplete(); //
@@ -152,7 +152,7 @@ class DmaapMessageHandlerTest {
DmaapRequestMessage message = dmaapRequestMessage(Operation.GET);
StepVerifier //
- .create(testedObject.createTask(message)) //
+ .create(testedObject.handleDmaapMsg(message)) //
.expectSubscription() //
.verifyComplete(); //
@@ -170,7 +170,7 @@ class DmaapMessageHandlerTest {
DmaapRequestMessage message = dmaapRequestMessage(Operation.PUT);
StepVerifier //
- .create(testedObject.createTask(message)) //
+ .create(testedObject.handleDmaapMsg(message)) //
.expectSubscription() //
.expectNext("OK") //
.verifyComplete(); //
@@ -189,7 +189,7 @@ class DmaapMessageHandlerTest {
DmaapRequestMessage message = dmaapRequestMessage(Operation.POST);
StepVerifier //
- .create(testedObject.createTask(message)) //
+ .create(testedObject.handleDmaapMsg(message)) //
.expectSubscription() //
.expectNext("OK") //
.verifyComplete(); //
@@ -208,7 +208,7 @@ class DmaapMessageHandlerTest {
doReturn(Mono.just("OK")).when(dmaapClient).post(anyString(), anyString());
DmaapRequestMessage message = dmaapRequestMessage(Operation.PUT);
- testedObject.createTask(message).block();
+ testedObject.handleDmaapMsg(message).block();
verify(pmsClient).putForEntity(anyString(), anyString());
verifyNoMoreInteractions(pmsClient);
@@ -239,7 +239,8 @@ class DmaapMessageHandlerTest {
final ListAppender<ILoggingEvent> logAppender =
LoggingUtils.getLogListAppender(DmaapMessageHandler.class, WARN);
- testedObject.handleDmaapMsg(message);
+ doReturn(notOkResponse()).when(pmsClient).putForEntity(anyString(), anyString());
+ testedObject.handleDmaapMsg(message).block();
assertThat(logAppender.list.get(0).getFormattedMessage())
.startsWith("Expected payload in message from DMAAP: ");
diff --git a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTaskTest.java b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTaskTest.java
index 2e96b680..4cc23607 100644
--- a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTaskTest.java
+++ b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTaskTest.java
@@ -134,17 +134,6 @@ class RefreshConfigTaskTest {
}
@Test
- void stop_thenTaskIsDisposed() throws Exception {
- refreshTaskUnderTest = this.createTestObject(CONFIG_FILE_DOES_NOT_EXIST, null, null, false);
- refreshTaskUnderTest.systemEnvironment = new Properties();
-
- refreshTaskUnderTest.start();
- refreshTaskUnderTest.stop();
-
- assertThat(refreshTaskUnderTest.getRefreshTask().isDisposed()).as("Refresh task is disposed").isTrue();
- }
-
- @Test
void whenTheConfigurationFits_thenConfiguredRicsArePutInRepository() throws Exception {
refreshTaskUnderTest = this.createTestObject(CONFIG_FILE_EXISTS);
refreshTaskUnderTest.systemEnvironment = new Properties();