diff options
Diffstat (limited to 'models-interactions/model-actors/actor.cds/src')
2 files changed, 74 insertions, 2 deletions
diff --git a/models-interactions/model-actors/actor.cds/src/main/java/org/onap/policy/controlloop/actor/cds/GrpcOperation.java b/models-interactions/model-actors/actor.cds/src/main/java/org/onap/policy/controlloop/actor/cds/GrpcOperation.java index 820f4de34..0a882ce93 100644 --- a/models-interactions/model-actors/actor.cds/src/main/java/org/onap/policy/controlloop/actor/cds/GrpcOperation.java +++ b/models-interactions/model-actors/actor.cds/src/main/java/org/onap/policy/controlloop/actor/cds/GrpcOperation.java @@ -51,6 +51,7 @@ import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome; import org.onap.policy.controlloop.actorserviceprovider.Util; import org.onap.policy.controlloop.actorserviceprovider.impl.OperationPartial; import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams; +import org.onap.policy.controlloop.actorserviceprovider.pipeline.PipelineControllerFuture; import org.onap.policy.controlloop.policy.TargetType; /** @@ -202,13 +203,26 @@ public class GrpcOperation extends OperationPartial { @Override protected CompletableFuture<OperationOutcome> startOperationAsync(int attempt, OperationOutcome outcome) { + /* + * construct the request first so that we don't have to clean up the "client" if + * an exception is thrown + */ + ExecutionServiceInput request = constructRequest(params); + CompletableFuture<OperationOutcome> future = new CompletableFuture<>(); + client = new CdsProcessorGrpcClient(new CdsActorServiceManager(outcome, future), config.getCdsServerProperties()); - ExecutionServiceInput request = constructRequest(params); client.sendRequest(request); - return future; + + // arrange to shutdown the client when the request completes + PipelineControllerFuture<OperationOutcome> controller = new PipelineControllerFuture<>(); + + controller.wrap(future).whenCompleteAsync(controller.delayedComplete(), params.getExecutor()) + .whenCompleteAsync((arg1, arg2) -> client.close(), getBlockingExecutor()); + + return controller; } /** diff --git a/models-interactions/model-actors/actor.cds/src/test/java/org/onap/policy/controlloop/actor/cds/GrpcOperationTest.java b/models-interactions/model-actors/actor.cds/src/test/java/org/onap/policy/controlloop/actor/cds/GrpcOperationTest.java index 9477a1585..fe8d42814 100644 --- a/models-interactions/model-actors/actor.cds/src/test/java/org/onap/policy/controlloop/actor/cds/GrpcOperationTest.java +++ b/models-interactions/model-actors/actor.cds/src/test/java/org/onap/policy/controlloop/actor/cds/GrpcOperationTest.java @@ -35,16 +35,20 @@ import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import org.junit.AfterClass; import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Test; import org.mockito.Mock; import org.mockito.MockitoAnnotations; import org.onap.aai.domain.yang.GenericVnf; import org.onap.aai.domain.yang.ServiceInstance; import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput; +import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput; import org.onap.policy.aai.AaiCqResponse; import org.onap.policy.cds.client.CdsProcessorGrpcClient; import org.onap.policy.cds.properties.CdsServerProperties; @@ -63,6 +67,8 @@ import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOp import org.onap.policy.controlloop.policy.PolicyResult; import org.onap.policy.controlloop.policy.Target; import org.onap.policy.controlloop.policy.TargetType; +import org.onap.policy.simulators.CdsSimulator; +import org.onap.policy.simulators.Util; public class GrpcOperationTest { private static final String TARGET_ENTITY = "entity"; @@ -74,6 +80,14 @@ public class GrpcOperationTest { private static final UUID REQUEST_ID = UUID.randomUUID(); private static final Coder coder = new StandardCoder(); + protected static final Executor blockingExecutor = command -> { + Thread thread = new Thread(command); + thread.setDaemon(true); + thread.start(); + }; + + private static CdsSimulator sim; + @Mock private CdsProcessorGrpcClient cdsClient; private CdsServerProperties cdsProps; @@ -82,6 +96,16 @@ public class GrpcOperationTest { private Target target; private GrpcOperation operation; + @BeforeClass + public static void setUpBeforeClass() throws Exception { + sim = Util.buildCdsSim(); + } + + @AfterClass + public static void tearDownAfterClass() { + sim.stop(); + } + /** * Sets up the fields. */ @@ -112,6 +136,40 @@ public class GrpcOperationTest { target.setResourceID(RESOURCE_ID); } + /** + * Tests "success" case with simulator. + */ + @Test + public void testSuccess() throws Exception { + ControlLoopEventContext context = new ControlLoopEventContext(onset); + loadCqData(context); + + Map<String, Object> payload = Map.of("artifact_name", "my_artifact", "artifact_version", "1.0"); + + final ControlLoopOperationParams params = ControlLoopOperationParams.builder() + .actor(CdsActorConstants.CDS_ACTOR).operation("subscribe").context(context) + .actorService(new ActorService()).targetEntity(TARGET_ENTITY).target(target).retry(0) + .timeoutSec(5).executor(blockingExecutor).payload(payload).build(); + + cdsProps.setHost("localhost"); + cdsProps.setPort(sim.getPort()); + cdsProps.setTimeout(3); + + GrpcConfig config = new GrpcConfig(blockingExecutor, cdsProps); + + operation = new GrpcOperation(params, config) { + @Override + protected CompletableFuture<OperationOutcome> startGuardAsync() { + // indicate that guard completed successfully + return CompletableFuture.completedFuture(params.makeOutcome()); + } + }; + + OperationOutcome outcome = operation.start().get(); + assertEquals(PolicyResult.SUCCESS, outcome.getResult()); + assertTrue(outcome.getResponse() instanceof ExecutionServiceOutput); + } + @Test public void testStartPreprocessorAsync() throws InterruptedException, ExecutionException, TimeoutException { |