diff options
34 files changed, 677 insertions, 82 deletions
diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/CbsClient.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/CbsClient.java index 3ee12eed..d6a5700a 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/CbsClient.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/CbsClient.java @@ -21,12 +21,10 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api; import com.google.gson.JsonObject; import java.time.Duration; -import java.util.UUID; import java.util.function.BiPredicate; import java.util.function.Function; import org.jetbrains.annotations.NotNull; -import org.onap.dcaegen2.services.sdk.rest.services.model.logging.ImmutableRequestDiagnosticContext; -import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsRequest; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -46,28 +44,27 @@ public interface CbsClient { * <p> * Returns a {@link Mono} that publishes new configuration after CBS client retrieves one. * + * @param request the CBS Request to be performed (can be obtained from {@link CbsRequests}) * @return reactive stream of configuration - * @param diagnosticContext diagnostic context as defined in Logging Guideline - * @since 1.1.2 */ - @NotNull Mono<JsonObject> get(RequestDiagnosticContext diagnosticContext); + @NotNull Mono<JsonObject> get(CbsRequest request); /** * <p> * Poll for configuration. * * <p> - * Will call {@link #get(RequestDiagnosticContext)} after {@code initialDelay} every {@code period}. Resulting entries may or may not be + * Will call {@link #get(CbsRequest)} after {@code initialDelay} every {@code period}. Resulting entries may or may not be * changed, ie. items in the stream might be the same until change is made in CBS. * - * @param diagnosticContext diagnostic context as defined in Logging Guideline + * @param request the CBS Request to be performed (can be obtained from {@link CbsRequests}) * @param initialDelay delay after first request attempt * @param period frequency of update checks * @return stream of configuration states */ - default Flux<JsonObject> get(RequestDiagnosticContext diagnosticContext, Duration initialDelay, Duration period) { + default Flux<JsonObject> get(CbsRequest request, Duration initialDelay, Duration period) { return Flux.interval(initialDelay, period) - .map(i -> ImmutableRequestDiagnosticContext.copyOf(diagnosticContext).withInvocationId(UUID.randomUUID())) + .map(i -> request.withNewInvocationId()) .flatMap(this::get); } @@ -76,7 +73,7 @@ public interface CbsClient { * Poll for configuration updates. * * <p> - * Will call {@link #get(RequestDiagnosticContext)} after {@code initialDelay} every {@code period}. Will emit an item + * Will call {@link #get(CbsRequest)} after {@code initialDelay} every {@code period}. Will emit an item * only when an update was detected, ie. when new item is different then last emitted item. * * <p> @@ -87,17 +84,17 @@ public interface CbsClient { * (<b>experimental API</b>) if you want to react differently to changes in subsets of the configuration. * </li> * <li> - * Use {@link #get(RequestDiagnosticContext, Duration, Duration)} with + * Use {@link #get(CbsRequest, Duration, Duration)} with * {@link Flux#distinctUntilChanged(Function, BiPredicate)} if you want to specify custom comparison logic. * </li> * </ul> * - * @param diagnosticContext diagnostic context as defined in Logging Guideline + * @param request the CBS Request to be performed (can be obtained from {@link CbsRequests}) * @param initialDelay delay after first request attempt * @param period frequency of update checks * @return stream of configuration updates */ - default Flux<JsonObject> updates(RequestDiagnosticContext diagnosticContext, Duration initialDelay, Duration period) { - return get(diagnosticContext, initialDelay, period).distinctUntilChanged(); + default Flux<JsonObject> updates(CbsRequest request, Duration initialDelay, Duration period) { + return get(request, initialDelay, period).distinctUntilChanged(); } } diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/CbsClientFactory.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/CbsClientFactory.java index 379daf97..c11ed533 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/CbsClientFactory.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/CbsClientFactory.java @@ -56,7 +56,7 @@ public class CbsClientFactory { final RxHttpClient httpClient = RxHttpClient.create(); final CbsLookup lookup = new CbsLookup(httpClient); return lookup.lookup(env) - .map(addr -> CbsClientImpl.create(httpClient, addr, env.appName())); + .map(addr -> new CbsClientImpl(httpClient, env.appName(), addr)); }); } } diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/CbsRequests.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/CbsRequests.java new file mode 100644 index 00000000..3724338d --- /dev/null +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/CbsRequests.java @@ -0,0 +1,85 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api; + +import org.jetbrains.annotations.NotNull; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsRequest; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.ImmutableCbsRequest; +import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext; + +/** + * A factory to various of requests supported by Config Binding Service. + * + * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> + * @since 1.1.4 + */ +public final class CbsRequests { + + /** + * <p>A get-configuration request.</p> + * + * <p>Will bind the configuration for given service and return the bound configuration.</p> + * + * @param diagnosticContext logging diagnostic context (MDC) + * @return the CbsRequest ready to be used when calling {@link CbsClient} + */ + public static @NotNull CbsRequest getConfiguration(RequestDiagnosticContext diagnosticContext) { + return ImmutableCbsRequest.builder() + .diagnosticContext(diagnosticContext) + .requestPath(serviceName -> "/service_component/" + serviceName) + .build(); + } + + /** + * <p>A get-by-key request.</p> + * + * <p>This will call an endpoint that fetches a generic service_component_name:key out of Consul</p> + * + * @param diagnosticContext logging diagnostic context (MDC) + * @return the CbsRequest ready to be used when calling {@link CbsClient} + */ + public static @NotNull CbsRequest getByKey( + RequestDiagnosticContext diagnosticContext, + String key) { + return ImmutableCbsRequest.builder() + .diagnosticContext(diagnosticContext) + .requestPath(serviceName -> "/" + key + "/" + serviceName) + .build(); + } + + /** + * <p>A get-all request.</p> + * + * <p>Will bind the configuration for given service and return the bound configuration, policies, and any other + * keys that are in Consul</p> + * + * @param diagnosticContext logging diagnostic context (MDC) + * @return the CbsRequest ready to be used when calling {@link CbsClient} + */ + public static @NotNull CbsRequest getAll(RequestDiagnosticContext diagnosticContext) { + return ImmutableCbsRequest.builder() + .diagnosticContext(diagnosticContext) + .requestPath(serviceName -> "/service_component_all/" + serviceName) + .build(); + } + +} diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/exceptions/StreamParsingException.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/exceptions/StreamParsingException.java index aa8c2ed7..ca531e82 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/exceptions/StreamParsingException.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/exceptions/StreamParsingException.java @@ -36,6 +36,10 @@ public class StreamParsingException extends CbsClientException { this.cause = cause; } + public StreamParsingException(String message) { + this(new StreamParserError(message)); + } + public StreamParserError cause() { return cause; } diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/DataStreams.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/DataStreams.java index 4fdb31b1..648b7a61 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/DataStreams.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/DataStreams.java @@ -29,6 +29,38 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.Dat import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream; /** + * Extract streams from the application configuration represented as GSON JsonObject. + * + * Example input: + * <pre> + * { + * "application_config_1": "value_1, + * ... + * "streams_publishes": { + * "stream1": { + * "type": "message_router", + * "dmaap_info": { + * ... + * } + * }, + * "stream2": { + * "type": "data_router", + * "dmaap_info": { + * ... + * } + * } + * }, + * "streams_subscribes": { + * "stream3": { + * "type": "message_router", + * "dmaap_info": { + * ... + * } + * }, + * } + * } + * </pre> + * * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> * @since 1.1.4 */ @@ -38,10 +70,78 @@ public final class DataStreams { private DataStreams() { } + /** + * <p> + * Extracts sources from application configuration. Parses <em>streams_subscribes</em> subtree. + * </p> + * + * <p> + * For sample input it will yield: + * </p> + * + * <pre> + * [ + * RawDataStream{ + * name="stream3" + * type="message_router" + * direction=SOURCE + * descriptor=JsonObject{ + * type: "message_router", + * dmaap_info: { + * ... + * } + * } + * } + * ] + * </pre> + * + * @param rootJson - the full application configuration + * @return io.vavr.collection.Stream of data streams + */ public static Stream<RawDataStream<JsonObject>> namedSources(JsonObject rootJson) { return createCollectionOfStreams(rootJson, DataStreamDirection.SOURCE); } + + /** + * <p> + * Extracts sinks from application configuration. Parses <em>streams_publishes</em> subtree. + * </p> + * + * <p> + * For sample input it will yield: + * </p> + * + * <pre> + * [ + * RawDataStream{ + * name="stream1" + * type="message_router" + * direction=SINK + * descriptor=JsonObject{ + * type: "message_router", + * dmaap_info: { + * ... + * } + * } + * }, + * RawDataStream{ + * name="stream2" + * type="data_router" + * direction=SINK + * descriptor=JsonObject{ + * type: "data_router" + * dmaap_info: { + * ... + * } + * } + * } + * ] + * </pre> + * + * @param rootJson - the full application configuration + * @return io.vavr.collection.Stream of data streams + */ public static Stream<RawDataStream<JsonObject>> namedSinks(JsonObject rootJson) { return createCollectionOfStreams(rootJson, DataStreamDirection.SINK); } diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParser.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParser.java index 460d7100..a8ce3644 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParser.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParser.java @@ -25,6 +25,8 @@ import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStream; /** + * Represents parser taking GSON JsonObject as an input + * * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> * @since 1.1.4 */ diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParsers.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParsers.java index 7ae92baf..7476e976 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParsers.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/streams/StreamFromGsonParsers.java @@ -29,6 +29,8 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.*; /** + * Factory methods for GSON-based {@code StreamParser}s + * * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> * @since 1.1.4 */ @@ -37,26 +39,56 @@ public final class StreamFromGsonParsers { private StreamFromGsonParsers() { } + /** + * Creates a stream parser capable of parsing Kafka sinks. + * + * @return a stream parser + */ public static StreamFromGsonParser<KafkaSink> kafkaSinkParser() { return KafkaSinkParser.create(); } + /** + * Creates a stream parser capable of parsing Kafka sources. + * + * @return a stream parser + */ public static StreamFromGsonParser<KafkaSource> kafkaSourceParser() { return KafkaSourceParser.create(); } + /** + * Creates a stream parser capable of parsing DMaaP Message Router sinks. + * + * @return a stream parser + */ public static StreamFromGsonParser<MessageRouterSink> messageRouterSinkParser() { return MessageRouterSinkParser.create(); } + /** + * Creates a stream parser capable of parsing DMaaP Message Router sources. + * + * @return a stream parser + */ public static StreamFromGsonParser<MessageRouterSource> messageRouterSourceParser() { return MessageRouterSourceParser.create(); } + /** + * Creates a stream parser capable of parsing DMaaP Data Router sinks. + * + * @return a stream parser + */ public static StreamFromGsonParser<DataRouterSink> dataRouterSinkParser() { return DataRouterSinkParser.create(); } + /** + * Creates a stream parser capable of parsing DMaaP Data Router sources. + * + * @return a stream parser + */ public static StreamFromGsonParser<DataRouterSource> dataRouterSourceParser() { return DataRouterSourceParser.create(); } diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImpl.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImpl.java index 72c1b267..98f3cc97 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImpl.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImpl.java @@ -28,45 +28,57 @@ import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpMethod; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.ImmutableHttpRequest; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClient; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient; -import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsRequest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import reactor.core.publisher.Mono; public class CbsClientImpl implements CbsClient { + private static final Logger LOGGER = LoggerFactory.getLogger(CbsClientImpl.class); private final RxHttpClient httpClient; - private final String fetchUrl; + private final String serviceName; + private final InetSocketAddress cbsAddress; - CbsClientImpl(RxHttpClient httpClient, URL fetchUrl) { + public CbsClientImpl(RxHttpClient httpClient, String serviceName, InetSocketAddress cbsAddress) { this.httpClient = httpClient; - this.fetchUrl = fetchUrl.toString(); + this.serviceName = serviceName; + this.cbsAddress = cbsAddress; } - public static CbsClientImpl create(RxHttpClient httpClient, InetSocketAddress cbsAddress, String serviceName) { - return new CbsClientImpl(httpClient, constructUrl(cbsAddress, serviceName)); + @Override + public @NotNull Mono<JsonObject> get(CbsRequest request) { + return Mono.fromCallable(() -> constructUrl(request).toString()) + .doOnNext(this::logRequestUrl) + .map(url -> ImmutableHttpRequest.builder() + .method(HttpMethod.GET) + .url(url) + .diagnosticContext(request.diagnosticContext()) + .build()) + .flatMap(httpClient::call) + .map(resp -> resp.bodyAsJson(JsonObject.class)) + .doOnNext(this::logCbsResponse); } - private static URL constructUrl(InetSocketAddress cbsAddress, String serviceName) { + + private URL constructUrl(CbsRequest request) { try { return new URL( "http", cbsAddress.getHostString(), cbsAddress.getPort(), - "/service_component/" + serviceName); + request.requestPath().getForService(serviceName)); } catch (MalformedURLException e) { throw new IllegalArgumentException("Invalid CBS URL", e); } } - @Override - public @NotNull Mono<JsonObject> get(RequestDiagnosticContext diagnosticContext) { - return Mono.defer(() -> { - final ImmutableHttpRequest request = ImmutableHttpRequest.builder() - .method(HttpMethod.GET) - .url(fetchUrl) - .diagnosticContext(diagnosticContext) - .build(); - return httpClient.call(request) - .map(resp -> resp.bodyAsJson(JsonObject.class)); - }); + private void logRequestUrl(String url) { + LOGGER.debug("Calling {} for configuration", url); + } + + private void logCbsResponse(JsonObject json) { + LOGGER.info("Got successful response from Config Binding Service"); + LOGGER.debug("CBS response: {}", json); } } diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsLookup.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsLookup.java index 3d528c33..99058772 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsLookup.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsLookup.java @@ -21,6 +21,7 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl; import com.google.gson.JsonArray; +import com.google.gson.JsonElement; import com.google.gson.JsonObject; import java.net.InetSocketAddress; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpMethod; @@ -29,6 +30,8 @@ import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.ImmutableHttpR import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClient; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.ServiceLookupException; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import reactor.core.publisher.Mono; /** @@ -37,6 +40,7 @@ import reactor.core.publisher.Mono; */ public class CbsLookup { + private static final Logger LOGGER = LoggerFactory.getLogger(CbsLookup.class); private static final String CONSUL_JSON_SERVICE_ADDRESS = "ServiceAddress"; private static final String CONSUL_JSON_SERVICE_PORT = "ServicePort"; private final RxHttpClient httpClient; @@ -47,15 +51,22 @@ public class CbsLookup { public Mono<InetSocketAddress> lookup(EnvProperties env) { return Mono.fromCallable(() -> createConsulUrl(env)) + .doOnNext(this::logConsulRequestUrl) .flatMap(this::fetchHttpData) + .doOnNext(this::logConsulResponse) .flatMap(this::firstService) - .map(this::parseServiceEntry); + .map(this::parseServiceEntry) + .doOnNext(this::logCbsServiceAddress); } private String createConsulUrl(EnvProperties env) { return String.format("http://%s:%s/v1/catalog/service/%s", env.consulHost(), env.consulPort(), env.cbsName()); } + private void logConsulRequestUrl(String consulUrl) { + LOGGER.debug("Calling Consul for CBS address. consulUrl={}", consulUrl); + } + private Mono<JsonArray> fetchHttpData(String consulUrl) { return httpClient.call( ImmutableHttpRequest.builder() @@ -66,6 +77,10 @@ public class CbsLookup { .map(resp -> resp.bodyAsJson(JsonArray.class)); } + private void logConsulResponse(JsonArray consulResponse) { + LOGGER.debug("Consul response with CBS service list. Will use 1st one. response={}", consulResponse); + } + private Mono<JsonObject> firstService(JsonArray services) { return services.size() == 0 ? Mono.error(new ServiceLookupException("Consul server did not return any service with given name")) @@ -78,4 +93,8 @@ public class CbsLookup { service.get(CONSUL_JSON_SERVICE_PORT).getAsInt()); } + private void logCbsServiceAddress(InetSocketAddress address) { + LOGGER.info("Config Binding Service address: {}", address); + } + } diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataStreamUtils.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataStreamUtils.java index d34b1440..1148574e 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataStreamUtils.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/DataStreamUtils.java @@ -24,6 +24,7 @@ import com.google.gson.JsonElement; import com.google.gson.JsonObject; import io.vavr.collection.Stream; import java.io.IOException; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParsingException; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.DataStreamDirection; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableRawDataStream; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream; @@ -48,11 +49,11 @@ public final class DataStreamUtils { String expectedType, DataStreamDirection expectedDirection) { if (!json.type().equals(expectedType)) { - throw new IllegalArgumentException( + throw new StreamParsingException( "Invalid stream type. Expected '" + expectedType + "', but was '" + json.type() + "'"); } if (json.direction() != expectedDirection) { - throw new IllegalArgumentException( + throw new StreamParsingException( "Invalid stream direction. Expected '" + expectedDirection + "', but was '" + json.direction() + "'"); } diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonUtils.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonUtils.java index 0b662286..0fdec5d8 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonUtils.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/streams/gson/GsonUtils.java @@ -32,6 +32,7 @@ import java.io.InputStreamReader; import java.io.Reader; import java.util.Map.Entry; import java.util.stream.Collectors; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParsingException; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.dmaap.mr.GsonAdaptersMessageRouterDmaapInfo; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.streams.gson.kafka.GsonAdaptersKafkaInfo; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.GsonAdaptersAafCredentials; @@ -71,7 +72,7 @@ public final class GsonUtils { public static JsonElement requiredChild(JsonObject parent, String childName) { return optionalChild(parent, childName) - .getOrElseThrow(() -> new IllegalArgumentException( + .getOrElseThrow(() -> new StreamParsingException( "Could not find sub-node '" + childName + "'. Actual sub-nodes: " + stringifyChildrenNames(parent))); diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/CbsRequest.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/CbsRequest.java new file mode 100644 index 00000000..0a319666 --- /dev/null +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/CbsRequest.java @@ -0,0 +1,57 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model; + +import java.util.UUID; +import org.immutables.value.Value; +import org.onap.dcaegen2.services.sdk.rest.services.model.logging.ImmutableRequestDiagnosticContext; +import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext; + +/** + * A recipe on which CBS endpoint to call. Usually you should use {@link org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests} + * which is a factory to each request type. + * + * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> + * @since 1.1.4 + */ +@Value.Immutable +public interface CbsRequest { + + /** + * The CBS request path. It will be created by the library. + */ + RequestPath requestPath(); + + /** + * Diagnostic context as defined in Logging Guideline + */ + RequestDiagnosticContext diagnosticContext(); + + /** + * Return a view on this CbsRequest with updated InvocationID. + */ + default CbsRequest withNewInvocationId() { + final RequestDiagnosticContext newDiagnosticCtx = ImmutableRequestDiagnosticContext + .copyOf(diagnosticContext()) + .withInvocationId(UUID.randomUUID()); + return ImmutableCbsRequest.copyOf(this).withDiagnosticContext(newDiagnosticCtx); + } +} diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/RequestPath.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/RequestPath.java new file mode 100644 index 00000000..97d4b4ea --- /dev/null +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/RequestPath.java @@ -0,0 +1,37 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model; + +import io.vavr.Function1; + +/** + * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> + * @since March 2019 + */ +@FunctionalInterface +public interface RequestPath extends Function1<String, String> { + String getForService(String serviceName); + + @Override + default String apply(String serviceName) { + return getForService(serviceName); + } +} diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/AafCredentials.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/AafCredentials.java index c3c70b78..9fa83bcb 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/AafCredentials.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/AafCredentials.java @@ -28,8 +28,10 @@ import org.jetbrains.annotations.Nullable; import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi; /** + * Represents the AAF Credentials. Currently it contains only user name and password. + * * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> - * @version 1.2.1 + * @since 1.1.4 */ @ExperimentalApi @Value.Immutable diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/DataStream.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/DataStream.java index 37bf7e57..1950a304 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/DataStream.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/DataStream.java @@ -24,8 +24,10 @@ import org.immutables.value.Value; import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi; /** + * Represents a named data stream. + * * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> - * @version 1.2.1 + * @since 1.1.4 */ @ExperimentalApi public interface DataStream { diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/DataStreamDirection.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/DataStreamDirection.java index f3cac547..3d05c9a9 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/DataStreamDirection.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/DataStreamDirection.java @@ -21,8 +21,10 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams; /** + * The direction of the stream, ie. whether it's input ({@code SOURCE}) or output ({@code SINK}) stream. + * * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> - * @since March 2019 + * @since 1.1.4 */ public enum DataStreamDirection { @@ -35,6 +37,11 @@ public enum DataStreamDirection { this.configurationKey = configurationKey; } + /** + * The configuration key under which the single stream definitions should reside. + * + * @return the configuration key + */ public String configurationKey() { return configurationKey; } diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/RawDataStream.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/RawDataStream.java index 7a39ede5..d6bc8000 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/RawDataStream.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/RawDataStream.java @@ -23,8 +23,11 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams; import org.immutables.value.Value; /** + * Represents a raw/uninterpreted data stream. + * * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> - * @since March 2019 + * @since 1.1.4 + * @param <T> type of raw data, eg. JsonObject */ @Value.Immutable public interface RawDataStream<T> { diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/SinkStream.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/SinkStream.java index e3389207..7002fd68 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/SinkStream.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/SinkStream.java @@ -23,10 +23,11 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams; import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi; /** - * AKA PublishStream + * Represents an output stream, ie. one of objects in <em>streams_publishes</em> array from application configuration. + * Application can put data to this stream. * * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> - * @version 1.2.1 + * @since 1.1.4 */ @ExperimentalApi public interface SinkStream extends DataStream { diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/SourceStream.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/SourceStream.java index 2bea143b..c5ab8a34 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/SourceStream.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/SourceStream.java @@ -23,10 +23,11 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams; import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi; /** - * AKA SubscribeStream + * Represents an input stream, ie. one of objects in <em>streams_subscribes</em> array from application configuration. + * Application can read data from this stream. * * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> - * @version 1.2.1 + * @since 1.1.4 */ @ExperimentalApi public interface SourceStream extends DataStream { diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/DataRouter.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/DataRouter.java index b4b5549e..072d4b0b 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/DataRouter.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/DataRouter.java @@ -17,7 +17,6 @@ * limitations under the License. * ============LICENSE_END===================================== */ - package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap; @@ -27,17 +26,34 @@ import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi; /** * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> - * @version 1.2.1 + * @since 1.1.4 */ @ExperimentalApi public interface DataRouter { + /** + * DCAE location for the publisher, used to set up routing. + */ @SerializedName("location") @Nullable String location(); + /** + * Username + * <ul> + * <li>Data Router uses to authenticate to the subscriber when delivering files OR</li> + * <li>the publisher uses to authenticate to Data Router.</li> + * </ul> + */ @SerializedName("username") @Nullable String username(); + /** + * Password + * <ul> + * <li>Data Router uses to authenticate to the subscriber when delivering files OR</li> + * <li>the publisher uses to authenticate to Data Router.</li> + * </ul> + */ @SerializedName("password") @Nullable String password(); } diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/DataRouterSink.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/DataRouterSink.java index 60d91009..baddb91e 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/DataRouterSink.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/DataRouterSink.java @@ -17,7 +17,6 @@ * limitations under the License. * ============LICENSE_END===================================== */ - package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap; @@ -30,19 +29,28 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.Sin /** * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> - * @version 1.2.1 + * @since 1.1.4 */ @Gson.TypeAdapters @ExperimentalApi @Value.Immutable public interface DataRouterSink extends DataRouter, SinkStream { + /** + * URL to which the publisher makes Data Router publish requests. + */ @SerializedName("publish_url") String publishUrl(); + /** + * Publisher id in Data Router + */ @SerializedName("publisher_id") @Nullable String publisherId(); + /** + * URL from which log data for the feed can be obtained. + */ @SerializedName("log_url") @Nullable String logUrl(); diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/DataRouterSource.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/DataRouterSource.java index b6dedb7a..d089a403 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/DataRouterSource.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/DataRouterSource.java @@ -17,7 +17,6 @@ * limitations under the License. * ============LICENSE_END===================================== */ - package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap; @@ -30,17 +29,23 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.Sou /** * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> - * @version 1.2.1 + * @since 1.1.4 */ @Gson.TypeAdapters @ExperimentalApi @Value.Immutable public interface DataRouterSource extends DataRouter, SourceStream { - // in future, since crucial need to be verified if could be nullable + /** + * URL to which the Data Router should deliver files. + */ + // TODO: since crucial, we need to verify if it should be non-null @SerializedName("delivery_url") @Nullable String deliveryUrl(); + /** + * Subscriber id in Data Router. + */ @SerializedName("subscriber_id") @Nullable String subscriberId(); } diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/Kafka.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/Kafka.java index 1810fc6c..42558cbf 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/Kafka.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/Kafka.java @@ -17,7 +17,6 @@ * limitations under the License. * ============LICENSE_END===================================== */ - package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap; import static io.vavr.Predicates.not; @@ -35,21 +34,44 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.Aaf @ExperimentalApi public interface Kafka { + /** + * Kafka bootstrap servers as defined in Kafka client documentation under <em>bootstrap.servers</em> configuration + * key. + */ String bootstrapServers(); + /** + * The name of the topic where application should publish or subscribe for the messages. + */ String topicName(); + /** + * The credentials to use when authenticating to Kafka cluster or null when connection should be unauthenticated. + */ @Nullable AafCredentials aafCredentials(); + /** + * AAF client role that’s requesting publish or subscribe access to the topic. + */ @Nullable String clientRole(); + /** + * Client id for given AAF client. + */ @Nullable String clientId(); + /** + * The limit on the size of message published to/subscribed from the topic. Can be used to set Kafka client + * <em>max.request.size</em> configuration property. + */ @Value.Default default int maxPayloadSizeBytes() { return 1024 * 1024; } + /** + * The {@code bootstrapServers} converted to the list of servers' addresses. + */ @Value.Derived default List<String> bootstrapServerList() { return List.of(bootstrapServers().split(",")).filter(not(String::isEmpty)); diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/KafkaSink.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/KafkaSink.java index 09322e4a..bd6ab1ca 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/KafkaSink.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/KafkaSink.java @@ -17,7 +17,6 @@ * limitations under the License. * ============LICENSE_END===================================== */ - package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap; import org.immutables.value.Value; diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/KafkaSource.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/KafkaSource.java index 65280a98..78f5c3af 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/KafkaSource.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/KafkaSource.java @@ -17,7 +17,6 @@ * limitations under the License. * ============LICENSE_END===================================== */ - package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap; import org.immutables.value.Value; @@ -33,5 +32,9 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.Sou @Value.Immutable public interface KafkaSource extends Kafka, SourceStream { + /** + * A unique string that identifies the consumer group this consumer belongs to as defined in Kafka consumer + * configuration key <em>group.id</em>. + */ @Nullable String consumerGroupId(); } diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/MessageRouter.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/MessageRouter.java index 7ed720d0..3cca5134 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/MessageRouter.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/MessageRouter.java @@ -17,10 +17,8 @@ * limitations under the License. * ============LICENSE_END===================================== */ - package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap; - import com.google.gson.annotations.SerializedName; import org.jetbrains.annotations.Nullable; import org.onap.dcaegen2.services.sdk.rest.services.annotations.ExperimentalApi; @@ -28,23 +26,38 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.Aaf /** * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> - * @version 1.2.1 + * @since 1.1.4 */ @ExperimentalApi public interface MessageRouter { + /** + * URL for accessing the topic to publish or receive events. + */ @SerializedName("topic_url") String topicUrl(); + /** + * AAF client role that’s requesting publish or subscribe access to the topic. + */ @SerializedName("client_role") @Nullable String clientRole(); + /** + * Client id for given AAF client. + */ @SerializedName("client_id") @Nullable String clientId(); + /** + * DCAE location for the publisher or subscriber, used to set up routing. + */ @SerializedName("location") @Nullable String location(); + /** + * The AAF credentials. + */ @SerializedName("aaf_credentials") @Nullable AafCredentials aafCredentials(); } diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/MessageRouterSink.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/MessageRouterSink.java index 8f4a5339..3af79638 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/MessageRouterSink.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/MessageRouterSink.java @@ -17,7 +17,6 @@ * limitations under the License. * ============LICENSE_END===================================== */ - package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap; import org.immutables.value.Value; @@ -26,7 +25,7 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.Sin /** * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> - * @version 1.2.1 + * @since 1.1.4 */ @ExperimentalApi @Value.Immutable diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/MessageRouterSource.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/MessageRouterSource.java index ab802034..c7159f26 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/MessageRouterSource.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/streams/dmaap/MessageRouterSource.java @@ -17,7 +17,6 @@ * limitations under the License. * ============LICENSE_END===================================== */ - package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap; import org.immutables.value.Value; @@ -26,7 +25,7 @@ import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.Sou /** * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> - * @version 1.2.1 + * @since 1.1.4 */ @ExperimentalApi @Value.Immutable diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/CbsRequestsTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/CbsRequestsTest.java new file mode 100644 index 00000000..50233d3c --- /dev/null +++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/CbsRequestsTest.java @@ -0,0 +1,75 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api; + +import static org.assertj.core.api.Assertions.assertThat; + +import org.junit.jupiter.api.Test; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsRequest; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.ImmutableEnvProperties; +import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext; + +/** + * @author <a href="mailto:piotr.jaszczyk@nokia.com">Piotr Jaszczyk</a> + * @since March 2019 + */ +class CbsRequestsTest { + + private final RequestDiagnosticContext diagCtx = RequestDiagnosticContext.create(); + private final String serviceName = "srv-name"; + + @Test + void getConfiguration() { + // given + final CbsRequest cut = CbsRequests.getConfiguration(diagCtx); + + // when + final String result = cut.requestPath().getForService(serviceName); + + // then + assertThat(result).isEqualTo("/service_component/srv-name"); + } + + @Test + void getByKey() { + // given + final CbsRequest cut = CbsRequests.getByKey(diagCtx, "configKey"); + + // when + final String result = cut.requestPath().getForService(serviceName); + + // then + assertThat(result).isEqualTo("/configKey/srv-name"); + } + + @Test + void getAll() { + // given + final CbsRequest cut = CbsRequests.getAll(diagCtx); + + // when + final String result = cut.requestPath().getForService(serviceName); + + // then + assertThat(result).isEqualTo("/service_component_all/srv-name"); + } +}
\ No newline at end of file diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplIT.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplIT.java index 58e1e6cb..a51b87aa 100644 --- a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplIT.java +++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplIT.java @@ -34,9 +34,12 @@ import org.junit.jupiter.api.Test; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.test.DummyHttpServer; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.StreamParsingException; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.DataStreams; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsRequest; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.ImmutableEnvProperties; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.RawDataStream; @@ -61,7 +64,9 @@ class CbsClientImplIT { + " \"ServicePort\": PORT\n" + " }\n" + "]\n"; - private static final String SAMPLE_CONFIG = "/sample_config.json"; + private static final String SAMPLE_CONFIG = "/sample_service_config.json"; + private static final String SAMPLE_ALL = "/sample_all.json"; + private static final String SAMPLE_KEY = "/sample_key.json"; private static final String SAMPLE_CONFIG_KEY = "keystore.path"; private static final String EXPECTED_CONFIG_VALUE = "/var/run/security/keystore.p12"; private static EnvProperties sampleEnvironment; @@ -71,7 +76,10 @@ class CbsClientImplIT { static void setUp() { server = DummyHttpServer.start(routes -> routes.get("/v1/catalog/service/the_cbs", (req, resp) -> sendString(resp, lazyConsulResponse())) - .get("/service_component/dcae-component", (req, resp) -> sendResource(resp, SAMPLE_CONFIG))); + .get("/service_component/dcae-component", (req, resp) -> sendResource(resp, SAMPLE_CONFIG)) + .get("/service_component_all/dcae-component", (req, resp) -> sendResource(resp, SAMPLE_ALL)) + .get("/sampleKey/dcae-component", (req, resp) -> sendResource(resp, SAMPLE_KEY)) + ); sampleEnvironment = ImmutableEnvProperties.builder() .appName("dcae-component") .cbsName("the_cbs") @@ -89,10 +97,10 @@ class CbsClientImplIT { void testCbsClientWithSingleCall() { // given final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleEnvironment); - final RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create(); + final CbsRequest request = CbsRequests.getConfiguration(RequestDiagnosticContext.create()); // when - final Mono<JsonObject> result = sut.flatMap(cbsClient -> cbsClient.get(diagnosticContext)); + final Mono<JsonObject> result = sut.flatMap(cbsClient -> cbsClient.get(request)); // then StepVerifier.create(result.map(this::sampleConfigValue)) @@ -105,11 +113,11 @@ class CbsClientImplIT { void testCbsClientWithPeriodicCall() { // given final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleEnvironment); - final RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create(); + final CbsRequest request = CbsRequests.getConfiguration(RequestDiagnosticContext.create()); // when final Flux<JsonObject> result = sut - .flatMapMany(cbsClient -> cbsClient.get(diagnosticContext, Duration.ZERO, Duration.ofMillis(10))); + .flatMapMany(cbsClient -> cbsClient.get(request, Duration.ZERO, Duration.ofMillis(10))); // then final int itemsToTake = 5; @@ -123,12 +131,12 @@ class CbsClientImplIT { void testCbsClientWithUpdatesCall() { // given final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleEnvironment); - final RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create(); + final CbsRequest request = CbsRequests.getConfiguration(RequestDiagnosticContext.create()); final Duration period = Duration.ofMillis(10); // when final Flux<JsonObject> result = sut - .flatMapMany(cbsClient -> cbsClient.updates(diagnosticContext, Duration.ZERO, period)); + .flatMapMany(cbsClient -> cbsClient.updates(request, Duration.ZERO, period)); // then final Duration timeToCollectItemsFor = period.multipliedBy(50); @@ -143,10 +151,10 @@ class CbsClientImplIT { // given final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleEnvironment); final StreamFromGsonParser<KafkaSink> kafkaSinkParser = StreamFromGsonParsers.kafkaSinkParser(); - final RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create(); + final CbsRequest request = CbsRequests.getConfiguration(RequestDiagnosticContext.create()); // when - final Mono<KafkaSink> result = sut.flatMap(cbsClient -> cbsClient.get(diagnosticContext)) + final Mono<KafkaSink> result = sut.flatMap(cbsClient -> cbsClient.get(request)) .map(json -> DataStreams.namedSinks(json).map(kafkaSinkParser::unsafeParse).head() ); @@ -166,13 +174,13 @@ class CbsClientImplIT { void testCbsClientWithStreamsParsingUsingSwitch() { // given final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleEnvironment); - final RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create(); + final CbsRequest request = CbsRequests.getConfiguration(RequestDiagnosticContext.create()); // TODO: Use these parsers below final StreamFromGsonParser<KafkaSink> kafkaSinkParser = StreamFromGsonParsers.kafkaSinkParser(); final StreamFromGsonParser<MessageRouterSink> mrSinkParser = StreamFromGsonParsers.messageRouterSinkParser(); // when - final Mono<Void> result = sut.flatMap(cbsClient -> cbsClient.get(diagnosticContext)) + final Mono<Void> result = sut.flatMap(cbsClient -> cbsClient.get(request)) .map(json -> { final Map<String, Stream<RawDataStream<JsonObject>>> sinks = DataStreams.namedSinks(json) .groupBy(RawDataStream::type); @@ -204,10 +212,10 @@ class CbsClientImplIT { // given final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleEnvironment); final StreamFromGsonParser<KafkaSource> kafkaSourceParser = StreamFromGsonParsers.kafkaSourceParser(); - final RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create(); + final CbsRequest request = CbsRequests.getConfiguration(RequestDiagnosticContext.create()); // when - final Mono<KafkaSource> result = sut.flatMap(cbsClient -> cbsClient.get(diagnosticContext)) + final Mono<KafkaSource> result = sut.flatMap(cbsClient -> cbsClient.get(request)) .map(json -> DataStreams.namedSources(json).map(kafkaSourceParser::unsafeParse).head() ); @@ -215,7 +223,7 @@ class CbsClientImplIT { // then StepVerifier.create(result) .expectErrorSatisfies(ex -> { - assertThat(ex).isInstanceOf(IllegalArgumentException.class); + assertThat(ex).isInstanceOf(StreamParsingException.class); assertThat(ex).hasMessageContaining("Invalid stream type"); assertThat(ex).hasMessageContaining("message_router"); assertThat(ex).hasMessageContaining("kafka"); @@ -223,6 +231,46 @@ class CbsClientImplIT { .verify(Duration.ofSeconds(5)); } + @Test + void testCbsClientWithSingleAllRequest() { + // given + final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleEnvironment); + final CbsRequest request = CbsRequests.getAll(RequestDiagnosticContext.create()); + + // when + final Mono<JsonObject> result = sut.flatMap(cbsClient -> cbsClient.get(request)); + + // then + StepVerifier.create(result) + .assertNext(json -> { + assertThat(json.get("config")).isNotNull(); + assertThat(json.get("policies")).isNotNull(); + assertThat(json.get("sampleKey")).isNotNull(); + }) + .expectComplete() + .verify(Duration.ofSeconds(5)); + } + + + @Test + void testCbsClientWithSingleKeyRequest() { + // given + final Mono<CbsClient> sut = CbsClientFactory.createCbsClient(sampleEnvironment); + final CbsRequest request = CbsRequests.getByKey(RequestDiagnosticContext.create(), "sampleKey"); + + // when + final Mono<JsonObject> result = sut.flatMap(cbsClient -> cbsClient.get(request)); + + // then + StepVerifier.create(result) + .assertNext(json -> { + assertThat(json.get("key")).isNotNull(); + assertThat(json.get("key").getAsString()).isEqualTo("value"); + }) + .expectComplete() + .verify(Duration.ofSeconds(5)); + } + private String sampleConfigValue(JsonObject obj) { return obj.get(SAMPLE_CONFIG_KEY).getAsString(); } diff --git a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplTest.java b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplTest.java index 339b1efa..78b79f9d 100644 --- a/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplTest.java +++ b/rest-services/cbs-client/src/test/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImplTest.java @@ -29,13 +29,14 @@ import static org.mockito.Mockito.verify; import com.google.gson.JsonObject; import java.net.InetSocketAddress; import org.junit.jupiter.api.Test; -import org.mockito.Mockito; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpMethod; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpRequest; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.ImmutableHttpRequest; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.ImmutableHttpResponse; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClient; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests; import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext; import reactor.core.publisher.Mono; @@ -51,7 +52,7 @@ class CbsClientImplTest { // given InetSocketAddress cbsAddress = InetSocketAddress.createUnresolved("cbshost", 6969); String serviceName = "dcaegen2-ves-collector"; - final CbsClientImpl cut = CbsClientImpl.create(httpClient, cbsAddress, serviceName); + final CbsClient cut = new CbsClientImpl(httpClient, serviceName, cbsAddress); final HttpResponse httpResponse = ImmutableHttpResponse.builder() .url("http://xxx") .statusCode(200) @@ -61,7 +62,7 @@ class CbsClientImplTest { RequestDiagnosticContext diagnosticContext = RequestDiagnosticContext.create(); // when - final JsonObject result = cut.get(diagnosticContext).block(); + final JsonObject result = cut.get(CbsRequests.getConfiguration(diagnosticContext)).block(); // then final String expectedUrl = "http://cbshost:6969/service_component/dcaegen2-ves-collector"; diff --git a/rest-services/cbs-client/src/test/resources/sample_all.json b/rest-services/cbs-client/src/test/resources/sample_all.json new file mode 100644 index 00000000..ac4ebf29 --- /dev/null +++ b/rest-services/cbs-client/src/test/resources/sample_all.json @@ -0,0 +1,41 @@ +{ + "config": { + "keystore.path": "/var/run/security/keystore.p12", + "streams_publishes": { + "perf3gpp": { + "type": "kafka", + "kafka_info": { + "bootstrap_servers": "dmaap-mr-kafka:6060", + "topic_name": "HVVES_PERF3GPP" + } + }, + "pnf_ready": { + "type": "message_router", + "dmaap_info": { + "topic_url": "http://message-router:3904/events/VES_PNF_READY" + } + }, + "call_trace": { + "type": "kafka", + "kafka_info": { + "bootstrap_servers": "dmaap-mr-kafka:6060", + "topic_name": "HVVES_TRACE" + } + } + }, + "streams_subscribes": { + "measurements": { + "type": "message_router", + "dmaap_info": { + "topic_url": "http://message-router:3904/events/VES_MEASUREMENT" + } + } + } + }, + "policies": { + "samplePolicy": "sample value" + }, + "sampleKey": { + "key": "value" + } +}
\ No newline at end of file diff --git a/rest-services/cbs-client/src/test/resources/sample_key.json b/rest-services/cbs-client/src/test/resources/sample_key.json new file mode 100644 index 00000000..21da3b26 --- /dev/null +++ b/rest-services/cbs-client/src/test/resources/sample_key.json @@ -0,0 +1,3 @@ +{ + "key": "value" +} diff --git a/rest-services/cbs-client/src/test/resources/sample_config.json b/rest-services/cbs-client/src/test/resources/sample_service_config.json index 266326f4..266326f4 100644 --- a/rest-services/cbs-client/src/test/resources/sample_config.json +++ b/rest-services/cbs-client/src/test/resources/sample_service_config.json |