From 9b5df83176a2e2bf20442285b01ba4cd710fb7b8 Mon Sep 17 00:00:00 2001 From: Piotr Jaszczyk Date: Thu, 21 Mar 2019 14:45:50 +0100 Subject: Support other CBS endpoints Change-Id: I227a8edf6da8398ca58c47e864985dac47c5dfcd Issue-ID: DCAEGEN2-1363 Signed-off-by: Piotr Jaszczyk --- .../rest/services/cbs/client/api/CbsClient.java | 27 +++---- .../services/cbs/client/api/CbsClientFactory.java | 2 +- .../rest/services/cbs/client/api/CbsRequests.java | 85 ++++++++++++++++++++++ .../services/cbs/client/impl/CbsClientImpl.java | 50 ++++++++----- .../rest/services/cbs/client/impl/CbsLookup.java | 21 +++++- .../rest/services/cbs/client/model/CbsRequest.java | 57 +++++++++++++++ .../services/cbs/client/model/RequestPath.java | 37 ++++++++++ 7 files changed, 243 insertions(+), 36 deletions(-) create mode 100644 rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/CbsRequests.java create mode 100644 rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/CbsRequest.java create mode 100644 rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/RequestPath.java (limited to 'rest-services/cbs-client/src/main/java') diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/CbsClient.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/CbsClient.java index 3ee12eed..d6a5700a 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/CbsClient.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/CbsClient.java @@ -21,12 +21,10 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api; import com.google.gson.JsonObject; import java.time.Duration; -import java.util.UUID; import java.util.function.BiPredicate; import java.util.function.Function; import org.jetbrains.annotations.NotNull; -import org.onap.dcaegen2.services.sdk.rest.services.model.logging.ImmutableRequestDiagnosticContext; -import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsRequest; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -46,28 +44,27 @@ public interface CbsClient { *

* Returns a {@link Mono} that publishes new configuration after CBS client retrieves one. * + * @param request the CBS Request to be performed (can be obtained from {@link CbsRequests}) * @return reactive stream of configuration - * @param diagnosticContext diagnostic context as defined in Logging Guideline - * @since 1.1.2 */ - @NotNull Mono get(RequestDiagnosticContext diagnosticContext); + @NotNull Mono get(CbsRequest request); /** *

* Poll for configuration. * *

- * Will call {@link #get(RequestDiagnosticContext)} after {@code initialDelay} every {@code period}. Resulting entries may or may not be + * Will call {@link #get(CbsRequest)} after {@code initialDelay} every {@code period}. Resulting entries may or may not be * changed, ie. items in the stream might be the same until change is made in CBS. * - * @param diagnosticContext diagnostic context as defined in Logging Guideline + * @param request the CBS Request to be performed (can be obtained from {@link CbsRequests}) * @param initialDelay delay after first request attempt * @param period frequency of update checks * @return stream of configuration states */ - default Flux get(RequestDiagnosticContext diagnosticContext, Duration initialDelay, Duration period) { + default Flux get(CbsRequest request, Duration initialDelay, Duration period) { return Flux.interval(initialDelay, period) - .map(i -> ImmutableRequestDiagnosticContext.copyOf(diagnosticContext).withInvocationId(UUID.randomUUID())) + .map(i -> request.withNewInvocationId()) .flatMap(this::get); } @@ -76,7 +73,7 @@ public interface CbsClient { * Poll for configuration updates. * *

- * Will call {@link #get(RequestDiagnosticContext)} after {@code initialDelay} every {@code period}. Will emit an item + * Will call {@link #get(CbsRequest)} after {@code initialDelay} every {@code period}. Will emit an item * only when an update was detected, ie. when new item is different then last emitted item. * *

@@ -87,17 +84,17 @@ public interface CbsClient { * (experimental API) if you want to react differently to changes in subsets of the configuration. * *

  • - * Use {@link #get(RequestDiagnosticContext, Duration, Duration)} with + * Use {@link #get(CbsRequest, Duration, Duration)} with * {@link Flux#distinctUntilChanged(Function, BiPredicate)} if you want to specify custom comparison logic. *
  • * * - * @param diagnosticContext diagnostic context as defined in Logging Guideline + * @param request the CBS Request to be performed (can be obtained from {@link CbsRequests}) * @param initialDelay delay after first request attempt * @param period frequency of update checks * @return stream of configuration updates */ - default Flux updates(RequestDiagnosticContext diagnosticContext, Duration initialDelay, Duration period) { - return get(diagnosticContext, initialDelay, period).distinctUntilChanged(); + default Flux updates(CbsRequest request, Duration initialDelay, Duration period) { + return get(request, initialDelay, period).distinctUntilChanged(); } } diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/CbsClientFactory.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/CbsClientFactory.java index 379daf97..c11ed533 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/CbsClientFactory.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/CbsClientFactory.java @@ -56,7 +56,7 @@ public class CbsClientFactory { final RxHttpClient httpClient = RxHttpClient.create(); final CbsLookup lookup = new CbsLookup(httpClient); return lookup.lookup(env) - .map(addr -> CbsClientImpl.create(httpClient, addr, env.appName())); + .map(addr -> new CbsClientImpl(httpClient, env.appName(), addr)); }); } } diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/CbsRequests.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/CbsRequests.java new file mode 100644 index 00000000..3724338d --- /dev/null +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/api/CbsRequests.java @@ -0,0 +1,85 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api; + +import org.jetbrains.annotations.NotNull; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsRequest; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.ImmutableCbsRequest; +import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext; + +/** + * A factory to various of requests supported by Config Binding Service. + * + * @author Piotr Jaszczyk + * @since 1.1.4 + */ +public final class CbsRequests { + + /** + *

    A get-configuration request.

    + * + *

    Will bind the configuration for given service and return the bound configuration.

    + * + * @param diagnosticContext logging diagnostic context (MDC) + * @return the CbsRequest ready to be used when calling {@link CbsClient} + */ + public static @NotNull CbsRequest getConfiguration(RequestDiagnosticContext diagnosticContext) { + return ImmutableCbsRequest.builder() + .diagnosticContext(diagnosticContext) + .requestPath(serviceName -> "/service_component/" + serviceName) + .build(); + } + + /** + *

    A get-by-key request.

    + * + *

    This will call an endpoint that fetches a generic service_component_name:key out of Consul

    + * + * @param diagnosticContext logging diagnostic context (MDC) + * @return the CbsRequest ready to be used when calling {@link CbsClient} + */ + public static @NotNull CbsRequest getByKey( + RequestDiagnosticContext diagnosticContext, + String key) { + return ImmutableCbsRequest.builder() + .diagnosticContext(diagnosticContext) + .requestPath(serviceName -> "/" + key + "/" + serviceName) + .build(); + } + + /** + *

    A get-all request.

    + * + *

    Will bind the configuration for given service and return the bound configuration, policies, and any other + * keys that are in Consul

    + * + * @param diagnosticContext logging diagnostic context (MDC) + * @return the CbsRequest ready to be used when calling {@link CbsClient} + */ + public static @NotNull CbsRequest getAll(RequestDiagnosticContext diagnosticContext) { + return ImmutableCbsRequest.builder() + .diagnosticContext(diagnosticContext) + .requestPath(serviceName -> "/service_component_all/" + serviceName) + .build(); + } + +} diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImpl.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImpl.java index 72c1b267..98f3cc97 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImpl.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsClientImpl.java @@ -28,45 +28,57 @@ import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpMethod; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.ImmutableHttpRequest; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClient; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient; -import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsRequest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import reactor.core.publisher.Mono; public class CbsClientImpl implements CbsClient { + private static final Logger LOGGER = LoggerFactory.getLogger(CbsClientImpl.class); private final RxHttpClient httpClient; - private final String fetchUrl; + private final String serviceName; + private final InetSocketAddress cbsAddress; - CbsClientImpl(RxHttpClient httpClient, URL fetchUrl) { + public CbsClientImpl(RxHttpClient httpClient, String serviceName, InetSocketAddress cbsAddress) { this.httpClient = httpClient; - this.fetchUrl = fetchUrl.toString(); + this.serviceName = serviceName; + this.cbsAddress = cbsAddress; } - public static CbsClientImpl create(RxHttpClient httpClient, InetSocketAddress cbsAddress, String serviceName) { - return new CbsClientImpl(httpClient, constructUrl(cbsAddress, serviceName)); + @Override + public @NotNull Mono get(CbsRequest request) { + return Mono.fromCallable(() -> constructUrl(request).toString()) + .doOnNext(this::logRequestUrl) + .map(url -> ImmutableHttpRequest.builder() + .method(HttpMethod.GET) + .url(url) + .diagnosticContext(request.diagnosticContext()) + .build()) + .flatMap(httpClient::call) + .map(resp -> resp.bodyAsJson(JsonObject.class)) + .doOnNext(this::logCbsResponse); } - private static URL constructUrl(InetSocketAddress cbsAddress, String serviceName) { + + private URL constructUrl(CbsRequest request) { try { return new URL( "http", cbsAddress.getHostString(), cbsAddress.getPort(), - "/service_component/" + serviceName); + request.requestPath().getForService(serviceName)); } catch (MalformedURLException e) { throw new IllegalArgumentException("Invalid CBS URL", e); } } - @Override - public @NotNull Mono get(RequestDiagnosticContext diagnosticContext) { - return Mono.defer(() -> { - final ImmutableHttpRequest request = ImmutableHttpRequest.builder() - .method(HttpMethod.GET) - .url(fetchUrl) - .diagnosticContext(diagnosticContext) - .build(); - return httpClient.call(request) - .map(resp -> resp.bodyAsJson(JsonObject.class)); - }); + private void logRequestUrl(String url) { + LOGGER.debug("Calling {} for configuration", url); + } + + private void logCbsResponse(JsonObject json) { + LOGGER.info("Got successful response from Config Binding Service"); + LOGGER.debug("CBS response: {}", json); } } diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsLookup.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsLookup.java index 3d528c33..99058772 100644 --- a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsLookup.java +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/impl/CbsLookup.java @@ -21,6 +21,7 @@ package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl; import com.google.gson.JsonArray; +import com.google.gson.JsonElement; import com.google.gson.JsonObject; import java.net.InetSocketAddress; import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpMethod; @@ -29,6 +30,8 @@ import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.ImmutableHttpR import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClient; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.exceptions.ServiceLookupException; import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import reactor.core.publisher.Mono; /** @@ -37,6 +40,7 @@ import reactor.core.publisher.Mono; */ public class CbsLookup { + private static final Logger LOGGER = LoggerFactory.getLogger(CbsLookup.class); private static final String CONSUL_JSON_SERVICE_ADDRESS = "ServiceAddress"; private static final String CONSUL_JSON_SERVICE_PORT = "ServicePort"; private final RxHttpClient httpClient; @@ -47,15 +51,22 @@ public class CbsLookup { public Mono lookup(EnvProperties env) { return Mono.fromCallable(() -> createConsulUrl(env)) + .doOnNext(this::logConsulRequestUrl) .flatMap(this::fetchHttpData) + .doOnNext(this::logConsulResponse) .flatMap(this::firstService) - .map(this::parseServiceEntry); + .map(this::parseServiceEntry) + .doOnNext(this::logCbsServiceAddress); } private String createConsulUrl(EnvProperties env) { return String.format("http://%s:%s/v1/catalog/service/%s", env.consulHost(), env.consulPort(), env.cbsName()); } + private void logConsulRequestUrl(String consulUrl) { + LOGGER.debug("Calling Consul for CBS address. consulUrl={}", consulUrl); + } + private Mono fetchHttpData(String consulUrl) { return httpClient.call( ImmutableHttpRequest.builder() @@ -66,6 +77,10 @@ public class CbsLookup { .map(resp -> resp.bodyAsJson(JsonArray.class)); } + private void logConsulResponse(JsonArray consulResponse) { + LOGGER.debug("Consul response with CBS service list. Will use 1st one. response={}", consulResponse); + } + private Mono firstService(JsonArray services) { return services.size() == 0 ? Mono.error(new ServiceLookupException("Consul server did not return any service with given name")) @@ -78,4 +93,8 @@ public class CbsLookup { service.get(CONSUL_JSON_SERVICE_PORT).getAsInt()); } + private void logCbsServiceAddress(InetSocketAddress address) { + LOGGER.info("Config Binding Service address: {}", address); + } + } diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/CbsRequest.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/CbsRequest.java new file mode 100644 index 00000000..0a319666 --- /dev/null +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/CbsRequest.java @@ -0,0 +1,57 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model; + +import java.util.UUID; +import org.immutables.value.Value; +import org.onap.dcaegen2.services.sdk.rest.services.model.logging.ImmutableRequestDiagnosticContext; +import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext; + +/** + * A recipe on which CBS endpoint to call. Usually you should use {@link org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests} + * which is a factory to each request type. + * + * @author Piotr Jaszczyk + * @since 1.1.4 + */ +@Value.Immutable +public interface CbsRequest { + + /** + * The CBS request path. It will be created by the library. + */ + RequestPath requestPath(); + + /** + * Diagnostic context as defined in Logging Guideline + */ + RequestDiagnosticContext diagnosticContext(); + + /** + * Return a view on this CbsRequest with updated InvocationID. + */ + default CbsRequest withNewInvocationId() { + final RequestDiagnosticContext newDiagnosticCtx = ImmutableRequestDiagnosticContext + .copyOf(diagnosticContext()) + .withInvocationId(UUID.randomUUID()); + return ImmutableCbsRequest.copyOf(this).withDiagnosticContext(newDiagnosticCtx); + } +} diff --git a/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/RequestPath.java b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/RequestPath.java new file mode 100644 index 00000000..97d4b4ea --- /dev/null +++ b/rest-services/cbs-client/src/main/java/org/onap/dcaegen2/services/sdk/rest/services/cbs/client/model/RequestPath.java @@ -0,0 +1,37 @@ +/* + * ============LICENSE_START==================================== + * DCAEGEN2-SERVICES-SDK + * ========================================================= + * Copyright (C) 2019 Nokia. All rights reserved. + * ========================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END===================================== + */ + +package org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model; + +import io.vavr.Function1; + +/** + * @author Piotr Jaszczyk + * @since March 2019 + */ +@FunctionalInterface +public interface RequestPath extends Function1 { + String getForService(String serviceName); + + @Override + default String apply(String serviceName) { + return getForService(serviceName); + } +} -- cgit 1.2.3-korg