From c92e24f7de074b177cc8b57ef4fcddd133b25093 Mon Sep 17 00:00:00 2001 From: Rashmi Pujar Date: Tue, 4 Jun 2019 16:31:54 -0400 Subject: GRPC Client impl to send process message to CDS blueprint-processor endpoint Issue-ID: POLICY-1762 Signed-off-by: Rashmi Pujar Change-Id: Iecef458b1f25db8e2989cc40ccd399be15867497 --- models-interactions/model-impl/cds/pom.xml | 100 +++++++++++ .../onap/policy/cds/api/CdsProcessorListener.java | 69 ++++++++ .../client/BasicAuthClientHeaderInterceptor.java | 64 ++++++++ .../policy/cds/client/CdsProcessorGrpcClient.java | 94 +++++++++++ .../policy/cds/client/CdsProcessorHandler.java | 81 +++++++++ .../policy/cds/properties/CdsServerProperties.java | 87 ++++++++++ .../cds/api/TestCdsProcessorListenerImpl.java | 49 ++++++ .../BasicAuthClientHeaderInterceptorTest.java | 138 ++++++++++++++++ .../cds/client/CdsProcessorGrpcClientTest.java | 182 +++++++++++++++++++++ 9 files changed, 864 insertions(+) create mode 100644 models-interactions/model-impl/cds/pom.xml create mode 100644 models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/api/CdsProcessorListener.java create mode 100644 models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/client/BasicAuthClientHeaderInterceptor.java create mode 100644 models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/client/CdsProcessorGrpcClient.java create mode 100644 models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/client/CdsProcessorHandler.java create mode 100644 models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/properties/CdsServerProperties.java create mode 100644 models-interactions/model-impl/cds/src/test/java/org/onap/policy/cds/api/TestCdsProcessorListenerImpl.java create mode 100644 models-interactions/model-impl/cds/src/test/java/org/onap/policy/cds/client/BasicAuthClientHeaderInterceptorTest.java create mode 100644 models-interactions/model-impl/cds/src/test/java/org/onap/policy/cds/client/CdsProcessorGrpcClientTest.java (limited to 'models-interactions/model-impl/cds') diff --git a/models-interactions/model-impl/cds/pom.xml b/models-interactions/model-impl/cds/pom.xml new file mode 100644 index 000000000..9e3b55beb --- /dev/null +++ b/models-interactions/model-impl/cds/pom.xml @@ -0,0 +1,100 @@ + + + + + 4.0.0 + + model-impl + org.onap.policy.models.policy-models-interactions.model-impl + 2.1.0-SNAPSHOT + + + cds + ${project.artifactId} + gRPC client implementation to send process message to CDS blueprint processor gRPC endpoint. + + + 1.17.1 + 3.6.1 + 4.1.30.Final + 0.4.4 + + + + + + org.onap.ccsdk.cds.components + proto-definition + ${ccsdk.version} + + + + + com.google.protobuf + protobuf-java + ${protobuf.version} + + + + + io.grpc + grpc-protobuf + ${grpc.version} + + + com.google.code.findbugs + jsr305 + + + + + io.grpc + grpc-stub + ${grpc.version} + + + io.grpc + grpc-netty + ${grpc.version} + + + io.grpc + grpc-testing + ${grpc.version} + test + + + + + org.onap.policy.common + common-parameters + ${policy.common.version} + + + + + org.mockito + mockito-core + 2.13.0 + test + + + diff --git a/models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/api/CdsProcessorListener.java b/models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/api/CdsProcessorListener.java new file mode 100644 index 000000000..c07c559c4 --- /dev/null +++ b/models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/api/CdsProcessorListener.java @@ -0,0 +1,69 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 Bell Canada. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.cds.api; + +import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput; + +/** + *

+ * In order for the caller of {@link org.onap.policy.cds.client.CdsProcessorGrpcClient} to manage the callback to handle + * the received messages appropriately, it needs to implement {@link CdsProcessorListener}. + *

+ * + *

Here is a sample implementation of a listener: + *

+ * new CdsProcessorListener {
+ *
+ *     @Override
+ *     public void onMessage(ExecutionServiceOutput message) {
+ *         log.info("Received notification from CDS: {}", message);
+ *     }
+ *
+ *     @Override
+ *     public void onError(Throwable throwable) {
+ *         Status status = Status.fromThrowable(throwable);
+ *         log.error("Failed processing blueprint {}", status, throwable);
+ *     }
+ * }
+ * 
+ *

+ */ +public interface CdsProcessorListener { + + /** + * Implements the workflow upon receiving the message from the server side. + * + *

Note that the CDS client-server communication is configured to use a streaming approach, which means when + * client + * sends an event, the server can reply with multiple sub-responses until full completion of the processing. Hence, + * it is up to the implementation of this method to process the received message using {@link + * ExecutionServiceOutput#getStatus()#getEventType()}

+ * + * @param message ExecutionServiceOutput received by the CDS grpc server + */ + void onMessage(ExecutionServiceOutput message); + + /** + * Implements the workflow when an error is received from the server side. + * + * @param throwable Throwable object received from CDS grpc server upon error + */ + void onError(Throwable throwable); + +} diff --git a/models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/client/BasicAuthClientHeaderInterceptor.java b/models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/client/BasicAuthClientHeaderInterceptor.java new file mode 100644 index 000000000..3957fe5e4 --- /dev/null +++ b/models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/client/BasicAuthClientHeaderInterceptor.java @@ -0,0 +1,64 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 Bell Canada. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.cds.client; + +import io.grpc.CallOptions; +import io.grpc.Channel; +import io.grpc.ClientCall; +import io.grpc.ClientInterceptor; +import io.grpc.ForwardingClientCall; +import io.grpc.Metadata; +import io.grpc.Metadata.Key; +import io.grpc.MethodDescriptor; +import org.onap.policy.cds.properties.CdsServerProperties; + +/** + * An interceptor to insert the client authHeader. + * + *

The {@link BasicAuthClientHeaderInterceptor} implements {@link ClientInterceptor} to insert authorization + * header data provided by {@link CdsServerProperties#getBasicAuth()} to all the outgoing calls.

+ * + *

On the client context, we add metadata with "Authorization" as the key and "Basic" followed by base64 encoded + * username:password as its value. + * On the server side, CDS BasicAuthServerInterceptor (1) gets the client metadata from the server context, (2) extract + * the "Authorization" header key and finally (3) decodes the username and password from the authHeader.

+ */ +public class BasicAuthClientHeaderInterceptor implements ClientInterceptor { + + static final String BASIC_AUTH_HEADER_KEY = "Authorization"; + private CdsServerProperties props; + + BasicAuthClientHeaderInterceptor(CdsServerProperties props) { + this.props = props; + } + + @Override + public ClientCall interceptCall(MethodDescriptor method, + CallOptions callOptions, Channel channel) { + Key authHeader = Key.of(BASIC_AUTH_HEADER_KEY, Metadata.ASCII_STRING_MARSHALLER); + return new ForwardingClientCall.SimpleForwardingClientCall(channel.newCall(method, callOptions)) { + @Override + public void start(Listener responseListener, Metadata headers) { + headers.put(authHeader, props.getBasicAuth()); + super.start(responseListener, headers); + } + }; + } +} + diff --git a/models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/client/CdsProcessorGrpcClient.java b/models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/client/CdsProcessorGrpcClient.java new file mode 100644 index 000000000..b8ec7acf5 --- /dev/null +++ b/models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/client/CdsProcessorGrpcClient.java @@ -0,0 +1,94 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 Bell Canada. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.cds.client; + +import com.google.common.base.Preconditions; +import io.grpc.ManagedChannel; +import io.grpc.internal.DnsNameResolverProvider; +import io.grpc.internal.PickFirstLoadBalancerProvider; +import io.grpc.netty.NettyChannelBuilder; +import java.util.concurrent.CountDownLatch; +import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput; +import org.onap.policy.cds.api.CdsProcessorListener; +import org.onap.policy.cds.properties.CdsServerProperties; +import org.onap.policy.common.parameters.GroupValidationResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + *

+ * The CDS processor client uses gRPC for communication between Policy and CDS. This communication is configured to use + * a streaming approach, which means the client sends an event to which the server can reply with multiple + * sub-responses, until full completion of the processing. + *

+ */ +public class CdsProcessorGrpcClient implements AutoCloseable { + + private static final Logger LOGGER = LoggerFactory.getLogger(CdsProcessorGrpcClient.class); + + private ManagedChannel channel; + private CdsProcessorHandler handler; + + /** + * Constructor, create a CDS processor gRPC client. + * + * @param listener the listener to listen on + */ + public CdsProcessorGrpcClient(final CdsProcessorListener listener, CdsServerProperties props) { + final GroupValidationResult validationResult = props.validate(); + Preconditions.checkState(validationResult.getStatus().isValid(), "Error validating CDS server " + + "properties: " + validationResult.getResult()); + + this.channel = NettyChannelBuilder.forAddress(props.getHost(), props.getPort()) + .nameResolverFactory(new DnsNameResolverProvider()) + .loadBalancerFactory(new PickFirstLoadBalancerProvider()) + .intercept(new BasicAuthClientHeaderInterceptor(props)).usePlaintext().build(); + this.handler = new CdsProcessorHandler(listener); + LOGGER.info("CdsProcessorListener started"); + } + + CdsProcessorGrpcClient(final ManagedChannel channel, final CdsProcessorHandler handler) { + this.channel = channel; + this.handler = handler; + } + + /** + * Sends a request to the CDS backend micro-service. + * + *

The caller will be returned a CountDownLatch that can be used to define how long the processing can wait. The + * CountDownLatch is initiated with just 1 count. When the client receives an #onCompleted callback, the counter + * will decrement.

+ * + *

It is the user responsibility to close the client.

+ * + * @param input request to send + * @return CountDownLatch instance that can be use to #await for completeness of processing + */ + public CountDownLatch sendRequest(ExecutionServiceInput input) { + return handler.process(input, channel); + } + + @Override + public void close() { + if (channel != null) { + channel.shutdown(); + } + LOGGER.info("CdsProcessorListener stopped"); + } +} diff --git a/models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/client/CdsProcessorHandler.java b/models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/client/CdsProcessorHandler.java new file mode 100644 index 000000000..9dd249ceb --- /dev/null +++ b/models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/client/CdsProcessorHandler.java @@ -0,0 +1,81 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 Bell Canada. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.cds.client; + +import io.grpc.ManagedChannel; +import io.grpc.stub.StreamObserver; +import java.util.concurrent.CountDownLatch; +import org.onap.ccsdk.cds.controllerblueprints.common.api.ActionIdentifiers; +import org.onap.ccsdk.cds.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc; +import org.onap.ccsdk.cds.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc.BluePrintProcessingServiceStub; +import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput; +import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput; +import org.onap.policy.cds.api.CdsProcessorListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class CdsProcessorHandler { + + private static final Logger LOGGER = LoggerFactory.getLogger(CdsProcessorHandler.class); + + private CdsProcessorListener listener; + + CdsProcessorHandler(final CdsProcessorListener listener) { + this.listener = listener; + } + + CountDownLatch process(ExecutionServiceInput request, ManagedChannel channel) { + final ActionIdentifiers header = request.getActionIdentifiers(); + LOGGER.info("Processing blueprint({}:{}) for action({})", header.getBlueprintVersion(), + header.getBlueprintName(), header.getBlueprintVersion()); + + final CountDownLatch finishLatch = new CountDownLatch(1); + final BluePrintProcessingServiceStub asyncStub = BluePrintProcessingServiceGrpc.newStub(channel); + final StreamObserver responseObserver = new StreamObserver() { + @Override + public void onNext(ExecutionServiceOutput output) { + listener.onMessage(output); + } + + @Override + public void onError(Throwable throwable) { + listener.onError(throwable); + finishLatch.countDown(); + } + + @Override + public void onCompleted() { + LOGGER.info("Completed blueprint({}:{}) for action({})", header.getBlueprintVersion(), + header.getBlueprintName(), header.getBlueprintVersion()); + finishLatch.countDown(); + } + }; + + final StreamObserver requestObserver = asyncStub.process(responseObserver); + try { + // Send the message to CDS backend for processing + requestObserver.onNext(request); + // Mark the end of requests + requestObserver.onCompleted(); + } catch (RuntimeException e) { + requestObserver.onError(e); + } + return finishLatch; + } +} diff --git a/models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/properties/CdsServerProperties.java b/models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/properties/CdsServerProperties.java new file mode 100644 index 000000000..94a336b6d --- /dev/null +++ b/models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/properties/CdsServerProperties.java @@ -0,0 +1,87 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 Bell Canada. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.cds.properties; + +import java.nio.charset.StandardCharsets; +import java.util.Base64; +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; +import org.onap.policy.common.parameters.GroupValidationResult; +import org.onap.policy.common.parameters.ParameterGroup; +import org.onap.policy.common.parameters.ParameterRuntimeException; +import org.onap.policy.common.parameters.annotations.Max; +import org.onap.policy.common.parameters.annotations.Min; +import org.onap.policy.common.parameters.annotations.NotNull; + +@Getter +@Setter +@ToString +public class CdsServerProperties implements ParameterGroup { + + // Port range constants + private static final int MIN_USER_PORT = 1024; + private static final int MAX_USER_PORT = 65535; + + private static final String INVALID_PROP = "Invalid CDS property: "; + private static final String SERVER_PROPERTIES_TYPE = "CDS gRPC Server Properties"; + + // CDS carrier properties + @Min(value = 1) + private int timeout; + + @Min(value = MIN_USER_PORT) + @Max(value = MAX_USER_PORT) + private int port; + + @NotNull + private String host; + + @NotNull + private String username; + + @NotNull + private String password; + + + @Override + public String getName() { + return SERVER_PROPERTIES_TYPE; + } + + @Override + public void setName(final String name) { + throw new ParameterRuntimeException("The name of this ParameterGroup implementation is always " + getName()); + } + + @Override + public GroupValidationResult validate() { + return new GroupValidationResult(this); + } + + /** + * Generate base64-encoded Authorization header from username and password. + * + * @return Base64 encoded string + */ + public String getBasicAuth() { + return Base64.getEncoder().encodeToString(String.format("%s:%s", getUsername(), getPassword()) + .getBytes(StandardCharsets.UTF_8)); + } +} diff --git a/models-interactions/model-impl/cds/src/test/java/org/onap/policy/cds/api/TestCdsProcessorListenerImpl.java b/models-interactions/model-impl/cds/src/test/java/org/onap/policy/cds/api/TestCdsProcessorListenerImpl.java new file mode 100644 index 000000000..6dfd70dd6 --- /dev/null +++ b/models-interactions/model-impl/cds/src/test/java/org/onap/policy/cds/api/TestCdsProcessorListenerImpl.java @@ -0,0 +1,49 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 Bell Canada. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.cds.api; + +import io.grpc.Status; +import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Used as a helper for the gRPC client unit test. + */ +public class TestCdsProcessorListenerImpl implements CdsProcessorListener { + + private static final Logger LOGGER = LoggerFactory.getLogger(TestCdsProcessorListenerImpl.class); + + /** + * Used to verify/inspect message received from server. + */ + @Override + public void onMessage(final ExecutionServiceOutput message) { + LOGGER.info("Received notification from CDS: {}", message); + } + + /** + * Used to verify/inspect error received from server. + */ + @Override + public void onError(final Throwable throwable) { + Status status = Status.fromThrowable(throwable); + LOGGER.error("Failed processing blueprint {}", status, throwable); + } +} diff --git a/models-interactions/model-impl/cds/src/test/java/org/onap/policy/cds/client/BasicAuthClientHeaderInterceptorTest.java b/models-interactions/model-impl/cds/src/test/java/org/onap/policy/cds/client/BasicAuthClientHeaderInterceptorTest.java new file mode 100644 index 000000000..3b6ad7da1 --- /dev/null +++ b/models-interactions/model-impl/cds/src/test/java/org/onap/policy/cds/client/BasicAuthClientHeaderInterceptorTest.java @@ -0,0 +1,138 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 Bell Canada. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.cds.client; + +import static org.junit.Assert.assertEquals; +import static org.mockito.AdditionalAnswers.delegatesTo; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +import io.grpc.ClientInterceptors; +import io.grpc.ManagedChannel; +import io.grpc.Metadata; +import io.grpc.Metadata.Key; +import io.grpc.ServerCall; +import io.grpc.ServerCall.Listener; +import io.grpc.ServerCallHandler; +import io.grpc.ServerInterceptor; +import io.grpc.ServerInterceptors; +import io.grpc.inprocess.InProcessChannelBuilder; +import io.grpc.inprocess.InProcessServerBuilder; +import io.grpc.stub.StreamObserver; +import io.grpc.testing.GrpcCleanupRule; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Base64; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.ArgumentMatchers; +import org.onap.ccsdk.cds.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc; +import org.onap.ccsdk.cds.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc.BluePrintProcessingServiceImplBase; +import org.onap.ccsdk.cds.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc.BluePrintProcessingServiceStub; +import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput; +import org.onap.policy.cds.properties.CdsServerProperties; + +public class BasicAuthClientHeaderInterceptorTest { + + // Generate a unique in-process server name. + private static final String SERVER_NAME = InProcessServerBuilder.generateName(); + private static final String CREDS = "test"; + + // Manages automatic graceful shutdown for the registered server and client channels at the end of test. + @Rule + public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule(); + + private final ServerInterceptor mockCdsGrpcServerInterceptor = mock(ServerInterceptor.class, + delegatesTo(new TestServerInterceptor())); + + private final CdsServerProperties props = new CdsServerProperties(); + + private ManagedChannel channel; + + /** + * Setup the test. + * + * @throws IOException on failure to register the test grpc server for graceful shutdown + */ + @Before + public void setUp() throws IOException { + // Setup the CDS properties + props.setHost(SERVER_NAME); + props.setPort(2000); + props.setUsername(CREDS); + props.setPassword(CREDS); + props.setTimeout(60); + + // Implement the test gRPC server + BluePrintProcessingServiceImplBase testCdsBlueprintServerImpl = new BluePrintProcessingServiceImplBase() {}; + + // Create a server, add service, start, and register for automatic graceful shutdown. + grpcCleanup.register(InProcessServerBuilder.forName(SERVER_NAME).directExecutor() + .addService(ServerInterceptors.intercept(testCdsBlueprintServerImpl, mockCdsGrpcServerInterceptor)).build() + .start()); + + // Create a client channel and register for automatic graceful shutdown + channel = grpcCleanup.register(InProcessChannelBuilder.forName(SERVER_NAME).directExecutor().build()); + } + + @Test + public void testIfBasicAuthHeaderIsDeliveredToCdsServer() { + BluePrintProcessingServiceStub bpProcessingSvcStub = BluePrintProcessingServiceGrpc + .newStub(ClientInterceptors.intercept(channel, new BasicAuthClientHeaderInterceptor(props))); + ArgumentCaptor metadataCaptor = ArgumentCaptor.forClass(Metadata.class); + bpProcessingSvcStub.process(new StreamObserver() { + @Override + public void onNext(final ExecutionServiceOutput executionServiceOutput) { + // Test purpose only + } + + @Override + public void onError(final Throwable throwable) { + // Test purpose only + } + + @Override + public void onCompleted() { + // Test purpose only + } + }); + verify(mockCdsGrpcServerInterceptor).interceptCall(ArgumentMatchers.any(), metadataCaptor.capture(), + ArgumentMatchers.any()); + + Key authHeader = Key + .of(BasicAuthClientHeaderInterceptor.BASIC_AUTH_HEADER_KEY, Metadata.ASCII_STRING_MARSHALLER); + String expectedBaseAuth = Base64.getEncoder().encodeToString(String.format("%s:%s", CREDS, CREDS) + .getBytes(StandardCharsets.UTF_8)); + assertEquals(expectedBaseAuth, metadataCaptor.getValue().get(authHeader)); + } + + private static class TestServerInterceptor implements ServerInterceptor { + + @Override + public Listener interceptCall(final ServerCall serverCall, + final Metadata metadata, + final ServerCallHandler serverCallHandler) { + return serverCallHandler.startCall(serverCall, metadata); + } + } +} + + diff --git a/models-interactions/model-impl/cds/src/test/java/org/onap/policy/cds/client/CdsProcessorGrpcClientTest.java b/models-interactions/model-impl/cds/src/test/java/org/onap/policy/cds/client/CdsProcessorGrpcClientTest.java new file mode 100644 index 000000000..b9a9a84cd --- /dev/null +++ b/models-interactions/model-impl/cds/src/test/java/org/onap/policy/cds/client/CdsProcessorGrpcClientTest.java @@ -0,0 +1,182 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 Bell Canada. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.cds.client; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; + +import io.grpc.ManagedChannel; +import io.grpc.inprocess.InProcessChannelBuilder; +import io.grpc.inprocess.InProcessServerBuilder; +import io.grpc.stub.StreamObserver; +import io.grpc.testing.GrpcCleanupRule; +import io.grpc.util.MutableHandlerRegistry; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.onap.ccsdk.cds.controllerblueprints.common.api.ActionIdentifiers; +import org.onap.ccsdk.cds.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc.BluePrintProcessingServiceImplBase; +import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput; +import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput; +import org.onap.policy.cds.api.CdsProcessorListener; +import org.onap.policy.cds.api.TestCdsProcessorListenerImpl; +import org.onap.policy.cds.properties.CdsServerProperties; + +public class CdsProcessorGrpcClientTest { + + // Generate a unique in-process server name. + private static final String SERVER_NAME = InProcessServerBuilder.generateName(); + + // Manages automatic graceful shutdown for the registered server and client channels at the end of test. + @Rule + public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule(); + + private final CdsProcessorListener listener = spy(new TestCdsProcessorListenerImpl()); + private final CdsServerProperties props = new CdsServerProperties(); + private final MutableHandlerRegistry serviceRegistry = new MutableHandlerRegistry(); + private final AtomicReference> responseObserverRef = new AtomicReference<>(); + private final List messagesDelivered = new ArrayList<>(); + private final CountDownLatch allRequestsDelivered = new CountDownLatch(1); + + private CdsProcessorGrpcClient client; + + /** + * Setup the test. + * + * @throws IOException on failure to register the test grpc server for graceful shutdown + */ + @Before + public void setUp() throws IOException { + // Setup the CDS properties + props.setHost(SERVER_NAME); + props.setPort(2000); + props.setUsername("testUser"); + props.setPassword("testPassword"); + props.setTimeout(60); + + // Create a server, add service, start, and register for automatic graceful shutdown. + grpcCleanup.register(InProcessServerBuilder.forName(SERVER_NAME) + .fallbackHandlerRegistry(serviceRegistry).directExecutor().build().start()); + + // Create a client channel and register for automatic graceful shutdown + ManagedChannel channel = grpcCleanup + .register(InProcessChannelBuilder.forName(SERVER_NAME).directExecutor().build()); + + // Create an instance of the gRPC client + client = new CdsProcessorGrpcClient(channel, new CdsProcessorHandler(listener)); + + // Implement the test gRPC server + BluePrintProcessingServiceImplBase testCdsBlueprintServerImpl = new BluePrintProcessingServiceImplBase() { + @Override + public StreamObserver process( + final StreamObserver responseObserver) { + responseObserverRef.set(responseObserver); + + return new StreamObserver() { + @Override + public void onNext(final ExecutionServiceInput executionServiceInput) { + messagesDelivered.add(executionServiceInput.getActionIdentifiers().getActionName()); + } + + @Override + public void onError(final Throwable throwable) { + // Test method + } + + @Override + public void onCompleted() { + allRequestsDelivered.countDown(); + } + }; + } + }; + serviceRegistry.addService(testCdsBlueprintServerImpl); + } + + @After + public void tearDown() { + client.close(); + } + + @Test + public void testCdsProcessorGrpcClientConstructor() { + new CdsProcessorGrpcClient(listener, props); + } + + @Test(expected = IllegalStateException.class) + public void testCdsProcessorGrpcClientConstructorFailure() { + props.setHost(null); + new CdsProcessorGrpcClient(listener, props); + } + + @Test + public void testSendRequestFail() throws InterruptedException { + // Setup + ExecutionServiceInput testReq = ExecutionServiceInput.newBuilder() + .setActionIdentifiers(ActionIdentifiers.newBuilder().setActionName("policy-to-cds").build()) + .build(); + + // Act + CountDownLatch finishLatch = client.sendRequest(testReq); + responseObserverRef.get().onError(new Throwable("failed to send testReq.")); + + verify(listener).onError(any(Throwable.class)); + assertTrue(finishLatch.await(0, TimeUnit.SECONDS)); + } + + @Test + public void testSendRequestSuccess() throws InterruptedException { + // Setup request + ExecutionServiceInput testReq1 = ExecutionServiceInput.newBuilder() + .setActionIdentifiers(ActionIdentifiers.newBuilder().setActionName("policy-to-cds-req1").build()).build(); + + // Act + final CountDownLatch finishLatch = client.sendRequest(testReq1); + + // Assert that request message was sent and delivered once to the server + assertTrue(allRequestsDelivered.await(1, TimeUnit.SECONDS)); + assertEquals(Collections.singletonList("policy-to-cds-req1"), messagesDelivered); + + // Setup the server to send out two simple response messages and verify that the client receives them. + ExecutionServiceOutput testResp1 = ExecutionServiceOutput.newBuilder() + .setActionIdentifiers(ActionIdentifiers.newBuilder().setActionName("policy-to-cds-resp1").build()).build(); + ExecutionServiceOutput testResp2 = ExecutionServiceOutput.newBuilder() + .setActionIdentifiers(ActionIdentifiers.newBuilder().setActionName("policy-to-cds-resp2").build()).build(); + responseObserverRef.get().onNext(testResp1); + verify(listener).onMessage(testResp1); + responseObserverRef.get().onNext(testResp2); + verify(listener).onMessage(testResp2); + + // let server complete. + responseObserverRef.get().onCompleted(); + assertTrue(finishLatch.await(0, TimeUnit.SECONDS)); + } +} -- cgit 1.2.3-korg