summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java232
-rw-r--r--rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/test/DummyHttpServer.java4
2 files changed, 220 insertions, 16 deletions
diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java
index ab51bfef..a2c000f5 100644
--- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java
+++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java
@@ -20,8 +20,10 @@
package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api;
+import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendError;
import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendResource;
+import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import java.time.Duration;
import org.junit.jupiter.api.BeforeAll;
@@ -30,9 +32,13 @@ import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouter
import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSource;
import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeResponse;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.netty.http.server.HttpServerRoutes;
import reactor.test.StepVerifier;
/**
@@ -40,36 +46,230 @@ import reactor.test.StepVerifier;
* @since May 2019
*/
class MessageRouterSubscriberIT {
- private MessageRouterSubscriber sut = DmaapClientFactory.createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault());
- private static DummyHttpServer server;
+ private static final Duration TIMEOUT = Duration.ofSeconds(10);
+ private static final String ERROR_MESSAGE = "Something went wrong";
+ private static final String CONSUMER_GROUP = "group1";
+ private static final String SUCCESS_CONSUMER_ID = "consumer200";
+ private static final String FAILING_WITH_401_CONSUMER_ID = "consumer401";
+ private static final String FAILING_WITH_403_CONSUMER_ID = "consumer403";
+ private static final String FAILING_WITH_409_CONSUMER_ID = "consumer409";
+ private static final String FAILING_WITH_429_CONSUMER_ID = "consumer429";
+ private static final String FAILING_WITH_500_CONSUMER_ID = "consumer500";
+
+ private static final String CONSUMER_PATH = String.format("/events/TOPIC/%s", CONSUMER_GROUP);
+
+ private static final String SUCCESS_RESP_PATH = String
+ .format("%s/%s", CONSUMER_PATH, SUCCESS_CONSUMER_ID);
+ private static final String FAILING_WITH_401_RESP_PATH = String
+ .format("%s/%s", CONSUMER_PATH, FAILING_WITH_401_CONSUMER_ID);
+ private static final String FAILING_WITH_403_RESP_PATH = String
+ .format("%s/%s", CONSUMER_PATH, FAILING_WITH_403_CONSUMER_ID);
+ private static final String FAILING_WITH_409_RESP_PATH = String
+ .format("%s/%s", CONSUMER_PATH, FAILING_WITH_409_CONSUMER_ID);
+ private static final String FAILING_WITH_429_RESP_PATH = String
+ .format("%s/%s", CONSUMER_PATH, FAILING_WITH_429_CONSUMER_ID);
+ private static final String FAILING_WITH_500_RESP_PATH = String
+ .format("%s/%s", CONSUMER_PATH, FAILING_WITH_500_CONSUMER_ID);
+
+ private static MessageRouterSubscribeRequest mrSuccessRequest;
+ private static MessageRouterSubscribeRequest mrFailingRequest;
+ private MessageRouterSubscriber sut = DmaapClientFactory
+ .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault());
private static MessageRouterSource sourceDefinition;
+
@BeforeAll
static void setUp() {
- server = DummyHttpServer.start(routes ->
- routes.get("/events/TOPIC/group1/consumer8", (req, resp) -> sendResource(resp, "/sample-mr-subscribe-response.json"))
- );
- sourceDefinition = ImmutableMessageRouterSource.builder()
- .name("the topic")
- .topicUrl(String.format("http://%s:%d/events/TOPIC", server.host(), server.port()))
- .build();
+ DummyHttpServer server = DummyHttpServer.start(MessageRouterSubscriberIT::setRoutes);
+
+ sourceDefinition = createMessageRouterSource(server);
+
+ mrSuccessRequest = createSuccessRequest();
+
+ mrFailingRequest = createFailingRequest(FAILING_WITH_500_CONSUMER_ID);
}
@Test
- void testStub() {
- final MessageRouterSubscribeRequest mrRequest = ImmutableMessageRouterSubscribeRequest.builder()
- .sourceDefinition(sourceDefinition)
- .consumerGroup("group1")
- .consumerId("consumer8")
+ void subscriber_shouldGetCorrectResponse(){
+ Mono<MessageRouterSubscribeResponse> response = sut
+ .get(mrSuccessRequest);
+
+ JsonArray expectedItems = new JsonArray();
+ expectedItems.add("I");
+ expectedItems.add("like");
+ expectedItems.add("pizza");
+
+ MessageRouterSubscribeResponse expectedResponse = ImmutableMessageRouterSubscribeResponse
+ .builder()
+ .items(expectedItems)
.build();
+ StepVerifier.create(response)
+ .expectNext(expectedResponse)
+ .expectComplete()
+ .verify(TIMEOUT);
+ }
+
+ @Test
+ void subscriber_shouldGetUnauthorizedErrorResponse(){
+ MessageRouterSubscribeRequest request = createFailingRequest(FAILING_WITH_401_CONSUMER_ID);
+ Mono<MessageRouterSubscribeResponse> response = sut.get(request);
+
+ MessageRouterSubscribeResponse expectedResponse = createErrorResponse(String
+ .format("401 Unauthorized\n%s", ERROR_MESSAGE));
+
+ StepVerifier.create(response)
+ .expectNext(expectedResponse)
+ .expectComplete()
+ .verify(TIMEOUT);
+ }
+
+ @Test
+ void subscriber_shouldGetForbiddenErrorResponse(){
+ MessageRouterSubscribeRequest request = createFailingRequest(FAILING_WITH_403_CONSUMER_ID);
+ Mono<MessageRouterSubscribeResponse> response = sut.get(request);
+
+ MessageRouterSubscribeResponse expectedResponse = createErrorResponse(String
+ .format("403 Forbidden\n%s", ERROR_MESSAGE));
+
+ StepVerifier.create(response)
+ .expectNext(expectedResponse)
+ .expectComplete()
+ .verify(TIMEOUT);
+ }
+
+ @Test
+ void subscriber_shouldGetConflictErrorResponse(){
+ MessageRouterSubscribeRequest request = createFailingRequest(FAILING_WITH_409_CONSUMER_ID);
+ Mono<MessageRouterSubscribeResponse> response = sut.get(request);
+
+ MessageRouterSubscribeResponse expectedResponse = createErrorResponse(String
+ .format("409 Conflict\n%s", ERROR_MESSAGE));
+
+ StepVerifier.create(response)
+ .expectNext(expectedResponse)
+ .expectComplete()
+ .verify(TIMEOUT);
+ }
+
+ @Test
+ void subscriber_shouldGetTooManyRequestsErrorResponse(){
+ MessageRouterSubscribeRequest request = createFailingRequest(FAILING_WITH_429_CONSUMER_ID);
+ Mono<MessageRouterSubscribeResponse> response = sut.get(request);
+
+ MessageRouterSubscribeResponse expectedResponse = createErrorResponse(String
+ .format("429 Too Many Requests\n%s", ERROR_MESSAGE));
+
+ StepVerifier.create(response)
+ .expectNext(expectedResponse)
+ .expectComplete()
+ .verify(TIMEOUT);
+ }
+
+ @Test
+ void subscriber_shouldGetInternalServerErrorResponse(){
+ Mono<MessageRouterSubscribeResponse> response = sut
+ .get(mrFailingRequest);
+
+ MessageRouterSubscribeResponse expectedResponse = createErrorResponse(String
+ .format("500 Internal Server Error\n%s", ERROR_MESSAGE));
+
+ StepVerifier.create(response)
+ .expectNext(expectedResponse)
+ .expectComplete()
+ .verify(TIMEOUT);
+ }
+
+ @Test
+ void subscriber_shouldParseCorrectResponse() {
final Flux<String> result = sut
- .getElements(mrRequest)
+ .getElements(mrSuccessRequest)
.map(JsonElement::getAsString);
StepVerifier.create(result)
.expectNext("I", "like", "pizza")
.expectComplete()
- .verify(Duration.ofSeconds(10));
+ .verify(TIMEOUT);
+ }
+
+ @Test
+ void subscriber_shouldParseErrorResponse(){
+ Flux<String> result = sut
+ .getElements(mrFailingRequest)
+ .map(JsonElement::getAsString);
+
+ StepVerifier.create(result)
+ .expectError(IllegalStateException.class)
+ .verify(TIMEOUT);
+ }
+
+ @Test
+ void subscriber_shouldSubscribeCorrectly(){
+ Flux<String> subscriptionForElements = sut
+ .subscribeForElements(mrSuccessRequest, Duration.ofSeconds(1))
+ .map(JsonElement:: getAsString);
+
+ StepVerifier.create(subscriptionForElements.take(2))
+ .expectNext("I", "like")
+ .expectComplete()
+ .verify(TIMEOUT);
+ }
+
+ @Test
+ void subscriber_shouldParseErrorWhenSubscribed(){
+ Flux<String> subscriptionForElements = sut
+ .subscribeForElements(mrFailingRequest, Duration.ofSeconds(1))
+ .map(JsonElement:: getAsString);
+
+ StepVerifier.create(subscriptionForElements.take(2))
+ .expectError(IllegalStateException.class)
+ .verify(TIMEOUT);
+ }
+
+ private static HttpServerRoutes setRoutes(HttpServerRoutes routes){
+ return routes
+ .get(SUCCESS_RESP_PATH, (req, resp) ->
+ sendResource(resp, "/sample-mr-subscribe-response.json"))
+ .get(FAILING_WITH_401_RESP_PATH, (req, resp) ->
+ sendError(resp, 401, ERROR_MESSAGE))
+ .get(FAILING_WITH_403_RESP_PATH, (req, resp) ->
+ sendError(resp, 403, ERROR_MESSAGE))
+ .get(FAILING_WITH_409_RESP_PATH, (req, resp) ->
+ sendError(resp, 409, ERROR_MESSAGE))
+ .get(FAILING_WITH_429_RESP_PATH, (req, resp) ->
+ sendError(resp, 429, ERROR_MESSAGE))
+ .get(FAILING_WITH_500_RESP_PATH, (req, resp) ->
+ sendError(resp, 500, ERROR_MESSAGE));
+ }
+
+ private static MessageRouterSource createMessageRouterSource(DummyHttpServer server){
+ return ImmutableMessageRouterSource.builder()
+ .name("the topic")
+ .topicUrl(String.format("http://%s:%d/events/TOPIC", server.host(), server.port()))
+ .build();
+ }
+
+ private static MessageRouterSubscribeRequest createSuccessRequest(){
+ return ImmutableMessageRouterSubscribeRequest.builder()
+ .sourceDefinition(sourceDefinition)
+ .consumerGroup(CONSUMER_GROUP)
+ .consumerId(SUCCESS_CONSUMER_ID)
+ .build();
+ }
+
+ private static MessageRouterSubscribeRequest createFailingRequest(String consumerId){
+ return ImmutableMessageRouterSubscribeRequest
+ .builder()
+ .sourceDefinition(sourceDefinition)
+ .consumerGroup(CONSUMER_GROUP)
+ .consumerId(consumerId)
+ .build();
+ }
+
+ private static MessageRouterSubscribeResponse createErrorResponse(String failReason){
+ return ImmutableMessageRouterSubscribeResponse
+ .builder()
+ .failReason(failReason)
+ .build();
}
}
diff --git a/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/test/DummyHttpServer.java b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/test/DummyHttpServer.java
index 2a1ba7a6..4795b00f 100644
--- a/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/test/DummyHttpServer.java
+++ b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/test/DummyHttpServer.java
@@ -67,6 +67,10 @@ public class DummyHttpServer {
return sendString(httpServerResponse, Mono.fromCallable(() -> readResource(resourcePath)));
}
+ public static Publisher<Void> sendError(HttpServerResponse httpServerResponse, int statusCode, String message){
+ return sendString(httpServerResponse.status(statusCode), Mono.just(message));
+ }
+
public static Publisher<Void> sendString(HttpServerResponse httpServerResponse, Publisher<String> content) {
return httpServerResponse.sendString(content);
}