summaryrefslogtreecommitdiffstats
path: root/models-interactions/model-impl/cds/src/main
diff options
context:
space:
mode:
authorRashmi Pujar <rashmi.pujar@bell.ca>2019-06-04 16:31:54 -0400
committerRashmi Pujar <rashmi.pujar@bell.ca>2019-06-11 10:20:27 -0400
commitc92e24f7de074b177cc8b57ef4fcddd133b25093 (patch)
treec17fabb8b22dd0d4edbc4680a9431391f7102c91 /models-interactions/model-impl/cds/src/main
parent8676a38be6877235fe857a5f0bc289b2e036d4bb (diff)
GRPC Client impl to send process message to CDS blueprint-processor endpoint
Issue-ID: POLICY-1762 Signed-off-by: Rashmi Pujar <rashmi.pujar@bell.ca> Change-Id: Iecef458b1f25db8e2989cc40ccd399be15867497
Diffstat (limited to 'models-interactions/model-impl/cds/src/main')
-rw-r--r--models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/api/CdsProcessorListener.java69
-rw-r--r--models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/client/BasicAuthClientHeaderInterceptor.java64
-rw-r--r--models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/client/CdsProcessorGrpcClient.java94
-rw-r--r--models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/client/CdsProcessorHandler.java81
-rw-r--r--models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/properties/CdsServerProperties.java87
5 files changed, 395 insertions, 0 deletions
diff --git a/models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/api/CdsProcessorListener.java b/models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/api/CdsProcessorListener.java
new file mode 100644
index 000000000..c07c559c4
--- /dev/null
+++ b/models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/api/CdsProcessorListener.java
@@ -0,0 +1,69 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Bell Canada.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.cds.api;
+
+import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput;
+
+/**
+ * <p>
+ * In order for the caller of {@link org.onap.policy.cds.client.CdsProcessorGrpcClient} to manage the callback to handle
+ * the received messages appropriately, it needs to implement {@link CdsProcessorListener}.
+ * </p>
+ *
+ * <p>Here is a sample implementation of a listener:
+ * <pre>
+ * new CdsProcessorListener {
+ *
+ * &#64;Override
+ * public void onMessage(ExecutionServiceOutput message) {
+ * log.info("Received notification from CDS: {}", message);
+ * }
+ *
+ * &#64;Override
+ * public void onError(Throwable throwable) {
+ * Status status = Status.fromThrowable(throwable);
+ * log.error("Failed processing blueprint {}", status, throwable);
+ * }
+ * }
+ * </pre>
+ * </p>
+ */
+public interface CdsProcessorListener {
+
+ /**
+ * Implements the workflow upon receiving the message from the server side.
+ *
+ * <p>Note that the CDS client-server communication is configured to use a streaming approach, which means when
+ * client
+ * sends an event, the server can reply with multiple sub-responses until full completion of the processing. Hence,
+ * it is up to the implementation of this method to process the received message using {@link
+ * ExecutionServiceOutput#getStatus()#getEventType()}</p>
+ *
+ * @param message ExecutionServiceOutput received by the CDS grpc server
+ */
+ void onMessage(ExecutionServiceOutput message);
+
+ /**
+ * Implements the workflow when an error is received from the server side.
+ *
+ * @param throwable Throwable object received from CDS grpc server upon error
+ */
+ void onError(Throwable throwable);
+
+}
diff --git a/models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/client/BasicAuthClientHeaderInterceptor.java b/models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/client/BasicAuthClientHeaderInterceptor.java
new file mode 100644
index 000000000..3957fe5e4
--- /dev/null
+++ b/models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/client/BasicAuthClientHeaderInterceptor.java
@@ -0,0 +1,64 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Bell Canada.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.cds.client;
+
+import io.grpc.CallOptions;
+import io.grpc.Channel;
+import io.grpc.ClientCall;
+import io.grpc.ClientInterceptor;
+import io.grpc.ForwardingClientCall;
+import io.grpc.Metadata;
+import io.grpc.Metadata.Key;
+import io.grpc.MethodDescriptor;
+import org.onap.policy.cds.properties.CdsServerProperties;
+
+/**
+ * An interceptor to insert the client authHeader.
+ *
+ * <p>The {@link BasicAuthClientHeaderInterceptor} implements {@link ClientInterceptor} to insert authorization
+ * header data provided by {@link CdsServerProperties#getBasicAuth()} to all the outgoing calls.</p>
+ *
+ * <p>On the client context, we add metadata with "Authorization" as the key and "Basic" followed by base64 encoded
+ * username:password as its value.
+ * On the server side, CDS BasicAuthServerInterceptor (1) gets the client metadata from the server context, (2) extract
+ * the "Authorization" header key and finally (3) decodes the username and password from the authHeader.</p>
+ */
+public class BasicAuthClientHeaderInterceptor implements ClientInterceptor {
+
+ static final String BASIC_AUTH_HEADER_KEY = "Authorization";
+ private CdsServerProperties props;
+
+ BasicAuthClientHeaderInterceptor(CdsServerProperties props) {
+ this.props = props;
+ }
+
+ @Override
+ public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method,
+ CallOptions callOptions, Channel channel) {
+ Key<String> authHeader = Key.of(BASIC_AUTH_HEADER_KEY, Metadata.ASCII_STRING_MARSHALLER);
+ return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(channel.newCall(method, callOptions)) {
+ @Override
+ public void start(Listener<RespT> responseListener, Metadata headers) {
+ headers.put(authHeader, props.getBasicAuth());
+ super.start(responseListener, headers);
+ }
+ };
+ }
+}
+
diff --git a/models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/client/CdsProcessorGrpcClient.java b/models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/client/CdsProcessorGrpcClient.java
new file mode 100644
index 000000000..b8ec7acf5
--- /dev/null
+++ b/models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/client/CdsProcessorGrpcClient.java
@@ -0,0 +1,94 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Bell Canada.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.cds.client;
+
+import com.google.common.base.Preconditions;
+import io.grpc.ManagedChannel;
+import io.grpc.internal.DnsNameResolverProvider;
+import io.grpc.internal.PickFirstLoadBalancerProvider;
+import io.grpc.netty.NettyChannelBuilder;
+import java.util.concurrent.CountDownLatch;
+import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput;
+import org.onap.policy.cds.api.CdsProcessorListener;
+import org.onap.policy.cds.properties.CdsServerProperties;
+import org.onap.policy.common.parameters.GroupValidationResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <p>
+ * The CDS processor client uses gRPC for communication between Policy and CDS. This communication is configured to use
+ * a streaming approach, which means the client sends an event to which the server can reply with multiple
+ * sub-responses, until full completion of the processing.
+ * </p>
+ */
+public class CdsProcessorGrpcClient implements AutoCloseable {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(CdsProcessorGrpcClient.class);
+
+ private ManagedChannel channel;
+ private CdsProcessorHandler handler;
+
+ /**
+ * Constructor, create a CDS processor gRPC client.
+ *
+ * @param listener the listener to listen on
+ */
+ public CdsProcessorGrpcClient(final CdsProcessorListener listener, CdsServerProperties props) {
+ final GroupValidationResult validationResult = props.validate();
+ Preconditions.checkState(validationResult.getStatus().isValid(), "Error validating CDS server "
+ + "properties: " + validationResult.getResult());
+
+ this.channel = NettyChannelBuilder.forAddress(props.getHost(), props.getPort())
+ .nameResolverFactory(new DnsNameResolverProvider())
+ .loadBalancerFactory(new PickFirstLoadBalancerProvider())
+ .intercept(new BasicAuthClientHeaderInterceptor(props)).usePlaintext().build();
+ this.handler = new CdsProcessorHandler(listener);
+ LOGGER.info("CdsProcessorListener started");
+ }
+
+ CdsProcessorGrpcClient(final ManagedChannel channel, final CdsProcessorHandler handler) {
+ this.channel = channel;
+ this.handler = handler;
+ }
+
+ /**
+ * Sends a request to the CDS backend micro-service.
+ *
+ * <p>The caller will be returned a CountDownLatch that can be used to define how long the processing can wait. The
+ * CountDownLatch is initiated with just 1 count. When the client receives an #onCompleted callback, the counter
+ * will decrement.</p>
+ *
+ * <p>It is the user responsibility to close the client.</p>
+ *
+ * @param input request to send
+ * @return CountDownLatch instance that can be use to #await for completeness of processing
+ */
+ public CountDownLatch sendRequest(ExecutionServiceInput input) {
+ return handler.process(input, channel);
+ }
+
+ @Override
+ public void close() {
+ if (channel != null) {
+ channel.shutdown();
+ }
+ LOGGER.info("CdsProcessorListener stopped");
+ }
+}
diff --git a/models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/client/CdsProcessorHandler.java b/models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/client/CdsProcessorHandler.java
new file mode 100644
index 000000000..9dd249ceb
--- /dev/null
+++ b/models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/client/CdsProcessorHandler.java
@@ -0,0 +1,81 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Bell Canada.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.cds.client;
+
+import io.grpc.ManagedChannel;
+import io.grpc.stub.StreamObserver;
+import java.util.concurrent.CountDownLatch;
+import org.onap.ccsdk.cds.controllerblueprints.common.api.ActionIdentifiers;
+import org.onap.ccsdk.cds.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc;
+import org.onap.ccsdk.cds.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc.BluePrintProcessingServiceStub;
+import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput;
+import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput;
+import org.onap.policy.cds.api.CdsProcessorListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class CdsProcessorHandler {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(CdsProcessorHandler.class);
+
+ private CdsProcessorListener listener;
+
+ CdsProcessorHandler(final CdsProcessorListener listener) {
+ this.listener = listener;
+ }
+
+ CountDownLatch process(ExecutionServiceInput request, ManagedChannel channel) {
+ final ActionIdentifiers header = request.getActionIdentifiers();
+ LOGGER.info("Processing blueprint({}:{}) for action({})", header.getBlueprintVersion(),
+ header.getBlueprintName(), header.getBlueprintVersion());
+
+ final CountDownLatch finishLatch = new CountDownLatch(1);
+ final BluePrintProcessingServiceStub asyncStub = BluePrintProcessingServiceGrpc.newStub(channel);
+ final StreamObserver<ExecutionServiceOutput> responseObserver = new StreamObserver<ExecutionServiceOutput>() {
+ @Override
+ public void onNext(ExecutionServiceOutput output) {
+ listener.onMessage(output);
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ listener.onError(throwable);
+ finishLatch.countDown();
+ }
+
+ @Override
+ public void onCompleted() {
+ LOGGER.info("Completed blueprint({}:{}) for action({})", header.getBlueprintVersion(),
+ header.getBlueprintName(), header.getBlueprintVersion());
+ finishLatch.countDown();
+ }
+ };
+
+ final StreamObserver<ExecutionServiceInput> requestObserver = asyncStub.process(responseObserver);
+ try {
+ // Send the message to CDS backend for processing
+ requestObserver.onNext(request);
+ // Mark the end of requests
+ requestObserver.onCompleted();
+ } catch (RuntimeException e) {
+ requestObserver.onError(e);
+ }
+ return finishLatch;
+ }
+}
diff --git a/models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/properties/CdsServerProperties.java b/models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/properties/CdsServerProperties.java
new file mode 100644
index 000000000..94a336b6d
--- /dev/null
+++ b/models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/properties/CdsServerProperties.java
@@ -0,0 +1,87 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Bell Canada.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.cds.properties;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import org.onap.policy.common.parameters.GroupValidationResult;
+import org.onap.policy.common.parameters.ParameterGroup;
+import org.onap.policy.common.parameters.ParameterRuntimeException;
+import org.onap.policy.common.parameters.annotations.Max;
+import org.onap.policy.common.parameters.annotations.Min;
+import org.onap.policy.common.parameters.annotations.NotNull;
+
+@Getter
+@Setter
+@ToString
+public class CdsServerProperties implements ParameterGroup {
+
+ // Port range constants
+ private static final int MIN_USER_PORT = 1024;
+ private static final int MAX_USER_PORT = 65535;
+
+ private static final String INVALID_PROP = "Invalid CDS property: ";
+ private static final String SERVER_PROPERTIES_TYPE = "CDS gRPC Server Properties";
+
+ // CDS carrier properties
+ @Min(value = 1)
+ private int timeout;
+
+ @Min(value = MIN_USER_PORT)
+ @Max(value = MAX_USER_PORT)
+ private int port;
+
+ @NotNull
+ private String host;
+
+ @NotNull
+ private String username;
+
+ @NotNull
+ private String password;
+
+
+ @Override
+ public String getName() {
+ return SERVER_PROPERTIES_TYPE;
+ }
+
+ @Override
+ public void setName(final String name) {
+ throw new ParameterRuntimeException("The name of this ParameterGroup implementation is always " + getName());
+ }
+
+ @Override
+ public GroupValidationResult validate() {
+ return new GroupValidationResult(this);
+ }
+
+ /**
+ * Generate base64-encoded Authorization header from username and password.
+ *
+ * @return Base64 encoded string
+ */
+ public String getBasicAuth() {
+ return Base64.getEncoder().encodeToString(String.format("%s:%s", getUsername(), getPassword())
+ .getBytes(StandardCharsets.UTF_8));
+ }
+}