diff options
113 files changed, 4625 insertions, 490 deletions
@@ -28,7 +28,7 @@ <junit-jupiter.version>5.3.1</junit-jupiter.version> <junit-vintage.version>5.3.1</junit-vintage.version> <junit-platform.version>1.3.1</junit-platform.version> - <immutables.version>2.7.3</immutables.version> + <immutables.version>2.7.4</immutables.version> <assertj-core.version>3.11.1</assertj-core.version> <reactor.bom.version>Californium-SR4</reactor.bom.version> <slf4j.version>1.7.25</slf4j.version> @@ -79,6 +79,11 @@ <plugin> <artifactId>maven-javadoc-plugin</artifactId> <version>3.0.1</version> + <configuration> + <additionalJOptions> + <additionalJOption>-Xdoclint:none</additionalJOption> + </additionalJOptions> + </configuration> </plugin> <plugin> <artifactId>maven-project-info-reports-plugin</artifactId> diff --git a/rest-services/aai-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/aai/client/service/AaiHttpClientFactory.java b/rest-services/aai-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/aai/client/service/AaiHttpClientFactory.java index 5e117456..d2e109ee 100644 --- a/rest-services/aai-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/aai/client/service/AaiHttpClientFactory.java +++ b/rest-services/aai-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/aai/client/service/AaiHttpClientFactory.java @@ -73,10 +73,6 @@ public class AaiHttpClientFactory { return Try.of(() -> Paths.get(Passwords.class.getResource(resource).toURI())); } - public static String performBasicAuthentication(String userName, String password) { - return Base64.getEncoder().encodeToString((userName + ":" + password).getBytes()); - } - public static RequestDiagnosticContext createRequestDiagnosticContext() { return ImmutableRequestDiagnosticContext.builder() .invocationId(UUID.randomUUID()).requestId(UUID.randomUUID()).build(); diff --git a/rest-services/aai-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/aai/client/service/http/get/AaiHttpGetClient.java b/rest-services/aai-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/aai/client/service/http/get/AaiHttpGetClient.java index 07987d2e..dad1c1f1 100644 --- a/rest-services/aai-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/aai/client/service/http/get/AaiHttpGetClient.java +++ b/rest-services/aai-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/aai/client/service/http/get/AaiHttpGetClient.java @@ -28,8 +28,6 @@ import org.onap.dcaegen2.services.sdk.rest.services.uri.URI; import reactor.core.publisher.Mono; import static org.onap.dcaegen2.services.sdk.rest.services.aai.client.service.AaiHttpClientFactory.createRequestDiagnosticContext; -import static org.onap.dcaegen2.services.sdk.rest.services.aai.client.service.AaiHttpClientFactory.performBasicAuthentication; - public final class AaiHttpGetClient implements AaiHttpClient<String> { @@ -37,9 +35,9 @@ public final class AaiHttpGetClient implements AaiHttpClient<String> { private final AaiClientConfiguration configuration; - public AaiHttpGetClient(AaiClientConfiguration configuration) { + public AaiHttpGetClient(AaiClientConfiguration configuration, CloudHttpClient httpGetClient) { this.configuration = configuration; - addAuthorizationBasicHeader(); + this.httpGetClient = httpGetClient; } @Override @@ -47,11 +45,6 @@ public final class AaiHttpGetClient implements AaiHttpClient<String> { return httpGetClient.get(getUri(aaiModel.getCorrelationId()), createRequestDiagnosticContext(), configuration.aaiHeaders(), String.class); } - public AaiHttpGetClient createAaiHttpClient(CloudHttpClient httpGetClient) { - this.httpGetClient = httpGetClient; - return this; - } - private String getUri(String pnfName) { return new URI.URIBuilder() .scheme(configuration.aaiProtocol()) @@ -60,8 +53,4 @@ public final class AaiHttpGetClient implements AaiHttpClient<String> { .path(configuration.aaiBasePath() + configuration.aaiPnfPath() + "/" + pnfName).build().toString(); } - private void addAuthorizationBasicHeader() { - configuration.aaiHeaders().put("Authorization", - "Basic " + performBasicAuthentication(configuration.aaiUserName(), configuration.aaiUserPassword())); - } } diff --git a/rest-services/aai-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/aai/client/service/http/patch/AaiHttpPatchClient.java b/rest-services/aai-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/aai/client/service/http/patch/AaiHttpPatchClient.java index ef601550..18511008 100644 --- a/rest-services/aai-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/aai/client/service/http/patch/AaiHttpPatchClient.java +++ b/rest-services/aai-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/aai/client/service/http/patch/AaiHttpPatchClient.java @@ -30,8 +30,6 @@ import reactor.core.publisher.Mono; import reactor.netty.http.client.HttpClientResponse; import static org.onap.dcaegen2.services.sdk.rest.services.aai.client.service.AaiHttpClientFactory.createRequestDiagnosticContext; -import static org.onap.dcaegen2.services.sdk.rest.services.aai.client.service.AaiHttpClientFactory.performBasicAuthentication; - public final class AaiHttpPatchClient implements AaiHttpClient<HttpClientResponse> { @@ -40,10 +38,10 @@ public final class AaiHttpPatchClient implements AaiHttpClient<HttpClientRespons private final JsonBodyBuilder jsonBodyBuilder; - public AaiHttpPatchClient(final AaiClientConfiguration configuration, JsonBodyBuilder jsonBodyBuilder) { + public AaiHttpPatchClient(final AaiClientConfiguration configuration, JsonBodyBuilder jsonBodyBuilder, CloudHttpClient httpPatchClient) { this.configuration = configuration; this.jsonBodyBuilder = jsonBodyBuilder; - addAuthorizationBasicHeader(); + this.httpPatchClient = httpPatchClient; } public Mono<HttpClientResponse> getAaiResponse(AaiModel aaiModel) { @@ -51,11 +49,6 @@ public final class AaiHttpPatchClient implements AaiHttpClient<HttpClientRespons .patch(getUri(aaiModel.getCorrelationId()), createRequestDiagnosticContext(), configuration.aaiHeaders(), jsonBodyBuilder, aaiModel); } - public AaiHttpPatchClient createAaiHttpClient(CloudHttpClient httpPatchClient) { - this.httpPatchClient = httpPatchClient; - return this; - } - private String getUri(String pnfName) { return new URI.URIBuilder() .scheme(configuration.aaiProtocol()) @@ -64,8 +57,4 @@ public final class AaiHttpPatchClient implements AaiHttpClient<HttpClientRespons .path(configuration.aaiBasePath() + configuration.aaiPnfPath() + "/" + pnfName).build().toString(); } - private void addAuthorizationBasicHeader() { - configuration.aaiHeaders().put("Authorization", - "Basic " + performBasicAuthentication(configuration.aaiUserName(), configuration.aaiUserPassword())); - } } diff --git a/rest-services/aai-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/aai/client/service/http/put/AaiHttpPutClient.java b/rest-services/aai-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/aai/client/service/http/put/AaiHttpPutClient.java new file mode 100644 index 00000000..33fcfcce --- /dev/null +++ b/rest-services/aai-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/aai/client/service/http/put/AaiHttpPutClient.java @@ -0,0 +1,53 @@ +/*- + * ============LICENSE_START======================================================= + * DCAEGEN2-SERVICES-SDK + * ================================================================================ + * Copyright (C) 2019 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + + +package org.onap.dcaegen2.services.sdk.rest.services.aai.client.service.http.put; + +import static org.onap.dcaegen2.services.sdk.rest.services.aai.client.service.AaiHttpClientFactory.createRequestDiagnosticContext; + +import org.onap.dcaegen2.services.sdk.rest.services.aai.client.config.AaiClientConfiguration; +import org.onap.dcaegen2.services.sdk.rest.services.aai.client.service.http.AaiHttpClient; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.CloudHttpClient; +import org.onap.dcaegen2.services.sdk.rest.services.model.AaiModel; +import org.onap.dcaegen2.services.sdk.rest.services.model.JsonBodyBuilder; +import reactor.core.publisher.Mono; +import reactor.netty.http.client.HttpClientResponse; + +public class AaiHttpPutClient implements AaiHttpClient<HttpClientResponse> { + + private CloudHttpClient httpPutClient; + private final AaiClientConfiguration configuration; + private final JsonBodyBuilder jsonBodyBuilder; + private final String uri; + + public AaiHttpPutClient(final AaiClientConfiguration configuration, JsonBodyBuilder jsonBodyBuilder, String uri, CloudHttpClient httpPutClient) { + this.configuration = configuration; + this.jsonBodyBuilder = jsonBodyBuilder; + this.uri = uri; + this.httpPutClient = httpPutClient; + } + + @Override + public Mono<HttpClientResponse> getAaiResponse(AaiModel aaiModel) { + return httpPutClient + .put(uri, createRequestDiagnosticContext(), configuration.aaiHeaders(), jsonBodyBuilder, aaiModel); + } +} diff --git a/rest-services/cbs-client/pom.xml b/rest-services/cbs-client/pom.xml index 9544a7fe..34804038 100644 --- a/rest-services/cbs-client/pom.xml +++ b/rest-services/cbs-client/pom.xml @@ -24,12 +24,9 @@ <version>${project.version}</version> </dependency> <dependency> - <groupId>io.vavr</groupId> - <artifactId>vavr</artifactId> - </dependency> - <dependency> - <groupId>org.jetbrains</groupId> - <artifactId>annotations</artifactId> + <groupId>org.onap.dcaegen2.services.sdk.rest.services</groupId> + <artifactId>model</artifactId> + <version>${project.version}</version> </dependency> <dependency> diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/CbsClient.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/CbsClient.java index 3ee12eed..d6a5700a 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/CbsClient.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/CbsClient.java @@ -21,12 +21,10 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api; import com.google.gson.JsonObject; import java.time.Duration; -import java.util.UUID; import java.util.function.BiPredicate; import java.util.function.Function; import org.jetbrains.annotations.NotNull; -import org.onap.dcaegen2.services.sdk.rest.services.model.logging.ImmutableRequestDiagnosticContext; -import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsRequest; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -46,28 +44,27 @@ public interface CbsClient { * <p> * Returns a {@link Mono} that publishes new configuration after CBS client retrieves one. * + * @param request the CBS Request to be performed (can be obtained from {@link CbsRequests}) * @return reactive stream of configuration - * @param diagnosticContext diagnostic context as defined in Logging Guideline - * @since 1.1.2 */ - @NotNull Mono<JsonObject> get(RequestDiagnosticContext diagnosticContext); + @NotNull Mono<JsonObject> get(CbsRequest request); /** * <p> * Poll for configuration. * * <p> - * Will call {@link #get(RequestDiagnosticContext)} after {@code initialDelay} every {@code period}. Resulting entries may or may not be + * Will call {@link #get(CbsRequest)} after {@code initialDelay} every {@code period}. Resulting entries may or may not be * changed, ie. items in the stream might be the same until change is made in CBS. * - * @param diagnosticContext diagnostic context as defined in Logging Guideline + * @param request the CBS Request to be performed (can be obtained from {@link CbsRequests}) * @param initialDelay delay after first request attempt * @param period frequency of update checks * @return stream of configuration states */ - default Flux<JsonObject> get(RequestDiagnosticContext diagnosticContext, Duration initialDelay, Duration period) { + default Flux<JsonObject> get(CbsRequest request, Duration initialDelay, Duration period) { return Flux.interval(initialDelay, period) - .map(i -> ImmutableRequestDiagnosticContext.copyOf(diagnosticContext).withInvocationId(UUID.randomUUID())) + .map(i -> request.withNewInvocationId()) .flatMap(this::get); } @@ -76,7 +73,7 @@ public interface CbsClient { * Poll for configuration updates. * * <p> - * Will call {@link #get(RequestDiagnosticContext)} after {@code initialDelay} every {@code period}. Will emit an item + * Will call {@link #get(CbsRequest)} after {@code initialDelay} every {@code period}. Will emit an item * only when an update was detected, ie. when new item is different then last emitted item. * * <p> @@ -87,17 +84,17 @@ public interface CbsClient { * (<b>experimental API</b>) if you want to react differently to changes in subsets of the configuration. * </li> * <li> - * Use {@link #get(RequestDiagnosticContext, Duration, Duration)} with + * Use {@link #get(CbsRequest, Duration, Duration)} with * {@link Flux#distinctUntilChanged(Function, BiPredicate)} if you want to specify custom comparison logic. * </li> * </ul> * - * @param diagnosticContext diagnostic context as defined in Logging Guideline + * @param request the CBS Request to be performed (can be obtained from {@link CbsRequests}) * @param initialDelay delay after first request attempt * @param period frequency of update checks * @return stream of configuration updates */ - default Flux<JsonObject> updates(RequestDiagnosticContext diagnosticContext, Duration initialDelay, Duration period) { - return get(diagnosticContext, initialDelay, period).distinctUntilChanged(); + default Flux<JsonObject> updates(CbsRequest request, Duration initialDelay, Duration period) { + return get(request, initialDelay, period).distinctUntilChanged(); } } diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/CbsClientFactory.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/CbsClientFactory.java index 36589dad..c11ed533 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/CbsClientFactory.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/CbsClientFactory.java @@ -20,9 +20,9 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api; import org.jetbrains.annotations.NotNull; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClient; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.CbsClientImpl; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.CbsLookup; -import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.CloudHttpClient; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties; import reactor.core.publisher.Mono; @@ -53,10 +53,10 @@ public class CbsClientFactory { */ public static @NotNull Mono<CbsClient> createCbsClient(EnvProperties env) { return Mono.defer(() -> { - final CloudHttpClient httpClient = new CloudHttpClient(); + final RxHttpClient httpClient = RxHttpClient.create(); final CbsLookup lookup = new CbsLookup(httpClient); return lookup.lookup(env) - .map(addr -> CbsClientImpl.create(httpClient, addr, env.appName())); + .map(addr -> new CbsClientImpl(httpClient, env.appName(), addr)); }); } } diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/CbsRequests.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/CbsRequests.java new file mode 100644 index 00000000..3724338d --- /dev/null +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/CbsRequests.java @@ -0,0 +1,85 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api; + +import org.jetbrains.annotations.NotNull; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsRequest; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.ImmutableCbsRequest; +import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext; + +/** + * A factory to various of requests supported by Config Binding Service. + * + * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> + * @since 1.1.4 + */ +public final class CbsRequests { + + /** + * <p>A get-configuration request.</p> + * + * <p>Will bind the configuration for given service and return the bound configuration.</p> + * + * @param diagnosticContext logging diagnostic context (MDC) + * @return the CbsRequest ready to be used when calling {@link CbsClient} + */ + public static @NotNull CbsRequest getConfiguration(RequestDiagnosticContext diagnosticContext) { + return ImmutableCbsRequest.builder() + .diagnosticContext(diagnosticContext) + .requestPath(serviceName -> "/service_component/" + serviceName) + .build(); + } + + /** + * <p>A get-by-key request.</p> + * + * <p>This will call an endpoint that fetches a generic service_component_name:key out of Consul</p> + * + * @param diagnosticContext logging diagnostic context (MDC) + * @return the CbsRequest ready to be used when calling {@link CbsClient} + */ + public static @NotNull CbsRequest getByKey( + RequestDiagnosticContext diagnosticContext, + String key) { + return ImmutableCbsRequest.builder() + .diagnosticContext(diagnosticContext) + .requestPath(serviceName -> "/" + key + "/" + serviceName) + .build(); + } + + /** + * <p>A get-all request.</p> + * + * <p>Will bind the configuration for given service and return the bound configuration, policies, and any other + * keys that are in Consul</p> + * + * @param diagnosticContext logging diagnostic context (MDC) + * @return the CbsRequest ready to be used when calling {@link CbsClient} + */ + public static @NotNull CbsRequest getAll(RequestDiagnosticContext diagnosticContext) { + return ImmutableCbsRequest.builder() + .diagnosticContext(diagnosticContext) + .requestPath(serviceName -> "/service_component_all/" + serviceName) + .build(); + } + +} diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/exceptions/StreamParserError.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/exceptions/StreamParserError.java index cbdea005..3e295a0f 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/exceptions/StreamParserError.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/exceptions/StreamParserError.java @@ -21,13 +21,11 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions; import io.vavr.control.Either; -import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi; /** * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> * @since 1.1.2 */ -@ExperimentalApi public class StreamParserError { private final String message; diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/exceptions/StreamParsingException.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/exceptions/StreamParsingException.java index aa8c2ed7..4fca3d9a 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/exceptions/StreamParsingException.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/exceptions/StreamParsingException.java @@ -20,13 +20,10 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions; -import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi; - /** * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> * @since 1.1.2 */ -@ExperimentalApi public class StreamParsingException extends CbsClientException { private final StreamParserError cause; @@ -36,6 +33,10 @@ public class StreamParsingException extends CbsClientException { this.cause = cause; } + public StreamParsingException(String message) { + this(new StreamParserError(message)); + } + public StreamParserError cause() { return cause; } diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/MerkleTreeParser.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/MerkleTreeParser.java index 15c4eea2..dfd0e2f7 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/MerkleTreeParser.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/MerkleTreeParser.java @@ -19,17 +19,16 @@ */ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.listener; +import static java.lang.String.valueOf; + import com.google.gson.JsonArray; import com.google.gson.JsonElement; import com.google.gson.JsonObject; import io.vavr.collection.List; -import org.jetbrains.annotations.NotNull; - import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; - -import static java.lang.String.valueOf; +import org.jetbrains.annotations.NotNull; /** diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/DataStreams.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/DataStreams.java new file mode 100644 index 00000000..e9263f4f --- /dev/null +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/DataStreams.java @@ -0,0 +1,155 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams; + +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import io.vavr.collection.Stream; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils; +import org.onap.dcaegen2.services.sdk.model.streams.DataStreamDirection; +import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream; + +/** + * Extract streams from the application configuration represented as GSON JsonObject. + * + * Example input: + * <pre> + * { + * "application_config_1": "value_1, + * ... + * "streams_publishes": { + * "stream1": { + * "type": "message_router", + * "dmaap_info": { + * ... + * } + * }, + * "stream2": { + * "type": "data_router", + * "dmaap_info": { + * ... + * } + * } + * }, + * "streams_subscribes": { + * "stream3": { + * "type": "message_router", + * "dmaap_info": { + * ... + * } + * }, + * } + * } + * </pre> + * + * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> + * @since 1.1.4 + */ +public final class DataStreams { + + private DataStreams() { + } + + /** + * <p> + * Extracts sources from application configuration. Parses <em>streams_subscribes</em> subtree. + * </p> + * + * <p> + * For sample input it will yield: + * </p> + * + * <pre> + * [ + * RawDataStream{ + * name="stream3" + * type="message_router" + * direction=SOURCE + * descriptor=JsonObject{ + * type: "message_router", + * dmaap_info: { + * ... + * } + * } + * } + * ] + * </pre> + * + * @param rootJson - the full application configuration + * @return io.vavr.collection.Stream of data streams + */ + public static Stream<RawDataStream<JsonObject>> namedSources(JsonObject rootJson) { + return createCollectionOfStreams(rootJson, DataStreamDirection.SOURCE); + } + + + /** + * <p> + * Extracts sinks from application configuration. Parses <em>streams_publishes</em> subtree. + * </p> + * + * <p> + * For sample input it will yield: + * </p> + * + * <pre> + * [ + * RawDataStream{ + * name="stream1" + * type="message_router" + * direction=SINK + * descriptor=JsonObject{ + * type: "message_router", + * dmaap_info: { + * ... + * } + * } + * }, + * RawDataStream{ + * name="stream2" + * type="data_router" + * direction=SINK + * descriptor=JsonObject{ + * type: "data_router" + * dmaap_info: { + * ... + * } + * } + * } + * ] + * </pre> + * + * @param rootJson - the full application configuration + * @return io.vavr.collection.Stream of data streams + */ + public static Stream<RawDataStream<JsonObject>> namedSinks(JsonObject rootJson) { + return createCollectionOfStreams(rootJson, DataStreamDirection.SINK); + } + + private static Stream<RawDataStream<JsonObject>> createCollectionOfStreams(JsonObject rootJson, DataStreamDirection direction) { + final JsonElement streamsJson = rootJson.get(direction.configurationKey()); + return streamsJson == null + ? Stream.empty() + : DataStreamUtils.mapJsonToStreams(streamsJson, direction); + } + + +} diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParser.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParser.java index f18f2175..2fd1a49d 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParser.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParser.java @@ -21,17 +21,14 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams; import com.google.gson.JsonObject; -import io.vavr.control.Either; -import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParserError; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParsingException; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStream; +import org.onap.dcaegen2.services.sdk.model.streams.DataStream; /** + * Represents parser taking GSON JsonObject as an input + * * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> * @since 1.1.4 */ -@ExperimentalApi public interface StreamFromGsonParser<S extends DataStream> extends StreamParser<JsonObject, S> { } diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParsers.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParsers.java index 4b0223fd..e117a3c1 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParsers.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParsers.java @@ -20,25 +20,76 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.KafkaSinkParser; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.KafkaSourceParser; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSource; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.dr.DataRouterSinkParser; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.dr.DataRouterSourceParser; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.mr.MessageRouterSinkParser; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.mr.MessageRouterSourceParser; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka.KafkaSinkParser; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka.KafkaSourceParser; +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.*; /** + * Factory methods for GSON-based {@code StreamParser}s + * * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> - * @since March 2019 + * @since 1.1.4 */ public final class StreamFromGsonParsers { private StreamFromGsonParsers() { } + /** + * Creates a stream parser capable of parsing Kafka sinks. + * + * @return a stream parser + */ public static StreamFromGsonParser<KafkaSink> kafkaSinkParser() { return KafkaSinkParser.create(); } + /** + * Creates a stream parser capable of parsing Kafka sources. + * + * @return a stream parser + */ public static StreamFromGsonParser<KafkaSource> kafkaSourceParser() { return KafkaSourceParser.create(); } + + /** + * Creates a stream parser capable of parsing DMaaP Message Router sinks. + * + * @return a stream parser + */ + public static StreamFromGsonParser<MessageRouterSink> messageRouterSinkParser() { + return MessageRouterSinkParser.create(); + } + + /** + * Creates a stream parser capable of parsing DMaaP Message Router sources. + * + * @return a stream parser + */ + public static StreamFromGsonParser<MessageRouterSource> messageRouterSourceParser() { + return MessageRouterSourceParser.create(); + } + + /** + * Creates a stream parser capable of parsing DMaaP Data Router sinks. + * + * @return a stream parser + */ + public static StreamFromGsonParser<DataRouterSink> dataRouterSinkParser() { + return DataRouterSinkParser.create(); + } + + /** + * Creates a stream parser capable of parsing DMaaP Data Router sources. + * + * @return a stream parser + */ + public static StreamFromGsonParser<DataRouterSource> dataRouterSourceParser() { + return DataRouterSourceParser.create(); + } } diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamParser.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamParser.java index 3467c809..61afbe4f 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamParser.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamParser.java @@ -22,10 +22,10 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams; import io.vavr.control.Either; import io.vavr.control.Try; -import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParserError; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParsingException; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStream; +import org.onap.dcaegen2.services.sdk.model.streams.DataStream; +import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream; /** * A generic data stream parser which parses {@code T} to data stream {@code S}. @@ -33,9 +33,8 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.Dat * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> * @param <T> input data type, eg. Gson Object * @param <S> output data type - * @since 1.1.3 + * @since 1.1.4 */ -@ExperimentalApi public interface StreamParser<T, S extends DataStream> { /** @@ -44,7 +43,7 @@ public interface StreamParser<T, S extends DataStream> { * @param input - the input data * @return Right(parsing result) or Left(parsing error) */ - default Either<StreamParserError, S> parse(T input) { + default Either<StreamParserError, S> parse(RawDataStream<T> input) { return Try.of(() -> unsafeParse(input)) .toEither() .mapLeft(StreamParserError::fromThrowable); @@ -58,7 +57,7 @@ public interface StreamParser<T, S extends DataStream> { * @return parsing result * @throws StreamParsingException when parsing was unsuccessful */ - default S unsafeParse(T input) { + default S unsafeParse(RawDataStream<T> input) { return parse(input).getOrElseThrow(StreamParsingException::new); } } diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamPredicates.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamPredicates.java new file mode 100644 index 00000000..dfc6344a --- /dev/null +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamPredicates.java @@ -0,0 +1,60 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams; + +import java.util.Objects; +import java.util.function.Predicate; +import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream; +import org.onap.dcaegen2.services.sdk.model.streams.StreamType; + +/** + * A small collection of predicates usable when filtering {@link RawDataStream}s. + * + * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> + * @since 1.1.4 + */ +public final class StreamPredicates { + + public StreamPredicates() { + } + + /** + * Predicate for matching {@link RawDataStream} by name. + * + * @param name data stream name + * @param <T> type of data stream + * @return a predicate which returns true only when a stream name is equal to the given name + */ + public static <T> Predicate<RawDataStream<T>> streamWithName(String name) { + return stream -> Objects.equals(stream.name(), name); + } + + /** + * Predicate for matching {@link RawDataStream} by type. + * + * @param type data stream type + * @param <T> type of data stream + * @return a predicate which returns true only when a stream type is equal to the given type + */ + public static <T> Predicate<RawDataStream<T>> streamOfType(StreamType type) { + return stream -> stream.type() == type; + } +} diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImpl.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImpl.java index 05bfc9be..98f3cc97 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImpl.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImpl.java @@ -24,39 +24,61 @@ import java.net.InetSocketAddress; import java.net.MalformedURLException; import java.net.URL; import org.jetbrains.annotations.NotNull; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpMethod; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.ImmutableHttpRequest; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClient; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient; -import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext; -import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.CloudHttpClient; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsRequest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import reactor.core.publisher.Mono; public class CbsClientImpl implements CbsClient { - private final CloudHttpClient httpClient; - private final String fetchUrl; + private static final Logger LOGGER = LoggerFactory.getLogger(CbsClientImpl.class); + private final RxHttpClient httpClient; + private final String serviceName; + private final InetSocketAddress cbsAddress; - CbsClientImpl(CloudHttpClient httpClient, URL fetchUrl) { + public CbsClientImpl(RxHttpClient httpClient, String serviceName, InetSocketAddress cbsAddress) { this.httpClient = httpClient; - this.fetchUrl = fetchUrl.toString(); + this.serviceName = serviceName; + this.cbsAddress = cbsAddress; } - public static CbsClientImpl create(CloudHttpClient httpClient, InetSocketAddress cbsAddress, String serviceName) { - return new CbsClientImpl(httpClient, constructUrl(cbsAddress, serviceName)); + @Override + public @NotNull Mono<JsonObject> get(CbsRequest request) { + return Mono.fromCallable(() -> constructUrl(request).toString()) + .doOnNext(this::logRequestUrl) + .map(url -> ImmutableHttpRequest.builder() + .method(HttpMethod.GET) + .url(url) + .diagnosticContext(request.diagnosticContext()) + .build()) + .flatMap(httpClient::call) + .map(resp -> resp.bodyAsJson(JsonObject.class)) + .doOnNext(this::logCbsResponse); } - private static URL constructUrl(InetSocketAddress cbsAddress, String serviceName) { + + private URL constructUrl(CbsRequest request) { try { return new URL( "http", cbsAddress.getHostString(), cbsAddress.getPort(), - "/service_component/" + serviceName); + request.requestPath().getForService(serviceName)); } catch (MalformedURLException e) { throw new IllegalArgumentException("Invalid CBS URL", e); } } - @Override - public @NotNull Mono<JsonObject> get(RequestDiagnosticContext diagnosticContext) { - return Mono.defer(() -> httpClient.get(fetchUrl, diagnosticContext, JsonObject.class)); + private void logRequestUrl(String url) { + LOGGER.debug("Calling {} for configuration", url); + } + + private void logCbsResponse(JsonObject json) { + LOGGER.info("Got successful response from Config Binding Service"); + LOGGER.debug("CBS response: {}", json); } } diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsLookup.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsLookup.java index 53d0bd34..99058772 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsLookup.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsLookup.java @@ -21,13 +21,17 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl; import com.google.gson.JsonArray; +import com.google.gson.JsonElement; import com.google.gson.JsonObject; - import java.net.InetSocketAddress; - +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpMethod; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.ImmutableHttpRequest; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClient; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.ServiceLookupException; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties; -import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.CloudHttpClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import reactor.core.publisher.Mono; /** @@ -36,27 +40,45 @@ import reactor.core.publisher.Mono; */ public class CbsLookup { + private static final Logger LOGGER = LoggerFactory.getLogger(CbsLookup.class); private static final String CONSUL_JSON_SERVICE_ADDRESS = "ServiceAddress"; private static final String CONSUL_JSON_SERVICE_PORT = "ServicePort"; - private final CloudHttpClient httpClient; + private final RxHttpClient httpClient; - public CbsLookup(CloudHttpClient httpClient) { + public CbsLookup(RxHttpClient httpClient) { this.httpClient = httpClient; } public Mono<InetSocketAddress> lookup(EnvProperties env) { return Mono.fromCallable(() -> createConsulUrl(env)) + .doOnNext(this::logConsulRequestUrl) .flatMap(this::fetchHttpData) + .doOnNext(this::logConsulResponse) .flatMap(this::firstService) - .map(this::parseServiceEntry); + .map(this::parseServiceEntry) + .doOnNext(this::logCbsServiceAddress); } private String createConsulUrl(EnvProperties env) { return String.format("http://%s:%s/v1/catalog/service/%s", env.consulHost(), env.consulPort(), env.cbsName()); } + private void logConsulRequestUrl(String consulUrl) { + LOGGER.debug("Calling Consul for CBS address. consulUrl={}", consulUrl); + } + private Mono<JsonArray> fetchHttpData(String consulUrl) { - return httpClient.get(consulUrl, JsonArray.class); + return httpClient.call( + ImmutableHttpRequest.builder() + .method(HttpMethod.GET) + .url(consulUrl) + .build()) + .doOnNext(HttpResponse::throwIfUnsuccessful) + .map(resp -> resp.bodyAsJson(JsonArray.class)); + } + + private void logConsulResponse(JsonArray consulResponse) { + LOGGER.debug("Consul response with CBS service list. Will use 1st one. response={}", consulResponse); } private Mono<JsonObject> firstService(JsonArray services) { @@ -71,4 +93,8 @@ public class CbsLookup { service.get(CONSUL_JSON_SERVICE_PORT).getAsInt()); } + private void logCbsServiceAddress(InetSocketAddress address) { + LOGGER.info("Config Binding Service address: {}", address); + } + } diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataStreamUtils.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataStreamUtils.java new file mode 100644 index 00000000..7f3ccf35 --- /dev/null +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataStreamUtils.java @@ -0,0 +1,79 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson; + +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import io.vavr.collection.Stream; +import java.io.IOException; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParsingException; +import org.onap.dcaegen2.services.sdk.model.streams.DataStreamDirection; +import org.onap.dcaegen2.services.sdk.model.streams.ImmutableRawDataStream; +import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream; +import org.onap.dcaegen2.services.sdk.model.streams.StreamType; + +/** + * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> + * @since March 2019 + */ +public final class DataStreamUtils { + + public static Stream<RawDataStream<JsonObject>> mapJsonToStreams(JsonElement streamsJson, + DataStreamDirection direction) { + return Stream.ofAll(streamsJson.getAsJsonObject().entrySet()) + .map(namedSinkJson -> { + final JsonObject jsonObject = namedSinkJson.getValue().getAsJsonObject(); + return rawDataStream(namedSinkJson.getKey(), direction, jsonObject); + }); + } + + public static void assertStreamType( + RawDataStream<JsonObject> json, + StreamType expectedType, + DataStreamDirection expectedDirection) { + if (json.type() != expectedType) { + throw new StreamParsingException( + "Invalid stream type. Expected '" + expectedType + "', but was '" + json.type() + "'"); + } + if (json.direction() != expectedDirection) { + throw new StreamParsingException( + "Invalid stream direction. Expected '" + expectedDirection + "', but was '" + json.direction() + + "'"); + } + } + + public static RawDataStream<JsonObject> readSourceFromResource(String resource) throws IOException { + return rawDataStream(resource, DataStreamDirection.SOURCE, GsonUtils.readObjectFromResource(resource)); + } + + public static RawDataStream<JsonObject> readSinkFromResource(String resource) throws IOException { + return rawDataStream(resource, DataStreamDirection.SINK, GsonUtils.readObjectFromResource(resource)); + } + + private static RawDataStream<JsonObject> rawDataStream(String name, DataStreamDirection direction, JsonObject json) { + return ImmutableRawDataStream.<JsonObject>builder() + .name(name) + .direction(direction) + .type(StreamType.parse(GsonUtils.requiredString(json, "type"))) + .descriptor(json) + .build(); + } +} diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonUtils.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonUtils.java index a813607e..7776a1ef 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonUtils.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonUtils.java @@ -26,58 +26,67 @@ import com.google.gson.JsonElement; import com.google.gson.JsonObject; import com.google.gson.JsonParser; import io.vavr.Lazy; +import io.vavr.control.Option; import java.io.IOException; import java.io.InputStreamReader; import java.io.Reader; import java.util.Map.Entry; -import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.GsonAdaptersAafCredentials; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParsingException; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.mr.GsonAdaptersMessageRouterDmaapInfo; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka.GsonAdaptersKafkaInfo; +import org.onap.dcaegen2.services.sdk.model.streams.GsonAdaptersAafCredentials; +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.GsonAdaptersDataRouterSink; +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.GsonAdaptersDataRouterSource; /** * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> * @since March 2019 */ -final class GsonUtils { +public final class GsonUtils { + private static final Lazy<Gson> GSON = Lazy.of(() -> { GsonBuilder gsonBuilder = new GsonBuilder(); gsonBuilder.registerTypeAdapterFactory(new GsonAdaptersKafkaInfo()); gsonBuilder.registerTypeAdapterFactory(new GsonAdaptersAafCredentials()); + gsonBuilder.registerTypeAdapterFactory(new GsonAdaptersMessageRouterDmaapInfo()); + gsonBuilder.registerTypeAdapterFactory(new GsonAdaptersDataRouterSink()); + gsonBuilder.registerTypeAdapterFactory(new GsonAdaptersDataRouterSource()); return gsonBuilder.create(); }); private GsonUtils() { } - static Gson gsonInstance() { + public static Gson gsonInstance() { return GSON.get(); } - static void assertStreamType(JsonObject json, String expectedType) { - final String actualType = requiredString(json, "type"); - if (!actualType.equals(expectedType)) { - throw new IllegalArgumentException("Invalid stream type. Expected '" + expectedType + "', but was '" + actualType + "'"); - } + public static String requiredString(JsonObject parent, String childName) { + return requiredChild(parent, childName).getAsString(); } - static String requiredString(JsonObject parent, String childName) { - return requiredChild(parent, childName).getAsString(); + public static Option<String> optionalString(JsonObject parent, String childName) { + return Option.of(parent.get(childName).getAsString()); } - static JsonElement requiredChild(JsonObject parent, String childName) { - if (parent.has(childName)) { - return parent.get(childName); - } else { - throw new IllegalArgumentException( - "Could not find sub-node '" + childName + "'. Actual sub-nodes: " + stringifyChildrenNames(parent)); - } + public static JsonElement requiredChild(JsonObject parent, String childName) { + return optionalChild(parent, childName) + .getOrElseThrow(() -> new StreamParsingException( + "Could not find sub-node '" + childName + "'. Actual sub-nodes: " + + stringifyChildrenNames(parent))); + + } + + public static Option<JsonElement> optionalChild(JsonObject parent, String childName) { + return Option.of(parent.get(childName)); } - static JsonObject readObjectFromResource(String resource) throws IOException { + public static JsonObject readObjectFromResource(String resource) throws IOException { return readFromResource(resource).getAsJsonObject(); } - static JsonElement readFromResource(String resource) throws IOException { + public static JsonElement readFromResource(String resource) throws IOException { try (Reader reader = new InputStreamReader(GsonUtils.class.getResourceAsStream(resource))) { return new JsonParser().parse(reader); } diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/StreamsConstants.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/StreamsConstants.java new file mode 100644 index 00000000..68304cae --- /dev/null +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/StreamsConstants.java @@ -0,0 +1,32 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson; + +/** + * @author <a href="mailto:kornel.janiak@nokia.com">Kornel Janiak</a> + */ + +public final class StreamsConstants { + + public static final String DMAAP_INFO_CHILD_NAME = "dmaap_info"; + + public static final String KAFKA_INFO_CHILD_NAME = "kafka_info"; + +}
\ No newline at end of file diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/DmaapUtils.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/DmaapUtils.java new file mode 100644 index 00000000..858fd73b --- /dev/null +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/DmaapUtils.java @@ -0,0 +1,44 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap; + +import com.google.gson.Gson; +import com.google.gson.JsonObject; +import org.jetbrains.annotations.Nullable; +import org.onap.dcaegen2.services.sdk.model.streams.AafCredentials; +import org.onap.dcaegen2.services.sdk.model.streams.ImmutableAafCredentials; + +/** + * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> + * @since March 2019 + */ +public class DmaapUtils { + + public static final ImmutableAafCredentials EMPTY_CREDENTIALS = ImmutableAafCredentials.builder().build(); + + private DmaapUtils() { + } + + public static @Nullable AafCredentials extractAafCredentials(Gson gson, JsonObject input) { + final AafCredentials aafCredentials = gson.fromJson(input, ImmutableAafCredentials.class); + return EMPTY_CREDENTIALS.equals(aafCredentials) ? null : aafCredentials; + } +} diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSinkParser.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSinkParser.java new file mode 100644 index 00000000..4cf7cbec --- /dev/null +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSinkParser.java @@ -0,0 +1,61 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.dr; + +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils.assertStreamType; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.gsonInstance; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.requiredChild; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DMAAP_INFO_CHILD_NAME; + +import com.google.gson.Gson; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser; +import org.onap.dcaegen2.services.sdk.model.streams.DataStreamDirection; +import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream; +import org.onap.dcaegen2.services.sdk.model.streams.StreamType; +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.DataRouterSink; +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableDataRouterSink; + +/** + * @author <a href="mailto:kornel.janiak@nokia.com">Kornel Janiak</a> + */ + +public final class DataRouterSinkParser implements StreamFromGsonParser<DataRouterSink> { + private final Gson gson; + + private DataRouterSinkParser(Gson gson) { + this.gson = gson; + } + + public static DataRouterSinkParser create() { + return new DataRouterSinkParser(gsonInstance()); + } + + @Override + public DataRouterSink unsafeParse(RawDataStream<JsonObject> input) { + assertStreamType(input, StreamType.DATA_ROUTER, DataStreamDirection.SINK); + + final JsonElement dmaapInfoJson = requiredChild(input.descriptor(), DMAAP_INFO_CHILD_NAME); + return gson.fromJson(dmaapInfoJson, ImmutableDataRouterSink.class).withName(input.name()); + + } + +}
\ No newline at end of file diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSourceParser.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSourceParser.java new file mode 100644 index 00000000..a8800711 --- /dev/null +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSourceParser.java @@ -0,0 +1,61 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.dr; + +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils.assertStreamType; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.gsonInstance; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.requiredChild; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DMAAP_INFO_CHILD_NAME; + +import com.google.gson.Gson; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser; +import org.onap.dcaegen2.services.sdk.model.streams.DataStreamDirection; +import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream; +import org.onap.dcaegen2.services.sdk.model.streams.StreamType; +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.DataRouterSource; +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableDataRouterSource; + +/** + * @author <a href="mailto:kornel.janiak@nokia.com">Kornel Janiak</a> + */ + +public final class DataRouterSourceParser implements StreamFromGsonParser<DataRouterSource> { + private final Gson gson; + + private DataRouterSourceParser(Gson gson) { + this.gson = gson; + } + + public static DataRouterSourceParser create() { + return new DataRouterSourceParser(gsonInstance()); + } + + @Override + public DataRouterSource unsafeParse(RawDataStream<JsonObject> input) { + assertStreamType(input, StreamType.DATA_ROUTER, DataStreamDirection.SOURCE); + + final JsonElement dmaapInfoJson = requiredChild(input.descriptor(), DMAAP_INFO_CHILD_NAME); + return gson.fromJson(dmaapInfoJson, ImmutableDataRouterSource.class).withName(input.name()); + + } + +}
\ No newline at end of file diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/GsonMessageRouter.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/GsonMessageRouter.java new file mode 100644 index 00000000..40b8f383 --- /dev/null +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/GsonMessageRouter.java @@ -0,0 +1,73 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.mr; + +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; +import org.onap.dcaegen2.services.sdk.model.streams.AafCredentials; +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouter; + +import java.util.Objects; + +/** + * @author <a href="mailto:kornel.janiak@nokia.com">Kornel Janiak</a> + */ + +abstract class GsonMessageRouter implements MessageRouter { + private final String name; + private final MessageRouterDmaapInfo dmaapInfo; + private final AafCredentials aafCredentials; + + GsonMessageRouter(String name, @NotNull MessageRouterDmaapInfo dmaapInfo, + @Nullable AafCredentials aafCredentials) { + this.name = name; + this.dmaapInfo = Objects.requireNonNull(dmaapInfo, "dmaapInfo"); + this.aafCredentials = aafCredentials; + } + + public String name() { + return name; + } + + @Override + public String topicUrl() { + return dmaapInfo.topicUrl(); + } + + @Override + public @Nullable String clientRole() { + return dmaapInfo.clientRole(); + } + + @Override + public @Nullable String clientId() { + return dmaapInfo.clientId(); + } + + @Override + public @Nullable String location() { + return dmaapInfo.location(); + } + + @Override + public @Nullable AafCredentials aafCredentials() { + return aafCredentials; + } +}
\ No newline at end of file diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/GsonMessageRouterSink.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/GsonMessageRouterSink.java new file mode 100644 index 00000000..650161f7 --- /dev/null +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/GsonMessageRouterSink.java @@ -0,0 +1,38 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.mr; + +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; +import org.onap.dcaegen2.services.sdk.model.streams.AafCredentials; +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink; + +/** + * @author <a href="mailto:kornel.janiak@nokia.com">Kornel Janiak</a> + */ + +public class GsonMessageRouterSink extends GsonMessageRouter implements MessageRouterSink { + GsonMessageRouterSink( + String name, + @NotNull MessageRouterDmaapInfo dmaapInfo, + @Nullable AafCredentials aafCredentials) { + super(name, dmaapInfo, aafCredentials); + } +}
\ No newline at end of file diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/GsonMessageRouterSource.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/GsonMessageRouterSource.java new file mode 100644 index 00000000..286c4494 --- /dev/null +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/GsonMessageRouterSource.java @@ -0,0 +1,38 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.mr; + +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; +import org.onap.dcaegen2.services.sdk.model.streams.AafCredentials; +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSource; + +/** + * @author <a href="mailto:kornel.janiak@nokia.com">Kornel Janiak</a> + */ + +public class GsonMessageRouterSource extends GsonMessageRouter implements MessageRouterSource { + GsonMessageRouterSource( + String name, + @NotNull MessageRouterDmaapInfo dmaapInfo, + @Nullable AafCredentials aafCredentials) { + super(name, dmaapInfo, aafCredentials); + } +}
\ No newline at end of file diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/MessageRouter.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterDmaapInfo.java index 76c6f6db..d887f964 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/MessageRouter.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterDmaapInfo.java @@ -17,28 +17,25 @@ * limitations under the License. * ============LICENSE_END===================================== */ +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.mr; -package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap; - - +import com.google.gson.annotations.SerializedName; +import org.immutables.gson.Gson; +import org.immutables.value.Value; import org.jetbrains.annotations.Nullable; -import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials; - -/** - * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> - * @version 1.2.1 - */ -@ExperimentalApi -public interface MessageRouter { +@Gson.TypeAdapters +@Value.Immutable +public interface MessageRouterDmaapInfo { + @SerializedName("topic_url") String topicUrl(); + @SerializedName("client_role") @Nullable String clientRole(); + @SerializedName("client_id") @Nullable String clientId(); + @SerializedName("location") @Nullable String location(); - - @Nullable AafCredentials aafCredentials(); } diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSinkParser.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSinkParser.java new file mode 100644 index 00000000..dc2c2e2d --- /dev/null +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSinkParser.java @@ -0,0 +1,62 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.mr; + +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils.assertStreamType; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.gsonInstance; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.requiredChild; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DMAAP_INFO_CHILD_NAME; + +import com.google.gson.Gson; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.DmaapUtils; +import org.onap.dcaegen2.services.sdk.model.streams.AafCredentials; +import org.onap.dcaegen2.services.sdk.model.streams.DataStreamDirection; +import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream; +import org.onap.dcaegen2.services.sdk.model.streams.StreamType; +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink; + +public final class MessageRouterSinkParser implements StreamFromGsonParser<MessageRouterSink> { + + private final Gson gson; + + private MessageRouterSinkParser(Gson gson) { + this.gson = gson; + } + + public static MessageRouterSinkParser create() { + return new MessageRouterSinkParser(gsonInstance()); + } + + @Override + public MessageRouterSink unsafeParse(RawDataStream<JsonObject> input) { + assertStreamType(input, StreamType.MESSAGE_ROUTER, DataStreamDirection.SINK); + + final AafCredentials aafCredentials = DmaapUtils.extractAafCredentials(gson, input.descriptor()); + + final JsonElement dmaapInfoJson = requiredChild(input.descriptor(), DMAAP_INFO_CHILD_NAME); + final MessageRouterDmaapInfo dmaapInfo = gson.fromJson(dmaapInfoJson, ImmutableMessageRouterDmaapInfo.class); + + return new GsonMessageRouterSink(input.name(), dmaapInfo, aafCredentials); + + } +} diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSourceParser.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSourceParser.java new file mode 100644 index 00000000..148584a6 --- /dev/null +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSourceParser.java @@ -0,0 +1,66 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.mr; + +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils.assertStreamType; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.gsonInstance; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.requiredChild; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.StreamsConstants.DMAAP_INFO_CHILD_NAME; + +import com.google.gson.Gson; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.DmaapUtils; +import org.onap.dcaegen2.services.sdk.model.streams.AafCredentials; +import org.onap.dcaegen2.services.sdk.model.streams.DataStreamDirection; +import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream; +import org.onap.dcaegen2.services.sdk.model.streams.StreamType; +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSource; + +/** + * @author <a href="mailto:kornel.janiak@nokia.com">Kornel Janiak</a> + */ + +public final class MessageRouterSourceParser implements StreamFromGsonParser<MessageRouterSource> { + private final Gson gson; + + private MessageRouterSourceParser(Gson gson) { + this.gson = gson; + } + + public static MessageRouterSourceParser create() { + return new MessageRouterSourceParser(gsonInstance()); + } + + @Override + public MessageRouterSource unsafeParse(RawDataStream<JsonObject> input) { + assertStreamType(input, StreamType.MESSAGE_ROUTER, DataStreamDirection.SOURCE); + + final AafCredentials aafCredentials = DmaapUtils.extractAafCredentials(gson, input.descriptor()); + + final JsonElement dmaapInfoJson = requiredChild(input.descriptor(), DMAAP_INFO_CHILD_NAME); + final MessageRouterDmaapInfo dmaapInfo = gson.fromJson(dmaapInfoJson, ImmutableMessageRouterDmaapInfo.class); + + return new GsonMessageRouterSource(input.name(), dmaapInfo, aafCredentials); + + } + +}
\ No newline at end of file diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonKafka.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/GsonKafka.java index ecafd30c..a746fac6 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonKafka.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/GsonKafka.java @@ -18,14 +18,13 @@ * ============LICENSE_END===================================== */ -package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson; +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka; import java.util.Objects; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.Kafka; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSource; +import org.onap.dcaegen2.services.sdk.model.streams.AafCredentials; +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.Kafka; /** * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> @@ -33,15 +32,23 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dma */ abstract class GsonKafka implements Kafka { - protected final KafkaInfo kafkaInfo; + private final String name; + final KafkaInfo kafkaInfo; private final AafCredentials aafCredentials; - GsonKafka(@NotNull KafkaInfo kafkaInfo, + GsonKafka( + @NotNull String name, + @NotNull KafkaInfo kafkaInfo, @Nullable AafCredentials aafCredentials) { + this.name = Objects.requireNonNull(name, "name"); this.kafkaInfo = Objects.requireNonNull(kafkaInfo, "kafkaInfo"); this.aafCredentials = aafCredentials; } + public String name() { + return name; + } + @Override public String bootstrapServers() { return kafkaInfo.bootstrapServers(); diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonKafkaSink.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/GsonKafkaSink.java index c45f8470..4cc28b37 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonKafkaSink.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/GsonKafkaSink.java @@ -18,12 +18,12 @@ * ============LICENSE_END===================================== */ -package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson; +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink; +import org.onap.dcaegen2.services.sdk.model.streams.AafCredentials; +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink; /** * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> @@ -32,8 +32,10 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dma class GsonKafkaSink extends GsonKafka implements KafkaSink { GsonKafkaSink( + @NotNull String name, @NotNull KafkaInfo kafkaInfo, @Nullable AafCredentials aafCredentials) { - super(kafkaInfo, aafCredentials); + super(name, kafkaInfo, aafCredentials); } + } diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonKafkaSource.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/GsonKafkaSource.java index 1509d9d7..19108286 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonKafkaSource.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/GsonKafkaSource.java @@ -18,12 +18,12 @@ * ============LICENSE_END===================================== */ -package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson; +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSource; +import org.onap.dcaegen2.services.sdk.model.streams.AafCredentials; +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSource; /** * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> @@ -32,9 +32,10 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dma class GsonKafkaSource extends GsonKafka implements KafkaSource { GsonKafkaSource( + @NotNull String name, @NotNull KafkaInfo kafkaInfo, @Nullable AafCredentials aafCredentials) { - super(kafkaInfo, aafCredentials); + super(name, kafkaInfo, aafCredentials); } @Override diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaInfo.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaInfo.java index 8b17a8d4..fd5602e6 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaInfo.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaInfo.java @@ -18,7 +18,7 @@ * ============LICENSE_END===================================== */ -package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson; +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka; import com.google.gson.annotations.SerializedName; import org.immutables.gson.Gson; diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSinkParser.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSinkParser.java new file mode 100644 index 00000000..1cd3b487 --- /dev/null +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSinkParser.java @@ -0,0 +1,62 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka; + +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils.assertStreamType; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.gsonInstance; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka.KafkaUtils.extractAafCredentials; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka.KafkaUtils.extractKafkaInfo; + +import com.google.gson.Gson; +import com.google.gson.JsonObject; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser; +import org.onap.dcaegen2.services.sdk.model.streams.AafCredentials; +import org.onap.dcaegen2.services.sdk.model.streams.DataStreamDirection; +import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream; +import org.onap.dcaegen2.services.sdk.model.streams.StreamType; +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink; + +/** + * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> + * @since 1.1.4 + */ +public final class KafkaSinkParser implements StreamFromGsonParser<KafkaSink> { + private final Gson gson; + + private KafkaSinkParser(Gson gson) { + this.gson = gson; + } + + public static KafkaSinkParser create() { + return new KafkaSinkParser(gsonInstance()); + } + + @Override + public KafkaSink unsafeParse(RawDataStream<JsonObject> input) { + assertStreamType(input, StreamType.KAFKA, DataStreamDirection.SINK); + final JsonObject json = input.descriptor(); + + final KafkaInfo kafkaInfo = extractKafkaInfo(gson, json); + final AafCredentials aafCreds = extractAafCredentials(gson, json).getOrNull(); + + return new GsonKafkaSink(input.name(), kafkaInfo, aafCreds); + } +} diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSourceParser.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSourceParser.java index 8b48a2a4..7bdc12c6 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSourceParser.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSourceParser.java @@ -18,40 +18,45 @@ * ============LICENSE_END===================================== */ -package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson; +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka; -import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.assertStreamType; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils.assertStreamType; import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.gsonInstance; -import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.requiredChild; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka.KafkaUtils.extractAafCredentials; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka.KafkaUtils.extractKafkaInfo; import com.google.gson.Gson; -import com.google.gson.JsonElement; import com.google.gson.JsonObject; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSource; +import org.onap.dcaegen2.services.sdk.model.streams.AafCredentials; +import org.onap.dcaegen2.services.sdk.model.streams.DataStreamDirection; +import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream; +import org.onap.dcaegen2.services.sdk.model.streams.StreamType; +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSource; /** * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> * @since 1.1.4 */ -public class KafkaSourceParser implements StreamFromGsonParser<KafkaSource> { +public final class KafkaSourceParser implements StreamFromGsonParser<KafkaSource> { private final Gson gson; - public static KafkaSourceParser create() { - return new KafkaSourceParser(gsonInstance()); + private KafkaSourceParser(Gson gson) { + this.gson = gson; } - KafkaSourceParser(Gson gson) { - this.gson = gson; + public static KafkaSourceParser create() { + return new KafkaSourceParser(gsonInstance()); } @Override - public KafkaSource unsafeParse(JsonObject input) { - assertStreamType(input, "kafka"); + public KafkaSource unsafeParse(RawDataStream<JsonObject> input) { + assertStreamType(input, StreamType.KAFKA, DataStreamDirection.SOURCE); + final JsonObject json = input.descriptor(); - final JsonElement kafkaInfoJson = requiredChild(input, "kafka_info"); - final KafkaInfo kafkaInfo = gson.fromJson(kafkaInfoJson, ImmutableKafkaInfo.class); + final KafkaInfo kafkaInfo = extractKafkaInfo(gson, json); + final AafCredentials aafCreds = extractAafCredentials(gson, json).getOrNull(); - return new GsonKafkaSource(kafkaInfo, null); + return new GsonKafkaSource(input.name(), kafkaInfo, aafCreds); } } diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSinkParser.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaUtils.java index 393fe40f..50a004c6 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSinkParser.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaUtils.java @@ -18,41 +18,34 @@ * ============LICENSE_END===================================== */ -package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson; +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka; -import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.assertStreamType; -import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.gsonInstance; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.optionalChild; import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils.requiredChild; import com.google.gson.Gson; import com.google.gson.JsonElement; import com.google.gson.JsonObject; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink; +import io.vavr.control.Option; +import org.onap.dcaegen2.services.sdk.model.streams.AafCredentials; +import org.onap.dcaegen2.services.sdk.model.streams.ImmutableAafCredentials; /** * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> - * @since 1.1.4 + * @since March 2019 */ -public class KafkaSinkParser implements StreamFromGsonParser<KafkaSink> { +final class KafkaUtils { - private final Gson gson; - - public static KafkaSinkParser create() { - return new KafkaSinkParser(gsonInstance()); - } - - KafkaSinkParser(Gson gson) { - this.gson = gson; + private KafkaUtils() { } - @Override - public KafkaSink unsafeParse(JsonObject input) { - assertStreamType(input, "kafka"); - + static KafkaInfo extractKafkaInfo(Gson gson, JsonObject input) { final JsonElement kafkaInfoJson = requiredChild(input, "kafka_info"); - final KafkaInfo kafkaInfo = gson.fromJson(kafkaInfoJson, ImmutableKafkaInfo.class); + return gson.fromJson(kafkaInfoJson, ImmutableKafkaInfo.class); + } - return new GsonKafkaSink(kafkaInfo, null); + static Option<AafCredentials> extractAafCredentials(Gson gson, JsonObject input) { + return optionalChild(input, "aaf_credentials") + .map(aafCredsJson -> gson.fromJson(aafCredsJson, ImmutableAafCredentials.class)); } } diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/CbsRequest.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/CbsRequest.java new file mode 100644 index 00000000..0a319666 --- /dev/null +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/CbsRequest.java @@ -0,0 +1,57 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model; + +import java.util.UUID; +import org.immutables.value.Value; +import org.onap.dcaegen2.services.sdk.rest.services.model.logging.ImmutableRequestDiagnosticContext; +import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext; + +/** + * A recipe on which CBS endpoint to call. Usually you should use {@link org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests} + * which is a factory to each request type. + * + * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> + * @since 1.1.4 + */ +@Value.Immutable +public interface CbsRequest { + + /** + * The CBS request path. It will be created by the library. + */ + RequestPath requestPath(); + + /** + * Diagnostic context as defined in Logging Guideline + */ + RequestDiagnosticContext diagnosticContext(); + + /** + * Return a view on this CbsRequest with updated InvocationID. + */ + default CbsRequest withNewInvocationId() { + final RequestDiagnosticContext newDiagnosticCtx = ImmutableRequestDiagnosticContext + .copyOf(diagnosticContext()) + .withInvocationId(UUID.randomUUID()); + return ImmutableCbsRequest.copyOf(this).withDiagnosticContext(newDiagnosticCtx); + } +} diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/DataRouter.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/RequestPath.java index ba26b103..97d4b4ea 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/DataRouter.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/RequestPath.java @@ -18,22 +18,20 @@ * ============LICENSE_END===================================== */ -package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap; +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model; - -import org.jetbrains.annotations.Nullable; -import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi; +import io.vavr.Function1; /** * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> - * @version 1.2.1 + * @since March 2019 */ -@ExperimentalApi -public interface DataRouter { - - String location(); - - @Nullable String username(); +@FunctionalInterface +public interface RequestPath extends Function1<String, String> { + String getForService(String serviceName); - @Nullable String password(); + @Override + default String apply(String serviceName) { + return getForService(serviceName); + } } diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSourceParserTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/CbsRequestsTest.java index 87131285..50233d3c 100644 --- a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSourceParserTest.java +++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/CbsRequestsTest.java @@ -18,43 +18,58 @@ * ============LICENSE_END===================================== */ -package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson; +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api; import static org.assertj.core.api.Assertions.assertThat; -import com.google.gson.JsonObject; -import java.io.IOException; import org.junit.jupiter.api.Test; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSource; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsRequest; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.ImmutableEnvProperties; +import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext; /** * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> * @since March 2019 */ -class KafkaSourceParserTest { +class CbsRequestsTest { - private final StreamFromGsonParser<KafkaSource> cut = StreamFromGsonParsers.kafkaSourceParser(); + private final RequestDiagnosticContext diagCtx = RequestDiagnosticContext.create(); + private final String serviceName = "srv-name"; @Test - void precondition_assureInstanceOf() { - assertThat(cut).isInstanceOf(KafkaSourceParser.class); + void getConfiguration() { + // given + final CbsRequest cut = CbsRequests.getConfiguration(diagCtx); + + // when + final String result = cut.requestPath().getForService(serviceName); + + // then + assertThat(result).isEqualTo("/service_component/srv-name"); + } + + @Test + void getByKey() { + // given + final CbsRequest cut = CbsRequests.getByKey(diagCtx, "configKey"); + + // when + final String result = cut.requestPath().getForService(serviceName); + + // then + assertThat(result).isEqualTo("/configKey/srv-name"); } @Test - void shouldParseMinimalKafkaSourceDefinition() throws IOException { + void getAll() { // given - JsonObject input = GsonUtils.readObjectFromResource("/streams/kafka_source_minimal.json"); + final CbsRequest cut = CbsRequests.getAll(diagCtx); // when - final KafkaSource result = cut.unsafeParse(input); + final String result = cut.requestPath().getForService(serviceName); // then - assertThat(result.aafCredentials()).isNull(); - assertThat(result.bootstrapServers()).isEqualTo("dmaap-mr-kafka-0:6060,dmaap-mr-kafka-1:6060"); - assertThat(result.topicName()).isEqualTo("HVVES_PERF3GPP"); - assertThat(result.clientId()).isNull(); - assertThat(result.clientRole()).isNull(); + assertThat(result).isEqualTo("/service_component_all/srv-name"); } }
\ No newline at end of file diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/MerkleTreeParserTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/MerkleTreeParserTest.java index c9ceeaf1..8a5edcc9 100644 --- a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/MerkleTreeParserTest.java +++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/listener/MerkleTreeParserTest.java @@ -20,17 +20,15 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.listener; +import static org.assertj.core.api.Assertions.assertThat; + import com.google.gson.JsonArray; import com.google.gson.JsonObject; import io.vavr.collection.List; +import java.math.BigInteger; import org.jetbrains.annotations.NotNull; import org.junit.jupiter.api.Test; -import java.math.BigInteger; - - -import static org.assertj.core.api.Assertions.assertThat; - class MerkleTreeParserTest { private final MerkleTreeParser cut = new MerkleTreeParser(); diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/MessageRouterSinksIT.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/MessageRouterSinksIT.java new file mode 100644 index 00000000..c57ce027 --- /dev/null +++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/MessageRouterSinksIT.java @@ -0,0 +1,151 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamPredicates.streamWithName; + +import com.google.gson.JsonObject; +import java.io.IOException; +import org.junit.jupiter.api.Test; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils; +import org.onap.dcaegen2.services.sdk.model.streams.DataStreamDirection; +import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream; +import org.onap.dcaegen2.services.sdk.model.streams.StreamType; +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink; + +/** + * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> + * @since March 2019 + */ +class MessageRouterSinksIT { + + final JsonObject json = GsonUtils.readObjectFromResource("/streams/integration_message_router.json"); + + MessageRouterSinksIT() throws IOException { + } + + @Test + void thereShouldBeNoDataSources() { + assertThat(DataStreams.namedSources(json)).isEmpty(); + } + + @Test + void thereShouldBeSomeSinksDefined() { + assertThat(DataStreams.namedSinks(json)).isNotEmpty(); + assertThat(DataStreams.namedSinks(json)).hasSize(4); + } + + @Test + void allSinksShouldBeOfMessageRouterType() { + assertThat(DataStreams.namedSinks(json).map(RawDataStream::type).distinct()) + .containsExactly(StreamType.MESSAGE_ROUTER); + } + + @Test + void sinksShouldHaveProperDirection() { + assertThat(DataStreams.namedSinks(json).map(RawDataStream::direction).distinct()) + .containsExactly(DataStreamDirection.SINK); + } + + @Test + void verifySecMeasurementSink() { + // given + final String streamName = "sec_measurement"; + final RawDataStream<JsonObject> sink = DataStreams.namedSinks(json).find(streamWithName(streamName)) + .get(); + + // when + final MessageRouterSink parsedSink = StreamFromGsonParsers.messageRouterSinkParser().unsafeParse(sink); + + // then + assertThat(parsedSink.name()).describedAs("name").isEqualTo(streamName); + assertThat(parsedSink.aafCredentials()).describedAs("aaf credentials").isNotNull(); + assertThat(parsedSink.aafCredentials().username()).describedAs("aaf user name").isEqualTo("aaf_username"); + assertThat(parsedSink.aafCredentials().password()).describedAs("aaf password").isEqualTo("aaf_password"); + assertThat(parsedSink.location()).describedAs("location").isEqualTo("mtl5"); + assertThat(parsedSink.clientId()).describedAs("client id").isEqualTo("111111"); + assertThat(parsedSink.clientRole()).describedAs("client role").isEqualTo("com.att.dcae.member"); + assertThat(parsedSink.topicUrl()).describedAs("topic url") + .isEqualTo("https://mrlocal:3905/events/com.att.dcae.dmaap.FTL2.SEC-MEASUREMENT-OUTPUT"); + } + + @Test + void verifySecFaultUnsecureSink() { + // given + final String streamName = "sec_fault_unsecure"; + final RawDataStream<JsonObject> sink = DataStreams.namedSinks(json).find(streamWithName(streamName)) + .get(); + + // when + final MessageRouterSink parsedSink = StreamFromGsonParsers.messageRouterSinkParser().unsafeParse(sink); + + // then + assertThat(parsedSink.name()).describedAs("name").isEqualTo(streamName); + assertThat(parsedSink.aafCredentials()).describedAs("aaf credentials").isNull(); + assertThat(parsedSink.location()).describedAs("location").isEqualTo("mtl5"); + assertThat(parsedSink.clientId()).describedAs("client id").isNull(); + assertThat(parsedSink.clientRole()).describedAs("client role").isNull(); + assertThat(parsedSink.topicUrl()).describedAs("topic url") + .isEqualTo("http://ueb.global:3904/events/DCAE-SE-COLLECTOR-EVENTS-DEV"); + } + + @Test + void verifySecMeasurementUnsecureSink() { + // given + final String streamName = "sec_measurement_unsecure"; + final RawDataStream<JsonObject> sink = DataStreams.namedSinks(json).find(streamWithName(streamName)) + .get(); + + // when + final MessageRouterSink parsedSink = StreamFromGsonParsers.messageRouterSinkParser().unsafeParse(sink); + + // then + assertThat(parsedSink.name()).describedAs("name").isEqualTo(streamName); + assertThat(parsedSink.aafCredentials()).describedAs("aaf credentials").isNull(); + assertThat(parsedSink.location()).describedAs("location").isEqualTo("mtl5"); + assertThat(parsedSink.clientId()).describedAs("client id").isNull(); + assertThat(parsedSink.clientRole()).describedAs("client role").isNull(); + assertThat(parsedSink.topicUrl()).describedAs("topic url") + .isEqualTo("http://ueb.global:3904/events/DCAE-SE-COLLECTOR-EVENTS-DEV"); + } + + @Test + void verifySecFaultSink() { + // given + final String streamName = "sec_fault"; + final RawDataStream<JsonObject> sink = DataStreams.namedSinks(json).find(streamWithName(streamName)) + .get(); + + // when + final MessageRouterSink parsedSink = StreamFromGsonParsers.messageRouterSinkParser().unsafeParse(sink); + + // then + assertThat(parsedSink.name()).describedAs("name").isEqualTo(streamName); + assertThat(parsedSink.aafCredentials()).describedAs("aaf credentials").isNotNull(); + assertThat(parsedSink.aafCredentials().username()).describedAs("aaf user name").isEqualTo("aaf_username"); + assertThat(parsedSink.aafCredentials().password()).describedAs("aaf password").isEqualTo("aaf_password"); + assertThat(parsedSink.location()).describedAs("location").isEqualTo("mtl5"); + assertThat(parsedSink.clientId()).describedAs("client id").isEqualTo("222222"); + assertThat(parsedSink.clientRole()).describedAs("client role").isEqualTo("com.att.dcae.member"); + assertThat(parsedSink.topicUrl()).describedAs("topic url") + .isEqualTo("https://mrlocal:3905/events/com.att.dcae.dmaap.FTL2.SEC-FAULT-OUTPUT"); + } +}
\ No newline at end of file diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/MixedDmaapStreamsIT.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/MixedDmaapStreamsIT.java new file mode 100644 index 00000000..4508939a --- /dev/null +++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/MixedDmaapStreamsIT.java @@ -0,0 +1,204 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamPredicates.streamWithName; + +import com.google.gson.JsonObject; +import io.vavr.collection.List; +import java.io.IOException; +import org.junit.jupiter.api.Test; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.GsonUtils; +import org.onap.dcaegen2.services.sdk.model.streams.DataStreamDirection; +import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream; +import org.onap.dcaegen2.services.sdk.model.streams.StreamType; +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.DataRouterSink; +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.DataRouterSource; +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSource; + +/** + * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> + * @since March 2019 + */ +class MixedDmaapStreamsIT { + + final JsonObject json = GsonUtils.readObjectFromResource("/streams/integration_mixed_dmaap.json"); + final List<RawDataStream<JsonObject>> sources = DataStreams.namedSources(json).toList(); + final List<RawDataStream<JsonObject>> sinks = DataStreams.namedSinks(json).toList(); + + MixedDmaapStreamsIT() throws IOException { + } + + @Test + void thereShouldBeSomeSinksDefined() { + assertThat(sinks).isNotEmpty(); + assertThat(sinks).hasSize(3); + } + + @Test + void thereShouldBeSomeSourcesDefined() { + assertThat(sources).isNotEmpty(); + assertThat(sources).hasSize(3); + } + + @Test + void allStreamsShouldBeOfProperType() { + assertThat(sources.map(RawDataStream::type).distinct()).containsExactly(StreamType.DATA_ROUTER, StreamType.MESSAGE_ROUTER); + assertThat(sinks.map(RawDataStream::type).distinct()).containsExactly(StreamType.DATA_ROUTER); + } + + @Test + void sinksShouldHaveProperDirection() { + assertThat(sinks.map(RawDataStream::direction).distinct()) + .containsExactly(DataStreamDirection.SINK); + } + + @Test + void sourcesShouldHaveProperDirection() { + assertThat(sources.map(RawDataStream::direction).distinct()) + .containsExactly(DataStreamDirection.SOURCE); + } + + @Test + void verifyDcaeGuestOsSource() { + // given + final String streamName = "DCAE_GUEST_OS"; + final RawDataStream<JsonObject> source = sources.find(streamWithName(streamName)).get(); + + // when + final DataRouterSource parsedSource = StreamFromGsonParsers.dataRouterSourceParser().unsafeParse(source); + + // then + assertThat(parsedSource.name()).describedAs("name").isEqualTo(streamName); + assertThat(parsedSource.location()).describedAs("location").isEqualTo("mtn23"); + assertThat(parsedSource.username()).describedAs("user name").isEqualTo("xyz"); + assertThat(parsedSource.password()).describedAs("password").isEqualTo("abc"); + assertThat(parsedSource.deliveryUrl()).describedAs("delivery url") + .isEqualTo("https://dr.global:8666/DCAE_SAM_GUEST_OS"); + assertThat(parsedSource.subscriberId()).describedAs("subscriber id").isEqualTo("811"); + } + + @Test + void verifyDcaeRawDataSource() { + // given + final String streamName = "DCAE_RAW_DATA"; + final RawDataStream<JsonObject> source = sources.find(streamWithName(streamName)).get(); + + // when + final DataRouterSource parsedSource = StreamFromGsonParsers.dataRouterSourceParser().unsafeParse(source); + + // then + assertThat(parsedSource.name()).describedAs("name").isEqualTo(streamName); + assertThat(parsedSource.location()).describedAs("location").isEqualTo("mtn23"); + assertThat(parsedSource.username()).describedAs("user name").isEqualTo("abc"); + assertThat(parsedSource.password()).describedAs("password").isEqualTo("xyz"); + assertThat(parsedSource.deliveryUrl()).describedAs("delivery url") + .isEqualTo("https://dr.global:8666/DCAE_CEILOMETER_RAW_DATA"); + assertThat(parsedSource.subscriberId()).describedAs("subscriber id").isEqualTo("812"); + } + + @Test + void verifySecMeasurementOutputSource() { + // given + final String streamName = "sec-measurement-output"; + final RawDataStream<JsonObject> source = sources.find(streamWithName(streamName)) + .get(); + + // when + final MessageRouterSource parsedSource = StreamFromGsonParsers.messageRouterSourceParser().unsafeParse(source); + + // then + assertThat(parsedSource.name()).describedAs("name").isEqualTo(streamName); + assertThat(parsedSource.aafCredentials()).describedAs("aaf credentials").isNotNull(); + assertThat(parsedSource.aafCredentials().username()).describedAs("aaf user name").isEqualTo("aaf_username"); + assertThat(parsedSource.aafCredentials().password()).describedAs("aaf password").isEqualTo("aaf_password"); + assertThat(parsedSource.location()).describedAs("location").isEqualTo("mtn23"); + assertThat(parsedSource.clientId()).describedAs("client id").isEqualTo("1111"); + assertThat(parsedSource.clientRole()).describedAs("client role").isEqualTo("com.att.dcae.member"); + assertThat(parsedSource.topicUrl()).describedAs("topic url") + .isEqualTo("https://mr.hostname:3905/events/com.att.dcae.dmaap.SEC-MEASUREMENT-OUTPUT-v1"); + } + + @Test + void verifyDcaeVoipPmDataSink() { + // given + final String streamName = "DCAE_VOIP_PM_DATA"; + final RawDataStream<JsonObject> sink = sinks.find(streamWithName(streamName)).get(); + + // when + final DataRouterSink parsedSink = StreamFromGsonParsers.dataRouterSinkParser().unsafeParse(sink); + + // then + assertThat(parsedSink.name()).describedAs("name").isEqualTo(streamName); + assertThat(parsedSink.location()).describedAs("location").isEqualTo("mtn23"); + assertThat(parsedSink.username()).describedAs("user name").isEqualTo("abc"); + assertThat(parsedSink.password()).describedAs("password").isEqualTo("xyz"); + assertThat(parsedSink.logUrl()).describedAs("log url") + .isEqualTo("https://dcae-drps/feedlog/206"); + assertThat(parsedSink.publishUrl()).describedAs("publish url") + .isEqualTo("https://dcae-drps/publish/206"); + assertThat(parsedSink.publisherId()).describedAs("publisher id").isEqualTo("206.518hu"); + } + + @Test + void verifyDcaeGuestOsOSink() { + // given + final String streamName = "DCAE_GUEST_OS_O"; + final RawDataStream<JsonObject> sink = sinks.find(streamWithName(streamName)).get(); + + // when + final DataRouterSink parsedSink = StreamFromGsonParsers.dataRouterSinkParser().unsafeParse(sink); + + // then + assertThat(parsedSink.name()).describedAs("name").isEqualTo(streamName); + assertThat(parsedSink.location()).describedAs("location").isEqualTo("mtn23"); + assertThat(parsedSink.username()).describedAs("user name").isEqualTo("axyz"); + assertThat(parsedSink.password()).describedAs("password").isEqualTo("abc"); + assertThat(parsedSink.logUrl()).describedAs("log url") + .isEqualTo("https://dcae-drps/feedlog/203"); + assertThat(parsedSink.publishUrl()).describedAs("publish url") + .isEqualTo("https://dcae-drps/publish/203"); + assertThat(parsedSink.publisherId()).describedAs("publisher id").isEqualTo("203.2od8s"); + } + + + @Test + void verifyDcaePmDataSink() { + // given + final String streamName = "DCAE_PM_DATA"; + final RawDataStream<JsonObject> sink = sinks.find(streamWithName(streamName)).get(); + + // when + final DataRouterSink parsedSink = StreamFromGsonParsers.dataRouterSinkParser().unsafeParse(sink); + + // then + assertThat(parsedSink.name()).describedAs("name").isEqualTo(streamName); + assertThat(parsedSink.location()).describedAs("location").isEqualTo("mtn23bdce2"); + assertThat(parsedSink.username()).describedAs("user name").isEqualTo("xyz"); + assertThat(parsedSink.password()).describedAs("password").isEqualTo("abc"); + assertThat(parsedSink.logUrl()).describedAs("log url") + .isEqualTo("https://dcae-drps/feedlog/493"); + assertThat(parsedSink.publishUrl()).describedAs("publish url") + .isEqualTo("https://dcae-drps/publish/493"); + assertThat(parsedSink.publisherId()).describedAs("publisher id").isEqualTo("493.eacqs"); + } + +}
\ No newline at end of file diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplIT.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplIT.java index e862d849..a296c920 100644 --- a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplIT.java +++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplIT.java @@ -20,8 +20,12 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl; -import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.DummyHttpServer.sendResource; -import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.DummyHttpServer.sendString; +import static org.assertj.core.api.Assertions.assertThat; +import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendResource; +import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendString; +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamPredicates.streamOfType; +import static org.onap.dcaegen2.services.sdk.model.streams.StreamType.KAFKA; +import static org.onap.dcaegen2.services.sdk.model.streams.StreamType.MESSAGE_ROUTER; import com.google.gson.JsonObject; import io.vavr.collection.Stream; @@ -29,11 +33,22 @@ import java.time.Duration; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.ImmutableEnvProperties; -import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParsingException; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.DataStreams; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsRequest; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.ImmutableEnvProperties; +import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream; +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink; +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSource; +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink; +import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; @@ -51,7 +66,9 @@ class CbsClientImplIT { + " \"ServicePort\": PORT\n" + " }\n" + "]\n"; - private static final String SAMPLE_CONFIG = "/sample_config.json"; + private static final String SAMPLE_CONFIG = "/sample_service_config.json"; + private static final String SAMPLE_ALL = "/sample_all.json"; + private static final String SAMPLE_KEY = "/sample_key.json"; private static final String SAMPLE_CONFIG_KEY = "keystore.path"; private static final String EXPECTED_CONFIG_VALUE = "/var/run/security/keystore.p12"; private static EnvProperties sampleEnvironment; @@ -61,7 +78,10 @@ class CbsClientImplIT { static void setUp() { server = DummyHttpServer.start(routes -> routes.get("/v1/catalog/service/the_cbs", (req, resp) -> sendString(resp, lazyConsulResponse())) - .get("/service_component/dcae-component", (req, resp) -> sendResource(resp, SAMPLE_CONFIG))); + .get("/service_component/dcae-component", (req, resp) -> sendResource(resp, SAMPLE_CONFIG)) + .get("/service_component_all/dcae-component", (req, resp) -> sendResource(resp, SAMPLE_ALL)) + .get("/sampleKey/dcae-component", (req, resp) -> sendResource(resp, SAMPLE_KEY)) + ); sampleEnvironment = ImmutableEnvProperties.builder() .appName("dcae-component") .cbsName("the_cbs") @@ -79,10 +99,10 @@ class CbsClientImplIT { void testCbsClientWithSingleCall() { // given final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleEnvironment); - final RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create(); + final CbsRequest request = CbsRequests.getConfiguration(RequestDiagnosticContext.create()); // when - final Mono<JsonObject> result = sut.flatMap(cbsClient -> cbsClient.get(diagnosticContext)); + final Mono<JsonObject> result = sut.flatMap(cbsClient -> cbsClient.get(request)); // then StepVerifier.create(result.map(this::sampleConfigValue)) @@ -95,11 +115,11 @@ class CbsClientImplIT { void testCbsClientWithPeriodicCall() { // given final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleEnvironment); - final RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create(); + final CbsRequest request = CbsRequests.getConfiguration(RequestDiagnosticContext.create()); // when final Flux<JsonObject> result = sut - .flatMapMany(cbsClient -> cbsClient.get(diagnosticContext, Duration.ZERO, Duration.ofMillis(10))); + .flatMapMany(cbsClient -> cbsClient.get(request, Duration.ZERO, Duration.ofMillis(10))); // then final int itemsToTake = 5; @@ -113,12 +133,12 @@ class CbsClientImplIT { void testCbsClientWithUpdatesCall() { // given final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleEnvironment); - final RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create(); + final CbsRequest request = CbsRequests.getConfiguration(RequestDiagnosticContext.create()); final Duration period = Duration.ofMillis(10); // when final Flux<JsonObject> result = sut - .flatMapMany(cbsClient -> cbsClient.updates(diagnosticContext, Duration.ZERO, period)); + .flatMapMany(cbsClient -> cbsClient.updates(request, Duration.ZERO, period)); // then final Duration timeToCollectItemsFor = period.multipliedBy(50); @@ -128,6 +148,130 @@ class CbsClientImplIT { .verify(Duration.ofSeconds(5)); } + @Test + void testCbsClientWithStreamsParsing() { + // given + final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleEnvironment); + final StreamFromGsonParser<KafkaSink> kafkaSinkParser = StreamFromGsonParsers.kafkaSinkParser(); + final CbsRequest request = CbsRequests.getConfiguration(RequestDiagnosticContext.create()); + + // when + final Mono<KafkaSink> result = sut.flatMap(cbsClient -> cbsClient.get(request)) + .map(json -> + DataStreams.namedSinks(json).map(kafkaSinkParser::unsafeParse).head() + ); + + // then + StepVerifier.create(result) + .consumeNextWith(kafkaSink -> { + assertThat(kafkaSink.name()).isEqualTo("perf3gpp"); + assertThat(kafkaSink.bootstrapServers()).isEqualTo("dmaap-mr-kafka:6060"); + assertThat(kafkaSink.topicName()).isEqualTo("HVVES_PERF3GPP"); + }) + .expectComplete() + .verify(Duration.ofSeconds(5)); + } + + @Test + void testCbsClientWithStreamsParsingUsingSwitch() { + // given + final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleEnvironment); + final CbsRequest request = CbsRequests.getConfiguration(RequestDiagnosticContext.create()); + // TODO: Use these parsers below + final StreamFromGsonParser<KafkaSink> kafkaSinkParser = StreamFromGsonParsers.kafkaSinkParser(); + final StreamFromGsonParser<MessageRouterSink> mrSinkParser = StreamFromGsonParsers.messageRouterSinkParser(); + + // when + final Mono<Void> result = sut.flatMap(cbsClient -> cbsClient.get(request)) + .map(json -> { + final Stream<RawDataStream<JsonObject>> sinks = DataStreams.namedSinks(json); + + final Stream<KafkaSink> allKafkaSinks = sinks.filter(streamOfType(KAFKA)) + .map(kafkaSinkParser::unsafeParse); + final Stream<MessageRouterSink> allMrSinks = sinks.filter(streamOfType(MESSAGE_ROUTER)) + .map(mrSinkParser::unsafeParse); + + assertThat(allKafkaSinks.size()) + .describedAs("Number of kafka sinks") + .isEqualTo(2); + assertThat(allMrSinks.size()) + .describedAs("Number of DMAAP-MR sinks") + .isEqualTo(1); + + return true; + }) + .then(); + + // then + StepVerifier.create(result) + .expectComplete() + .verify(Duration.ofSeconds(5)); + } + + @Test + void testCbsClientWithStreamsParsingWhenUsingInvalidParser() { + // given + final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleEnvironment); + final StreamFromGsonParser<KafkaSource> kafkaSourceParser = StreamFromGsonParsers.kafkaSourceParser(); + final CbsRequest request = CbsRequests.getConfiguration(RequestDiagnosticContext.create()); + + // when + final Mono<KafkaSource> result = sut.flatMap(cbsClient -> cbsClient.get(request)) + .map(json -> + DataStreams.namedSources(json).map(kafkaSourceParser::unsafeParse).head() + ); + + // then + StepVerifier.create(result) + .expectErrorSatisfies(ex -> { + assertThat(ex).isInstanceOf(StreamParsingException.class); + assertThat(ex).hasMessageContaining("Invalid stream type"); + assertThat(ex).hasMessageContaining(MESSAGE_ROUTER.toString()); + assertThat(ex).hasMessageContaining(KAFKA.toString()); + }) + .verify(Duration.ofSeconds(5)); + } + + @Test + void testCbsClientWithSingleAllRequest() { + // given + final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleEnvironment); + final CbsRequest request = CbsRequests.getAll(RequestDiagnosticContext.create()); + + // when + final Mono<JsonObject> result = sut.flatMap(cbsClient -> cbsClient.get(request)); + + // then + StepVerifier.create(result) + .assertNext(json -> { + assertThat(json.get("config")).isNotNull(); + assertThat(json.get("policies")).isNotNull(); + assertThat(json.get("sampleKey")).isNotNull(); + }) + .expectComplete() + .verify(Duration.ofSeconds(5)); + } + + + @Test + void testCbsClientWithSingleKeyRequest() { + // given + final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleEnvironment); + final CbsRequest request = CbsRequests.getByKey(RequestDiagnosticContext.create(), "sampleKey"); + + // when + final Mono<JsonObject> result = sut.flatMap(cbsClient -> cbsClient.get(request)); + + // then + StepVerifier.create(result) + .assertNext(json -> { + assertThat(json.get("key")).isNotNull(); + assertThat(json.get("key").getAsString()).isEqualTo("value"); + }) + .expectComplete() + .verify(Duration.ofSeconds(5)); + } + private String sampleConfigValue(JsonObject obj) { return obj.get(SAMPLE_CONFIG_KEY).getAsString(); } diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplTest.java index 9fd7cc88..78b79f9d 100644 --- a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplTest.java +++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplTest.java @@ -22,7 +22,6 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.BDDMockito.given; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; @@ -30,8 +29,15 @@ import static org.mockito.Mockito.verify; import com.google.gson.JsonObject; import java.net.InetSocketAddress; import org.junit.jupiter.api.Test; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpMethod; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpRequest; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.ImmutableHttpRequest; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.ImmutableHttpResponse; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClient; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests; import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext; -import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.CloudHttpClient; import reactor.core.publisher.Mono; /** @@ -39,24 +45,32 @@ import reactor.core.publisher.Mono; * @since February 2019 */ class CbsClientImplTest { - private final CloudHttpClient httpClient = mock(CloudHttpClient.class); + private final RxHttpClient httpClient = mock(RxHttpClient.class); @Test void shouldFetchUsingProperUrl() { // given InetSocketAddress cbsAddress = InetSocketAddress.createUnresolved("cbshost", 6969); String serviceName = "dcaegen2-ves-collector"; - final CbsClientImpl cut = CbsClientImpl.create(httpClient, cbsAddress, serviceName); - final JsonObject httpResponse = new JsonObject(); - given(httpClient.get(anyString(), any(RequestDiagnosticContext.class), any(Class.class))).willReturn(Mono.just(httpResponse)); + final CbsClient cut = new CbsClientImpl(httpClient, serviceName, cbsAddress); + final HttpResponse httpResponse = ImmutableHttpResponse.builder() + .url("http://xxx") + .statusCode(200) + .rawBody("{}".getBytes()) + .build(); + given(httpClient.call(any(HttpRequest.class))).willReturn(Mono.just(httpResponse)); RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create(); // when - final JsonObject result = cut.get(diagnosticContext).block(); + final JsonObject result = cut.get(CbsRequests.getConfiguration(diagnosticContext)).block(); // then final String expectedUrl = "http://cbshost:6969/service_component/dcaegen2-ves-collector"; - verify(httpClient).get(expectedUrl, diagnosticContext, JsonObject.class); - assertThat(result).isSameAs(httpResponse); + verify(httpClient).call(ImmutableHttpRequest.builder() + .method(HttpMethod.GET) + .url(expectedUrl) + .diagnosticContext(diagnosticContext) + .build()); + assertThat(result.toString()).isEqualTo(httpResponse.bodyAsString()); } }
\ No newline at end of file diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsLookupTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsLookupTest.java index 6843e0e3..e16605de 100644 --- a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsLookupTest.java +++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsLookupTest.java @@ -22,8 +22,11 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.isA; import static org.mockito.BDDMockito.given; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; import com.google.gson.JsonArray; import com.google.gson.JsonElement; @@ -31,9 +34,13 @@ import com.google.gson.JsonParser; import java.io.InputStreamReader; import java.net.InetSocketAddress; import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpMethod; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpRequest; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.ImmutableHttpResponse; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClient; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.ServiceLookupException; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties; -import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.CloudHttpClient; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.ImmutableEnvProperties; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; @@ -49,7 +56,7 @@ class CbsLookupTest { .consulHost("consul.local") .consulPort(8050) .appName("whatever").build(); - private final CloudHttpClient httpClient = mock(CloudHttpClient.class); + private final RxHttpClient httpClient = mock(RxHttpClient.class); private final CbsLookup cut = new CbsLookup(httpClient); @Test @@ -63,6 +70,14 @@ class CbsLookupTest { // then assertThat(result.getHostString()).isEqualTo("config-binding-service"); assertThat(result.getPort()).isEqualTo(10000); + + final String url = "http://" + + env.consulHost() + + ":" + + env.consulPort() + + "/v1/catalog/service/" + + env.cbsName(); + verifyHttpGetHasBeenCalled(url); } @Test @@ -82,14 +97,24 @@ class CbsLookupTest { } private void givenConsulResponse(JsonArray jsonArray) { - final String url = "http://" - + env.consulHost() - + ":" - + env.consulPort() - + "/v1/catalog/service/" - + env.cbsName(); - given(httpClient.get(url, JsonArray.class)) - .willReturn(Mono.just(jsonArray)); + given(httpClient.call(any(HttpRequest.class))) + .willReturn(Mono.just(ImmutableHttpResponse.builder() + .url("http://xxx") + .statusCode(200) + .rawBody(jsonArray.toString().getBytes()) + .build())); + } + + private void verifyHttpGetHasBeenCalled(String url) { + final ArgumentCaptor<HttpRequest> httpRequestArgumentCaptor = ArgumentCaptor.forClass(HttpRequest.class); + verify(httpClient).call(httpRequestArgumentCaptor.capture()); + assertThat(httpRequestArgumentCaptor.getValue().url()) + .describedAs("HTTP request URL") + .isEqualTo(url); + assertThat(httpRequestArgumentCaptor.getValue().method()) + .describedAs("HTTP request method") + .isEqualTo(HttpMethod.GET); } + }
\ No newline at end of file diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/DmaapUtilsTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/DmaapUtilsTest.java new file mode 100644 index 00000000..a26af446 --- /dev/null +++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/DmaapUtilsTest.java @@ -0,0 +1,103 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonObject; +import org.junit.jupiter.api.Test; +import org.onap.dcaegen2.services.sdk.model.streams.AafCredentials; +import org.onap.dcaegen2.services.sdk.model.streams.GsonAdaptersAafCredentials; +import org.onap.dcaegen2.services.sdk.model.streams.ImmutableAafCredentials; + +/** + * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> + * @since March 2019 + */ +class DmaapUtilsTest { + + @Test + void extractAafCredentials_shouldReturnNull_whenAllFieldsAreNull() { + // given + Gson gson = new GsonBuilder().registerTypeAdapterFactory(new GsonAdaptersAafCredentials()).create(); + JsonObject json = gson.fromJson("{\"aaf_username\":null,\"aaf_password\":null}", JsonObject.class); + + // when + final AafCredentials result = DmaapUtils.extractAafCredentials(gson, json); + + // then + assertThat(result).isNull(); + } + + @Test + void extractAafCredentials_shouldReturnNull_whenAllFieldsAreAbsent() { + // given + Gson gson = new GsonBuilder().registerTypeAdapterFactory(new GsonAdaptersAafCredentials()).create(); + JsonObject json = gson.fromJson("{\"whatever\":\"else\"}", JsonObject.class); + + // when + final AafCredentials result = DmaapUtils.extractAafCredentials(gson, json); + + // then + assertThat(result).isNull(); + } + + @Test + void extractAafCredentials_shouldReturnValue_whenBothFieldsAreSet() { + // given + Gson gson = new GsonBuilder().registerTypeAdapterFactory(new GsonAdaptersAafCredentials()).create(); + JsonObject json = gson.fromJson("{\"aaf_username\":\"uname\",\"aaf_password\":\"passwd\"}", JsonObject.class); + + // when + final AafCredentials result = DmaapUtils.extractAafCredentials(gson, json); + + // then + assertThat(result).isEqualTo(ImmutableAafCredentials.builder().username("uname").password("passwd").build()); + } + + @Test + void extractAafCredentials_shouldReturnValueWithUser_whenOnlyUserIsSet() { + // given + Gson gson = new GsonBuilder().registerTypeAdapterFactory(new GsonAdaptersAafCredentials()).create(); + JsonObject json = gson.fromJson("{\"aaf_username\":\"uname\"}", JsonObject.class); + + // when + final AafCredentials result = DmaapUtils.extractAafCredentials(gson, json); + + // then + assertThat(result).isEqualTo(ImmutableAafCredentials.builder().username("uname").build()); + } + + @Test + void extractAafCredentials_shouldReturnValueWithUser_whenPasswordIsNull() { + // given + Gson gson = new GsonBuilder().registerTypeAdapterFactory(new GsonAdaptersAafCredentials()).create(); + JsonObject json = gson.fromJson("{\"aaf_username\":\"uname\",\"aaf_password\":null}", JsonObject.class); + + // when + final AafCredentials result = DmaapUtils.extractAafCredentials(gson, json); + + // then + assertThat(result).isEqualTo(ImmutableAafCredentials.builder().username("uname").build()); + } +}
\ No newline at end of file diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSinkParserTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSinkParserTest.java new file mode 100644 index 00000000..90c69942 --- /dev/null +++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSinkParserTest.java @@ -0,0 +1,125 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.dr; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.google.gson.JsonObject; +import io.vavr.control.Either; +import java.io.IOException; +import org.junit.jupiter.api.Test; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParserError; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils; +import org.onap.dcaegen2.services.sdk.model.streams.DataStreamDirection; +import org.onap.dcaegen2.services.sdk.model.streams.ImmutableRawDataStream; +import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream; +import org.onap.dcaegen2.services.sdk.model.streams.StreamType; +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.DataRouterSink; +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableDataRouterSink; + + +class DataRouterSinkParserTest { + + private static final String SAMPLE_LOCATION = "mtc00"; + private static final String SAMPLE_PUBLISH_URL = "https://we-are-data-router.us/feed/xyz"; + private static final String SAMPLE_LOG_URL = "https://we-are-data-router.us/feed/xyz/logs"; + private static final String SAMPLE_USER = "some-user"; + private static final String SAMPLE_PASSWORD = "some-password"; + private static final String SAMPLE_PUBLISHER_ID = "123456"; + + private final StreamFromGsonParser<DataRouterSink> streamParser = StreamFromGsonParsers.dataRouterSinkParser(); + + @Test + void fullConfiguration_shouldGenerateDataRouterSinkObject() throws IOException { + // given + RawDataStream<JsonObject> input = DataStreamUtils.readSinkFromResource("/streams/data_router_sink_full.json"); + + // when + DataRouterSink result = streamParser.unsafeParse(input); + + // then + final DataRouterSink fullConfigurationStream = ImmutableDataRouterSink.builder() + .name(input.name()) + .location(SAMPLE_LOCATION) + .publishUrl(SAMPLE_PUBLISH_URL) + .logUrl(SAMPLE_LOG_URL) + .username(SAMPLE_USER) + .password(SAMPLE_PASSWORD) + .publisherId(SAMPLE_PUBLISHER_ID) + .build(); + assertThat(result).isEqualTo(fullConfigurationStream); + } + + @Test + void minimalConfiguration_shouldGenerateDataRouterSinkObject() throws IOException { + //given + RawDataStream<JsonObject> input = DataStreamUtils + .readSinkFromResource("/streams/data_router_sink_minimal.json"); + + // when + DataRouterSink result = streamParser.unsafeParse(input); + + // then + final DataRouterSink minimalConfigurationStream = ImmutableDataRouterSink.builder() + .name(input.name()) + .publishUrl(SAMPLE_PUBLISH_URL) + .build(); + assertThat(result).isEqualTo(minimalConfigurationStream); + } + + @Test + void incorrectConfiguration_shouldParseToStreamParserError() throws IOException { + // given + RawDataStream<JsonObject> input = DataStreamUtils.readSinkFromResource("/streams/message_router_full.json"); + + // when + Either<StreamParserError, DataRouterSink> result = streamParser.parse(input); + + // then + assertThat(result.getLeft()).isInstanceOf(StreamParserError.class); + result.peekLeft(error -> { + assertThat(error.message()).contains("Invalid stream type"); + assertThat(error.message()).contains("Expected '" + StreamType.DATA_ROUTER + "', but was '" + + StreamType.MESSAGE_ROUTER + "'"); + } + ); + } + + @Test + void emptyConfiguration_shouldParseToStreamParserError() { + // given + JsonObject json = new JsonObject(); + final ImmutableRawDataStream<JsonObject> input = ImmutableRawDataStream.<JsonObject>builder() + .name("empty") + .type(StreamType.DATA_ROUTER) + .descriptor(json) + .direction(DataStreamDirection.SINK) + .build(); + + // when + Either<StreamParserError, DataRouterSink> result = streamParser.parse(input); + + // then + assertThat(result.getLeft()).isInstanceOf(StreamParserError.class); + } + +}
\ No newline at end of file diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSourceParserTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSourceParserTest.java new file mode 100644 index 00000000..f704e523 --- /dev/null +++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/dr/DataRouterSourceParserTest.java @@ -0,0 +1,121 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.dr; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.google.gson.JsonObject; +import io.vavr.control.Either; +import java.io.IOException; +import org.junit.jupiter.api.Test; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParserError; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils; +import org.onap.dcaegen2.services.sdk.model.streams.DataStreamDirection; +import org.onap.dcaegen2.services.sdk.model.streams.ImmutableRawDataStream; +import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream; +import org.onap.dcaegen2.services.sdk.model.streams.StreamType; +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.DataRouterSource; +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableDataRouterSource; + +public class DataRouterSourceParserTest { + + private static final String SAMPLE_LOCATION = "mtc00"; + private static final String SAMPLE_DELIVERY_URL = "https://my-subscriber-app.dcae:8080/target-path"; + private static final String SAMPLE_USER = "some-user"; + private static final String SAMPLE_PASSWORD = "some-password"; + private static final String SAMPLE_SUBSCRIBER_ID = "789012"; + + private final StreamFromGsonParser<DataRouterSource> streamParser = StreamFromGsonParsers.dataRouterSourceParser(); + + @Test + void fullConfiguration_shouldGenerateDataRouterSinkObject() throws IOException { + // given + RawDataStream<JsonObject> input = DataStreamUtils.readSourceFromResource("/streams/data_router_source_full.json"); + + // when + DataRouterSource result = streamParser.unsafeParse(input); + + // then + + final DataRouterSource fullConfigurationStream = ImmutableDataRouterSource.builder() + .name(input.name()) + .location(SAMPLE_LOCATION) + .deliveryUrl(SAMPLE_DELIVERY_URL) + .username(SAMPLE_USER) + .password(SAMPLE_PASSWORD) + .subscriberId(SAMPLE_SUBSCRIBER_ID) + .build(); + assertThat(result).isEqualTo(fullConfigurationStream); + } + + @Test + void minimalConfiguration_shouldGenerateDataRouterSinkObject() throws IOException { + // given + RawDataStream<JsonObject> input = DataStreamUtils + .readSourceFromResource("/streams/data_router_source_minimal.json"); + + // when + DataRouterSource result = streamParser.unsafeParse(input); + + // then + final DataRouterSource minimalConfigurationStream = ImmutableDataRouterSource.builder() + .name(input.name()) + .build(); + assertThat(result).isEqualTo(minimalConfigurationStream); + } + + @Test + void incorrectConfiguration_shouldParseToStreamParserError() throws IOException { + // given + RawDataStream<JsonObject> input = DataStreamUtils.readSourceFromResource("/streams/message_router_full.json"); + + // when + Either<StreamParserError, DataRouterSource> result = streamParser.parse(input); + + // then + assertThat(result.getLeft()).isInstanceOf(StreamParserError.class); + result.peekLeft(error -> { + assertThat(error.message()).contains("Invalid stream type"); + assertThat(error.message()).contains("Expected '" + StreamType.DATA_ROUTER + "', but was '" + + StreamType.MESSAGE_ROUTER + "'"); + } + ); + } + + @Test + void emptyConfiguration_shouldBeParsedToStreamParserError() { + // given + JsonObject json = new JsonObject(); + final ImmutableRawDataStream<JsonObject> input = ImmutableRawDataStream.<JsonObject>builder() + .name("empty") + .type(StreamType.DATA_ROUTER) + .descriptor(json) + .direction(DataStreamDirection.SOURCE) + .build(); + + // when + Either<StreamParserError, DataRouterSource> result = streamParser.parse(input); + + // then + assertThat(result.getLeft()).isInstanceOf(StreamParserError.class); + } +} diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSinkParserTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSinkParserTest.java new file mode 100644 index 00000000..e3182c5c --- /dev/null +++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSinkParserTest.java @@ -0,0 +1,123 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.mr; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.google.gson.JsonObject; +import io.vavr.control.Either; +import java.io.IOException; +import org.junit.jupiter.api.Test; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParserError; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils; +import org.onap.dcaegen2.services.sdk.model.streams.DataStreamDirection; +import org.onap.dcaegen2.services.sdk.model.streams.ImmutableRawDataStream; +import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream; +import org.onap.dcaegen2.services.sdk.model.streams.StreamType; +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink; + +/** + * @author <a href="mailto:kornel.janiak@nokia.com">Kornel Janiak</a> + */ + +public class MessageRouterSinkParserTest { + + private static final String SAMPLE_AAF_USERNAME = "some-user"; + private static final String SAMPLE_AAF_PASSWORD = "some-password"; + private static final String SAMPLE_LOCATION = "mtc00"; + private static final String SAMPLE_CLIENT_ROLE = "com.dcae.member"; + private static final String SAMPLE_CLIENT_ID = "1500462518108"; + private static final String SAMPLE_TOPIC_URL = "https://we-are-message-router.us:3905/events/some-topic"; + + private final StreamFromGsonParser<MessageRouterSink> streamParser = StreamFromGsonParsers.messageRouterSinkParser(); + + @Test + void fullConfiguration_shouldGenerateDataRouterSinkObject() throws IOException { + // given + RawDataStream<JsonObject> input = DataStreamUtils.readSinkFromResource("/streams/message_router_full.json"); + + // when + MessageRouterSink result = streamParser.unsafeParse(input); + + // then + assertThat(result).isInstanceOf(MessageRouterSink.class); + assertThat(result.aafCredentials().username()).isEqualTo(SAMPLE_AAF_USERNAME); + assertThat(result.aafCredentials().password()).isEqualTo(SAMPLE_AAF_PASSWORD); + assertThat(result.location()).isEqualTo(SAMPLE_LOCATION); + assertThat(result.clientRole()).isEqualTo(SAMPLE_CLIENT_ROLE); + assertThat(result.clientId()).isEqualTo(SAMPLE_CLIENT_ID); + assertThat(result.topicUrl()).isEqualTo(SAMPLE_TOPIC_URL); + } + + @Test + void minimalConfiguration_shouldGenerateDataRouterSinkObject() throws IOException { + // given + RawDataStream<JsonObject> input = DataStreamUtils.readSinkFromResource("/streams/message_router_minimal.json"); + + // when + MessageRouterSink result = streamParser.unsafeParse(input); + + // then + assertThat(result).isInstanceOf(MessageRouterSink.class); + assertThat(result.topicUrl()).isEqualTo(SAMPLE_TOPIC_URL); + assertThat(result.aafCredentials()).isNull(); + assertThat(result.clientId()).isNull(); + } + + @Test + void incorrectConfiguration_shouldParseToStreamParserError() throws IOException { + // given + RawDataStream<JsonObject> input = DataStreamUtils.readSinkFromResource("/streams/data_router_sink_full.json"); + + // when + Either<StreamParserError, MessageRouterSink> result = streamParser.parse(input); + + // then + assertThat(result.getLeft()).isInstanceOf(StreamParserError.class); + result.peekLeft(error -> { + assertThat(error.message()).contains("Invalid stream type"); + assertThat(error.message()).contains("Expected '" + StreamType.MESSAGE_ROUTER + "', but was '" + + StreamType.DATA_ROUTER + "'"); + } + ); + } + + @Test + void emptyConfiguration_shouldParseToStreamParserError() { + // given + JsonObject json = new JsonObject(); + final ImmutableRawDataStream<JsonObject> input = ImmutableRawDataStream.<JsonObject>builder() + .name("empty") + .type(StreamType.MESSAGE_ROUTER) + .descriptor(json) + .direction(DataStreamDirection.SINK) + .build(); + + // when + Either<StreamParserError, MessageRouterSink> result = streamParser.parse(input); + + // then + assertThat(result.getLeft()).isInstanceOf(StreamParserError.class); + } + + +}
\ No newline at end of file diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSourceParserTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSourceParserTest.java new file mode 100644 index 00000000..51e56764 --- /dev/null +++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/dmaap/mr/MessageRouterSourceParserTest.java @@ -0,0 +1,119 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.mr; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.google.gson.JsonObject; +import io.vavr.control.Either; +import java.io.IOException; +import org.junit.jupiter.api.Test; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParserError; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils; +import org.onap.dcaegen2.services.sdk.model.streams.DataStreamDirection; +import org.onap.dcaegen2.services.sdk.model.streams.ImmutableRawDataStream; +import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream; +import org.onap.dcaegen2.services.sdk.model.streams.StreamType; +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSource; + +/** + * @author <a href="mailto:kornel.janiak@nokia.com">Kornel Janiak</a> + */ + +public class MessageRouterSourceParserTest { + + private static final String SAMPLE_AAF_USERNAME = "some-user"; + private static final String SAMPLE_AAF_PASSWORD = "some-password"; + private static final String SAMPLE_LOCATION = "mtc00"; + private static final String SAMPLE_CLIENT_ROLE = "com.dcae.member"; + private static final String SAMPLE_CLIENT_ID = "1500462518108"; + private static final String SAMPLE_TOPIC_URL = "https://we-are-message-router.us:3905/events/some-topic"; + + private final StreamFromGsonParser<MessageRouterSource> streamParser = StreamFromGsonParsers.messageRouterSourceParser(); + + @Test + void fullConfiguration_shouldGenerateDataRouterSourceObject() throws IOException { + // given + RawDataStream<JsonObject> input = DataStreamUtils.readSourceFromResource("/streams/message_router_full.json"); + + // when + MessageRouterSource result = streamParser.unsafeParse(input); + + // then + assertThat(result.aafCredentials().username()).isEqualTo(SAMPLE_AAF_USERNAME); + assertThat(result.aafCredentials().password()).isEqualTo(SAMPLE_AAF_PASSWORD); + assertThat(result.location()).isEqualTo(SAMPLE_LOCATION); + assertThat(result.clientRole()).isEqualTo(SAMPLE_CLIENT_ROLE); + assertThat(result.clientId()).isEqualTo(SAMPLE_CLIENT_ID); + assertThat(result.topicUrl()).isEqualTo(SAMPLE_TOPIC_URL); + } + + @Test + void minimalConfiguration_shouldGenerateDataRouterSourceObject() throws IOException { + // given + RawDataStream<JsonObject> input = DataStreamUtils.readSourceFromResource("/streams/message_router_minimal.json"); + + // when + MessageRouterSource result = streamParser.unsafeParse(input); + + // then + assertThat(result.topicUrl()).isEqualTo(SAMPLE_TOPIC_URL); + assertThat(result.aafCredentials()).isNull(); + assertThat(result.clientId()).isNull(); + } + + @Test + void incorrectConfiguration_shouldParseToStreamParserError() throws IOException { + // given + RawDataStream<JsonObject> input = DataStreamUtils.readSourceFromResource("/streams/data_router_sink_full.json"); + + // when + Either<StreamParserError, MessageRouterSource> result = streamParser.parse(input); + + // then + assertThat(result.getLeft()).isInstanceOf(StreamParserError.class); + result.peekLeft(error -> { + assertThat(error.message()).contains("Invalid stream type"); + assertThat(error.message()).contains("Expected '" + StreamType.MESSAGE_ROUTER + "', but was '" + + StreamType.DATA_ROUTER + "'"); + } + ); + } + + @Test + void emptyConfiguration_shouldParseToStreamParserError() { + // given + JsonObject json = new JsonObject(); + final ImmutableRawDataStream<JsonObject> input = ImmutableRawDataStream.<JsonObject>builder() + .name("empty") + .type(StreamType.MESSAGE_ROUTER) + .descriptor(json) + .direction(DataStreamDirection.SOURCE) + .build(); + // when + Either<StreamParserError, MessageRouterSource> result = streamParser.parse(input); + // then + assertThat(result.getLeft()).isInstanceOf(StreamParserError.class); + } + + +}
\ No newline at end of file diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSinkParserTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSinkParserTest.java index b5481203..2e4f71b3 100644 --- a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/KafkaSinkParserTest.java +++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSinkParserTest.java @@ -18,23 +18,21 @@ * ============LICENSE_END===================================== */ -package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson; +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka; import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.in; -import static org.junit.jupiter.api.Assertions.*; import com.google.gson.JsonObject; -import com.google.gson.JsonParser; -import io.vavr.Function1; import io.vavr.control.Either; import java.io.IOException; -import java.io.InputStreamReader; import org.junit.jupiter.api.Test; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParserError; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils; +import org.onap.dcaegen2.services.sdk.model.streams.ImmutableAafCredentials; +import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream; +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink; /** * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> @@ -52,7 +50,7 @@ class KafkaSinkParserTest { @Test void shouldParseMinimalKafkaSinkDefinition() throws IOException { // given - JsonObject input = GsonUtils.readObjectFromResource("/streams/kafka_sink_minimal.json"); + RawDataStream<JsonObject> input = DataStreamUtils.readSinkFromResource("/streams/kafka_sink_minimal.json"); // when final KafkaSink result = cut.unsafeParse(input); @@ -66,15 +64,19 @@ class KafkaSinkParserTest { } @Test - void shouldParseBasicKafkaSinkDefinition() throws IOException { + void shouldParseFullKafkaSinkDefinition() throws IOException { // given - JsonObject input = GsonUtils.readObjectFromResource("/streams/kafka_sink.json"); + RawDataStream<JsonObject> input = DataStreamUtils.readSinkFromResource("/streams/kafka_sink.json"); // when final KafkaSink result = cut.unsafeParse(input); // then - assertThat(result.aafCredentials()).isNull(); + final ImmutableAafCredentials expectedCredentials = ImmutableAafCredentials.builder() + .username("the user") + .password("the passwd") + .build(); + assertThat(result.aafCredentials()).isEqualTo(expectedCredentials); assertThat(result.bootstrapServers()).isEqualTo("dmaap-mr-kafka-0:6060,dmaap-mr-kafka-1:6060"); assertThat(result.topicName()).isEqualTo("HVVES_PERF3GPP"); assertThat(result.clientId()).isEqualTo("1500462518108"); @@ -84,7 +86,7 @@ class KafkaSinkParserTest { @Test void shouldReturnErrorWhenStructureIsWrong() throws IOException { // given - JsonObject input = GsonUtils.readObjectFromResource("/streams/kafka_sink_missing_child.json"); + RawDataStream<JsonObject> input = DataStreamUtils.readSinkFromResource("/streams/kafka_sink_missing_child.json"); // when final Either<StreamParserError, KafkaSink> result = cut.parse(input); @@ -99,7 +101,7 @@ class KafkaSinkParserTest { @Test void shouldReturnErrorWhenTypeIsWrong() throws IOException { // given - JsonObject input = GsonUtils.readObjectFromResource("/streams/kafka_sink_invalid_type.json"); + RawDataStream<JsonObject> input = DataStreamUtils.readSinkFromResource("/streams/kafka_invalid_type.json"); // when final Either<StreamParserError, KafkaSink> result = cut.parse(input); @@ -112,4 +114,19 @@ class KafkaSinkParserTest { assertThat(error.message()).containsIgnoringCase("message_router"); }); } + + @Test + void shouldReturnErrorWhenDirectionIsWrong() throws IOException { + // given + RawDataStream<JsonObject> input = DataStreamUtils.readSourceFromResource("/streams/kafka_sink.json"); + + // when + final Either<StreamParserError, KafkaSink> result = cut.parse(input); + + // then + assertThat(result.isRight()).describedAs("should not be right").isFalse(); + result.peekLeft(error -> { + assertThat(error.message()).containsIgnoringCase("invalid stream direction"); + }); + } }
\ No newline at end of file diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSourceParserTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSourceParserTest.java new file mode 100644 index 00000000..1e8e3f52 --- /dev/null +++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/kafka/KafkaSourceParserTest.java @@ -0,0 +1,119 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.google.gson.JsonObject; +import io.vavr.collection.List; +import io.vavr.control.Either; +import java.io.IOException; +import org.junit.jupiter.api.Test; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParserError; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.DataStreamUtils; +import org.onap.dcaegen2.services.sdk.model.streams.ImmutableAafCredentials; +import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream; +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSource; + +/** + * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> + * @since March 2019 + */ +class KafkaSourceParserTest { + + private final StreamFromGsonParser<KafkaSource> cut = StreamFromGsonParsers.kafkaSourceParser(); + + @Test + void precondition_assureInstanceOf() { + assertThat(cut).isInstanceOf(KafkaSourceParser.class); + } + + @Test + void shouldParseMinimalKafkaSourceDefinition() throws IOException { + // given + RawDataStream<JsonObject> input = DataStreamUtils.readSourceFromResource("/streams/kafka_source_minimal.json"); + + // when + final KafkaSource result = cut.unsafeParse(input); + + // then + assertThat(result.aafCredentials()).isNull(); + assertThat(result.bootstrapServers()).isEqualTo("dmaap-mr-kafka-0:6060,dmaap-mr-kafka-1:6060"); + assertThat(result.topicName()).isEqualTo("HVVES_PERF3GPP"); + assertThat(result.clientId()).isNull(); + assertThat(result.clientRole()).isNull(); + } + + @Test + void shouldParseFullKafkaSourceDefinition() throws IOException { + // given + RawDataStream<JsonObject> input = DataStreamUtils.readSourceFromResource("/streams/kafka_source.json"); + + // when + final KafkaSource result = cut.unsafeParse(input); + + // then + final ImmutableAafCredentials expectedCredentials = ImmutableAafCredentials.builder() + .username("the user") + .password("the passwd") + .build(); + assertThat(result.aafCredentials()).isEqualTo(expectedCredentials); + assertThat(result.bootstrapServerList()).isEqualTo(List.of("dmaap-mr-kafka-0:6060", "dmaap-mr-kafka-1:6060")); + assertThat(result.topicName()).isEqualTo("HVVES_PERF3GPP"); + assertThat(result.consumerGroupId()).isEqualTo("nokia-perf3gpp-processor"); + assertThat(result.clientId()).isEqualTo("1500462518108"); + assertThat(result.clientRole()).isEqualTo("com.dcae.member"); + } + + @Test + void shouldReturnErrorWhenTypeIsWrong() throws IOException { + // given + RawDataStream<JsonObject> input = DataStreamUtils.readSourceFromResource("/streams/kafka_invalid_type.json"); + + // when + final Either<StreamParserError, KafkaSource> result = cut.parse(input); + + // then + assertThat(result.isRight()).describedAs("should not be right").isFalse(); + result.peekLeft(error -> { + assertThat(error.message()).containsIgnoringCase("invalid stream type"); + assertThat(error.message()).containsIgnoringCase("kafka"); + assertThat(error.message()).containsIgnoringCase("message_router"); + }); + } + + @Test + void shouldReturnErrorWhenDirectionIsWrong() throws IOException { + // given + RawDataStream<JsonObject> input = DataStreamUtils.readSinkFromResource("/streams/kafka_source.json"); + + // when + final Either<StreamParserError, KafkaSource> result = cut.parse(input); + + // then + assertThat(result.isRight()).describedAs("should not be right").isFalse(); + result.peekLeft(error -> { + assertThat(error.message()).containsIgnoringCase("invalid stream direction"); + }); + } +}
\ No newline at end of file diff --git a/rest-services/cbs-client/src/test/resources/sample_all.json b/rest-services/cbs-client/src/test/resources/sample_all.json new file mode 100644 index 00000000..ac4ebf29 --- /dev/null +++ b/rest-services/cbs-client/src/test/resources/sample_all.json @@ -0,0 +1,41 @@ +{ + "config": { + "keystore.path": "/var/run/security/keystore.p12", + "streams_publishes": { + "perf3gpp": { + "type": "kafka", + "kafka_info": { + "bootstrap_servers": "dmaap-mr-kafka:6060", + "topic_name": "HVVES_PERF3GPP" + } + }, + "pnf_ready": { + "type": "message_router", + "dmaap_info": { + "topic_url": "http://message-router:3904/events/VES_PNF_READY" + } + }, + "call_trace": { + "type": "kafka", + "kafka_info": { + "bootstrap_servers": "dmaap-mr-kafka:6060", + "topic_name": "HVVES_TRACE" + } + } + }, + "streams_subscribes": { + "measurements": { + "type": "message_router", + "dmaap_info": { + "topic_url": "http://message-router:3904/events/VES_MEASUREMENT" + } + } + } + }, + "policies": { + "samplePolicy": "sample value" + }, + "sampleKey": { + "key": "value" + } +}
\ No newline at end of file diff --git a/rest-services/cbs-client/src/test/resources/sample_config.json b/rest-services/cbs-client/src/test/resources/sample_config.json deleted file mode 100644 index a95b723f..00000000 --- a/rest-services/cbs-client/src/test/resources/sample_config.json +++ /dev/null @@ -1,3 +0,0 @@ -{ - "keystore.path": "/var/run/security/keystore.p12" -} diff --git a/rest-services/cbs-client/src/test/resources/sample_key.json b/rest-services/cbs-client/src/test/resources/sample_key.json new file mode 100644 index 00000000..21da3b26 --- /dev/null +++ b/rest-services/cbs-client/src/test/resources/sample_key.json @@ -0,0 +1,3 @@ +{ + "key": "value" +} diff --git a/rest-services/cbs-client/src/test/resources/sample_service_config.json b/rest-services/cbs-client/src/test/resources/sample_service_config.json new file mode 100644 index 00000000..266326f4 --- /dev/null +++ b/rest-services/cbs-client/src/test/resources/sample_service_config.json @@ -0,0 +1,33 @@ +{ + "keystore.path": "/var/run/security/keystore.p12", + "streams_publishes": { + "perf3gpp": { + "type": "kafka", + "kafka_info": { + "bootstrap_servers": "dmaap-mr-kafka:6060", + "topic_name": "HVVES_PERF3GPP" + } + }, + "pnf_ready": { + "type": "message_router", + "dmaap_info": { + "topic_url": "http://message-router:3904/events/VES_PNF_READY" + } + }, + "call_trace": { + "type": "kafka", + "kafka_info": { + "bootstrap_servers": "dmaap-mr-kafka:6060", + "topic_name": "HVVES_TRACE" + } + } + }, + "streams_subscribes": { + "measurements": { + "type": "message_router", + "dmaap_info": { + "topic_url": "http://message-router:3904/events/VES_MEASUREMENT" + } + } + } +} diff --git a/rest-services/cbs-client/src/test/resources/streams/data_router_sink_full.json b/rest-services/cbs-client/src/test/resources/streams/data_router_sink_full.json new file mode 100644 index 00000000..fc43a827 --- /dev/null +++ b/rest-services/cbs-client/src/test/resources/streams/data_router_sink_full.json @@ -0,0 +1,11 @@ +{ + "type": "data_router", + "dmaap_info": { + "location": "mtc00", + "publish_url": "https://we-are-data-router.us/feed/xyz", + "log_url": "https://we-are-data-router.us/feed/xyz/logs", + "username": "some-user", + "password": "some-password", + "publisher_id": "123456" + } +}
\ No newline at end of file diff --git a/rest-services/cbs-client/src/test/resources/streams/data_router_sink_minimal.json b/rest-services/cbs-client/src/test/resources/streams/data_router_sink_minimal.json new file mode 100644 index 00000000..8f76ecae --- /dev/null +++ b/rest-services/cbs-client/src/test/resources/streams/data_router_sink_minimal.json @@ -0,0 +1,6 @@ +{ + "type": "data_router", + "dmaap_info": { + "publish_url": "https://we-are-data-router.us/feed/xyz" + } +}
\ No newline at end of file diff --git a/rest-services/cbs-client/src/test/resources/streams/data_router_source_full.json b/rest-services/cbs-client/src/test/resources/streams/data_router_source_full.json new file mode 100644 index 00000000..56d269cd --- /dev/null +++ b/rest-services/cbs-client/src/test/resources/streams/data_router_source_full.json @@ -0,0 +1,10 @@ +{ + "type": "data_router", + "dmaap_info": { + "location": "mtc00", + "delivery_url": "https://my-subscriber-app.dcae:8080/target-path", + "username": "some-user", + "password": "some-password", + "subscriber_id": "789012" + } +}
\ No newline at end of file diff --git a/rest-services/cbs-client/src/test/resources/streams/data_router_source_minimal.json b/rest-services/cbs-client/src/test/resources/streams/data_router_source_minimal.json new file mode 100644 index 00000000..8e522ba7 --- /dev/null +++ b/rest-services/cbs-client/src/test/resources/streams/data_router_source_minimal.json @@ -0,0 +1,5 @@ +{ + "type": "data_router", + "dmaap_info": { + } +}
\ No newline at end of file diff --git a/rest-services/cbs-client/src/test/resources/streams/integration_message_router.json b/rest-services/cbs-client/src/test/resources/streams/integration_message_router.json new file mode 100644 index 00000000..d38b0cce --- /dev/null +++ b/rest-services/cbs-client/src/test/resources/streams/integration_message_router.json @@ -0,0 +1,62 @@ +{ + "collector.schema.file": "./etc/CommonEventFormat_27.2.json", + "collector.service.port": 8080, + "collector.dmaap.streamid": "fault=sec_fault,roadm-sec-to-hp|syslog=sec_syslog|heartbeat=sec_heartbeat|measurementsForVfScaling=sec_measurement|mobileFlow=sec_mobileflow|other=sec_other|stateChange=sec_statechange|thresholdCrossingAlert=sec_thresholdCrossingAlert", + "collector.schema.checkflag": 1, + "tomcat.maxthreads": "200", + "collector.keystore.passwordfile": "/opt/app/dcae-certificate/.password", + "streams_subscribes": {}, + "services_calls": {}, + "collector.inputQueue.maxPending": 8096, + "header.authflag": 0, + "collector.keystore.file.location": "/opt/app/dcae-certificate/keystore.jks", + "collector.service.secure.port": -1, + "header.authlist": "userid1,base64encodepwd1|userid2,base64encodepwd2", + "collector.keystore.alias": "dynamically generated", + "streams_publishes": { + "sec_measurement": { + "type": "message_router", + "aaf_password": "aaf_password", + "dmaap_info": { + "location": "mtl5", + "client_id": "111111", + "client_role": "com.att.dcae.member", + "topic_url": "https://mrlocal:3905/events/com.att.dcae.dmaap.FTL2.SEC-MEASUREMENT-OUTPUT" + }, + "aaf_username": "aaf_username" + }, + "sec_fault_unsecure": { + "type": "message_router", + "aaf_password": null, + "dmaap_info": { + "location": "mtl5", + "client_id": null, + "client_role": null, + "topic_url": "http://ueb.global:3904/events/DCAE-SE-COLLECTOR-EVENTS-DEV" + }, + "aaf_username": null + }, + "sec_measurement_unsecure": { + "type": "message_router", + "aaf_password": null, + "dmaap_info": { + "location": "mtl5", + "client_id": null, + "client_role": null, + "topic_url": "http://ueb.global:3904/events/DCAE-SE-COLLECTOR-EVENTS-DEV" + }, + "aaf_username": null + }, + "sec_fault": { + "type": "message_router", + "aaf_password": "aaf_password", + "dmaap_info": { + "location": "mtl5", + "client_id": "222222", + "client_role": "com.att.dcae.member", + "topic_url": "https://mrlocal:3905/events/com.att.dcae.dmaap.FTL2.SEC-FAULT-OUTPUT" + }, + "aaf_username": "aaf_username" + } + } +} diff --git a/rest-services/cbs-client/src/test/resources/streams/integration_mixed_dmaap.json b/rest-services/cbs-client/src/test/resources/streams/integration_mixed_dmaap.json new file mode 100644 index 00000000..acc7b987 --- /dev/null +++ b/rest-services/cbs-client/src/test/resources/streams/integration_mixed_dmaap.json @@ -0,0 +1,80 @@ +{ + + "streams_subscribes": { + "DCAE_GUEST_OS": { + "type": "data_router", + "dmaap_info": { + "username": "xyz", + "password": "abc", + "location": "mtn23", + "delivery_url": "https://dr.global:8666/DCAE_SAM_GUEST_OS", + "subscriber_id": "811" + } + }, + "DCAE_RAW_DATA": { + "type": "data_router", + "dmaap_info": { + "username": "abc", + "password": "xyz", + "location": "mtn23", + "delivery_url": "https://dr.global:8666/DCAE_CEILOMETER_RAW_DATA", + "subscriber_id": "812" + } + }, + "sec-measurement-output": { + "type": "message_router", + "aaf_password": "aaf_password", + "dmaap_info": { + "topic_url": "https://mr.hostname:3905/events/com.att.dcae.dmaap.SEC-MEASUREMENT-OUTPUT-v1", + "client_role": "com.att.dcae.member", + "location": "mtn23", + "client_id": "1111" + }, + "aaf_username": "aaf_username" + + } + + }, + + "streams_publishes": { + + "DCAE_VOIP_PM_DATA": { + "type": "data_router", + "dmaap_info": { + "username": "abc", + "log_url": "https://dcae-drps/feedlog/206", + "publish_url": "https://dcae-drps/publish/206", + "location": "mtn23", + "password": "xyz", + "publisher_id": "206.518hu" + + } + }, + + "DCAE_GUEST_OS_O": { + "type": "data_router", + "dmaap_info": { + "username": "axyz", + "log_url": "https://dcae-drps/feedlog/203", + "publish_url": "https://dcae-drps/publish/203", + "location": "mtn23", + "password": "abc", + + "publisher_id": "203.2od8s" + } + }, + + "DCAE_PM_DATA": { + "type": "data_router", + "dmaap_info": { + "username": "xyz", + "log_url": "https://dcae-drps/feedlog/493", + "publish_url": "https://dcae-drps/publish/493", + "location": "mtn23bdce2", + "password": "abc", + "publisher_id": "493.eacqs" + } + } + } + +}
\ No newline at end of file diff --git a/rest-services/cbs-client/src/test/resources/streams/kafka_sink_invalid_type.json b/rest-services/cbs-client/src/test/resources/streams/kafka_invalid_type.json index 0ee88adb..0ee88adb 100644 --- a/rest-services/cbs-client/src/test/resources/streams/kafka_sink_invalid_type.json +++ b/rest-services/cbs-client/src/test/resources/streams/kafka_invalid_type.json diff --git a/rest-services/cbs-client/src/test/resources/streams/kafka_sink.json b/rest-services/cbs-client/src/test/resources/streams/kafka_sink.json index b60388d5..e7b45508 100644 --- a/rest-services/cbs-client/src/test/resources/streams/kafka_sink.json +++ b/rest-services/cbs-client/src/test/resources/streams/kafka_sink.json @@ -1,5 +1,9 @@ { "type": "kafka", + "aaf_credentials": { + "username": "the user", + "password": "the passwd" + }, "kafka_info": { "client_role": "com.dcae.member", "client_id": "1500462518108", diff --git a/rest-services/cbs-client/src/test/resources/streams/kafka_source.json b/rest-services/cbs-client/src/test/resources/streams/kafka_source.json new file mode 100644 index 00000000..379dbef1 --- /dev/null +++ b/rest-services/cbs-client/src/test/resources/streams/kafka_source.json @@ -0,0 +1,14 @@ +{ + "type": "kafka", + "aaf_credentials": { + "username": "the user", + "password": "the passwd" + }, + "kafka_info": { + "client_role": "com.dcae.member", + "client_id": "1500462518108", + "bootstrap_servers": "dmaap-mr-kafka-0:6060,,dmaap-mr-kafka-1:6060,", + "topic_name": "HVVES_PERF3GPP", + "consumer_group_id": "nokia-perf3gpp-processor" + } +} diff --git a/rest-services/cbs-client/src/test/resources/streams/message_router_full.json b/rest-services/cbs-client/src/test/resources/streams/message_router_full.json new file mode 100644 index 00000000..be1725ec --- /dev/null +++ b/rest-services/cbs-client/src/test/resources/streams/message_router_full.json @@ -0,0 +1,11 @@ +{ + "type": "message_router", + "aaf_username": "some-user", + "aaf_password": "some-password", + "dmaap_info": { + "client_role": "com.dcae.member", + "client_id": "1500462518108", + "location": "mtc00", + "topic_url": "https://we-are-message-router.us:3905/events/some-topic" + } +}
\ No newline at end of file diff --git a/rest-services/cbs-client/src/test/resources/streams/message_router_minimal.json b/rest-services/cbs-client/src/test/resources/streams/message_router_minimal.json new file mode 100644 index 00000000..bd504b85 --- /dev/null +++ b/rest-services/cbs-client/src/test/resources/streams/message_router_minimal.json @@ -0,0 +1,6 @@ +{ + "type": "message_router", + "dmaap_info": { + "topic_url": "https://we-are-message-router.us:3905/events/some-topic" + } +}
\ No newline at end of file diff --git a/rest-services/common-dependency/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/CloudHttpClient.java b/rest-services/common-dependency/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/CloudHttpClient.java index e142081e..132d3d83 100644 --- a/rest-services/common-dependency/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/CloudHttpClient.java +++ b/rest-services/common-dependency/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/CloudHttpClient.java @@ -21,133 +21,119 @@ package org.onap.dcaegen2.services.sdk.rest.services.adapters.http; import com.google.gson.Gson; -import io.netty.handler.codec.http.HttpStatusClass; import io.netty.handler.ssl.SslContext; -import io.vavr.collection.Stream; -import java.io.IOException; +import io.vavr.collection.HashMap; import java.util.Collections; import java.util.Map; -import java.util.function.BiConsumer; -import java.util.stream.Collectors; import org.onap.dcaegen2.services.sdk.rest.services.model.ClientModel; import org.onap.dcaegen2.services.sdk.rest.services.model.JsonBodyBuilder; import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Mono; -import reactor.netty.ByteBufFlux; -import reactor.netty.http.client.HttpClient; -import reactor.netty.http.client.HttpClientRequest; import reactor.netty.http.client.HttpClientResponse; /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 11/15/18 + * @deprecated use {@link RxHttpClient} instead */ - +@Deprecated public class CloudHttpClient { private static final Logger LOGGER = LoggerFactory.getLogger(CloudHttpClient.class); private final Gson gson = new Gson(); - private final HttpClient httpClient; - - public CloudHttpClient() { - this(HttpClient.create()); - } + private final RxHttpClient httpClient; - public CloudHttpClient(SslContext sslContext) { - this(HttpClient.create().secure(sslContextSpec -> sslContextSpec.sslContext(sslContext))); - } - - CloudHttpClient(HttpClient httpClient) { + CloudHttpClient(RxHttpClient httpClient) { this.httpClient = httpClient; - } - public <T> Mono<T> get(String url, RequestDiagnosticContext context, Class<T> bodyClass) { - return get(url, context, Collections.EMPTY_MAP, bodyClass); + public CloudHttpClient() { + this(RxHttpClient.create()); } - public <T> Mono<T> get(String url, RequestDiagnosticContext context, Map<String, String> customHeaders, - Class<T> bodyClass) { - final HttpClient clientWithHeaders = getHttpClientWithHeaders(context, customHeaders); - return callHttpGet(clientWithHeaders, url, bodyClass); + public CloudHttpClient(SslContext sslContext) { + this(RxHttpClient.create(sslContext)); } public <T> Mono<T> get(String url, Class<T> bodyClass) { - return callHttpGet(httpClient, url, bodyClass); + return get(url, RequestDiagnosticContext.create(), bodyClass); } - public Mono<HttpClientResponse> post(String url, RequestDiagnosticContext context, Map<String, String> customHeaders, - JsonBodyBuilder jsonBodyBuilder, ClientModel clientModel) { - final HttpClient clientWithHeaders = getHttpClientWithHeaders(context, customHeaders); - return callHttpPost(clientWithHeaders, url, jsonBodyBuilder, clientModel); - } - - public Mono<HttpClientResponse> patch(String url, RequestDiagnosticContext context, Map<String, String> customHeaders, - JsonBodyBuilder jsonBodyBuilder, ClientModel clientModel) { - final HttpClient clientWithHeaders = getHttpClientWithHeaders(context, customHeaders); - return callHttpPatch(clientWithHeaders, url, jsonBodyBuilder, clientModel); - } - - private HttpClient getHttpClientWithHeaders(RequestDiagnosticContext context, Map<String, String> customHeaders) { - final HttpClient clientWithHeaders = httpClient - .doOnRequest((req, conn) -> logRequest(context, req)) - .doOnResponse((rsp, conn) -> logResponse(context, rsp)) - .headers(hdrs -> context.remoteCallHttpHeaders().forEach((BiConsumer<String, String>) hdrs::set)) - .headers(hdrs -> customHeaders.forEach(hdrs::set)); - return clientWithHeaders; - } - - private <T> Mono<T> callHttpGet(HttpClient client, String url, Class<T> bodyClass) { - return client.get() - .uri(url) - .responseSingle((resp, content) -> HttpStatusClass.SUCCESS.contains(resp.status().code()) - ? content.asString() - : Mono.error(createException(url, resp))) - .map(body -> parseJson(body, bodyClass)); - } - - private <T extends ClientModel> Mono<HttpClientResponse> callHttpPost(HttpClient client, String url, - JsonBodyBuilder<T> jsonBodyBuilder, T clientModel) { - return client.baseUrl(url).post() - .send(ByteBufFlux.fromString(Mono.just(jsonBodyBuilder.createJsonBody(clientModel)))) - .responseSingle((httpClientResponse, byteBufMono) -> Mono.just(httpClientResponse)); + public <T> Mono<T> get(String url, RequestDiagnosticContext context, Class<T> bodyClass) { + return get(url, context, Collections.emptyMap(), bodyClass); } - private <T extends ClientModel> Mono<HttpClientResponse> callHttpPatch(HttpClient client, String url, - JsonBodyBuilder<T> jsonBodyBuilder, T clientModel) { - return client.baseUrl(url).patch() - .send(ByteBufFlux.fromString(Mono.just(jsonBodyBuilder.createJsonBody(clientModel)))) + public <T> Mono<T> get( + String url, + RequestDiagnosticContext context, + Map<String, String> customHeaders, + Class<T> bodyClass) { + return httpClient.call( + ImmutableHttpRequest.builder() + .method(HttpMethod.GET) + .url(url) + .customHeaders(HashMap.ofAll(customHeaders)) + .diagnosticContext(context) + .build()) + .doOnNext(HttpResponse::throwIfUnsuccessful) + .map(HttpResponse::bodyAsString) + .map(body -> gson.fromJson(body, bodyClass)); + } + + + public Mono<HttpClientResponse> post( + String url, + RequestDiagnosticContext context, + Map<String, String> customHeaders, + JsonBodyBuilder jsonBodyBuilder, + ClientModel clientModel) { + return callForRawResponse(url, context, customHeaders, jsonBodyBuilder, clientModel, HttpMethod.POST); + } + + public Mono<HttpClientResponse> patch( + String url, + RequestDiagnosticContext context, + Map<String, String> customHeaders, + JsonBodyBuilder jsonBodyBuilder, + ClientModel clientModel) { + return callForRawResponse(url, context, customHeaders, jsonBodyBuilder, clientModel, HttpMethod.PATCH); + } + + public Mono<HttpClientResponse> put( + String url, + RequestDiagnosticContext context, + Map<String, String> customHeaders, + JsonBodyBuilder jsonBodyBuilder, + ClientModel clientModel) { + return callForRawResponse(url, context, customHeaders, jsonBodyBuilder, clientModel, HttpMethod.PUT); + } + + private Mono<HttpClientResponse> callForRawResponse( + String url, + RequestDiagnosticContext context, + Map<String, String> customHeaders, + JsonBodyBuilder jsonBodyBuilder, + ClientModel clientModel, + HttpMethod method) { + + String jsonBody = jsonBodyBuilder.createJsonBody(clientModel); + LOGGER.debug("CloudHttpClient JSon body:: {}", jsonBody); + LOGGER.debug("CloudHttpClient url: {}", url); + LOGGER.debug("CloudHttpClient customHeaders: {}", customHeaders); + + return httpClient.prepareRequest( + ImmutableHttpRequest.builder() + .url(url) + .customHeaders(HashMap.ofAll(customHeaders)) + .diagnosticContext(context) + .body(RequestBody.fromString(jsonBody)) + .method(method) + .build()) .responseSingle((httpClientResponse, byteBufMono) -> Mono.just(httpClientResponse)); } - private Exception createException(String url, HttpClientResponse response) { - return new IOException(String.format("Request failed for URL '%s'. Response code: %s", - url, - response.status())); - } - - private <T> T parseJson(String body, Class<T> bodyClass) { - return gson.fromJson(body, bodyClass); - } - private void logRequest(RequestDiagnosticContext context, HttpClientRequest httpClientRequest) { - context.withSlf4jMdc(LOGGER.isDebugEnabled(), () -> { - LOGGER.debug("Request: {} {}", httpClientRequest.method(), httpClientRequest.uri()); - if (LOGGER.isTraceEnabled()) { - final String headers = Stream.ofAll(httpClientRequest.requestHeaders()) - .map(entry -> entry.getKey() + "=" + entry.getValue()) - .collect(Collectors.joining("\n")); - LOGGER.trace(headers); - } - }); - } - - private void logResponse(RequestDiagnosticContext context, HttpClientResponse httpClientResponse) { - context.withSlf4jMdc(LOGGER.isDebugEnabled(), () -> { - LOGGER.debug("Response status: {}", httpClientResponse.status()); - }); - } } diff --git a/rest-services/common-dependency/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/HttpMethod.java b/rest-services/common-dependency/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/HttpMethod.java new file mode 100644 index 00000000..78e6789e --- /dev/null +++ b/rest-services/common-dependency/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/HttpMethod.java @@ -0,0 +1,48 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.adapters.http; + +/** + * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> + * @since March 2019 + */ +public enum HttpMethod { + + CONNECT(io.netty.handler.codec.http.HttpMethod.CONNECT), + DELETE(io.netty.handler.codec.http.HttpMethod.DELETE), + GET(io.netty.handler.codec.http.HttpMethod.GET), + HEAD(io.netty.handler.codec.http.HttpMethod.HEAD), + OPTIONS(io.netty.handler.codec.http.HttpMethod.OPTIONS), + POST(io.netty.handler.codec.http.HttpMethod.POST), + PATCH(io.netty.handler.codec.http.HttpMethod.PATCH), + PUT(io.netty.handler.codec.http.HttpMethod.PUT), + TRACE(io.netty.handler.codec.http.HttpMethod.TRACE); + + private final io.netty.handler.codec.http.HttpMethod nettyMethod; + + HttpMethod(io.netty.handler.codec.http.HttpMethod nettyMethod) { + this.nettyMethod = nettyMethod; + } + + io.netty.handler.codec.http.HttpMethod asNetty() { + return nettyMethod; + } +} diff --git a/rest-services/common-dependency/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/HttpRequest.java b/rest-services/common-dependency/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/HttpRequest.java new file mode 100644 index 00000000..78660833 --- /dev/null +++ b/rest-services/common-dependency/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/HttpRequest.java @@ -0,0 +1,68 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.adapters.http; + +import io.netty.buffer.ByteBuf; +import io.vavr.collection.HashMap; +import io.vavr.collection.Map; +import java.util.function.BiFunction; +import org.immutables.value.Value; +import org.jetbrains.annotations.Nullable; +import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Mono; +import reactor.netty.NettyOutbound; +import reactor.netty.http.client.HttpClientRequest; + +/** + * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> + * @since March 2019 + */ +@Value.Immutable +public interface HttpRequest { + + String url(); + + HttpMethod method(); + + @Value.Default + default RequestDiagnosticContext diagnosticContext() { + return RequestDiagnosticContext.create(); + } + + @Value.Default + default Map<String, String> customHeaders() { + return HashMap.empty(); + } + + @Value.Default + default Publisher<ByteBuf> body() { + return Mono.empty(); + } + + @Value.Derived + default Map<String, String> headers() { + final RequestDiagnosticContext ctx = diagnosticContext(); + return ctx == null + ? customHeaders() + : customHeaders().merge(ctx.remoteCallHttpHeaders()); + } +} diff --git a/rest-services/common-dependency/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/HttpResponse.java b/rest-services/common-dependency/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/HttpResponse.java new file mode 100644 index 00000000..ce100478 --- /dev/null +++ b/rest-services/common-dependency/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/HttpResponse.java @@ -0,0 +1,77 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.adapters.http; + +import com.google.gson.Gson; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import org.immutables.value.Value; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.exceptions.HttpException; + +/** + * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> + * @since March 2019 + */ +@Value.Immutable +public interface HttpResponse { + + String url(); + + int statusCode(); + + byte[] rawBody(); + + @Value.Default + default String statusReason() { + return ""; + } + + @Value.Derived + default boolean successful() { + return statusCode() >= 200 && statusCode() < 300; + } + + @Value.Derived + default String bodyAsString() { + return bodyAsString(StandardCharsets.UTF_8); + } + + @Value.Derived + default String bodyAsString(Charset charset) { + return new String(rawBody(), charset); + } + + @Value.Derived + default <T> T bodyAsJson(Class<T> clazz) { + return bodyAsJson(StandardCharsets.UTF_8, new Gson(), clazz); + } + + @Value.Derived + default <T> T bodyAsJson(Charset charset, Gson gson, Class<T> clazz) { + return gson.fromJson(bodyAsString(charset), clazz); + } + + default void throwIfUnsuccessful() { + if (!successful()) { + throw new HttpException(url(), statusCode(), statusReason()); + } + } +} diff --git a/rest-services/common-dependency/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/NettyHttpResponse.java b/rest-services/common-dependency/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/NettyHttpResponse.java new file mode 100644 index 00000000..3dcd7098 --- /dev/null +++ b/rest-services/common-dependency/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/NettyHttpResponse.java @@ -0,0 +1,73 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.adapters.http; + +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.HttpStatusClass; +import java.nio.charset.Charset; + +/** + * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> + * @since March 2019 + */ +class NettyHttpResponse implements HttpResponse { + + private final String url; + private final HttpResponseStatus status; + private final byte[] body; + + NettyHttpResponse(String url, HttpResponseStatus status, byte[] body) { + this.url = url; + this.status = status; + this.body = body; + } + + @Override + public String url() { + return url; + } + + @Override + public boolean successful() { + return status.codeClass() == HttpStatusClass.SUCCESS; + } + + @Override + public int statusCode() { + return status.code(); + } + + @Override + public String statusReason() { + return status.reasonPhrase(); + } + + @Override + public byte[] rawBody() { + return new byte[0]; + } + + @Override + public String bodyAsString(Charset charset) { + return new String(body, charset); + } + +} diff --git a/rest-services/common-dependency/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RequestBody.java b/rest-services/common-dependency/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RequestBody.java new file mode 100644 index 00000000..514ea0bf --- /dev/null +++ b/rest-services/common-dependency/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RequestBody.java @@ -0,0 +1,53 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.adapters.http; + +import com.google.gson.JsonElement; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Mono; +import reactor.netty.ByteBufFlux; + +/** + * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> + * @since March 2019 + */ +public final class RequestBody { + + private RequestBody() { + } + + public static Publisher<ByteBuf> fromString(String contents) { + return fromString(contents, StandardCharsets.UTF_8); + } + + public static Publisher<ByteBuf> fromString(String contents, Charset charset) { + return ByteBufFlux.fromString(Mono.just(contents), charset, ByteBufAllocator.DEFAULT); + } + + public static Publisher<ByteBuf> fromJson(JsonElement contents) { + return fromString(contents.toString()); + } + +} diff --git a/rest-services/common-dependency/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClient.java b/rest-services/common-dependency/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClient.java new file mode 100644 index 00000000..f384c1c1 --- /dev/null +++ b/rest-services/common-dependency/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClient.java @@ -0,0 +1,91 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ +package org.onap.dcaegen2.services.sdk.rest.services.adapters.http; + +import io.netty.handler.ssl.SslContext; +import io.vavr.collection.Stream; +import java.util.stream.Collectors; +import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.publisher.Mono; +import reactor.netty.http.client.HttpClient; +import reactor.netty.http.client.HttpClient.ResponseReceiver; +import reactor.netty.http.client.HttpClientRequest; +import reactor.netty.http.client.HttpClientResponse; + +/** + * @since 1.1.4 + */ +public class RxHttpClient { + + private static final Logger LOGGER = LoggerFactory.getLogger(RxHttpClient.class); + private final HttpClient httpClient; + + public static RxHttpClient create() { + return new RxHttpClient(HttpClient.create()); + } + + // TODO: hide netty from public api (io.netty.handler.ssl.SslContext) + public static RxHttpClient create(SslContext sslContext) { + return new RxHttpClient(HttpClient.create().secure(sslContextSpec -> sslContextSpec.sslContext(sslContext))); + } + + RxHttpClient(HttpClient httpClient) { + this.httpClient = httpClient; + } + + public Mono<HttpResponse> call(HttpRequest request) { + return prepareRequest(request) + .responseSingle((resp, content) -> + content.asByteArray() + .defaultIfEmpty(new byte[0]) + .map(bytes -> new NettyHttpResponse(request.url(), resp.status(), bytes))); + } + + ResponseReceiver<?> prepareRequest(HttpRequest request) { + return httpClient + .doOnRequest((req, conn) -> logRequest(request.diagnosticContext(), req)) + .doOnResponse((rsp, conn) -> logResponse(request.diagnosticContext(), rsp)) + .headers(hdrs -> request.headers().forEach(hdr -> hdrs.set(hdr._1, hdr._2))) + .request(request.method().asNetty()) + .send(request.body()) + .uri(request.url()); + + } + + private void logRequest(RequestDiagnosticContext context, HttpClientRequest httpClientRequest) { + context.withSlf4jMdc(LOGGER.isDebugEnabled(), () -> { + LOGGER.debug("Request: {} {} {}", httpClientRequest.method(), httpClientRequest.uri(), + httpClientRequest.requestHeaders()); + if (LOGGER.isTraceEnabled()) { + final String headers = Stream.ofAll(httpClientRequest.requestHeaders()) + .map(entry -> entry.getKey() + "=" + entry.getValue()) + .collect(Collectors.joining("\n")); + LOGGER.trace(headers); + } + }); + } + + private void logResponse(RequestDiagnosticContext context, HttpClientResponse httpClientResponse) { + context.withSlf4jMdc(LOGGER.isDebugEnabled(), + () -> LOGGER.debug("Response status: {}", httpClientResponse.status())); + } +} diff --git a/rest-services/common-dependency/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/exceptions/HttpException.java b/rest-services/common-dependency/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/exceptions/HttpException.java new file mode 100644 index 00000000..9631f4c5 --- /dev/null +++ b/rest-services/common-dependency/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/exceptions/HttpException.java @@ -0,0 +1,45 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ +package org.onap.dcaegen2.services.sdk.rest.services.adapters.http.exceptions; + +/** + * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> + * @since March 2019 + */ +public class HttpException extends RuntimeException { + + private final String url; + private final int responseCode; + private final String reason; + + public HttpException(String url, int responseCode, String reason) { + this.url = url; + this.responseCode = responseCode; + this.reason = reason; + } + + @Override + public String getMessage() { + return String.format("Request failed for URL '%s'. Response code: %d %s", + url, + responseCode, + reason); + } +} diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/DummyHttpServer.java b/rest-services/common-dependency/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/test/DummyHttpServer.java index d0485f57..e565c786 100644 --- a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/DummyHttpServer.java +++ b/rest-services/common-dependency/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/test/DummyHttpServer.java @@ -18,12 +18,9 @@ * ============LICENSE_END===================================== */ -package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl; +package org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test; import io.vavr.CheckedFunction0; -import io.vavr.Function0; -import java.io.IOException; -import java.net.URISyntaxException; import java.net.URL; import java.nio.file.Files; import java.nio.file.Paths; diff --git a/rest-services/common-dependency/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/CloudHttpClientIT.java b/rest-services/common-dependency/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/CloudHttpClientIT.java index 936ef0c5..9844ef1d 100644 --- a/rest-services/common-dependency/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/CloudHttpClientIT.java +++ b/rest-services/common-dependency/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/CloudHttpClientIT.java @@ -42,26 +42,28 @@ import reactor.test.StepVerifier; import reactor.netty.http.client.HttpClientResponse; class CloudHttpClientIT { + private static final int MAX_CONNECTIONS = 1; private static final String SAMPLE_STRING = "sampleString"; private static final String SAMPLE_URL = "/sampleURL"; private static final String JSON_BODY = "{\"correlationId\":\"NOKnhfsadhff\"," - + "\"ipaddress-v4\":\"256.22.33.155\", " - + "\"ipaddress-v6\":\"200J:0db8:85a3:0000:0000:8a2e:0370:7334\"}"; + + "\"ipaddress-v4\":\"256.22.33.155\", " + + "\"ipaddress-v6\":\"200J:0db8:85a3:0000:0000:8a2e:0370:7334\"}"; private static final ConnectionProvider connectionProvider = ConnectionProvider.fixed("test", MAX_CONNECTIONS); - private DmaapModel dmaapModel = mock(DmaapModel.class); - private JsonBodyBuilder<DmaapModel> jsonBodyBuilder = mock(JsonBodyBuilder.class); + private final DmaapModel dmaapModel = mock(DmaapModel.class); + private final JsonBodyBuilder<DmaapModel> jsonBodyBuilder = mock(JsonBodyBuilder.class); @Test void successfulPatchResponse() { DisposableServer server = createValidServer(); - HttpClient httpClient = createHttpClientForContextWithAddress(server, connectionProvider); + RxHttpClient httpClient = createHttpClientForContextWithAddress(server); CloudHttpClient cloudHttpClient = new CloudHttpClient(httpClient); when(jsonBodyBuilder.createJsonBody(dmaapModel)).thenReturn(JSON_BODY); - Mono<HttpClientResponse> content = cloudHttpClient.patch(SAMPLE_URL, createRequestDiagnosticContext(), createCustomHeaders(), - jsonBodyBuilder, dmaapModel); + Mono<HttpClientResponse> content = cloudHttpClient + .patch(SAMPLE_URL, createRequestDiagnosticContext(), createCustomHeaders(), + jsonBodyBuilder, dmaapModel); HttpClientResponse httpClientResponse = content.block(); assertEquals(HttpResponseStatus.OK, httpClientResponse.status()); @@ -71,12 +73,13 @@ class CloudHttpClientIT { @Test void errorPatchRequest() { DisposableServer server = createInvalidServer(); - HttpClient httpClient = createHttpClientForContextWithAddress(server, connectionProvider); + RxHttpClient httpClient = createHttpClientForContextWithAddress(server); CloudHttpClient cloudHttpClient = new CloudHttpClient(httpClient); when(jsonBodyBuilder.createJsonBody(dmaapModel)).thenReturn(JSON_BODY); - Mono<HttpClientResponse> content = cloudHttpClient.patch(SAMPLE_URL, createRequestDiagnosticContext(), createCustomHeaders(), - jsonBodyBuilder, dmaapModel); + Mono<HttpClientResponse> content = cloudHttpClient + .patch(SAMPLE_URL, createRequestDiagnosticContext(), createCustomHeaders(), + jsonBodyBuilder, dmaapModel); HttpClientResponse httpClientResponse = content.block(); assertEquals(HttpResponseStatus.INTERNAL_SERVER_ERROR, httpClientResponse.status()); @@ -86,12 +89,13 @@ class CloudHttpClientIT { @Test void successfulPostResponse() { DisposableServer server = createValidServer(); - HttpClient httpClient = createHttpClientForContextWithAddress(server, connectionProvider); + RxHttpClient httpClient = createHttpClientForContextWithAddress(server); CloudHttpClient cloudHttpClient = new CloudHttpClient(httpClient); when(jsonBodyBuilder.createJsonBody(dmaapModel)).thenReturn(JSON_BODY); - Mono<HttpClientResponse> content = cloudHttpClient.post(SAMPLE_URL, createRequestDiagnosticContext(), createCustomHeaders(), - jsonBodyBuilder, dmaapModel); + Mono<HttpClientResponse> content = cloudHttpClient + .post(SAMPLE_URL, createRequestDiagnosticContext(), createCustomHeaders(), + jsonBodyBuilder, dmaapModel); HttpClientResponse httpClientResponse = content.block(); assertEquals(HttpResponseStatus.OK, httpClientResponse.status()); @@ -101,12 +105,13 @@ class CloudHttpClientIT { @Test void errorPostRequest() { DisposableServer server = createInvalidServer(); - HttpClient httpClient = createHttpClientForContextWithAddress(server, connectionProvider); + RxHttpClient httpClient = createHttpClientForContextWithAddress(server); CloudHttpClient cloudHttpClient = new CloudHttpClient(httpClient); when(jsonBodyBuilder.createJsonBody(dmaapModel)).thenReturn(JSON_BODY); - Mono<HttpClientResponse> content = cloudHttpClient.post(SAMPLE_URL, createRequestDiagnosticContext(), createCustomHeaders(), - jsonBodyBuilder, dmaapModel); + Mono<HttpClientResponse> content = cloudHttpClient + .post(SAMPLE_URL, createRequestDiagnosticContext(), createCustomHeaders(), + jsonBodyBuilder, dmaapModel); HttpClientResponse httpClientResponse = content.block(); assertEquals(HttpResponseStatus.INTERNAL_SERVER_ERROR, httpClientResponse.status()); @@ -116,36 +121,68 @@ class CloudHttpClientIT { @Test void successfulGetResponse() { DisposableServer server = createValidServer(); - HttpClient httpClient = createHttpClientForContextWithAddress(server, connectionProvider); + RxHttpClient httpClient = createHttpClientForContextWithAddress(server); CloudHttpClient cloudHttpClient = new CloudHttpClient(httpClient); when(jsonBodyBuilder.createJsonBody(dmaapModel)).thenReturn(JSON_BODY); Mono<String> content = cloudHttpClient.get(SAMPLE_URL, String.class); Mono<String> contentWithHeaders = cloudHttpClient.get(SAMPLE_URL, createRequestDiagnosticContext(), - createCustomHeaders(), String.class); + createCustomHeaders(), String.class); StepVerifier.create(content) - .expectNext(SAMPLE_STRING) - .expectComplete() - .verify(); + .expectNext(SAMPLE_STRING) + .expectComplete() + .verify(); StepVerifier.create(contentWithHeaders) - .expectNext(SAMPLE_STRING) - .expectComplete() - .verify(); + .expectNext(SAMPLE_STRING) + .expectComplete() + .verify(); server.disposeNow(); } @Test void errorGetRequest() { DisposableServer server = createInvalidServer(); - HttpClient httpClient = createHttpClientForContextWithAddress(server, connectionProvider); + RxHttpClient httpClient = createHttpClientForContextWithAddress(server); CloudHttpClient cloudHttpClient = new CloudHttpClient(httpClient); Mono<String> content = cloudHttpClient.get(SAMPLE_URL, String.class); StepVerifier.create(content) - .expectError() - .verify(); + .expectError() + .verify(); + server.disposeNow(); + } + + @Test + void successfulPutResponse() { + DisposableServer server = createValidServer(); + RxHttpClient httpClient = createHttpClientForContextWithAddress(server); + CloudHttpClient cloudHttpClient = new CloudHttpClient(httpClient); + + when(jsonBodyBuilder.createJsonBody(dmaapModel)).thenReturn(JSON_BODY); + Mono<HttpClientResponse> content = cloudHttpClient + .put(SAMPLE_URL, createRequestDiagnosticContext(), createCustomHeaders(), + jsonBodyBuilder, dmaapModel); + HttpClientResponse httpClientResponse = content.block(); + + assertEquals(HttpResponseStatus.OK, httpClientResponse.status()); + server.disposeNow(); + } + + @Test + void errorPutRequest() { + DisposableServer server = createInvalidServer(); + RxHttpClient httpClient = createHttpClientForContextWithAddress(server); + CloudHttpClient cloudHttpClient = new CloudHttpClient(httpClient); + + when(jsonBodyBuilder.createJsonBody(dmaapModel)).thenReturn(JSON_BODY); + Mono<HttpClientResponse> content = cloudHttpClient + .put(SAMPLE_URL, createRequestDiagnosticContext(), createCustomHeaders(), + jsonBodyBuilder, dmaapModel); + HttpClientResponse httpClientResponse = content.block(); + + assertEquals(HttpResponseStatus.INTERNAL_SERVER_ERROR, httpClientResponse.status()); server.disposeNow(); } @@ -158,26 +195,27 @@ class CloudHttpClientIT { private DisposableServer createValidServer() { Mono<String> response = Mono.just(SAMPLE_STRING); return HttpServer.create() - .handle((req, resp) -> resp.sendString(response)) - .wiretap(true) - .bindNow(); + .handle((req, resp) -> resp.sendString(response)) + .wiretap(true) + .bindNow(); } private DisposableServer createInvalidServer() { return HttpServer.create() - .handle((req, resp) -> Mono.error(new Exception("returnError"))) - .wiretap(true) - .bindNow(); + .handle((req, resp) -> Mono.error(new Exception("returnError"))) + .wiretap(true) + .bindNow(); } private RequestDiagnosticContext createRequestDiagnosticContext() { return ImmutableRequestDiagnosticContext.builder() - .invocationId(UUID.randomUUID()).requestId(UUID.randomUUID()).build(); + .invocationId(UUID.randomUUID()).requestId(UUID.randomUUID()).build(); } - private HttpClient createHttpClientForContextWithAddress(DisposableServer disposableServer, - ConnectionProvider connectionProvider) { - HttpClient client = connectionProvider == null? HttpClient.create() : HttpClient.create(connectionProvider); - return client.addressSupplier(disposableServer::address).wiretap(true); + private RxHttpClient createHttpClientForContextWithAddress(DisposableServer disposableServer) { + HttpClient client = HttpClient.create(connectionProvider) + .addressSupplier(disposableServer::address) + .wiretap(true); + return new RxHttpClient(client); } }
\ No newline at end of file diff --git a/rest-services/common-dependency/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClientIT.java b/rest-services/common-dependency/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClientIT.java new file mode 100644 index 00000000..5ae62c87 --- /dev/null +++ b/rest-services/common-dependency/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/adapters/http/RxHttpClientIT.java @@ -0,0 +1,110 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.adapters.http; + +import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer.sendString; + +import io.netty.handler.codec.http.HttpResponseStatus; +import java.net.MalformedURLException; +import java.net.URL; +import java.time.Duration; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.exceptions.HttpException; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; + +class RxHttpClientIT { + + private static final Duration TIMEOUT = Duration.ofHours(5); + private final RxHttpClient cut = RxHttpClient.create(); + private static DummyHttpServer httpServer; + + @BeforeAll + static void setUpClass() { + httpServer = DummyHttpServer.start(routes -> + routes.get("/sample-get", (req, resp) -> sendString(resp, Mono.just("OK"))) + .get("/sample-get-500", (req, resp) -> resp.status(HttpResponseStatus.INTERNAL_SERVER_ERROR).send()) + .post("/echo-post", (req, resp) -> resp.send(req.receive().retain())) + ); + } + + @AfterAll + static void tearDownClass() { + httpServer.close(); + } + + private ImmutableHttpRequest.Builder requestFor(String path) throws MalformedURLException { + return ImmutableHttpRequest.builder() + .url(new URL("http", httpServer.host(), httpServer.port(), path).toString()); + } + + @Test + void simpleGet() throws Exception { + // given + final HttpRequest httpRequest = requestFor("/sample-get").method(HttpMethod.GET).build(); + + // when + final Mono<String> bodyAsString = cut.call(httpRequest) + .doOnNext(HttpResponse::throwIfUnsuccessful) + .map(HttpResponse::bodyAsString); + + // then + StepVerifier.create(bodyAsString).expectNext("OK").expectComplete().verify(TIMEOUT); + } + + @Test + void getWithError() throws Exception { + // given + final HttpRequest httpRequest = requestFor("/sample-get-500").method(HttpMethod.GET).build(); + + // when + final Mono<String> bodyAsString = cut.call(httpRequest) + .doOnNext(HttpResponse::throwIfUnsuccessful) + .map(HttpResponse::bodyAsString); + + // then + StepVerifier.create(bodyAsString).expectError(HttpException.class).verify(TIMEOUT); + } + + @Test + void simplePost() throws Exception { + // given + final String requestBody = "hello world"; + final HttpRequest httpRequest = requestFor("/echo-post") + .method(HttpMethod.POST) + .body(RequestBody.fromString(requestBody)) + .build(); + + // when + final Mono<String> bodyAsString = cut.call(httpRequest) + .doOnNext(HttpResponse::throwIfUnsuccessful) + .map(HttpResponse::bodyAsString); + + // then + StepVerifier.create(bodyAsString) + .expectNext(requestBody) + .expectComplete() + .verify(TIMEOUT); + } +}
\ No newline at end of file diff --git a/rest-services/dmaap-client/pom.xml b/rest-services/dmaap-client/pom.xml index 52e9f0ac..bad855bd 100644 --- a/rest-services/dmaap-client/pom.xml +++ b/rest-services/dmaap-client/pom.xml @@ -24,6 +24,12 @@ <version>${project.version}</version> </dependency> <dependency> + <groupId>org.onap.dcaegen2.services.sdk.rest.services</groupId> + <artifactId>model + </artifactId> + <version>${project.version}</version> + </dependency> + <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient</artifactId> </dependency> diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/DmaapClientFactory.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/DmaapClientFactory.java new file mode 100644 index 00000000..48e6f5d1 --- /dev/null +++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/DmaapClientFactory.java @@ -0,0 +1,42 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api; + +import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi; + +/** + * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> + * @since 1.1.4 + */ +@ExperimentalApi +public final class DmaapClientFactory { + + private DmaapClientFactory() { + } + + public static MessageRouterPublisher createMessageRouterPublisher() { + throw new UnsupportedOperationException("not implemented yet"); + } + + public static MessageRouterSubscriber createMessageRouterSubscriber() { + throw new UnsupportedOperationException("not implemented yet"); + } +} diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisher.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisher.java new file mode 100644 index 00000000..c205f472 --- /dev/null +++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisher.java @@ -0,0 +1,36 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api; + +import com.google.gson.JsonElement; +import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse; +import reactor.core.publisher.Flux; + +/** + * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> + * @since 1.1.4 + */ +@ExperimentalApi +public interface MessageRouterPublisher { + Flux<MessageRouterPublishResponse> put(MessageRouterPublishRequest request, Flux<? extends JsonElement> items); +} diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriber.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriber.java new file mode 100644 index 00000000..a063f66c --- /dev/null +++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriber.java @@ -0,0 +1,53 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api; + +import com.google.gson.JsonElement; +import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +/** + * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> + * @since 1.1.4 + */ +@ExperimentalApi +public interface MessageRouterSubscriber { + + Mono<MessageRouterSubscribeResponse> get(MessageRouterSubscribeRequest request); + + default Flux<JsonElement> getElements(MessageRouterSubscribeRequest request) { + return get(request) + .doOnNext(response -> { + if (response.failed()) { + throw new IllegalStateException(response.failReason()); + } + }) + .filter(MessageRouterSubscribeResponse::hasElements) + .flatMapMany(response -> Flux.fromIterable(response.items())); + } + + default Flux<JsonElement> subscribeForElements(MessageRouterSubscribeRequest request) { + return getElements(request).repeat(); + } +} diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/MessageRouterSource.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/DmaapRequest.java index e9599a5e..f2a7193e 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/MessageRouterSource.java +++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/DmaapRequest.java @@ -18,19 +18,21 @@ * ============LICENSE_END===================================== */ -package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap; - +package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model; import org.immutables.value.Value; import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.SourceStream; +import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext; /** * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> - * @version 1.2.1 + * @since 1.1.4 */ @ExperimentalApi -@Value.Immutable -public abstract class MessageRouterSource implements MessageRouter, SourceStream { +public interface DmaapRequest { + @Value.Default + default RequestDiagnosticContext diagnosticContext() { + return RequestDiagnosticContext.create(); + } } diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/DataRouterSink.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/DmaapResponse.java index a8aa5cbb..8b4d41e2 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/DataRouterSink.java +++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/DmaapResponse.java @@ -18,26 +18,28 @@ * ============LICENSE_END===================================== */ -package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap; - +package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model; import org.immutables.value.Value; import org.jetbrains.annotations.Nullable; import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.SinkStream; /** * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> - * @version 1.2.1 + * @since 1.1.4 */ @ExperimentalApi -@Value.Immutable -public abstract class DataRouterSink implements DataRouter, SinkStream { - - abstract String publisherUrl(); +public interface DmaapResponse { - abstract String publisherId(); + @Nullable String failReason(); - abstract @Nullable String logUrl(); + @Value.Derived + default boolean successful() { + return failReason() == null; + } + @Value.Derived + default boolean failed() { + return !successful(); + } } diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/DataRouterSource.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/MessageRouterPublishRequest.java index ac8362db..0d507865 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/DataRouterSource.java +++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/MessageRouterPublishRequest.java @@ -18,22 +18,20 @@ * ============LICENSE_END===================================== */ -package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap; - +package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model; import org.immutables.value.Value; +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink; import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.SourceStream; +import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext; /** * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> - * @version 1.2.1 + * @since 1.1.4 */ @ExperimentalApi @Value.Immutable -public abstract class DataRouterSource implements DataRouter, SourceStream { - - abstract String deliveryUrl(); +public interface MessageRouterPublishRequest extends DmaapRequest { - abstract String subscriberId(); + MessageRouterSink sinkDefinition(); } diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/MessageRouterSink.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/MessageRouterPublishResponse.java index 7584bac4..62175e09 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/MessageRouterSink.java +++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/MessageRouterPublishResponse.java @@ -18,19 +18,18 @@ * ============LICENSE_END===================================== */ -package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap; +package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model; import org.immutables.value.Value; import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.SinkStream; /** * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> - * @version 1.2.1 + * @since 1.1.4 */ @ExperimentalApi @Value.Immutable -public abstract class MessageRouterSink implements MessageRouter, SinkStream { +public interface MessageRouterPublishResponse extends DmaapResponse { } diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/MessageRouterSubscribeRequest.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/MessageRouterSubscribeRequest.java new file mode 100644 index 00000000..49dca603 --- /dev/null +++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/MessageRouterSubscribeRequest.java @@ -0,0 +1,38 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model; + +import org.immutables.value.Value; +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSource; +import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi; +import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext; + + +/** + * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> + * @since 1.1.4 + */ +@ExperimentalApi +@Value.Immutable +public interface MessageRouterSubscribeRequest extends DmaapRequest { + + MessageRouterSource sourceDefinition(); +} diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/MessageRouterSubscribeResponse.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/MessageRouterSubscribeResponse.java new file mode 100644 index 00000000..2e826784 --- /dev/null +++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/model/MessageRouterSubscribeResponse.java @@ -0,0 +1,47 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model; + + +import com.google.gson.JsonArray; +import org.immutables.value.Value; +import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi; + +/** + * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> + * @since 1.1.4 + */ +@ExperimentalApi +@Value.Immutable +public interface MessageRouterSubscribeResponse extends DmaapResponse { + + JsonArray items(); + + @Value.Derived + default boolean hasElements() { + return items().size() > 0; + } + + @Value.Derived + default boolean isEmpty() { + return !hasElements(); + } +} diff --git a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/service/consumer/DMaaPConsumerReactiveHttpClient.java b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/service/consumer/DMaaPConsumerReactiveHttpClient.java index d0f95f6e..81a62eba 100644 --- a/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/service/consumer/DMaaPConsumerReactiveHttpClient.java +++ b/rest-services/dmaap-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/service/consumer/DMaaPConsumerReactiveHttpClient.java @@ -20,6 +20,7 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer; +import com.google.gson.JsonElement; import java.net.URI; import java.util.Map; import java.util.Optional; @@ -59,15 +60,16 @@ public class DMaaPConsumerReactiveHttpClient extends DMaaPAbstractReactiveHttpCl * * @return reactive response from DMaaP in string format */ - public Mono<String> getDMaaPConsumerResponse(Optional<RequestDiagnosticContext> requestDiagnosticContextOptional) { - Map<String,String> headers = DMaaPClientServiceUtils.getHeaders(consumerConfiguration.dmaapContentType()); + public Mono<JsonElement> getDMaaPConsumerResponse( + Optional<RequestDiagnosticContext> requestDiagnosticContextOptional) { + Map<String, String> headers = DMaaPClientServiceUtils.getHeaders(consumerConfiguration.dmaapContentType()); if (requestDiagnosticContextOptional.isPresent()) { return cloudHttpClient - .get(getUri().toString(), requestDiagnosticContextOptional.get(), headers, String.class); + .get(getUri().toString(), requestDiagnosticContextOptional.get(), headers, JsonElement.class); } RequestDiagnosticContext requestDiagnosticContext = ImmutableRequestDiagnosticContext.builder() .invocationId(UUID.randomUUID()).requestId(UUID.randomUUID()).build(); - return cloudHttpClient.get(getUri().toString(), requestDiagnosticContext, headers, String.class); + return cloudHttpClient.get(getUri().toString(), requestDiagnosticContext, headers, JsonElement.class); } URI getUri() { diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherTest.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherTest.java new file mode 100644 index 00000000..9656ae86 --- /dev/null +++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterPublisherTest.java @@ -0,0 +1,57 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api; + +import static org.mockito.Mockito.mock; + +import com.google.gson.JsonPrimitive; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Disabled; +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest; +import reactor.core.publisher.Flux; + +/** + * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> + * @since March 2019 + */ +@Disabled +class MessageRouterPublisherTest { + + private final MessageRouterPublisher cut = mock(MessageRouterPublisher.class); + private final MessageRouterSink sinkDefinition = mock(MessageRouterSink.class); + private final MessageRouterPublishRequest request = ImmutableMessageRouterPublishRequest.builder() + .sinkDefinition(sinkDefinition).build(); + + @Test + void apiShouldBeUsableWithTransform() { + Flux.just(1, 2, 3) + .map(JsonPrimitive::new) + .transform(input -> cut.put(request, input)); + } + + @Test + void apiShouldBeUsableWithSingleCall() { + final Flux<JsonPrimitive> input = Flux.just(1, 2, 3).map(JsonPrimitive::new); + cut.put(request, input); + } +}
\ No newline at end of file diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberTest.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberTest.java new file mode 100644 index 00000000..b8bcde92 --- /dev/null +++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/api/MessageRouterSubscriberTest.java @@ -0,0 +1,65 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api; + +import static org.mockito.Mockito.mock; + +import com.google.gson.JsonPrimitive; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink; +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSource; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.DmaapResponse; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse; +import reactor.core.publisher.Flux; + +/** + * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> + * @since March 2019 + */ +@Disabled +class MessageRouterSubscriberTest { + + private final MessageRouterSubscriber cut = mock(MessageRouterSubscriber.class); + private final MessageRouterSource sinkDefinition = mock(MessageRouterSource.class); + private final MessageRouterSubscribeRequest request = ImmutableMessageRouterSubscribeRequest.builder() + .sourceDefinition(sinkDefinition) + .build(); + + @Test + void getShouldBeUsable() { + cut.get(request) + .filter(DmaapResponse::successful) + .map(MessageRouterSubscribeResponse::items) + .subscribe(System.out::println); + } + + @Test + void getElementsShouldBeUsable() { + cut.getElements(request) + .collectList() + .subscribe(System.out::println); + } +}
\ No newline at end of file diff --git a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/service/consumer/DMaaPConsumerReactiveHttpClientTest.java b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/service/consumer/DMaaPConsumerReactiveHttpClientTest.java index d2ca5d12..5a29fff0 100644 --- a/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/service/consumer/DMaaPConsumerReactiveHttpClientTest.java +++ b/rest-services/dmaap-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/dmaap/client/service/consumer/DMaaPConsumerReactiveHttpClientTest.java @@ -23,6 +23,7 @@ package org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consum import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import com.google.gson.JsonElement; import java.net.URI; import java.util.Optional; import org.apache.http.entity.ContentType; @@ -44,7 +45,7 @@ class DMaaPConsumerReactiveHttpClientTest { private static final String JSON_MESSAGE = "{ \"responseFromDmaap\": \"Success\"}"; private DMaaPConsumerReactiveHttpClient dmaapConsumerReactiveHttpClient; private DmaapConsumerConfiguration consumerConfigurationMock = mock(DmaapConsumerConfiguration.class); - private Mono<String> expectedResult; + private Mono<JsonElement> expectedResult; private CloudHttpClient httpClient = mock(CloudHttpClient.class); private URI exampleTestUri = URI .create("https://54.45.33.2:1234/unauthenticated.SEC_OTHER_OUTPUT/OpenDCAE-c12/c12"); @@ -67,11 +68,12 @@ class DMaaPConsumerReactiveHttpClientTest { @Test void getHttpResponse_Success() { //given - expectedResult = Mono.just(JSON_MESSAGE); - when(httpClient.get(exampleTestUri.toString(), requestDiagnosticContext, DMaaPClientServiceUtils.getHeaders(ContentType.APPLICATION_JSON.getMimeType()), String.class)) + expectedResult = Mono.just(mock(JsonElement.class)); + when(httpClient.get(exampleTestUri.toString(), requestDiagnosticContext, + DMaaPClientServiceUtils.getHeaders(ContentType.APPLICATION_JSON.getMimeType()), JsonElement.class)) .thenReturn(expectedResult); //when - Mono<String> response = dmaapConsumerReactiveHttpClient + Mono<JsonElement> response = dmaapConsumerReactiveHttpClient .getDMaaPConsumerResponse(Optional.of(requestDiagnosticContext)); //then StepVerifier.create(response).expectSubscription() diff --git a/rest-services/dmaap-client/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker b/rest-services/dmaap-client/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker new file mode 100644 index 00000000..ca6ee9ce --- /dev/null +++ b/rest-services/dmaap-client/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker @@ -0,0 +1 @@ +mock-maker-inline
\ No newline at end of file diff --git a/rest-services/model/pom.xml b/rest-services/model/pom.xml new file mode 100644 index 00000000..51f8ffcb --- /dev/null +++ b/rest-services/model/pom.xml @@ -0,0 +1,80 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ ============LICENSE_START==================================== + ~ DCAEGEN2-SERVICES-SDK + ~ ========================================================= + ~ Copyright (C) 2019 Nokia. All rights reserved. + ~ ========================================================= + ~ Licensed under the Apache License, Version 2.0 (the "License"); + ~ you may not use this file except in compliance with the License. + ~ You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + ~ ============LICENSE_END===================================== + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.onap.dcaegen2.services.sdk</groupId> + <artifactId>dcaegen2-services-sdk-rest-services</artifactId> + <version>1.1.4-SNAPSHOT</version> + </parent> + + <groupId>org.onap.dcaegen2.services.sdk.rest.services</groupId> + <artifactId>model</artifactId> + + <name>dcaegen2-services-sdk-rest-services-model</name> + <description>Rest Services Model</description> + <packaging>jar</packaging> + + <dependencies> + <dependency> + <groupId>org.immutables</groupId> + <artifactId>gson</artifactId> + </dependency> + <dependency> + <groupId>org.immutables</groupId> + <artifactId>value</artifactId> + </dependency> + <dependency> + <groupId>io.vavr</groupId> + <artifactId>vavr</artifactId> + </dependency> + <dependency> + <groupId>org.jetbrains</groupId> + <artifactId>annotations</artifactId> + </dependency> + + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-core</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.junit.jupiter</groupId> + <artifactId>junit-jupiter-engine</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.assertj</groupId> + <artifactId>assertj-core</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>io.projectreactor</groupId> + <artifactId>reactor-test</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + +</project> diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/AafCredentials.java b/rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/AafCredentials.java index ecb0b553..565efa10 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/AafCredentials.java +++ b/rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/AafCredentials.java @@ -18,24 +18,27 @@ * ============LICENSE_END===================================== */ -package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams; +package org.onap.dcaegen2.services.sdk.model.streams; +import com.google.gson.annotations.SerializedName; import org.immutables.gson.Gson; import org.immutables.value.Value; import org.jetbrains.annotations.Nullable; -import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi; /** + * Represents the AAF Credentials. Currently it contains only user name and password. + * * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> - * @version 1.2.1 + * @since 1.1.4 */ -@ExperimentalApi @Value.Immutable @Gson.TypeAdapters public interface AafCredentials { + @SerializedName(value = "username", alternate = "aaf_username") @Nullable String username(); + @SerializedName(value = "password", alternate = "aaf_password") @Nullable String password(); } diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/DataStream.java b/rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/DataStream.java index 43d9d726..06fabccd 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/DataStream.java +++ b/rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/DataStream.java @@ -18,15 +18,19 @@ * ============LICENSE_END===================================== */ -package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams; +package org.onap.dcaegen2.services.sdk.model.streams; -import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi; +import org.immutables.value.Value; /** + * Represents a named data stream. + * * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> - * @version 1.2.1 + * @since 1.1.4 */ -@ExperimentalApi public interface DataStream { - + @Value.Default + default String name() { + return ""; + } } diff --git a/rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/DataStreamDirection.java b/rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/DataStreamDirection.java new file mode 100644 index 00000000..240a4c82 --- /dev/null +++ b/rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/DataStreamDirection.java @@ -0,0 +1,48 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.model.streams; + +/** + * The direction of the stream, ie. whether it's input ({@code SOURCE}) or output ({@code SINK}) stream. + * + * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> + * @since 1.1.4 + */ +public enum DataStreamDirection { + + SINK("streams_publishes"), + SOURCE("streams_subscribes"); + + private final String configurationKey; + + DataStreamDirection(String configurationKey) { + this.configurationKey = configurationKey; + } + + /** + * The configuration key under which the single stream definitions should reside. + * + * @return the configuration key + */ + public String configurationKey() { + return configurationKey; + } +} diff --git a/rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/RawDataStream.java b/rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/RawDataStream.java new file mode 100644 index 00000000..7f6040ee --- /dev/null +++ b/rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/RawDataStream.java @@ -0,0 +1,38 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.model.streams; + +import org.immutables.value.Value; + +/** + * Represents a raw/uninterpreted data stream. + * + * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> + * @since 1.1.4 + * @param <T> type of raw data, eg. JsonObject + */ +@Value.Immutable +public interface RawDataStream<T> { + String name(); + StreamType type(); + DataStreamDirection direction(); + T descriptor(); +} diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/SinkStream.java b/rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/SinkStream.java index e3389207..5d1d5873 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/SinkStream.java +++ b/rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/SinkStream.java @@ -18,17 +18,15 @@ * ============LICENSE_END===================================== */ -package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams; - -import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi; +package org.onap.dcaegen2.services.sdk.model.streams; /** - * AKA PublishStream + * Represents an output stream, ie. one of objects in <em>streams_publishes</em> array from application configuration. + * Application can put data to this stream. * * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> - * @version 1.2.1 + * @since 1.1.4 */ -@ExperimentalApi public interface SinkStream extends DataStream { } diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/SourceStream.java b/rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/SourceStream.java index 2bea143b..9b68c785 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/SourceStream.java +++ b/rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/SourceStream.java @@ -18,17 +18,15 @@ * ============LICENSE_END===================================== */ -package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams; - -import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi; +package org.onap.dcaegen2.services.sdk.model.streams; /** - * AKA SubscribeStream + * Represents an input stream, ie. one of objects in <em>streams_subscribes</em> array from application configuration. + * Application can read data from this stream. * * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> - * @version 1.2.1 + * @since 1.1.4 */ -@ExperimentalApi public interface SourceStream extends DataStream { } diff --git a/rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/StreamType.java b/rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/StreamType.java new file mode 100644 index 00000000..2e08c82b --- /dev/null +++ b/rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/StreamType.java @@ -0,0 +1,52 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.model.streams; + +import io.vavr.collection.Stream; +import org.jetbrains.annotations.NotNull; + +/** + * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> + * @since 1.1.4 + */ +public enum StreamType { + MESSAGE_ROUTER("message_router"), + DATA_ROUTER("data_router"), + KAFKA("kafka"), + UNKNOWN("unknown"); + + private final String rawType; + + StreamType(String rawType) { + this.rawType = rawType; + } + + public static StreamType parse(@NotNull String rawType) { + return Stream.of(StreamType.values()) + .find(type -> type.rawType.equals(rawType)) + .getOrElse(UNKNOWN); + } + + @Override + public String toString() { + return rawType; + } +} diff --git a/rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/dmaap/DataRouter.java b/rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/dmaap/DataRouter.java new file mode 100644 index 00000000..38adb197 --- /dev/null +++ b/rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/dmaap/DataRouter.java @@ -0,0 +1,57 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ +package org.onap.dcaegen2.services.sdk.model.streams.dmaap; + + +import com.google.gson.annotations.SerializedName; +import org.jetbrains.annotations.Nullable; + +/** + * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> + * @since 1.1.4 + */ +public interface DataRouter { + + /** + * DCAE location for the publisher, used to set up routing. + */ + @SerializedName("location") + @Nullable String location(); + + /** + * Username + * <ul> + * <li>Data Router uses to authenticate to the subscriber when delivering files OR</li> + * <li>the publisher uses to authenticate to Data Router.</li> + * </ul> + */ + @SerializedName("username") + @Nullable String username(); + + /** + * Password + * <ul> + * <li>Data Router uses to authenticate to the subscriber when delivering files OR</li> + * <li>the publisher uses to authenticate to Data Router.</li> + * </ul> + */ + @SerializedName("password") + @Nullable String password(); +} diff --git a/rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/dmaap/DataRouterSink.java b/rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/dmaap/DataRouterSink.java new file mode 100644 index 00000000..bfe31182 --- /dev/null +++ b/rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/dmaap/DataRouterSink.java @@ -0,0 +1,55 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ +package org.onap.dcaegen2.services.sdk.model.streams.dmaap; + + +import com.google.gson.annotations.SerializedName; +import org.immutables.gson.Gson; +import org.immutables.value.Value; +import org.jetbrains.annotations.Nullable; +import org.onap.dcaegen2.services.sdk.model.streams.SinkStream; + +/** + * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> + * @since 1.1.4 + */ +@Gson.TypeAdapters +@Value.Immutable +public interface DataRouterSink extends DataRouter, SinkStream { + + /** + * URL to which the publisher makes Data Router publish requests. + */ + @SerializedName("publish_url") + String publishUrl(); + + /** + * Publisher id in Data Router + */ + @SerializedName("publisher_id") + @Nullable String publisherId(); + + /** + * URL from which log data for the feed can be obtained. + */ + @SerializedName("log_url") + @Nullable String logUrl(); + +} diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/Kafka.java b/rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/dmaap/DataRouterSource.java index 97f07a29..4ba81acb 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/Kafka.java +++ b/rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/dmaap/DataRouterSource.java @@ -17,33 +17,33 @@ * limitations under the License. * ============LICENSE_END===================================== */ +package org.onap.dcaegen2.services.sdk.model.streams.dmaap; -package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap; +import com.google.gson.annotations.SerializedName; +import org.immutables.gson.Gson; import org.immutables.value.Value; import org.jetbrains.annotations.Nullable; -import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.AafCredentials; +import org.onap.dcaegen2.services.sdk.model.streams.SourceStream; /** * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> * @since 1.1.4 */ -@ExperimentalApi -public interface Kafka { - - String bootstrapServers(); - - String topicName(); - - @Nullable AafCredentials aafCredentials(); - - @Nullable String clientRole(); - - @Nullable String clientId(); - - @Value.Default - default int maxPayloadSizeBytes() { - return 1024 * 1024; - } +@Gson.TypeAdapters +@Value.Immutable +public interface DataRouterSource extends DataRouter, SourceStream { + + /** + * URL to which the Data Router should deliver files. + */ + // TODO: since crucial, we need to verify if it should be non-null + @SerializedName("delivery_url") + @Nullable String deliveryUrl(); + + /** + * Subscriber id in Data Router. + */ + @SerializedName("subscriber_id") + @Nullable String subscriberId(); } diff --git a/rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/dmaap/Kafka.java b/rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/dmaap/Kafka.java new file mode 100644 index 00000000..df2cee6d --- /dev/null +++ b/rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/dmaap/Kafka.java @@ -0,0 +1,77 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ +package org.onap.dcaegen2.services.sdk.model.streams.dmaap; + +import static io.vavr.Predicates.not; + +import io.vavr.collection.List; +import org.immutables.value.Value; +import org.jetbrains.annotations.Nullable; +import org.onap.dcaegen2.services.sdk.model.streams.AafCredentials; + +/** + * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> + * @since 1.1.4 + */ +public interface Kafka { + + /** + * Kafka bootstrap servers as defined in Kafka client documentation under <em>bootstrap.servers</em> configuration + * key. + */ + String bootstrapServers(); + + /** + * The name of the topic where application should publish or subscribe for the messages. + */ + String topicName(); + + /** + * The credentials to use when authenticating to Kafka cluster or null when connection should be unauthenticated. + */ + @Nullable AafCredentials aafCredentials(); + + /** + * AAF client role that’s requesting publish or subscribe access to the topic. + */ + @Nullable String clientRole(); + + /** + * Client id for given AAF client. + */ + @Nullable String clientId(); + + /** + * The limit on the size of message published to/subscribed from the topic. Can be used to set Kafka client + * <em>max.request.size</em> configuration property. + */ + @Value.Default + default int maxPayloadSizeBytes() { + return 1024 * 1024; + } + + /** + * The {@code bootstrapServers} converted to the list of servers' addresses. + */ + @Value.Derived + default List<String> bootstrapServerList() { + return List.of(bootstrapServers().split(",")).filter(not(String::isEmpty)); + } +} diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/KafkaSink.java b/rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/dmaap/KafkaSink.java index 514881fe..2c397615 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/KafkaSink.java +++ b/rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/dmaap/KafkaSink.java @@ -17,19 +17,16 @@ * limitations under the License. * ============LICENSE_END===================================== */ - -package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap; +package org.onap.dcaegen2.services.sdk.model.streams.dmaap; import org.immutables.value.Value; -import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.SinkStream; +import org.onap.dcaegen2.services.sdk.model.streams.SinkStream; /** * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> * @since 1.1.4 */ -@ExperimentalApi @Value.Immutable -public abstract interface KafkaSink extends Kafka, SinkStream { +public interface KafkaSink extends Kafka, SinkStream { } diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/KafkaSource.java b/rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/dmaap/KafkaSource.java index 65280a98..799d3af5 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/KafkaSource.java +++ b/rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/dmaap/KafkaSource.java @@ -17,21 +17,22 @@ * limitations under the License. * ============LICENSE_END===================================== */ - -package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap; +package org.onap.dcaegen2.services.sdk.model.streams.dmaap; import org.immutables.value.Value; import org.jetbrains.annotations.Nullable; -import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.SourceStream; +import org.onap.dcaegen2.services.sdk.model.streams.SourceStream; /** * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> * @since 1.1.4 */ -@ExperimentalApi @Value.Immutable public interface KafkaSource extends Kafka, SourceStream { + /** + * A unique string that identifies the consumer group this consumer belongs to as defined in Kafka consumer + * configuration key <em>group.id</em>. + */ @Nullable String consumerGroupId(); } diff --git a/rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/dmaap/MessageRouter.java b/rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/dmaap/MessageRouter.java new file mode 100644 index 00000000..3a6ba0f6 --- /dev/null +++ b/rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/dmaap/MessageRouter.java @@ -0,0 +1,61 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ +package org.onap.dcaegen2.services.sdk.model.streams.dmaap; + +import com.google.gson.annotations.SerializedName; +import org.jetbrains.annotations.Nullable; +import org.onap.dcaegen2.services.sdk.model.streams.AafCredentials; + +/** + * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> + * @since 1.1.4 + */ +public interface MessageRouter { + + /** + * URL for accessing the topic to publish or receive events. + */ + @SerializedName("topic_url") + String topicUrl(); + + /** + * AAF client role that’s requesting publish or subscribe access to the topic. + */ + @SerializedName("client_role") + @Nullable String clientRole(); + + /** + * Client id for given AAF client. + */ + @SerializedName("client_id") + @Nullable String clientId(); + + /** + * DCAE location for the publisher or subscriber, used to set up routing. + */ + @SerializedName("location") + @Nullable String location(); + + /** + * The AAF credentials. + */ + @SerializedName("aaf_credentials") + @Nullable AafCredentials aafCredentials(); +} diff --git a/rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/dmaap/MessageRouterSink.java b/rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/dmaap/MessageRouterSink.java new file mode 100644 index 00000000..1820775b --- /dev/null +++ b/rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/dmaap/MessageRouterSink.java @@ -0,0 +1,32 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ +package org.onap.dcaegen2.services.sdk.model.streams.dmaap; + +import org.immutables.value.Value; +import org.onap.dcaegen2.services.sdk.model.streams.SinkStream; + +/** + * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> + * @since 1.1.4 + */ +@Value.Immutable +public interface MessageRouterSink extends MessageRouter, SinkStream { + +} diff --git a/rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/dmaap/MessageRouterSource.java b/rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/dmaap/MessageRouterSource.java new file mode 100644 index 00000000..b92dff1f --- /dev/null +++ b/rest-services/model/src/main/java/org/onap/dcaegen2/services/sdk/model/streams/dmaap/MessageRouterSource.java @@ -0,0 +1,32 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ +package org.onap.dcaegen2.services.sdk.model.streams.dmaap; + +import org.immutables.value.Value; +import org.onap.dcaegen2.services.sdk.model.streams.SourceStream; + +/** + * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> + * @since 1.1.4 + */ +@Value.Immutable +public interface MessageRouterSource extends MessageRouter, SourceStream { + +} diff --git a/rest-services/pom.xml b/rest-services/pom.xml index f54ea772..aa8caf27 100644 --- a/rest-services/pom.xml +++ b/rest-services/pom.xml @@ -18,6 +18,7 @@ <packaging>pom</packaging> <modules> + <module>model</module> <module>common-dependency</module> <module>aai-client</module> <module>cbs-client</module> |