diff options
2 files changed, 220 insertions, 16 deletions
diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java index ab51bfef..a2c000f5 100644 --- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java +++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberIT.java @@ -20,8 +20,10 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api; +import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendError; import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendResource; +import com.google.gson.JsonArray; import com.google.gson.JsonElement; import java.time.Duration; import org.junit.jupiter.api.BeforeAll; @@ -30,9 +32,13 @@ import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouter import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSource; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeResponse; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig; import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.netty.http.server.HttpServerRoutes; import reactor.test.StepVerifier; /** @@ -40,36 +46,230 @@ import reactor.test.StepVerifier; * @since May 2019 */ class MessageRouterSubscriberIT { - private MessageRouterSubscriber sut = DmaapClientFactory.createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault()); - private static DummyHttpServer server; + private static final Duration TIMEOUT = Duration.ofSeconds(10); + private static final String ERROR_MESSAGE = "Something went wrong"; + private static final String CONSUMER_GROUP = "group1"; + private static final String SUCCESS_CONSUMER_ID = "consumer200"; + private static final String FAILING_WITH_401_CONSUMER_ID = "consumer401"; + private static final String FAILING_WITH_403_CONSUMER_ID = "consumer403"; + private static final String FAILING_WITH_409_CONSUMER_ID = "consumer409"; + private static final String FAILING_WITH_429_CONSUMER_ID = "consumer429"; + private static final String FAILING_WITH_500_CONSUMER_ID = "consumer500"; + + private static final String CONSUMER_PATH = String.format("/events/TOPIC/%s", CONSUMER_GROUP); + + private static final String SUCCESS_RESP_PATH = String + .format("%s/%s", CONSUMER_PATH, SUCCESS_CONSUMER_ID); + private static final String FAILING_WITH_401_RESP_PATH = String + .format("%s/%s", CONSUMER_PATH, FAILING_WITH_401_CONSUMER_ID); + private static final String FAILING_WITH_403_RESP_PATH = String + .format("%s/%s", CONSUMER_PATH, FAILING_WITH_403_CONSUMER_ID); + private static final String FAILING_WITH_409_RESP_PATH = String + .format("%s/%s", CONSUMER_PATH, FAILING_WITH_409_CONSUMER_ID); + private static final String FAILING_WITH_429_RESP_PATH = String + .format("%s/%s", CONSUMER_PATH, FAILING_WITH_429_CONSUMER_ID); + private static final String FAILING_WITH_500_RESP_PATH = String + .format("%s/%s", CONSUMER_PATH, FAILING_WITH_500_CONSUMER_ID); + + private static MessageRouterSubscribeRequest mrSuccessRequest; + private static MessageRouterSubscribeRequest mrFailingRequest; + private MessageRouterSubscriber sut = DmaapClientFactory + .createMessageRouterSubscriber(MessageRouterSubscriberConfig.createDefault()); private static MessageRouterSource sourceDefinition; + @BeforeAll static void setUp() { - server = DummyHttpServer.start(routes -> - routes.get("/events/TOPIC/group1/consumer8", (req, resp) -> sendResource(resp, "/sample-mr-subscribe-response.json")) - ); - sourceDefinition = ImmutableMessageRouterSource.builder() - .name("the topic") - .topicUrl(String.format("http://%s:%d/events/TOPIC", server.host(), server.port())) - .build(); + DummyHttpServer server = DummyHttpServer.start(MessageRouterSubscriberIT::setRoutes); + + sourceDefinition = createMessageRouterSource(server); + + mrSuccessRequest = createSuccessRequest(); + + mrFailingRequest = createFailingRequest(FAILING_WITH_500_CONSUMER_ID); } @Test - void testStub() { - final MessageRouterSubscribeRequest mrRequest = ImmutableMessageRouterSubscribeRequest.builder() - .sourceDefinition(sourceDefinition) - .consumerGroup("group1") - .consumerId("consumer8") + void subscriber_shouldGetCorrectResponse(){ + Mono<MessageRouterSubscribeResponse> response = sut + .get(mrSuccessRequest); + + JsonArray expectedItems = new JsonArray(); + expectedItems.add("I"); + expectedItems.add("like"); + expectedItems.add("pizza"); + + MessageRouterSubscribeResponse expectedResponse = ImmutableMessageRouterSubscribeResponse + .builder() + .items(expectedItems) .build(); + StepVerifier.create(response) + .expectNext(expectedResponse) + .expectComplete() + .verify(TIMEOUT); + } + + @Test + void subscriber_shouldGetUnauthorizedErrorResponse(){ + MessageRouterSubscribeRequest request = createFailingRequest(FAILING_WITH_401_CONSUMER_ID); + Mono<MessageRouterSubscribeResponse> response = sut.get(request); + + MessageRouterSubscribeResponse expectedResponse = createErrorResponse(String + .format("401 Unauthorized\n%s", ERROR_MESSAGE)); + + StepVerifier.create(response) + .expectNext(expectedResponse) + .expectComplete() + .verify(TIMEOUT); + } + + @Test + void subscriber_shouldGetForbiddenErrorResponse(){ + MessageRouterSubscribeRequest request = createFailingRequest(FAILING_WITH_403_CONSUMER_ID); + Mono<MessageRouterSubscribeResponse> response = sut.get(request); + + MessageRouterSubscribeResponse expectedResponse = createErrorResponse(String + .format("403 Forbidden\n%s", ERROR_MESSAGE)); + + StepVerifier.create(response) + .expectNext(expectedResponse) + .expectComplete() + .verify(TIMEOUT); + } + + @Test + void subscriber_shouldGetConflictErrorResponse(){ + MessageRouterSubscribeRequest request = createFailingRequest(FAILING_WITH_409_CONSUMER_ID); + Mono<MessageRouterSubscribeResponse> response = sut.get(request); + + MessageRouterSubscribeResponse expectedResponse = createErrorResponse(String + .format("409 Conflict\n%s", ERROR_MESSAGE)); + + StepVerifier.create(response) + .expectNext(expectedResponse) + .expectComplete() + .verify(TIMEOUT); + } + + @Test + void subscriber_shouldGetTooManyRequestsErrorResponse(){ + MessageRouterSubscribeRequest request = createFailingRequest(FAILING_WITH_429_CONSUMER_ID); + Mono<MessageRouterSubscribeResponse> response = sut.get(request); + + MessageRouterSubscribeResponse expectedResponse = createErrorResponse(String + .format("429 Too Many Requests\n%s", ERROR_MESSAGE)); + + StepVerifier.create(response) + .expectNext(expectedResponse) + .expectComplete() + .verify(TIMEOUT); + } + + @Test + void subscriber_shouldGetInternalServerErrorResponse(){ + Mono<MessageRouterSubscribeResponse> response = sut + .get(mrFailingRequest); + + MessageRouterSubscribeResponse expectedResponse = createErrorResponse(String + .format("500 Internal Server Error\n%s", ERROR_MESSAGE)); + + StepVerifier.create(response) + .expectNext(expectedResponse) + .expectComplete() + .verify(TIMEOUT); + } + + @Test + void subscriber_shouldParseCorrectResponse() { final Flux<String> result = sut - .getElements(mrRequest) + .getElements(mrSuccessRequest) .map(JsonElement::getAsString); StepVerifier.create(result) .expectNext("I", "like", "pizza") .expectComplete() - .verify(Duration.ofSeconds(10)); + .verify(TIMEOUT); + } + + @Test + void subscriber_shouldParseErrorResponse(){ + Flux<String> result = sut + .getElements(mrFailingRequest) + .map(JsonElement::getAsString); + + StepVerifier.create(result) + .expectError(IllegalStateException.class) + .verify(TIMEOUT); + } + + @Test + void subscriber_shouldSubscribeCorrectly(){ + Flux<String> subscriptionForElements = sut + .subscribeForElements(mrSuccessRequest, Duration.ofSeconds(1)) + .map(JsonElement:: getAsString); + + StepVerifier.create(subscriptionForElements.take(2)) + .expectNext("I", "like") + .expectComplete() + .verify(TIMEOUT); + } + + @Test + void subscriber_shouldParseErrorWhenSubscribed(){ + Flux<String> subscriptionForElements = sut + .subscribeForElements(mrFailingRequest, Duration.ofSeconds(1)) + .map(JsonElement:: getAsString); + + StepVerifier.create(subscriptionForElements.take(2)) + .expectError(IllegalStateException.class) + .verify(TIMEOUT); + } + + private static HttpServerRoutes setRoutes(HttpServerRoutes routes){ + return routes + .get(SUCCESS_RESP_PATH, (req, resp) -> + sendResource(resp, "/sample-mr-subscribe-response.json")) + .get(FAILING_WITH_401_RESP_PATH, (req, resp) -> + sendError(resp, 401, ERROR_MESSAGE)) + .get(FAILING_WITH_403_RESP_PATH, (req, resp) -> + sendError(resp, 403, ERROR_MESSAGE)) + .get(FAILING_WITH_409_RESP_PATH, (req, resp) -> + sendError(resp, 409, ERROR_MESSAGE)) + .get(FAILING_WITH_429_RESP_PATH, (req, resp) -> + sendError(resp, 429, ERROR_MESSAGE)) + .get(FAILING_WITH_500_RESP_PATH, (req, resp) -> + sendError(resp, 500, ERROR_MESSAGE)); + } + + private static MessageRouterSource createMessageRouterSource(DummyHttpServer server){ + return ImmutableMessageRouterSource.builder() + .name("the topic") + .topicUrl(String.format("http://%s:%d/events/TOPIC", server.host(), server.port())) + .build(); + } + + private static MessageRouterSubscribeRequest createSuccessRequest(){ + return ImmutableMessageRouterSubscribeRequest.builder() + .sourceDefinition(sourceDefinition) + .consumerGroup(CONSUMER_GROUP) + .consumerId(SUCCESS_CONSUMER_ID) + .build(); + } + + private static MessageRouterSubscribeRequest createFailingRequest(String consumerId){ + return ImmutableMessageRouterSubscribeRequest + .builder() + .sourceDefinition(sourceDefinition) + .consumerGroup(CONSUMER_GROUP) + .consumerId(consumerId) + .build(); + } + + private static MessageRouterSubscribeResponse createErrorResponse(String failReason){ + return ImmutableMessageRouterSubscribeResponse + .builder() + .failReason(failReason) + .build(); } } diff --git a/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/test/DummyHttpServer.java b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/test/DummyHttpServer.java index 2a1ba7a6..4795b00f 100644 --- a/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/test/DummyHttpServer.java +++ b/rest-services/http-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/test/DummyHttpServer.java @@ -67,6 +67,10 @@ public class DummyHttpServer { return sendString(httpServerResponse, Mono.fromCallable(() -> readResource(resourcePath))); } + public static Publisher<Void> sendError(HttpServerResponse httpServerResponse, int statusCode, String message){ + return sendString(httpServerResponse.status(statusCode), Mono.just(message)); + } + public static Publisher<Void> sendString(HttpServerResponse httpServerResponse, Publisher<String> content) { return httpServerResponse.sendString(content); } |