summaryrefslogtreecommitdiffstats
path: root/models-interactions/model-impl/cds/src/test/java
diff options
context:
space:
mode:
authorRashmi Pujar <rashmi.pujar@bell.ca>2019-06-04 16:31:54 -0400
committerRashmi Pujar <rashmi.pujar@bell.ca>2019-06-11 10:20:27 -0400
commitc92e24f7de074b177cc8b57ef4fcddd133b25093 (patch)
treec17fabb8b22dd0d4edbc4680a9431391f7102c91 /models-interactions/model-impl/cds/src/test/java
parent8676a38be6877235fe857a5f0bc289b2e036d4bb (diff)
GRPC Client impl to send process message to CDS blueprint-processor endpoint
Issue-ID: POLICY-1762 Signed-off-by: Rashmi Pujar <rashmi.pujar@bell.ca> Change-Id: Iecef458b1f25db8e2989cc40ccd399be15867497
Diffstat (limited to 'models-interactions/model-impl/cds/src/test/java')
-rw-r--r--models-interactions/model-impl/cds/src/test/java/org/onap/policy/cds/api/TestCdsProcessorListenerImpl.java49
-rw-r--r--models-interactions/model-impl/cds/src/test/java/org/onap/policy/cds/client/BasicAuthClientHeaderInterceptorTest.java138
-rw-r--r--models-interactions/model-impl/cds/src/test/java/org/onap/policy/cds/client/CdsProcessorGrpcClientTest.java182
3 files changed, 369 insertions, 0 deletions
diff --git a/models-interactions/model-impl/cds/src/test/java/org/onap/policy/cds/api/TestCdsProcessorListenerImpl.java b/models-interactions/model-impl/cds/src/test/java/org/onap/policy/cds/api/TestCdsProcessorListenerImpl.java
new file mode 100644
index 000000000..6dfd70dd6
--- /dev/null
+++ b/models-interactions/model-impl/cds/src/test/java/org/onap/policy/cds/api/TestCdsProcessorListenerImpl.java
@@ -0,0 +1,49 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Bell Canada.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.cds.api;
+
+import io.grpc.Status;
+import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Used as a helper for the gRPC client unit test.
+ */
+public class TestCdsProcessorListenerImpl implements CdsProcessorListener {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(TestCdsProcessorListenerImpl.class);
+
+ /**
+ * Used to verify/inspect message received from server.
+ */
+ @Override
+ public void onMessage(final ExecutionServiceOutput message) {
+ LOGGER.info("Received notification from CDS: {}", message);
+ }
+
+ /**
+ * Used to verify/inspect error received from server.
+ */
+ @Override
+ public void onError(final Throwable throwable) {
+ Status status = Status.fromThrowable(throwable);
+ LOGGER.error("Failed processing blueprint {}", status, throwable);
+ }
+}
diff --git a/models-interactions/model-impl/cds/src/test/java/org/onap/policy/cds/client/BasicAuthClientHeaderInterceptorTest.java b/models-interactions/model-impl/cds/src/test/java/org/onap/policy/cds/client/BasicAuthClientHeaderInterceptorTest.java
new file mode 100644
index 000000000..3b6ad7da1
--- /dev/null
+++ b/models-interactions/model-impl/cds/src/test/java/org/onap/policy/cds/client/BasicAuthClientHeaderInterceptorTest.java
@@ -0,0 +1,138 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Bell Canada.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.cds.client;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.AdditionalAnswers.delegatesTo;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+import io.grpc.ClientInterceptors;
+import io.grpc.ManagedChannel;
+import io.grpc.Metadata;
+import io.grpc.Metadata.Key;
+import io.grpc.ServerCall;
+import io.grpc.ServerCall.Listener;
+import io.grpc.ServerCallHandler;
+import io.grpc.ServerInterceptor;
+import io.grpc.ServerInterceptors;
+import io.grpc.inprocess.InProcessChannelBuilder;
+import io.grpc.inprocess.InProcessServerBuilder;
+import io.grpc.stub.StreamObserver;
+import io.grpc.testing.GrpcCleanupRule;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.ArgumentMatchers;
+import org.onap.ccsdk.cds.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc;
+import org.onap.ccsdk.cds.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc.BluePrintProcessingServiceImplBase;
+import org.onap.ccsdk.cds.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc.BluePrintProcessingServiceStub;
+import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput;
+import org.onap.policy.cds.properties.CdsServerProperties;
+
+public class BasicAuthClientHeaderInterceptorTest {
+
+ // Generate a unique in-process server name.
+ private static final String SERVER_NAME = InProcessServerBuilder.generateName();
+ private static final String CREDS = "test";
+
+ // Manages automatic graceful shutdown for the registered server and client channels at the end of test.
+ @Rule
+ public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule();
+
+ private final ServerInterceptor mockCdsGrpcServerInterceptor = mock(ServerInterceptor.class,
+ delegatesTo(new TestServerInterceptor()));
+
+ private final CdsServerProperties props = new CdsServerProperties();
+
+ private ManagedChannel channel;
+
+ /**
+ * Setup the test.
+ *
+ * @throws IOException on failure to register the test grpc server for graceful shutdown
+ */
+ @Before
+ public void setUp() throws IOException {
+ // Setup the CDS properties
+ props.setHost(SERVER_NAME);
+ props.setPort(2000);
+ props.setUsername(CREDS);
+ props.setPassword(CREDS);
+ props.setTimeout(60);
+
+ // Implement the test gRPC server
+ BluePrintProcessingServiceImplBase testCdsBlueprintServerImpl = new BluePrintProcessingServiceImplBase() {};
+
+ // Create a server, add service, start, and register for automatic graceful shutdown.
+ grpcCleanup.register(InProcessServerBuilder.forName(SERVER_NAME).directExecutor()
+ .addService(ServerInterceptors.intercept(testCdsBlueprintServerImpl, mockCdsGrpcServerInterceptor)).build()
+ .start());
+
+ // Create a client channel and register for automatic graceful shutdown
+ channel = grpcCleanup.register(InProcessChannelBuilder.forName(SERVER_NAME).directExecutor().build());
+ }
+
+ @Test
+ public void testIfBasicAuthHeaderIsDeliveredToCdsServer() {
+ BluePrintProcessingServiceStub bpProcessingSvcStub = BluePrintProcessingServiceGrpc
+ .newStub(ClientInterceptors.intercept(channel, new BasicAuthClientHeaderInterceptor(props)));
+ ArgumentCaptor<Metadata> metadataCaptor = ArgumentCaptor.forClass(Metadata.class);
+ bpProcessingSvcStub.process(new StreamObserver<ExecutionServiceOutput>() {
+ @Override
+ public void onNext(final ExecutionServiceOutput executionServiceOutput) {
+ // Test purpose only
+ }
+
+ @Override
+ public void onError(final Throwable throwable) {
+ // Test purpose only
+ }
+
+ @Override
+ public void onCompleted() {
+ // Test purpose only
+ }
+ });
+ verify(mockCdsGrpcServerInterceptor).interceptCall(ArgumentMatchers.any(), metadataCaptor.capture(),
+ ArgumentMatchers.any());
+
+ Key<String> authHeader = Key
+ .of(BasicAuthClientHeaderInterceptor.BASIC_AUTH_HEADER_KEY, Metadata.ASCII_STRING_MARSHALLER);
+ String expectedBaseAuth = Base64.getEncoder().encodeToString(String.format("%s:%s", CREDS, CREDS)
+ .getBytes(StandardCharsets.UTF_8));
+ assertEquals(expectedBaseAuth, metadataCaptor.getValue().get(authHeader));
+ }
+
+ private static class TestServerInterceptor implements ServerInterceptor {
+
+ @Override
+ public <ReqT, RespT> Listener<ReqT> interceptCall(final ServerCall<ReqT, RespT> serverCall,
+ final Metadata metadata,
+ final ServerCallHandler<ReqT, RespT> serverCallHandler) {
+ return serverCallHandler.startCall(serverCall, metadata);
+ }
+ }
+}
+
+
diff --git a/models-interactions/model-impl/cds/src/test/java/org/onap/policy/cds/client/CdsProcessorGrpcClientTest.java b/models-interactions/model-impl/cds/src/test/java/org/onap/policy/cds/client/CdsProcessorGrpcClientTest.java
new file mode 100644
index 000000000..b9a9a84cd
--- /dev/null
+++ b/models-interactions/model-impl/cds/src/test/java/org/onap/policy/cds/client/CdsProcessorGrpcClientTest.java
@@ -0,0 +1,182 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Bell Canada.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.cds.client;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+
+import io.grpc.ManagedChannel;
+import io.grpc.inprocess.InProcessChannelBuilder;
+import io.grpc.inprocess.InProcessServerBuilder;
+import io.grpc.stub.StreamObserver;
+import io.grpc.testing.GrpcCleanupRule;
+import io.grpc.util.MutableHandlerRegistry;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.onap.ccsdk.cds.controllerblueprints.common.api.ActionIdentifiers;
+import org.onap.ccsdk.cds.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc.BluePrintProcessingServiceImplBase;
+import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput;
+import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput;
+import org.onap.policy.cds.api.CdsProcessorListener;
+import org.onap.policy.cds.api.TestCdsProcessorListenerImpl;
+import org.onap.policy.cds.properties.CdsServerProperties;
+
+public class CdsProcessorGrpcClientTest {
+
+ // Generate a unique in-process server name.
+ private static final String SERVER_NAME = InProcessServerBuilder.generateName();
+
+ // Manages automatic graceful shutdown for the registered server and client channels at the end of test.
+ @Rule
+ public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule();
+
+ private final CdsProcessorListener listener = spy(new TestCdsProcessorListenerImpl());
+ private final CdsServerProperties props = new CdsServerProperties();
+ private final MutableHandlerRegistry serviceRegistry = new MutableHandlerRegistry();
+ private final AtomicReference<StreamObserver<ExecutionServiceOutput>> responseObserverRef = new AtomicReference<>();
+ private final List<String> messagesDelivered = new ArrayList<>();
+ private final CountDownLatch allRequestsDelivered = new CountDownLatch(1);
+
+ private CdsProcessorGrpcClient client;
+
+ /**
+ * Setup the test.
+ *
+ * @throws IOException on failure to register the test grpc server for graceful shutdown
+ */
+ @Before
+ public void setUp() throws IOException {
+ // Setup the CDS properties
+ props.setHost(SERVER_NAME);
+ props.setPort(2000);
+ props.setUsername("testUser");
+ props.setPassword("testPassword");
+ props.setTimeout(60);
+
+ // Create a server, add service, start, and register for automatic graceful shutdown.
+ grpcCleanup.register(InProcessServerBuilder.forName(SERVER_NAME)
+ .fallbackHandlerRegistry(serviceRegistry).directExecutor().build().start());
+
+ // Create a client channel and register for automatic graceful shutdown
+ ManagedChannel channel = grpcCleanup
+ .register(InProcessChannelBuilder.forName(SERVER_NAME).directExecutor().build());
+
+ // Create an instance of the gRPC client
+ client = new CdsProcessorGrpcClient(channel, new CdsProcessorHandler(listener));
+
+ // Implement the test gRPC server
+ BluePrintProcessingServiceImplBase testCdsBlueprintServerImpl = new BluePrintProcessingServiceImplBase() {
+ @Override
+ public StreamObserver<ExecutionServiceInput> process(
+ final StreamObserver<ExecutionServiceOutput> responseObserver) {
+ responseObserverRef.set(responseObserver);
+
+ return new StreamObserver<ExecutionServiceInput>() {
+ @Override
+ public void onNext(final ExecutionServiceInput executionServiceInput) {
+ messagesDelivered.add(executionServiceInput.getActionIdentifiers().getActionName());
+ }
+
+ @Override
+ public void onError(final Throwable throwable) {
+ // Test method
+ }
+
+ @Override
+ public void onCompleted() {
+ allRequestsDelivered.countDown();
+ }
+ };
+ }
+ };
+ serviceRegistry.addService(testCdsBlueprintServerImpl);
+ }
+
+ @After
+ public void tearDown() {
+ client.close();
+ }
+
+ @Test
+ public void testCdsProcessorGrpcClientConstructor() {
+ new CdsProcessorGrpcClient(listener, props);
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testCdsProcessorGrpcClientConstructorFailure() {
+ props.setHost(null);
+ new CdsProcessorGrpcClient(listener, props);
+ }
+
+ @Test
+ public void testSendRequestFail() throws InterruptedException {
+ // Setup
+ ExecutionServiceInput testReq = ExecutionServiceInput.newBuilder()
+ .setActionIdentifiers(ActionIdentifiers.newBuilder().setActionName("policy-to-cds").build())
+ .build();
+
+ // Act
+ CountDownLatch finishLatch = client.sendRequest(testReq);
+ responseObserverRef.get().onError(new Throwable("failed to send testReq."));
+
+ verify(listener).onError(any(Throwable.class));
+ assertTrue(finishLatch.await(0, TimeUnit.SECONDS));
+ }
+
+ @Test
+ public void testSendRequestSuccess() throws InterruptedException {
+ // Setup request
+ ExecutionServiceInput testReq1 = ExecutionServiceInput.newBuilder()
+ .setActionIdentifiers(ActionIdentifiers.newBuilder().setActionName("policy-to-cds-req1").build()).build();
+
+ // Act
+ final CountDownLatch finishLatch = client.sendRequest(testReq1);
+
+ // Assert that request message was sent and delivered once to the server
+ assertTrue(allRequestsDelivered.await(1, TimeUnit.SECONDS));
+ assertEquals(Collections.singletonList("policy-to-cds-req1"), messagesDelivered);
+
+ // Setup the server to send out two simple response messages and verify that the client receives them.
+ ExecutionServiceOutput testResp1 = ExecutionServiceOutput.newBuilder()
+ .setActionIdentifiers(ActionIdentifiers.newBuilder().setActionName("policy-to-cds-resp1").build()).build();
+ ExecutionServiceOutput testResp2 = ExecutionServiceOutput.newBuilder()
+ .setActionIdentifiers(ActionIdentifiers.newBuilder().setActionName("policy-to-cds-resp2").build()).build();
+ responseObserverRef.get().onNext(testResp1);
+ verify(listener).onMessage(testResp1);
+ responseObserverRef.get().onNext(testResp2);
+ verify(listener).onMessage(testResp2);
+
+ // let server complete.
+ responseObserverRef.get().onCompleted();
+ assertTrue(finishLatch.await(0, TimeUnit.SECONDS));
+ }
+}