diff options
author | Rashmi Pujar <rashmi.pujar@bell.ca> | 2019-06-04 16:31:54 -0400 |
---|---|---|
committer | Rashmi Pujar <rashmi.pujar@bell.ca> | 2019-06-11 10:20:27 -0400 |
commit | c92e24f7de074b177cc8b57ef4fcddd133b25093 (patch) | |
tree | c17fabb8b22dd0d4edbc4680a9431391f7102c91 /models-interactions/model-impl/cds/src/test/java | |
parent | 8676a38be6877235fe857a5f0bc289b2e036d4bb (diff) |
GRPC Client impl to send process message to CDS blueprint-processor endpoint
Issue-ID: POLICY-1762
Signed-off-by: Rashmi Pujar <rashmi.pujar@bell.ca>
Change-Id: Iecef458b1f25db8e2989cc40ccd399be15867497
Diffstat (limited to 'models-interactions/model-impl/cds/src/test/java')
3 files changed, 369 insertions, 0 deletions
diff --git a/models-interactions/model-impl/cds/src/test/java/org/onap/policy/cds/api/TestCdsProcessorListenerImpl.java b/models-interactions/model-impl/cds/src/test/java/org/onap/policy/cds/api/TestCdsProcessorListenerImpl.java new file mode 100644 index 000000000..6dfd70dd6 --- /dev/null +++ b/models-interactions/model-impl/cds/src/test/java/org/onap/policy/cds/api/TestCdsProcessorListenerImpl.java @@ -0,0 +1,49 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 Bell Canada. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.cds.api; + +import io.grpc.Status; +import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Used as a helper for the gRPC client unit test. + */ +public class TestCdsProcessorListenerImpl implements CdsProcessorListener { + + private static final Logger LOGGER = LoggerFactory.getLogger(TestCdsProcessorListenerImpl.class); + + /** + * Used to verify/inspect message received from server. + */ + @Override + public void onMessage(final ExecutionServiceOutput message) { + LOGGER.info("Received notification from CDS: {}", message); + } + + /** + * Used to verify/inspect error received from server. + */ + @Override + public void onError(final Throwable throwable) { + Status status = Status.fromThrowable(throwable); + LOGGER.error("Failed processing blueprint {}", status, throwable); + } +} diff --git a/models-interactions/model-impl/cds/src/test/java/org/onap/policy/cds/client/BasicAuthClientHeaderInterceptorTest.java b/models-interactions/model-impl/cds/src/test/java/org/onap/policy/cds/client/BasicAuthClientHeaderInterceptorTest.java new file mode 100644 index 000000000..3b6ad7da1 --- /dev/null +++ b/models-interactions/model-impl/cds/src/test/java/org/onap/policy/cds/client/BasicAuthClientHeaderInterceptorTest.java @@ -0,0 +1,138 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 Bell Canada. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.cds.client; + +import static org.junit.Assert.assertEquals; +import static org.mockito.AdditionalAnswers.delegatesTo; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +import io.grpc.ClientInterceptors; +import io.grpc.ManagedChannel; +import io.grpc.Metadata; +import io.grpc.Metadata.Key; +import io.grpc.ServerCall; +import io.grpc.ServerCall.Listener; +import io.grpc.ServerCallHandler; +import io.grpc.ServerInterceptor; +import io.grpc.ServerInterceptors; +import io.grpc.inprocess.InProcessChannelBuilder; +import io.grpc.inprocess.InProcessServerBuilder; +import io.grpc.stub.StreamObserver; +import io.grpc.testing.GrpcCleanupRule; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Base64; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.ArgumentMatchers; +import org.onap.ccsdk.cds.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc; +import org.onap.ccsdk.cds.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc.BluePrintProcessingServiceImplBase; +import org.onap.ccsdk.cds.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc.BluePrintProcessingServiceStub; +import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput; +import org.onap.policy.cds.properties.CdsServerProperties; + +public class BasicAuthClientHeaderInterceptorTest { + + // Generate a unique in-process server name. + private static final String SERVER_NAME = InProcessServerBuilder.generateName(); + private static final String CREDS = "test"; + + // Manages automatic graceful shutdown for the registered server and client channels at the end of test. + @Rule + public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule(); + + private final ServerInterceptor mockCdsGrpcServerInterceptor = mock(ServerInterceptor.class, + delegatesTo(new TestServerInterceptor())); + + private final CdsServerProperties props = new CdsServerProperties(); + + private ManagedChannel channel; + + /** + * Setup the test. + * + * @throws IOException on failure to register the test grpc server for graceful shutdown + */ + @Before + public void setUp() throws IOException { + // Setup the CDS properties + props.setHost(SERVER_NAME); + props.setPort(2000); + props.setUsername(CREDS); + props.setPassword(CREDS); + props.setTimeout(60); + + // Implement the test gRPC server + BluePrintProcessingServiceImplBase testCdsBlueprintServerImpl = new BluePrintProcessingServiceImplBase() {}; + + // Create a server, add service, start, and register for automatic graceful shutdown. + grpcCleanup.register(InProcessServerBuilder.forName(SERVER_NAME).directExecutor() + .addService(ServerInterceptors.intercept(testCdsBlueprintServerImpl, mockCdsGrpcServerInterceptor)).build() + .start()); + + // Create a client channel and register for automatic graceful shutdown + channel = grpcCleanup.register(InProcessChannelBuilder.forName(SERVER_NAME).directExecutor().build()); + } + + @Test + public void testIfBasicAuthHeaderIsDeliveredToCdsServer() { + BluePrintProcessingServiceStub bpProcessingSvcStub = BluePrintProcessingServiceGrpc + .newStub(ClientInterceptors.intercept(channel, new BasicAuthClientHeaderInterceptor(props))); + ArgumentCaptor<Metadata> metadataCaptor = ArgumentCaptor.forClass(Metadata.class); + bpProcessingSvcStub.process(new StreamObserver<ExecutionServiceOutput>() { + @Override + public void onNext(final ExecutionServiceOutput executionServiceOutput) { + // Test purpose only + } + + @Override + public void onError(final Throwable throwable) { + // Test purpose only + } + + @Override + public void onCompleted() { + // Test purpose only + } + }); + verify(mockCdsGrpcServerInterceptor).interceptCall(ArgumentMatchers.any(), metadataCaptor.capture(), + ArgumentMatchers.any()); + + Key<String> authHeader = Key + .of(BasicAuthClientHeaderInterceptor.BASIC_AUTH_HEADER_KEY, Metadata.ASCII_STRING_MARSHALLER); + String expectedBaseAuth = Base64.getEncoder().encodeToString(String.format("%s:%s", CREDS, CREDS) + .getBytes(StandardCharsets.UTF_8)); + assertEquals(expectedBaseAuth, metadataCaptor.getValue().get(authHeader)); + } + + private static class TestServerInterceptor implements ServerInterceptor { + + @Override + public <ReqT, RespT> Listener<ReqT> interceptCall(final ServerCall<ReqT, RespT> serverCall, + final Metadata metadata, + final ServerCallHandler<ReqT, RespT> serverCallHandler) { + return serverCallHandler.startCall(serverCall, metadata); + } + } +} + + diff --git a/models-interactions/model-impl/cds/src/test/java/org/onap/policy/cds/client/CdsProcessorGrpcClientTest.java b/models-interactions/model-impl/cds/src/test/java/org/onap/policy/cds/client/CdsProcessorGrpcClientTest.java new file mode 100644 index 000000000..b9a9a84cd --- /dev/null +++ b/models-interactions/model-impl/cds/src/test/java/org/onap/policy/cds/client/CdsProcessorGrpcClientTest.java @@ -0,0 +1,182 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2019 Bell Canada. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.cds.client; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; + +import io.grpc.ManagedChannel; +import io.grpc.inprocess.InProcessChannelBuilder; +import io.grpc.inprocess.InProcessServerBuilder; +import io.grpc.stub.StreamObserver; +import io.grpc.testing.GrpcCleanupRule; +import io.grpc.util.MutableHandlerRegistry; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.onap.ccsdk.cds.controllerblueprints.common.api.ActionIdentifiers; +import org.onap.ccsdk.cds.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc.BluePrintProcessingServiceImplBase; +import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput; +import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput; +import org.onap.policy.cds.api.CdsProcessorListener; +import org.onap.policy.cds.api.TestCdsProcessorListenerImpl; +import org.onap.policy.cds.properties.CdsServerProperties; + +public class CdsProcessorGrpcClientTest { + + // Generate a unique in-process server name. + private static final String SERVER_NAME = InProcessServerBuilder.generateName(); + + // Manages automatic graceful shutdown for the registered server and client channels at the end of test. + @Rule + public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule(); + + private final CdsProcessorListener listener = spy(new TestCdsProcessorListenerImpl()); + private final CdsServerProperties props = new CdsServerProperties(); + private final MutableHandlerRegistry serviceRegistry = new MutableHandlerRegistry(); + private final AtomicReference<StreamObserver<ExecutionServiceOutput>> responseObserverRef = new AtomicReference<>(); + private final List<String> messagesDelivered = new ArrayList<>(); + private final CountDownLatch allRequestsDelivered = new CountDownLatch(1); + + private CdsProcessorGrpcClient client; + + /** + * Setup the test. + * + * @throws IOException on failure to register the test grpc server for graceful shutdown + */ + @Before + public void setUp() throws IOException { + // Setup the CDS properties + props.setHost(SERVER_NAME); + props.setPort(2000); + props.setUsername("testUser"); + props.setPassword("testPassword"); + props.setTimeout(60); + + // Create a server, add service, start, and register for automatic graceful shutdown. + grpcCleanup.register(InProcessServerBuilder.forName(SERVER_NAME) + .fallbackHandlerRegistry(serviceRegistry).directExecutor().build().start()); + + // Create a client channel and register for automatic graceful shutdown + ManagedChannel channel = grpcCleanup + .register(InProcessChannelBuilder.forName(SERVER_NAME).directExecutor().build()); + + // Create an instance of the gRPC client + client = new CdsProcessorGrpcClient(channel, new CdsProcessorHandler(listener)); + + // Implement the test gRPC server + BluePrintProcessingServiceImplBase testCdsBlueprintServerImpl = new BluePrintProcessingServiceImplBase() { + @Override + public StreamObserver<ExecutionServiceInput> process( + final StreamObserver<ExecutionServiceOutput> responseObserver) { + responseObserverRef.set(responseObserver); + + return new StreamObserver<ExecutionServiceInput>() { + @Override + public void onNext(final ExecutionServiceInput executionServiceInput) { + messagesDelivered.add(executionServiceInput.getActionIdentifiers().getActionName()); + } + + @Override + public void onError(final Throwable throwable) { + // Test method + } + + @Override + public void onCompleted() { + allRequestsDelivered.countDown(); + } + }; + } + }; + serviceRegistry.addService(testCdsBlueprintServerImpl); + } + + @After + public void tearDown() { + client.close(); + } + + @Test + public void testCdsProcessorGrpcClientConstructor() { + new CdsProcessorGrpcClient(listener, props); + } + + @Test(expected = IllegalStateException.class) + public void testCdsProcessorGrpcClientConstructorFailure() { + props.setHost(null); + new CdsProcessorGrpcClient(listener, props); + } + + @Test + public void testSendRequestFail() throws InterruptedException { + // Setup + ExecutionServiceInput testReq = ExecutionServiceInput.newBuilder() + .setActionIdentifiers(ActionIdentifiers.newBuilder().setActionName("policy-to-cds").build()) + .build(); + + // Act + CountDownLatch finishLatch = client.sendRequest(testReq); + responseObserverRef.get().onError(new Throwable("failed to send testReq.")); + + verify(listener).onError(any(Throwable.class)); + assertTrue(finishLatch.await(0, TimeUnit.SECONDS)); + } + + @Test + public void testSendRequestSuccess() throws InterruptedException { + // Setup request + ExecutionServiceInput testReq1 = ExecutionServiceInput.newBuilder() + .setActionIdentifiers(ActionIdentifiers.newBuilder().setActionName("policy-to-cds-req1").build()).build(); + + // Act + final CountDownLatch finishLatch = client.sendRequest(testReq1); + + // Assert that request message was sent and delivered once to the server + assertTrue(allRequestsDelivered.await(1, TimeUnit.SECONDS)); + assertEquals(Collections.singletonList("policy-to-cds-req1"), messagesDelivered); + + // Setup the server to send out two simple response messages and verify that the client receives them. + ExecutionServiceOutput testResp1 = ExecutionServiceOutput.newBuilder() + .setActionIdentifiers(ActionIdentifiers.newBuilder().setActionName("policy-to-cds-resp1").build()).build(); + ExecutionServiceOutput testResp2 = ExecutionServiceOutput.newBuilder() + .setActionIdentifiers(ActionIdentifiers.newBuilder().setActionName("policy-to-cds-resp2").build()).build(); + responseObserverRef.get().onNext(testResp1); + verify(listener).onMessage(testResp1); + responseObserverRef.get().onNext(testResp2); + verify(listener).onMessage(testResp2); + + // let server complete. + responseObserverRef.get().onCompleted(); + assertTrue(finishLatch.await(0, TimeUnit.SECONDS)); + } +} |