diff options
Diffstat (limited to 'models-interactions/model-impl/cds')
9 files changed, 864 insertions, 0 deletions
diff --git a/models-interactions/model-impl/cds/pom.xml b/models-interactions/model-impl/cds/pom.xml new file mode 100644 index 000000000..9e3b55beb --- /dev/null +++ b/models-interactions/model-impl/cds/pom.xml @@ -0,0 +1,100 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ============LICENSE_START======================================================= + Copyright (C) 2019 Bell Canada. + ================================================================================ + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + ============LICENSE_END========================================================= +--> + +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <artifactId>model-impl</artifactId> + <groupId>org.onap.policy.models.policy-models-interactions.model-impl</groupId> + <version>2.1.0-SNAPSHOT</version> + </parent> + + <artifactId>cds</artifactId> + <name>${project.artifactId}</name> + <description>gRPC client implementation to send process message to CDS blueprint processor gRPC endpoint.</description> + + <properties> + <grpc.version>1.17.1</grpc.version> + <protobuf.version>3.6.1</protobuf.version> + <grpc.netty.version>4.1.30.Final</grpc.netty.version> + <ccsdk.version>0.4.4</ccsdk.version> + </properties> + + <dependencies> + <!-- CDS dependencies --> + <dependency> + <groupId>org.onap.ccsdk.cds.components</groupId> + <artifactId>proto-definition</artifactId> + <version>${ccsdk.version}</version> + </dependency> + + <!-- protobuf dependencies --> + <dependency> + <groupId>com.google.protobuf</groupId> + <artifactId>protobuf-java</artifactId> + <version>${protobuf.version}</version> + </dependency> + + <!-- gRPC dependencies --> + <dependency> + <groupId>io.grpc</groupId> + <artifactId>grpc-protobuf</artifactId> + <version>${grpc.version}</version> + <exclusions> + <exclusion> + <groupId>com.google.code.findbugs</groupId> + <artifactId>jsr305</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>io.grpc</groupId> + <artifactId>grpc-stub</artifactId> + <version>${grpc.version}</version> + </dependency> + <dependency> + <groupId>io.grpc</groupId> + <artifactId>grpc-netty</artifactId> + <version>${grpc.version}</version> + </dependency> + <dependency> + <groupId>io.grpc</groupId> + <artifactId>grpc-testing</artifactId> + <version>${grpc.version}</version> + <scope>test</scope> + </dependency> + + <!-- Policy dependencies --> + <dependency> + <groupId>org.onap.policy.common</groupId> + <artifactId>common-parameters</artifactId> + <version>${policy.common.version}</version> + </dependency> + + <!-- junit dependencies --> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-core</artifactId> + <version>2.13.0</version> + <scope>test</scope> + </dependency> + </dependencies> +</project> diff --git a/models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/api/CdsProcessorListener.java b/models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/api/CdsProcessorListener.java new file mode 100644 index 000000000..c07c559c4 --- /dev/null +++ b/models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/api/CdsProcessorListener.java @@ -0,0 +1,69 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 Bell Canada. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.cds.api; + +import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput; + +/** + * <p> + * In order for the caller of {@link org.onap.policy.cds.client.CdsProcessorGrpcClient} to manage the callback to handle + * the received messages appropriately, it needs to implement {@link CdsProcessorListener}. + * </p> + * + * <p>Here is a sample implementation of a listener: + * <pre> + * new CdsProcessorListener { + * + * @Override + * public void onMessage(ExecutionServiceOutput message) { + * log.info("Received notification from CDS: {}", message); + * } + * + * @Override + * public void onError(Throwable throwable) { + * Status status = Status.fromThrowable(throwable); + * log.error("Failed processing blueprint {}", status, throwable); + * } + * } + * </pre> + * </p> + */ +public interface CdsProcessorListener { + + /** + * Implements the workflow upon receiving the message from the server side. + * + * <p>Note that the CDS client-server communication is configured to use a streaming approach, which means when + * client + * sends an event, the server can reply with multiple sub-responses until full completion of the processing. Hence, + * it is up to the implementation of this method to process the received message using {@link + * ExecutionServiceOutput#getStatus()#getEventType()}</p> + * + * @param message ExecutionServiceOutput received by the CDS grpc server + */ + void onMessage(ExecutionServiceOutput message); + + /** + * Implements the workflow when an error is received from the server side. + * + * @param throwable Throwable object received from CDS grpc server upon error + */ + void onError(Throwable throwable); + +} diff --git a/models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/client/BasicAuthClientHeaderInterceptor.java b/models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/client/BasicAuthClientHeaderInterceptor.java new file mode 100644 index 000000000..3957fe5e4 --- /dev/null +++ b/models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/client/BasicAuthClientHeaderInterceptor.java @@ -0,0 +1,64 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 Bell Canada. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.cds.client; + +import io.grpc.CallOptions; +import io.grpc.Channel; +import io.grpc.ClientCall; +import io.grpc.ClientInterceptor; +import io.grpc.ForwardingClientCall; +import io.grpc.Metadata; +import io.grpc.Metadata.Key; +import io.grpc.MethodDescriptor; +import org.onap.policy.cds.properties.CdsServerProperties; + +/** + * An interceptor to insert the client authHeader. + * + * <p>The {@link BasicAuthClientHeaderInterceptor} implements {@link ClientInterceptor} to insert authorization + * header data provided by {@link CdsServerProperties#getBasicAuth()} to all the outgoing calls.</p> + * + * <p>On the client context, we add metadata with "Authorization" as the key and "Basic" followed by base64 encoded + * username:password as its value. + * On the server side, CDS BasicAuthServerInterceptor (1) gets the client metadata from the server context, (2) extract + * the "Authorization" header key and finally (3) decodes the username and password from the authHeader.</p> + */ +public class BasicAuthClientHeaderInterceptor implements ClientInterceptor { + + static final String BASIC_AUTH_HEADER_KEY = "Authorization"; + private CdsServerProperties props; + + BasicAuthClientHeaderInterceptor(CdsServerProperties props) { + this.props = props; + } + + @Override + public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method, + CallOptions callOptions, Channel channel) { + Key<String> authHeader = Key.of(BASIC_AUTH_HEADER_KEY, Metadata.ASCII_STRING_MARSHALLER); + return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(channel.newCall(method, callOptions)) { + @Override + public void start(Listener<RespT> responseListener, Metadata headers) { + headers.put(authHeader, props.getBasicAuth()); + super.start(responseListener, headers); + } + }; + } +} + diff --git a/models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/client/CdsProcessorGrpcClient.java b/models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/client/CdsProcessorGrpcClient.java new file mode 100644 index 000000000..b8ec7acf5 --- /dev/null +++ b/models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/client/CdsProcessorGrpcClient.java @@ -0,0 +1,94 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 Bell Canada. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.cds.client; + +import com.google.common.base.Preconditions; +import io.grpc.ManagedChannel; +import io.grpc.internal.DnsNameResolverProvider; +import io.grpc.internal.PickFirstLoadBalancerProvider; +import io.grpc.netty.NettyChannelBuilder; +import java.util.concurrent.CountDownLatch; +import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput; +import org.onap.policy.cds.api.CdsProcessorListener; +import org.onap.policy.cds.properties.CdsServerProperties; +import org.onap.policy.common.parameters.GroupValidationResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * <p> + * The CDS processor client uses gRPC for communication between Policy and CDS. This communication is configured to use + * a streaming approach, which means the client sends an event to which the server can reply with multiple + * sub-responses, until full completion of the processing. + * </p> + */ +public class CdsProcessorGrpcClient implements AutoCloseable { + + private static final Logger LOGGER = LoggerFactory.getLogger(CdsProcessorGrpcClient.class); + + private ManagedChannel channel; + private CdsProcessorHandler handler; + + /** + * Constructor, create a CDS processor gRPC client. + * + * @param listener the listener to listen on + */ + public CdsProcessorGrpcClient(final CdsProcessorListener listener, CdsServerProperties props) { + final GroupValidationResult validationResult = props.validate(); + Preconditions.checkState(validationResult.getStatus().isValid(), "Error validating CDS server " + + "properties: " + validationResult.getResult()); + + this.channel = NettyChannelBuilder.forAddress(props.getHost(), props.getPort()) + .nameResolverFactory(new DnsNameResolverProvider()) + .loadBalancerFactory(new PickFirstLoadBalancerProvider()) + .intercept(new BasicAuthClientHeaderInterceptor(props)).usePlaintext().build(); + this.handler = new CdsProcessorHandler(listener); + LOGGER.info("CdsProcessorListener started"); + } + + CdsProcessorGrpcClient(final ManagedChannel channel, final CdsProcessorHandler handler) { + this.channel = channel; + this.handler = handler; + } + + /** + * Sends a request to the CDS backend micro-service. + * + * <p>The caller will be returned a CountDownLatch that can be used to define how long the processing can wait. The + * CountDownLatch is initiated with just 1 count. When the client receives an #onCompleted callback, the counter + * will decrement.</p> + * + * <p>It is the user responsibility to close the client.</p> + * + * @param input request to send + * @return CountDownLatch instance that can be use to #await for completeness of processing + */ + public CountDownLatch sendRequest(ExecutionServiceInput input) { + return handler.process(input, channel); + } + + @Override + public void close() { + if (channel != null) { + channel.shutdown(); + } + LOGGER.info("CdsProcessorListener stopped"); + } +} diff --git a/models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/client/CdsProcessorHandler.java b/models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/client/CdsProcessorHandler.java new file mode 100644 index 000000000..9dd249ceb --- /dev/null +++ b/models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/client/CdsProcessorHandler.java @@ -0,0 +1,81 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 Bell Canada. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.cds.client; + +import io.grpc.ManagedChannel; +import io.grpc.stub.StreamObserver; +import java.util.concurrent.CountDownLatch; +import org.onap.ccsdk.cds.controllerblueprints.common.api.ActionIdentifiers; +import org.onap.ccsdk.cds.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc; +import org.onap.ccsdk.cds.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc.BluePrintProcessingServiceStub; +import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput; +import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput; +import org.onap.policy.cds.api.CdsProcessorListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class CdsProcessorHandler { + + private static final Logger LOGGER = LoggerFactory.getLogger(CdsProcessorHandler.class); + + private CdsProcessorListener listener; + + CdsProcessorHandler(final CdsProcessorListener listener) { + this.listener = listener; + } + + CountDownLatch process(ExecutionServiceInput request, ManagedChannel channel) { + final ActionIdentifiers header = request.getActionIdentifiers(); + LOGGER.info("Processing blueprint({}:{}) for action({})", header.getBlueprintVersion(), + header.getBlueprintName(), header.getBlueprintVersion()); + + final CountDownLatch finishLatch = new CountDownLatch(1); + final BluePrintProcessingServiceStub asyncStub = BluePrintProcessingServiceGrpc.newStub(channel); + final StreamObserver<ExecutionServiceOutput> responseObserver = new StreamObserver<ExecutionServiceOutput>() { + @Override + public void onNext(ExecutionServiceOutput output) { + listener.onMessage(output); + } + + @Override + public void onError(Throwable throwable) { + listener.onError(throwable); + finishLatch.countDown(); + } + + @Override + public void onCompleted() { + LOGGER.info("Completed blueprint({}:{}) for action({})", header.getBlueprintVersion(), + header.getBlueprintName(), header.getBlueprintVersion()); + finishLatch.countDown(); + } + }; + + final StreamObserver<ExecutionServiceInput> requestObserver = asyncStub.process(responseObserver); + try { + // Send the message to CDS backend for processing + requestObserver.onNext(request); + // Mark the end of requests + requestObserver.onCompleted(); + } catch (RuntimeException e) { + requestObserver.onError(e); + } + return finishLatch; + } +} diff --git a/models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/properties/CdsServerProperties.java b/models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/properties/CdsServerProperties.java new file mode 100644 index 000000000..94a336b6d --- /dev/null +++ b/models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/properties/CdsServerProperties.java @@ -0,0 +1,87 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 Bell Canada. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.cds.properties; + +import java.nio.charset.StandardCharsets; +import java.util.Base64; +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; +import org.onap.policy.common.parameters.GroupValidationResult; +import org.onap.policy.common.parameters.ParameterGroup; +import org.onap.policy.common.parameters.ParameterRuntimeException; +import org.onap.policy.common.parameters.annotations.Max; +import org.onap.policy.common.parameters.annotations.Min; +import org.onap.policy.common.parameters.annotations.NotNull; + +@Getter +@Setter +@ToString +public class CdsServerProperties implements ParameterGroup { + + // Port range constants + private static final int MIN_USER_PORT = 1024; + private static final int MAX_USER_PORT = 65535; + + private static final String INVALID_PROP = "Invalid CDS property: "; + private static final String SERVER_PROPERTIES_TYPE = "CDS gRPC Server Properties"; + + // CDS carrier properties + @Min(value = 1) + private int timeout; + + @Min(value = MIN_USER_PORT) + @Max(value = MAX_USER_PORT) + private int port; + + @NotNull + private String host; + + @NotNull + private String username; + + @NotNull + private String password; + + + @Override + public String getName() { + return SERVER_PROPERTIES_TYPE; + } + + @Override + public void setName(final String name) { + throw new ParameterRuntimeException("The name of this ParameterGroup implementation is always " + getName()); + } + + @Override + public GroupValidationResult validate() { + return new GroupValidationResult(this); + } + + /** + * Generate base64-encoded Authorization header from username and password. + * + * @return Base64 encoded string + */ + public String getBasicAuth() { + return Base64.getEncoder().encodeToString(String.format("%s:%s", getUsername(), getPassword()) + .getBytes(StandardCharsets.UTF_8)); + } +} diff --git a/models-interactions/model-impl/cds/src/test/java/org/onap/policy/cds/api/TestCdsProcessorListenerImpl.java b/models-interactions/model-impl/cds/src/test/java/org/onap/policy/cds/api/TestCdsProcessorListenerImpl.java new file mode 100644 index 000000000..6dfd70dd6 --- /dev/null +++ b/models-interactions/model-impl/cds/src/test/java/org/onap/policy/cds/api/TestCdsProcessorListenerImpl.java @@ -0,0 +1,49 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 Bell Canada. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.cds.api; + +import io.grpc.Status; +import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Used as a helper for the gRPC client unit test. + */ +public class TestCdsProcessorListenerImpl implements CdsProcessorListener { + + private static final Logger LOGGER = LoggerFactory.getLogger(TestCdsProcessorListenerImpl.class); + + /** + * Used to verify/inspect message received from server. + */ + @Override + public void onMessage(final ExecutionServiceOutput message) { + LOGGER.info("Received notification from CDS: {}", message); + } + + /** + * Used to verify/inspect error received from server. + */ + @Override + public void onError(final Throwable throwable) { + Status status = Status.fromThrowable(throwable); + LOGGER.error("Failed processing blueprint {}", status, throwable); + } +} diff --git a/models-interactions/model-impl/cds/src/test/java/org/onap/policy/cds/client/BasicAuthClientHeaderInterceptorTest.java b/models-interactions/model-impl/cds/src/test/java/org/onap/policy/cds/client/BasicAuthClientHeaderInterceptorTest.java new file mode 100644 index 000000000..3b6ad7da1 --- /dev/null +++ b/models-interactions/model-impl/cds/src/test/java/org/onap/policy/cds/client/BasicAuthClientHeaderInterceptorTest.java @@ -0,0 +1,138 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 Bell Canada. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.cds.client; + +import static org.junit.Assert.assertEquals; +import static org.mockito.AdditionalAnswers.delegatesTo; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +import io.grpc.ClientInterceptors; +import io.grpc.ManagedChannel; +import io.grpc.Metadata; +import io.grpc.Metadata.Key; +import io.grpc.ServerCall; +import io.grpc.ServerCall.Listener; +import io.grpc.ServerCallHandler; +import io.grpc.ServerInterceptor; +import io.grpc.ServerInterceptors; +import io.grpc.inprocess.InProcessChannelBuilder; +import io.grpc.inprocess.InProcessServerBuilder; +import io.grpc.stub.StreamObserver; +import io.grpc.testing.GrpcCleanupRule; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Base64; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.ArgumentMatchers; +import org.onap.ccsdk.cds.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc; +import org.onap.ccsdk.cds.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc.BluePrintProcessingServiceImplBase; +import org.onap.ccsdk.cds.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc.BluePrintProcessingServiceStub; +import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput; +import org.onap.policy.cds.properties.CdsServerProperties; + +public class BasicAuthClientHeaderInterceptorTest { + + // Generate a unique in-process server name. + private static final String SERVER_NAME = InProcessServerBuilder.generateName(); + private static final String CREDS = "test"; + + // Manages automatic graceful shutdown for the registered server and client channels at the end of test. + @Rule + public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule(); + + private final ServerInterceptor mockCdsGrpcServerInterceptor = mock(ServerInterceptor.class, + delegatesTo(new TestServerInterceptor())); + + private final CdsServerProperties props = new CdsServerProperties(); + + private ManagedChannel channel; + + /** + * Setup the test. + * + * @throws IOException on failure to register the test grpc server for graceful shutdown + */ + @Before + public void setUp() throws IOException { + // Setup the CDS properties + props.setHost(SERVER_NAME); + props.setPort(2000); + props.setUsername(CREDS); + props.setPassword(CREDS); + props.setTimeout(60); + + // Implement the test gRPC server + BluePrintProcessingServiceImplBase testCdsBlueprintServerImpl = new BluePrintProcessingServiceImplBase() {}; + + // Create a server, add service, start, and register for automatic graceful shutdown. + grpcCleanup.register(InProcessServerBuilder.forName(SERVER_NAME).directExecutor() + .addService(ServerInterceptors.intercept(testCdsBlueprintServerImpl, mockCdsGrpcServerInterceptor)).build() + .start()); + + // Create a client channel and register for automatic graceful shutdown + channel = grpcCleanup.register(InProcessChannelBuilder.forName(SERVER_NAME).directExecutor().build()); + } + + @Test + public void testIfBasicAuthHeaderIsDeliveredToCdsServer() { + BluePrintProcessingServiceStub bpProcessingSvcStub = BluePrintProcessingServiceGrpc + .newStub(ClientInterceptors.intercept(channel, new BasicAuthClientHeaderInterceptor(props))); + ArgumentCaptor<Metadata> metadataCaptor = ArgumentCaptor.forClass(Metadata.class); + bpProcessingSvcStub.process(new StreamObserver<ExecutionServiceOutput>() { + @Override + public void onNext(final ExecutionServiceOutput executionServiceOutput) { + // Test purpose only + } + + @Override + public void onError(final Throwable throwable) { + // Test purpose only + } + + @Override + public void onCompleted() { + // Test purpose only + } + }); + verify(mockCdsGrpcServerInterceptor).interceptCall(ArgumentMatchers.any(), metadataCaptor.capture(), + ArgumentMatchers.any()); + + Key<String> authHeader = Key + .of(BasicAuthClientHeaderInterceptor.BASIC_AUTH_HEADER_KEY, Metadata.ASCII_STRING_MARSHALLER); + String expectedBaseAuth = Base64.getEncoder().encodeToString(String.format("%s:%s", CREDS, CREDS) + .getBytes(StandardCharsets.UTF_8)); + assertEquals(expectedBaseAuth, metadataCaptor.getValue().get(authHeader)); + } + + private static class TestServerInterceptor implements ServerInterceptor { + + @Override + public <ReqT, RespT> Listener<ReqT> interceptCall(final ServerCall<ReqT, RespT> serverCall, + final Metadata metadata, + final ServerCallHandler<ReqT, RespT> serverCallHandler) { + return serverCallHandler.startCall(serverCall, metadata); + } + } +} + + diff --git a/models-interactions/model-impl/cds/src/test/java/org/onap/policy/cds/client/CdsProcessorGrpcClientTest.java b/models-interactions/model-impl/cds/src/test/java/org/onap/policy/cds/client/CdsProcessorGrpcClientTest.java new file mode 100644 index 000000000..b9a9a84cd --- /dev/null +++ b/models-interactions/model-impl/cds/src/test/java/org/onap/policy/cds/client/CdsProcessorGrpcClientTest.java @@ -0,0 +1,182 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 Bell Canada. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.cds.client; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; + +import io.grpc.ManagedChannel; +import io.grpc.inprocess.InProcessChannelBuilder; +import io.grpc.inprocess.InProcessServerBuilder; +import io.grpc.stub.StreamObserver; +import io.grpc.testing.GrpcCleanupRule; +import io.grpc.util.MutableHandlerRegistry; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.onap.ccsdk.cds.controllerblueprints.common.api.ActionIdentifiers; +import org.onap.ccsdk.cds.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc.BluePrintProcessingServiceImplBase; +import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput; +import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput; +import org.onap.policy.cds.api.CdsProcessorListener; +import org.onap.policy.cds.api.TestCdsProcessorListenerImpl; +import org.onap.policy.cds.properties.CdsServerProperties; + +public class CdsProcessorGrpcClientTest { + + // Generate a unique in-process server name. + private static final String SERVER_NAME = InProcessServerBuilder.generateName(); + + // Manages automatic graceful shutdown for the registered server and client channels at the end of test. + @Rule + public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule(); + + private final CdsProcessorListener listener = spy(new TestCdsProcessorListenerImpl()); + private final CdsServerProperties props = new CdsServerProperties(); + private final MutableHandlerRegistry serviceRegistry = new MutableHandlerRegistry(); + private final AtomicReference<StreamObserver<ExecutionServiceOutput>> responseObserverRef = new AtomicReference<>(); + private final List<String> messagesDelivered = new ArrayList<>(); + private final CountDownLatch allRequestsDelivered = new CountDownLatch(1); + + private CdsProcessorGrpcClient client; + + /** + * Setup the test. + * + * @throws IOException on failure to register the test grpc server for graceful shutdown + */ + @Before + public void setUp() throws IOException { + // Setup the CDS properties + props.setHost(SERVER_NAME); + props.setPort(2000); + props.setUsername("testUser"); + props.setPassword("testPassword"); + props.setTimeout(60); + + // Create a server, add service, start, and register for automatic graceful shutdown. + grpcCleanup.register(InProcessServerBuilder.forName(SERVER_NAME) + .fallbackHandlerRegistry(serviceRegistry).directExecutor().build().start()); + + // Create a client channel and register for automatic graceful shutdown + ManagedChannel channel = grpcCleanup + .register(InProcessChannelBuilder.forName(SERVER_NAME).directExecutor().build()); + + // Create an instance of the gRPC client + client = new CdsProcessorGrpcClient(channel, new CdsProcessorHandler(listener)); + + // Implement the test gRPC server + BluePrintProcessingServiceImplBase testCdsBlueprintServerImpl = new BluePrintProcessingServiceImplBase() { + @Override + public StreamObserver<ExecutionServiceInput> process( + final StreamObserver<ExecutionServiceOutput> responseObserver) { + responseObserverRef.set(responseObserver); + + return new StreamObserver<ExecutionServiceInput>() { + @Override + public void onNext(final ExecutionServiceInput executionServiceInput) { + messagesDelivered.add(executionServiceInput.getActionIdentifiers().getActionName()); + } + + @Override + public void onError(final Throwable throwable) { + // Test method + } + + @Override + public void onCompleted() { + allRequestsDelivered.countDown(); + } + }; + } + }; + serviceRegistry.addService(testCdsBlueprintServerImpl); + } + + @After + public void tearDown() { + client.close(); + } + + @Test + public void testCdsProcessorGrpcClientConstructor() { + new CdsProcessorGrpcClient(listener, props); + } + + @Test(expected = IllegalStateException.class) + public void testCdsProcessorGrpcClientConstructorFailure() { + props.setHost(null); + new CdsProcessorGrpcClient(listener, props); + } + + @Test + public void testSendRequestFail() throws InterruptedException { + // Setup + ExecutionServiceInput testReq = ExecutionServiceInput.newBuilder() + .setActionIdentifiers(ActionIdentifiers.newBuilder().setActionName("policy-to-cds").build()) + .build(); + + // Act + CountDownLatch finishLatch = client.sendRequest(testReq); + responseObserverRef.get().onError(new Throwable("failed to send testReq.")); + + verify(listener).onError(any(Throwable.class)); + assertTrue(finishLatch.await(0, TimeUnit.SECONDS)); + } + + @Test + public void testSendRequestSuccess() throws InterruptedException { + // Setup request + ExecutionServiceInput testReq1 = ExecutionServiceInput.newBuilder() + .setActionIdentifiers(ActionIdentifiers.newBuilder().setActionName("policy-to-cds-req1").build()).build(); + + // Act + final CountDownLatch finishLatch = client.sendRequest(testReq1); + + // Assert that request message was sent and delivered once to the server + assertTrue(allRequestsDelivered.await(1, TimeUnit.SECONDS)); + assertEquals(Collections.singletonList("policy-to-cds-req1"), messagesDelivered); + + // Setup the server to send out two simple response messages and verify that the client receives them. + ExecutionServiceOutput testResp1 = ExecutionServiceOutput.newBuilder() + .setActionIdentifiers(ActionIdentifiers.newBuilder().setActionName("policy-to-cds-resp1").build()).build(); + ExecutionServiceOutput testResp2 = ExecutionServiceOutput.newBuilder() + .setActionIdentifiers(ActionIdentifiers.newBuilder().setActionName("policy-to-cds-resp2").build()).build(); + responseObserverRef.get().onNext(testResp1); + verify(listener).onMessage(testResp1); + responseObserverRef.get().onNext(testResp2); + verify(listener).onMessage(testResp2); + + // let server complete. + responseObserverRef.get().onCompleted(); + assertTrue(finishLatch.await(0, TimeUnit.SECONDS)); + } +} |