aboutsummaryrefslogtreecommitdiffstats
path: root/models-interactions/model-actors
diff options
context:
space:
mode:
Diffstat (limited to 'models-interactions/model-actors')
-rw-r--r--models-interactions/model-actors/actor.cds/src/main/java/org/onap/policy/controlloop/actor/cds/GrpcOperation.java18
-rw-r--r--models-interactions/model-actors/actor.cds/src/test/java/org/onap/policy/controlloop/actor/cds/GrpcOperationTest.java58
2 files changed, 74 insertions, 2 deletions
diff --git a/models-interactions/model-actors/actor.cds/src/main/java/org/onap/policy/controlloop/actor/cds/GrpcOperation.java b/models-interactions/model-actors/actor.cds/src/main/java/org/onap/policy/controlloop/actor/cds/GrpcOperation.java
index 820f4de34..0a882ce93 100644
--- a/models-interactions/model-actors/actor.cds/src/main/java/org/onap/policy/controlloop/actor/cds/GrpcOperation.java
+++ b/models-interactions/model-actors/actor.cds/src/main/java/org/onap/policy/controlloop/actor/cds/GrpcOperation.java
@@ -51,6 +51,7 @@ import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
import org.onap.policy.controlloop.actorserviceprovider.Util;
import org.onap.policy.controlloop.actorserviceprovider.impl.OperationPartial;
import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
+import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture;
import org.onap.policy.controlloop.policy.TargetType;
/**
@@ -202,13 +203,26 @@ public class GrpcOperation extends OperationPartial {
@Override
protected CompletableFuture<OperationOutcome> startOperationAsync(int attempt, OperationOutcome outcome) {
+ /*
+ * construct the request first so that we don't have to clean up the "client" if
+ * an exception is thrown
+ */
+ ExecutionServiceInput request = constructRequest(params);
+
CompletableFuture<OperationOutcome> future = new CompletableFuture<>();
+
client = new CdsProcessorGrpcClient(new CdsActorServiceManager(outcome, future),
config.getCdsServerProperties());
- ExecutionServiceInput request = constructRequest(params);
client.sendRequest(request);
- return future;
+
+ // arrange to shutdown the client when the request completes
+ PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>();
+
+ controller.wrap(future).whenCompleteAsync(controller.delayedComplete(), params.getExecutor())
+ .whenCompleteAsync((arg1, arg2) -> client.close(), getBlockingExecutor());
+
+ return controller;
}
/**
diff --git a/models-interactions/model-actors/actor.cds/src/test/java/org/onap/policy/controlloop/actor/cds/GrpcOperationTest.java b/models-interactions/model-actors/actor.cds/src/test/java/org/onap/policy/controlloop/actor/cds/GrpcOperationTest.java
index 9477a1585..fe8d42814 100644
--- a/models-interactions/model-actors/actor.cds/src/test/java/org/onap/policy/controlloop/actor/cds/GrpcOperationTest.java
+++ b/models-interactions/model-actors/actor.cds/src/test/java/org/onap/policy/controlloop/actor/cds/GrpcOperationTest.java
@@ -35,16 +35,20 @@ import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.junit.AfterClass;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.onap.aai.domain.yang.GenericVnf;
import org.onap.aai.domain.yang.ServiceInstance;
import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput;
+import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput;
import org.onap.policy.aai.AaiCqResponse;
import org.onap.policy.cds.client.CdsProcessorGrpcClient;
import org.onap.policy.cds.properties.CdsServerProperties;
@@ -63,6 +67,8 @@ import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOp
import org.onap.policy.controlloop.policy.PolicyResult;
import org.onap.policy.controlloop.policy.Target;
import org.onap.policy.controlloop.policy.TargetType;
+import org.onap.policy.simulators.CdsSimulator;
+import org.onap.policy.simulators.Util;
public class GrpcOperationTest {
private static final String TARGET_ENTITY = "entity";
@@ -74,6 +80,14 @@ public class GrpcOperationTest {
private static final UUID REQUEST_ID = UUID.randomUUID();
private static final Coder coder = new StandardCoder();
+ protected static final Executor blockingExecutor = command -> {
+ Thread thread = new Thread(command);
+ thread.setDaemon(true);
+ thread.start();
+ };
+
+ private static CdsSimulator sim;
+
@Mock
private CdsProcessorGrpcClient cdsClient;
private CdsServerProperties cdsProps;
@@ -82,6 +96,16 @@ public class GrpcOperationTest {
private Target target;
private GrpcOperation operation;
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ sim = Util.buildCdsSim();
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() {
+ sim.stop();
+ }
+
/**
* Sets up the fields.
*/
@@ -112,6 +136,40 @@ public class GrpcOperationTest {
target.setResourceID(RESOURCE_ID);
}
+ /**
+ * Tests "success" case with simulator.
+ */
+ @Test
+ public void testSuccess() throws Exception {
+ ControlLoopEventContext context = new ControlLoopEventContext(onset);
+ loadCqData(context);
+
+ Map<String, Object> payload = Map.of("artifact_name", "my_artifact", "artifact_version", "1.0");
+
+ final ControlLoopOperationParams params = ControlLoopOperationParams.builder()
+ .actor(CdsActorConstants.CDS_ACTOR).operation("subscribe").context(context)
+ .actorService(new ActorService()).targetEntity(TARGET_ENTITY).target(target).retry(0)
+ .timeoutSec(5).executor(blockingExecutor).payload(payload).build();
+
+ cdsProps.setHost("localhost");
+ cdsProps.setPort(sim.getPort());
+ cdsProps.setTimeout(3);
+
+ GrpcConfig config = new GrpcConfig(blockingExecutor, cdsProps);
+
+ operation = new GrpcOperation(params, config) {
+ @Override
+ protected CompletableFuture<OperationOutcome> startGuardAsync() {
+ // indicate that guard completed successfully
+ return CompletableFuture.completedFuture(params.makeOutcome());
+ }
+ };
+
+ OperationOutcome outcome = operation.start().get();
+ assertEquals(PolicyResult.SUCCESS, outcome.getResult());
+ assertTrue(outcome.getResponse() instanceof ExecutionServiceOutput);
+ }
+
@Test
public void testStartPreprocessorAsync() throws InterruptedException, ExecutionException, TimeoutException {