summaryrefslogtreecommitdiffstats
path: root/models-interactions/model-impl/cds/src
diff options
context:
space:
mode:
authorRashmi Pujar <rashmi.pujar@bell.ca>2019-06-04 16:31:54 -0400
committerRashmi Pujar <rashmi.pujar@bell.ca>2019-06-11 10:20:27 -0400
commitc92e24f7de074b177cc8b57ef4fcddd133b25093 (patch)
treec17fabb8b22dd0d4edbc4680a9431391f7102c91 /models-interactions/model-impl/cds/src
parent8676a38be6877235fe857a5f0bc289b2e036d4bb (diff)
GRPC Client impl to send process message to CDS blueprint-processor endpoint
Issue-ID: POLICY-1762 Signed-off-by: Rashmi Pujar <rashmi.pujar@bell.ca> Change-Id: Iecef458b1f25db8e2989cc40ccd399be15867497
Diffstat (limited to 'models-interactions/model-impl/cds/src')
-rw-r--r--models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/api/CdsProcessorListener.java69
-rw-r--r--models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/client/BasicAuthClientHeaderInterceptor.java64
-rw-r--r--models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/client/CdsProcessorGrpcClient.java94
-rw-r--r--models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/client/CdsProcessorHandler.java81
-rw-r--r--models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/properties/CdsServerProperties.java87
-rw-r--r--models-interactions/model-impl/cds/src/test/java/org/onap/policy/cds/api/TestCdsProcessorListenerImpl.java49
-rw-r--r--models-interactions/model-impl/cds/src/test/java/org/onap/policy/cds/client/BasicAuthClientHeaderInterceptorTest.java138
-rw-r--r--models-interactions/model-impl/cds/src/test/java/org/onap/policy/cds/client/CdsProcessorGrpcClientTest.java182
8 files changed, 764 insertions, 0 deletions
diff --git a/models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/api/CdsProcessorListener.java b/models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/api/CdsProcessorListener.java
new file mode 100644
index 000000000..c07c559c4
--- /dev/null
+++ b/models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/api/CdsProcessorListener.java
@@ -0,0 +1,69 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Bell Canada.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.cds.api;
+
+import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput;
+
+/**
+ * <p>
+ * In order for the caller of {@link org.onap.policy.cds.client.CdsProcessorGrpcClient} to manage the callback to handle
+ * the received messages appropriately, it needs to implement {@link CdsProcessorListener}.
+ * </p>
+ *
+ * <p>Here is a sample implementation of a listener:
+ * <pre>
+ * new CdsProcessorListener {
+ *
+ * &#64;Override
+ * public void onMessage(ExecutionServiceOutput message) {
+ * log.info("Received notification from CDS: {}", message);
+ * }
+ *
+ * &#64;Override
+ * public void onError(Throwable throwable) {
+ * Status status = Status.fromThrowable(throwable);
+ * log.error("Failed processing blueprint {}", status, throwable);
+ * }
+ * }
+ * </pre>
+ * </p>
+ */
+public interface CdsProcessorListener {
+
+ /**
+ * Implements the workflow upon receiving the message from the server side.
+ *
+ * <p>Note that the CDS client-server communication is configured to use a streaming approach, which means when
+ * client
+ * sends an event, the server can reply with multiple sub-responses until full completion of the processing. Hence,
+ * it is up to the implementation of this method to process the received message using {@link
+ * ExecutionServiceOutput#getStatus()#getEventType()}</p>
+ *
+ * @param message ExecutionServiceOutput received by the CDS grpc server
+ */
+ void onMessage(ExecutionServiceOutput message);
+
+ /**
+ * Implements the workflow when an error is received from the server side.
+ *
+ * @param throwable Throwable object received from CDS grpc server upon error
+ */
+ void onError(Throwable throwable);
+
+}
diff --git a/models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/client/BasicAuthClientHeaderInterceptor.java b/models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/client/BasicAuthClientHeaderInterceptor.java
new file mode 100644
index 000000000..3957fe5e4
--- /dev/null
+++ b/models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/client/BasicAuthClientHeaderInterceptor.java
@@ -0,0 +1,64 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Bell Canada.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.cds.client;
+
+import io.grpc.CallOptions;
+import io.grpc.Channel;
+import io.grpc.ClientCall;
+import io.grpc.ClientInterceptor;
+import io.grpc.ForwardingClientCall;
+import io.grpc.Metadata;
+import io.grpc.Metadata.Key;
+import io.grpc.MethodDescriptor;
+import org.onap.policy.cds.properties.CdsServerProperties;
+
+/**
+ * An interceptor to insert the client authHeader.
+ *
+ * <p>The {@link BasicAuthClientHeaderInterceptor} implements {@link ClientInterceptor} to insert authorization
+ * header data provided by {@link CdsServerProperties#getBasicAuth()} to all the outgoing calls.</p>
+ *
+ * <p>On the client context, we add metadata with "Authorization" as the key and "Basic" followed by base64 encoded
+ * username:password as its value.
+ * On the server side, CDS BasicAuthServerInterceptor (1) gets the client metadata from the server context, (2) extract
+ * the "Authorization" header key and finally (3) decodes the username and password from the authHeader.</p>
+ */
+public class BasicAuthClientHeaderInterceptor implements ClientInterceptor {
+
+ static final String BASIC_AUTH_HEADER_KEY = "Authorization";
+ private CdsServerProperties props;
+
+ BasicAuthClientHeaderInterceptor(CdsServerProperties props) {
+ this.props = props;
+ }
+
+ @Override
+ public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method,
+ CallOptions callOptions, Channel channel) {
+ Key<String> authHeader = Key.of(BASIC_AUTH_HEADER_KEY, Metadata.ASCII_STRING_MARSHALLER);
+ return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(channel.newCall(method, callOptions)) {
+ @Override
+ public void start(Listener<RespT> responseListener, Metadata headers) {
+ headers.put(authHeader, props.getBasicAuth());
+ super.start(responseListener, headers);
+ }
+ };
+ }
+}
+
diff --git a/models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/client/CdsProcessorGrpcClient.java b/models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/client/CdsProcessorGrpcClient.java
new file mode 100644
index 000000000..b8ec7acf5
--- /dev/null
+++ b/models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/client/CdsProcessorGrpcClient.java
@@ -0,0 +1,94 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Bell Canada.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.cds.client;
+
+import com.google.common.base.Preconditions;
+import io.grpc.ManagedChannel;
+import io.grpc.internal.DnsNameResolverProvider;
+import io.grpc.internal.PickFirstLoadBalancerProvider;
+import io.grpc.netty.NettyChannelBuilder;
+import java.util.concurrent.CountDownLatch;
+import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput;
+import org.onap.policy.cds.api.CdsProcessorListener;
+import org.onap.policy.cds.properties.CdsServerProperties;
+import org.onap.policy.common.parameters.GroupValidationResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <p>
+ * The CDS processor client uses gRPC for communication between Policy and CDS. This communication is configured to use
+ * a streaming approach, which means the client sends an event to which the server can reply with multiple
+ * sub-responses, until full completion of the processing.
+ * </p>
+ */
+public class CdsProcessorGrpcClient implements AutoCloseable {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(CdsProcessorGrpcClient.class);
+
+ private ManagedChannel channel;
+ private CdsProcessorHandler handler;
+
+ /**
+ * Constructor, create a CDS processor gRPC client.
+ *
+ * @param listener the listener to listen on
+ */
+ public CdsProcessorGrpcClient(final CdsProcessorListener listener, CdsServerProperties props) {
+ final GroupValidationResult validationResult = props.validate();
+ Preconditions.checkState(validationResult.getStatus().isValid(), "Error validating CDS server "
+ + "properties: " + validationResult.getResult());
+
+ this.channel = NettyChannelBuilder.forAddress(props.getHost(), props.getPort())
+ .nameResolverFactory(new DnsNameResolverProvider())
+ .loadBalancerFactory(new PickFirstLoadBalancerProvider())
+ .intercept(new BasicAuthClientHeaderInterceptor(props)).usePlaintext().build();
+ this.handler = new CdsProcessorHandler(listener);
+ LOGGER.info("CdsProcessorListener started");
+ }
+
+ CdsProcessorGrpcClient(final ManagedChannel channel, final CdsProcessorHandler handler) {
+ this.channel = channel;
+ this.handler = handler;
+ }
+
+ /**
+ * Sends a request to the CDS backend micro-service.
+ *
+ * <p>The caller will be returned a CountDownLatch that can be used to define how long the processing can wait. The
+ * CountDownLatch is initiated with just 1 count. When the client receives an #onCompleted callback, the counter
+ * will decrement.</p>
+ *
+ * <p>It is the user responsibility to close the client.</p>
+ *
+ * @param input request to send
+ * @return CountDownLatch instance that can be use to #await for completeness of processing
+ */
+ public CountDownLatch sendRequest(ExecutionServiceInput input) {
+ return handler.process(input, channel);
+ }
+
+ @Override
+ public void close() {
+ if (channel != null) {
+ channel.shutdown();
+ }
+ LOGGER.info("CdsProcessorListener stopped");
+ }
+}
diff --git a/models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/client/CdsProcessorHandler.java b/models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/client/CdsProcessorHandler.java
new file mode 100644
index 000000000..9dd249ceb
--- /dev/null
+++ b/models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/client/CdsProcessorHandler.java
@@ -0,0 +1,81 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Bell Canada.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.cds.client;
+
+import io.grpc.ManagedChannel;
+import io.grpc.stub.StreamObserver;
+import java.util.concurrent.CountDownLatch;
+import org.onap.ccsdk.cds.controllerblueprints.common.api.ActionIdentifiers;
+import org.onap.ccsdk.cds.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc;
+import org.onap.ccsdk.cds.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc.BluePrintProcessingServiceStub;
+import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput;
+import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput;
+import org.onap.policy.cds.api.CdsProcessorListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class CdsProcessorHandler {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(CdsProcessorHandler.class);
+
+ private CdsProcessorListener listener;
+
+ CdsProcessorHandler(final CdsProcessorListener listener) {
+ this.listener = listener;
+ }
+
+ CountDownLatch process(ExecutionServiceInput request, ManagedChannel channel) {
+ final ActionIdentifiers header = request.getActionIdentifiers();
+ LOGGER.info("Processing blueprint({}:{}) for action({})", header.getBlueprintVersion(),
+ header.getBlueprintName(), header.getBlueprintVersion());
+
+ final CountDownLatch finishLatch = new CountDownLatch(1);
+ final BluePrintProcessingServiceStub asyncStub = BluePrintProcessingServiceGrpc.newStub(channel);
+ final StreamObserver<ExecutionServiceOutput> responseObserver = new StreamObserver<ExecutionServiceOutput>() {
+ @Override
+ public void onNext(ExecutionServiceOutput output) {
+ listener.onMessage(output);
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ listener.onError(throwable);
+ finishLatch.countDown();
+ }
+
+ @Override
+ public void onCompleted() {
+ LOGGER.info("Completed blueprint({}:{}) for action({})", header.getBlueprintVersion(),
+ header.getBlueprintName(), header.getBlueprintVersion());
+ finishLatch.countDown();
+ }
+ };
+
+ final StreamObserver<ExecutionServiceInput> requestObserver = asyncStub.process(responseObserver);
+ try {
+ // Send the message to CDS backend for processing
+ requestObserver.onNext(request);
+ // Mark the end of requests
+ requestObserver.onCompleted();
+ } catch (RuntimeException e) {
+ requestObserver.onError(e);
+ }
+ return finishLatch;
+ }
+}
diff --git a/models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/properties/CdsServerProperties.java b/models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/properties/CdsServerProperties.java
new file mode 100644
index 000000000..94a336b6d
--- /dev/null
+++ b/models-interactions/model-impl/cds/src/main/java/org/onap/policy/cds/properties/CdsServerProperties.java
@@ -0,0 +1,87 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Bell Canada.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.cds.properties;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import org.onap.policy.common.parameters.GroupValidationResult;
+import org.onap.policy.common.parameters.ParameterGroup;
+import org.onap.policy.common.parameters.ParameterRuntimeException;
+import org.onap.policy.common.parameters.annotations.Max;
+import org.onap.policy.common.parameters.annotations.Min;
+import org.onap.policy.common.parameters.annotations.NotNull;
+
+@Getter
+@Setter
+@ToString
+public class CdsServerProperties implements ParameterGroup {
+
+ // Port range constants
+ private static final int MIN_USER_PORT = 1024;
+ private static final int MAX_USER_PORT = 65535;
+
+ private static final String INVALID_PROP = "Invalid CDS property: ";
+ private static final String SERVER_PROPERTIES_TYPE = "CDS gRPC Server Properties";
+
+ // CDS carrier properties
+ @Min(value = 1)
+ private int timeout;
+
+ @Min(value = MIN_USER_PORT)
+ @Max(value = MAX_USER_PORT)
+ private int port;
+
+ @NotNull
+ private String host;
+
+ @NotNull
+ private String username;
+
+ @NotNull
+ private String password;
+
+
+ @Override
+ public String getName() {
+ return SERVER_PROPERTIES_TYPE;
+ }
+
+ @Override
+ public void setName(final String name) {
+ throw new ParameterRuntimeException("The name of this ParameterGroup implementation is always " + getName());
+ }
+
+ @Override
+ public GroupValidationResult validate() {
+ return new GroupValidationResult(this);
+ }
+
+ /**
+ * Generate base64-encoded Authorization header from username and password.
+ *
+ * @return Base64 encoded string
+ */
+ public String getBasicAuth() {
+ return Base64.getEncoder().encodeToString(String.format("%s:%s", getUsername(), getPassword())
+ .getBytes(StandardCharsets.UTF_8));
+ }
+}
diff --git a/models-interactions/model-impl/cds/src/test/java/org/onap/policy/cds/api/TestCdsProcessorListenerImpl.java b/models-interactions/model-impl/cds/src/test/java/org/onap/policy/cds/api/TestCdsProcessorListenerImpl.java
new file mode 100644
index 000000000..6dfd70dd6
--- /dev/null
+++ b/models-interactions/model-impl/cds/src/test/java/org/onap/policy/cds/api/TestCdsProcessorListenerImpl.java
@@ -0,0 +1,49 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Bell Canada.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.cds.api;
+
+import io.grpc.Status;
+import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Used as a helper for the gRPC client unit test.
+ */
+public class TestCdsProcessorListenerImpl implements CdsProcessorListener {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(TestCdsProcessorListenerImpl.class);
+
+ /**
+ * Used to verify/inspect message received from server.
+ */
+ @Override
+ public void onMessage(final ExecutionServiceOutput message) {
+ LOGGER.info("Received notification from CDS: {}", message);
+ }
+
+ /**
+ * Used to verify/inspect error received from server.
+ */
+ @Override
+ public void onError(final Throwable throwable) {
+ Status status = Status.fromThrowable(throwable);
+ LOGGER.error("Failed processing blueprint {}", status, throwable);
+ }
+}
diff --git a/models-interactions/model-impl/cds/src/test/java/org/onap/policy/cds/client/BasicAuthClientHeaderInterceptorTest.java b/models-interactions/model-impl/cds/src/test/java/org/onap/policy/cds/client/BasicAuthClientHeaderInterceptorTest.java
new file mode 100644
index 000000000..3b6ad7da1
--- /dev/null
+++ b/models-interactions/model-impl/cds/src/test/java/org/onap/policy/cds/client/BasicAuthClientHeaderInterceptorTest.java
@@ -0,0 +1,138 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Bell Canada.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.cds.client;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.AdditionalAnswers.delegatesTo;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+import io.grpc.ClientInterceptors;
+import io.grpc.ManagedChannel;
+import io.grpc.Metadata;
+import io.grpc.Metadata.Key;
+import io.grpc.ServerCall;
+import io.grpc.ServerCall.Listener;
+import io.grpc.ServerCallHandler;
+import io.grpc.ServerInterceptor;
+import io.grpc.ServerInterceptors;
+import io.grpc.inprocess.InProcessChannelBuilder;
+import io.grpc.inprocess.InProcessServerBuilder;
+import io.grpc.stub.StreamObserver;
+import io.grpc.testing.GrpcCleanupRule;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.ArgumentMatchers;
+import org.onap.ccsdk.cds.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc;
+import org.onap.ccsdk.cds.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc.BluePrintProcessingServiceImplBase;
+import org.onap.ccsdk.cds.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc.BluePrintProcessingServiceStub;
+import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput;
+import org.onap.policy.cds.properties.CdsServerProperties;
+
+public class BasicAuthClientHeaderInterceptorTest {
+
+ // Generate a unique in-process server name.
+ private static final String SERVER_NAME = InProcessServerBuilder.generateName();
+ private static final String CREDS = "test";
+
+ // Manages automatic graceful shutdown for the registered server and client channels at the end of test.
+ @Rule
+ public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule();
+
+ private final ServerInterceptor mockCdsGrpcServerInterceptor = mock(ServerInterceptor.class,
+ delegatesTo(new TestServerInterceptor()));
+
+ private final CdsServerProperties props = new CdsServerProperties();
+
+ private ManagedChannel channel;
+
+ /**
+ * Setup the test.
+ *
+ * @throws IOException on failure to register the test grpc server for graceful shutdown
+ */
+ @Before
+ public void setUp() throws IOException {
+ // Setup the CDS properties
+ props.setHost(SERVER_NAME);
+ props.setPort(2000);
+ props.setUsername(CREDS);
+ props.setPassword(CREDS);
+ props.setTimeout(60);
+
+ // Implement the test gRPC server
+ BluePrintProcessingServiceImplBase testCdsBlueprintServerImpl = new BluePrintProcessingServiceImplBase() {};
+
+ // Create a server, add service, start, and register for automatic graceful shutdown.
+ grpcCleanup.register(InProcessServerBuilder.forName(SERVER_NAME).directExecutor()
+ .addService(ServerInterceptors.intercept(testCdsBlueprintServerImpl, mockCdsGrpcServerInterceptor)).build()
+ .start());
+
+ // Create a client channel and register for automatic graceful shutdown
+ channel = grpcCleanup.register(InProcessChannelBuilder.forName(SERVER_NAME).directExecutor().build());
+ }
+
+ @Test
+ public void testIfBasicAuthHeaderIsDeliveredToCdsServer() {
+ BluePrintProcessingServiceStub bpProcessingSvcStub = BluePrintProcessingServiceGrpc
+ .newStub(ClientInterceptors.intercept(channel, new BasicAuthClientHeaderInterceptor(props)));
+ ArgumentCaptor<Metadata> metadataCaptor = ArgumentCaptor.forClass(Metadata.class);
+ bpProcessingSvcStub.process(new StreamObserver<ExecutionServiceOutput>() {
+ @Override
+ public void onNext(final ExecutionServiceOutput executionServiceOutput) {
+ // Test purpose only
+ }
+
+ @Override
+ public void onError(final Throwable throwable) {
+ // Test purpose only
+ }
+
+ @Override
+ public void onCompleted() {
+ // Test purpose only
+ }
+ });
+ verify(mockCdsGrpcServerInterceptor).interceptCall(ArgumentMatchers.any(), metadataCaptor.capture(),
+ ArgumentMatchers.any());
+
+ Key<String> authHeader = Key
+ .of(BasicAuthClientHeaderInterceptor.BASIC_AUTH_HEADER_KEY, Metadata.ASCII_STRING_MARSHALLER);
+ String expectedBaseAuth = Base64.getEncoder().encodeToString(String.format("%s:%s", CREDS, CREDS)
+ .getBytes(StandardCharsets.UTF_8));
+ assertEquals(expectedBaseAuth, metadataCaptor.getValue().get(authHeader));
+ }
+
+ private static class TestServerInterceptor implements ServerInterceptor {
+
+ @Override
+ public <ReqT, RespT> Listener<ReqT> interceptCall(final ServerCall<ReqT, RespT> serverCall,
+ final Metadata metadata,
+ final ServerCallHandler<ReqT, RespT> serverCallHandler) {
+ return serverCallHandler.startCall(serverCall, metadata);
+ }
+ }
+}
+
+
diff --git a/models-interactions/model-impl/cds/src/test/java/org/onap/policy/cds/client/CdsProcessorGrpcClientTest.java b/models-interactions/model-impl/cds/src/test/java/org/onap/policy/cds/client/CdsProcessorGrpcClientTest.java
new file mode 100644
index 000000000..b9a9a84cd
--- /dev/null
+++ b/models-interactions/model-impl/cds/src/test/java/org/onap/policy/cds/client/CdsProcessorGrpcClientTest.java
@@ -0,0 +1,182 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Bell Canada.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.cds.client;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+
+import io.grpc.ManagedChannel;
+import io.grpc.inprocess.InProcessChannelBuilder;
+import io.grpc.inprocess.InProcessServerBuilder;
+import io.grpc.stub.StreamObserver;
+import io.grpc.testing.GrpcCleanupRule;
+import io.grpc.util.MutableHandlerRegistry;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.onap.ccsdk.cds.controllerblueprints.common.api.ActionIdentifiers;
+import org.onap.ccsdk.cds.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc.BluePrintProcessingServiceImplBase;
+import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput;
+import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput;
+import org.onap.policy.cds.api.CdsProcessorListener;
+import org.onap.policy.cds.api.TestCdsProcessorListenerImpl;
+import org.onap.policy.cds.properties.CdsServerProperties;
+
+public class CdsProcessorGrpcClientTest {
+
+ // Generate a unique in-process server name.
+ private static final String SERVER_NAME = InProcessServerBuilder.generateName();
+
+ // Manages automatic graceful shutdown for the registered server and client channels at the end of test.
+ @Rule
+ public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule();
+
+ private final CdsProcessorListener listener = spy(new TestCdsProcessorListenerImpl());
+ private final CdsServerProperties props = new CdsServerProperties();
+ private final MutableHandlerRegistry serviceRegistry = new MutableHandlerRegistry();
+ private final AtomicReference<StreamObserver<ExecutionServiceOutput>> responseObserverRef = new AtomicReference<>();
+ private final List<String> messagesDelivered = new ArrayList<>();
+ private final CountDownLatch allRequestsDelivered = new CountDownLatch(1);
+
+ private CdsProcessorGrpcClient client;
+
+ /**
+ * Setup the test.
+ *
+ * @throws IOException on failure to register the test grpc server for graceful shutdown
+ */
+ @Before
+ public void setUp() throws IOException {
+ // Setup the CDS properties
+ props.setHost(SERVER_NAME);
+ props.setPort(2000);
+ props.setUsername("testUser");
+ props.setPassword("testPassword");
+ props.setTimeout(60);
+
+ // Create a server, add service, start, and register for automatic graceful shutdown.
+ grpcCleanup.register(InProcessServerBuilder.forName(SERVER_NAME)
+ .fallbackHandlerRegistry(serviceRegistry).directExecutor().build().start());
+
+ // Create a client channel and register for automatic graceful shutdown
+ ManagedChannel channel = grpcCleanup
+ .register(InProcessChannelBuilder.forName(SERVER_NAME).directExecutor().build());
+
+ // Create an instance of the gRPC client
+ client = new CdsProcessorGrpcClient(channel, new CdsProcessorHandler(listener));
+
+ // Implement the test gRPC server
+ BluePrintProcessingServiceImplBase testCdsBlueprintServerImpl = new BluePrintProcessingServiceImplBase() {
+ @Override
+ public StreamObserver<ExecutionServiceInput> process(
+ final StreamObserver<ExecutionServiceOutput> responseObserver) {
+ responseObserverRef.set(responseObserver);
+
+ return new StreamObserver<ExecutionServiceInput>() {
+ @Override
+ public void onNext(final ExecutionServiceInput executionServiceInput) {
+ messagesDelivered.add(executionServiceInput.getActionIdentifiers().getActionName());
+ }
+
+ @Override
+ public void onError(final Throwable throwable) {
+ // Test method
+ }
+
+ @Override
+ public void onCompleted() {
+ allRequestsDelivered.countDown();
+ }
+ };
+ }
+ };
+ serviceRegistry.addService(testCdsBlueprintServerImpl);
+ }
+
+ @After
+ public void tearDown() {
+ client.close();
+ }
+
+ @Test
+ public void testCdsProcessorGrpcClientConstructor() {
+ new CdsProcessorGrpcClient(listener, props);
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testCdsProcessorGrpcClientConstructorFailure() {
+ props.setHost(null);
+ new CdsProcessorGrpcClient(listener, props);
+ }
+
+ @Test
+ public void testSendRequestFail() throws InterruptedException {
+ // Setup
+ ExecutionServiceInput testReq = ExecutionServiceInput.newBuilder()
+ .setActionIdentifiers(ActionIdentifiers.newBuilder().setActionName("policy-to-cds").build())
+ .build();
+
+ // Act
+ CountDownLatch finishLatch = client.sendRequest(testReq);
+ responseObserverRef.get().onError(new Throwable("failed to send testReq."));
+
+ verify(listener).onError(any(Throwable.class));
+ assertTrue(finishLatch.await(0, TimeUnit.SECONDS));
+ }
+
+ @Test
+ public void testSendRequestSuccess() throws InterruptedException {
+ // Setup request
+ ExecutionServiceInput testReq1 = ExecutionServiceInput.newBuilder()
+ .setActionIdentifiers(ActionIdentifiers.newBuilder().setActionName("policy-to-cds-req1").build()).build();
+
+ // Act
+ final CountDownLatch finishLatch = client.sendRequest(testReq1);
+
+ // Assert that request message was sent and delivered once to the server
+ assertTrue(allRequestsDelivered.await(1, TimeUnit.SECONDS));
+ assertEquals(Collections.singletonList("policy-to-cds-req1"), messagesDelivered);
+
+ // Setup the server to send out two simple response messages and verify that the client receives them.
+ ExecutionServiceOutput testResp1 = ExecutionServiceOutput.newBuilder()
+ .setActionIdentifiers(ActionIdentifiers.newBuilder().setActionName("policy-to-cds-resp1").build()).build();
+ ExecutionServiceOutput testResp2 = ExecutionServiceOutput.newBuilder()
+ .setActionIdentifiers(ActionIdentifiers.newBuilder().setActionName("policy-to-cds-resp2").build()).build();
+ responseObserverRef.get().onNext(testResp1);
+ verify(listener).onMessage(testResp1);
+ responseObserverRef.get().onNext(testResp2);
+ verify(listener).onMessage(testResp2);
+
+ // let server complete.
+ responseObserverRef.get().onCompleted();
+ assertTrue(finishLatch.await(0, TimeUnit.SECONDS));
+ }
+}