diff options
author | Rashmi Pujar <rashmi.pujar@bell.ca> | 2019-06-04 16:31:54 -0400 |
---|---|---|
committer | Rashmi Pujar <rashmi.pujar@bell.ca> | 2019-06-11 10:20:27 -0400 |
commit | c92e24f7de074b177cc8b57ef4fcddd133b25093 (patch) | |
tree | c17fabb8b22dd0d4edbc4680a9431391f7102c91 /models-interactions/model-impl/cds/src/main/java | |
parent | 8676a38be6877235fe857a5f0bc289b2e036d4bb (diff) |
GRPC Client impl to send process message to CDS blueprint-processor endpoint
Issue-ID: POLICY-1762
Signed-off-by: Rashmi Pujar <rashmi.pujar@bell.ca>
Change-Id: Iecef458b1f25db8e2989cc40ccd399be15867497
Diffstat (limited to 'models-interactions/model-impl/cds/src/main/java')
5 files changed, 395 insertions, 0 deletions
diff --git a/models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/api/CdsProcessorListener.java b/models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/api/CdsProcessorListener.java new file mode 100644 index 000000000..c07c559c4 --- /dev/null +++ b/models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/api/CdsProcessorListener.java @@ -0,0 +1,69 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 Bell Canada. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.cds.api; + +import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput; + +/** + * <p> + * In order for the caller of {@link org.onap.policy.cds.client.CdsProcessorGrpcClient} to manage the callback to handle + * the received messages appropriately, it needs to implement {@link CdsProcessorListener}. + * </p> + * + * <p>Here is a sample implementation of a listener: + * <pre> + * new CdsProcessorListener { + * + * @Override + * public void onMessage(ExecutionServiceOutput message) { + * log.info("Received notification from CDS: {}", message); + * } + * + * @Override + * public void onError(Throwable throwable) { + * Status status = Status.fromThrowable(throwable); + * log.error("Failed processing blueprint {}", status, throwable); + * } + * } + * </pre> + * </p> + */ +public interface CdsProcessorListener { + + /** + * Implements the workflow upon receiving the message from the server side. + * + * <p>Note that the CDS client-server communication is configured to use a streaming approach, which means when + * client + * sends an event, the server can reply with multiple sub-responses until full completion of the processing. Hence, + * it is up to the implementation of this method to process the received message using {@link + * ExecutionServiceOutput#getStatus()#getEventType()}</p> + * + * @param message ExecutionServiceOutput received by the CDS grpc server + */ + void onMessage(ExecutionServiceOutput message); + + /** + * Implements the workflow when an error is received from the server side. + * + * @param throwable Throwable object received from CDS grpc server upon error + */ + void onError(Throwable throwable); + +} diff --git a/models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/client/BasicAuthClientHeaderInterceptor.java b/models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/client/BasicAuthClientHeaderInterceptor.java new file mode 100644 index 000000000..3957fe5e4 --- /dev/null +++ b/models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/client/BasicAuthClientHeaderInterceptor.java @@ -0,0 +1,64 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 Bell Canada. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.cds.client; + +import io.grpc.CallOptions; +import io.grpc.Channel; +import io.grpc.ClientCall; +import io.grpc.ClientInterceptor; +import io.grpc.ForwardingClientCall; +import io.grpc.Metadata; +import io.grpc.Metadata.Key; +import io.grpc.MethodDescriptor; +import org.onap.policy.cds.properties.CdsServerProperties; + +/** + * An interceptor to insert the client authHeader. + * + * <p>The {@link BasicAuthClientHeaderInterceptor} implements {@link ClientInterceptor} to insert authorization + * header data provided by {@link CdsServerProperties#getBasicAuth()} to all the outgoing calls.</p> + * + * <p>On the client context, we add metadata with "Authorization" as the key and "Basic" followed by base64 encoded + * username:password as its value. + * On the server side, CDS BasicAuthServerInterceptor (1) gets the client metadata from the server context, (2) extract + * the "Authorization" header key and finally (3) decodes the username and password from the authHeader.</p> + */ +public class BasicAuthClientHeaderInterceptor implements ClientInterceptor { + + static final String BASIC_AUTH_HEADER_KEY = "Authorization"; + private CdsServerProperties props; + + BasicAuthClientHeaderInterceptor(CdsServerProperties props) { + this.props = props; + } + + @Override + public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method, + CallOptions callOptions, Channel channel) { + Key<String> authHeader = Key.of(BASIC_AUTH_HEADER_KEY, Metadata.ASCII_STRING_MARSHALLER); + return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(channel.newCall(method, callOptions)) { + @Override + public void start(Listener<RespT> responseListener, Metadata headers) { + headers.put(authHeader, props.getBasicAuth()); + super.start(responseListener, headers); + } + }; + } +} + diff --git a/models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/client/CdsProcessorGrpcClient.java b/models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/client/CdsProcessorGrpcClient.java new file mode 100644 index 000000000..b8ec7acf5 --- /dev/null +++ b/models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/client/CdsProcessorGrpcClient.java @@ -0,0 +1,94 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 Bell Canada. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.cds.client; + +import com.google.common.base.Preconditions; +import io.grpc.ManagedChannel; +import io.grpc.internal.DnsNameResolverProvider; +import io.grpc.internal.PickFirstLoadBalancerProvider; +import io.grpc.netty.NettyChannelBuilder; +import java.util.concurrent.CountDownLatch; +import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput; +import org.onap.policy.cds.api.CdsProcessorListener; +import org.onap.policy.cds.properties.CdsServerProperties; +import org.onap.policy.common.parameters.GroupValidationResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * <p> + * The CDS processor client uses gRPC for communication between Policy and CDS. This communication is configured to use + * a streaming approach, which means the client sends an event to which the server can reply with multiple + * sub-responses, until full completion of the processing. + * </p> + */ +public class CdsProcessorGrpcClient implements AutoCloseable { + + private static final Logger LOGGER = LoggerFactory.getLogger(CdsProcessorGrpcClient.class); + + private ManagedChannel channel; + private CdsProcessorHandler handler; + + /** + * Constructor, create a CDS processor gRPC client. + * + * @param listener the listener to listen on + */ + public CdsProcessorGrpcClient(final CdsProcessorListener listener, CdsServerProperties props) { + final GroupValidationResult validationResult = props.validate(); + Preconditions.checkState(validationResult.getStatus().isValid(), "Error validating CDS server " + + "properties: " + validationResult.getResult()); + + this.channel = NettyChannelBuilder.forAddress(props.getHost(), props.getPort()) + .nameResolverFactory(new DnsNameResolverProvider()) + .loadBalancerFactory(new PickFirstLoadBalancerProvider()) + .intercept(new BasicAuthClientHeaderInterceptor(props)).usePlaintext().build(); + this.handler = new CdsProcessorHandler(listener); + LOGGER.info("CdsProcessorListener started"); + } + + CdsProcessorGrpcClient(final ManagedChannel channel, final CdsProcessorHandler handler) { + this.channel = channel; + this.handler = handler; + } + + /** + * Sends a request to the CDS backend micro-service. + * + * <p>The caller will be returned a CountDownLatch that can be used to define how long the processing can wait. The + * CountDownLatch is initiated with just 1 count. When the client receives an #onCompleted callback, the counter + * will decrement.</p> + * + * <p>It is the user responsibility to close the client.</p> + * + * @param input request to send + * @return CountDownLatch instance that can be use to #await for completeness of processing + */ + public CountDownLatch sendRequest(ExecutionServiceInput input) { + return handler.process(input, channel); + } + + @Override + public void close() { + if (channel != null) { + channel.shutdown(); + } + LOGGER.info("CdsProcessorListener stopped"); + } +} diff --git a/models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/client/CdsProcessorHandler.java b/models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/client/CdsProcessorHandler.java new file mode 100644 index 000000000..9dd249ceb --- /dev/null +++ b/models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/client/CdsProcessorHandler.java @@ -0,0 +1,81 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 Bell Canada. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.cds.client; + +import io.grpc.ManagedChannel; +import io.grpc.stub.StreamObserver; +import java.util.concurrent.CountDownLatch; +import org.onap.ccsdk.cds.controllerblueprints.common.api.ActionIdentifiers; +import org.onap.ccsdk.cds.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc; +import org.onap.ccsdk.cds.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc.BluePrintProcessingServiceStub; +import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput; +import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput; +import org.onap.policy.cds.api.CdsProcessorListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class CdsProcessorHandler { + + private static final Logger LOGGER = LoggerFactory.getLogger(CdsProcessorHandler.class); + + private CdsProcessorListener listener; + + CdsProcessorHandler(final CdsProcessorListener listener) { + this.listener = listener; + } + + CountDownLatch process(ExecutionServiceInput request, ManagedChannel channel) { + final ActionIdentifiers header = request.getActionIdentifiers(); + LOGGER.info("Processing blueprint({}:{}) for action({})", header.getBlueprintVersion(), + header.getBlueprintName(), header.getBlueprintVersion()); + + final CountDownLatch finishLatch = new CountDownLatch(1); + final BluePrintProcessingServiceStub asyncStub = BluePrintProcessingServiceGrpc.newStub(channel); + final StreamObserver<ExecutionServiceOutput> responseObserver = new StreamObserver<ExecutionServiceOutput>() { + @Override + public void onNext(ExecutionServiceOutput output) { + listener.onMessage(output); + } + + @Override + public void onError(Throwable throwable) { + listener.onError(throwable); + finishLatch.countDown(); + } + + @Override + public void onCompleted() { + LOGGER.info("Completed blueprint({}:{}) for action({})", header.getBlueprintVersion(), + header.getBlueprintName(), header.getBlueprintVersion()); + finishLatch.countDown(); + } + }; + + final StreamObserver<ExecutionServiceInput> requestObserver = asyncStub.process(responseObserver); + try { + // Send the message to CDS backend for processing + requestObserver.onNext(request); + // Mark the end of requests + requestObserver.onCompleted(); + } catch (RuntimeException e) { + requestObserver.onError(e); + } + return finishLatch; + } +} diff --git a/models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/properties/CdsServerProperties.java b/models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/properties/CdsServerProperties.java new file mode 100644 index 000000000..94a336b6d --- /dev/null +++ b/models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/properties/CdsServerProperties.java @@ -0,0 +1,87 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 Bell Canada. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.cds.properties; + +import java.nio.charset.StandardCharsets; +import java.util.Base64; +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; +import org.onap.policy.common.parameters.GroupValidationResult; +import org.onap.policy.common.parameters.ParameterGroup; +import org.onap.policy.common.parameters.ParameterRuntimeException; +import org.onap.policy.common.parameters.annotations.Max; +import org.onap.policy.common.parameters.annotations.Min; +import org.onap.policy.common.parameters.annotations.NotNull; + +@Getter +@Setter +@ToString +public class CdsServerProperties implements ParameterGroup { + + // Port range constants + private static final int MIN_USER_PORT = 1024; + private static final int MAX_USER_PORT = 65535; + + private static final String INVALID_PROP = "Invalid CDS property: "; + private static final String SERVER_PROPERTIES_TYPE = "CDS gRPC Server Properties"; + + // CDS carrier properties + @Min(value = 1) + private int timeout; + + @Min(value = MIN_USER_PORT) + @Max(value = MAX_USER_PORT) + private int port; + + @NotNull + private String host; + + @NotNull + private String username; + + @NotNull + private String password; + + + @Override + public String getName() { + return SERVER_PROPERTIES_TYPE; + } + + @Override + public void setName(final String name) { + throw new ParameterRuntimeException("The name of this ParameterGroup implementation is always " + getName()); + } + + @Override + public GroupValidationResult validate() { + return new GroupValidationResult(this); + } + + /** + * Generate base64-encoded Authorization header from username and password. + * + * @return Base64 encoded string + */ + public String getBasicAuth() { + return Base64.getEncoder().encodeToString(String.format("%s:%s", getUsername(), getPassword()) + .getBytes(StandardCharsets.UTF_8)); + } +} |