aboutsummaryrefslogtreecommitdiffstats
path: root/models-interactions/model-actors
diff options
context:
space:
mode:
authorRam Krishna Verma <ram_krishna.verma@bell.ca>2020-02-21 17:24:47 -0500
committerRam Krishna Verma <ram_krishna.verma@bell.ca>2020-02-25 16:03:07 -0500
commit0c7aa23b5b429b33acc05867dffcfb97cdc5ca68 (patch)
treeab221541e82f2eccb6d1e76a4dcfb958c9ab7121 /models-interactions/model-actors
parentb2ed7d2a42ec2852f89a117f4448f044053e99b8 (diff)
Add actor for CDS
1) Create the operator, operation & manager classes for gRPC request. 2) Use CompletableFuture to track CDS request flow. Issue-ID: POLICY-2384 Change-Id: I84e30131a69c2d24c1871ceebced2b69194f619c Signed-off-by: Ram Krishna Verma <ram_krishna.verma@bell.ca>
Diffstat (limited to 'models-interactions/model-actors')
-rw-r--r--models-interactions/model-actors/actor.cds/pom.xml6
-rw-r--r--models-interactions/model-actors/actor.cds/src/main/java/org/onap/policy/controlloop/actor/cds/CdsActorServiceManager.java81
-rw-r--r--models-interactions/model-actors/actor.cds/src/main/java/org/onap/policy/controlloop/actor/cds/CdsActorServiceProvider.java88
-rw-r--r--models-interactions/model-actors/actor.cds/src/main/java/org/onap/policy/controlloop/actor/cds/GrpcConfig.java51
-rw-r--r--models-interactions/model-actors/actor.cds/src/main/java/org/onap/policy/controlloop/actor/cds/GrpcOperation.java203
-rw-r--r--models-interactions/model-actors/actor.cds/src/main/java/org/onap/policy/controlloop/actor/cds/GrpcOperator.java106
6 files changed, 497 insertions, 38 deletions
diff --git a/models-interactions/model-actors/actor.cds/pom.xml b/models-interactions/model-actors/actor.cds/pom.xml
index 4a5979eeb..d17e0b230 100644
--- a/models-interactions/model-actors/actor.cds/pom.xml
+++ b/models-interactions/model-actors/actor.cds/pom.xml
@@ -40,6 +40,12 @@
<scope>provided</scope>
</dependency>
<dependency>
+ <groupId>org.onap.policy.models.policy-models-interactions.model-actors</groupId>
+ <artifactId>actor.aai</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
<groupId>org.onap.policy.models.policy-models-interactions.model-impl</groupId>
<artifactId>events</artifactId>
<version>${project.version}</version>
diff --git a/models-interactions/model-actors/actor.cds/src/main/java/org/onap/policy/controlloop/actor/cds/CdsActorServiceManager.java b/models-interactions/model-actors/actor.cds/src/main/java/org/onap/policy/controlloop/actor/cds/CdsActorServiceManager.java
new file mode 100644
index 000000000..2afa9fa2e
--- /dev/null
+++ b/models-interactions/model-actors/actor.cds/src/main/java/org/onap/policy/controlloop/actor/cds/CdsActorServiceManager.java
@@ -0,0 +1,81 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2020 Bell Canada. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actor.cds;
+
+import java.util.concurrent.CompletableFuture;
+import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType;
+import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput;
+import org.onap.policy.cds.api.CdsProcessorListener;
+import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
+import org.onap.policy.controlloop.policy.PolicyResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * CDS Actor service-manager implementation.
+ */
+public class CdsActorServiceManager implements CdsProcessorListener {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(CdsActorServiceManager.class);
+
+ private final CompletableFuture<OperationOutcome> future;
+
+ private final OperationOutcome outcome;
+
+ /**
+ * Constructs the object.
+ *
+ * @param outcome the operation outcome to populate
+ * @param future the future to complete
+ */
+ public CdsActorServiceManager(OperationOutcome outcome, CompletableFuture<OperationOutcome> future) {
+ this.outcome = outcome;
+ this.future = future;
+ }
+
+ /**
+ * {@inheritDoc}.
+ */
+ @Override
+ public void onMessage(final ExecutionServiceOutput message) {
+ LOGGER.info("Received notification from CDS: {}", message);
+ EventType eventType = message.getStatus().getEventType();
+ switch (eventType) {
+ case EVENT_COMPONENT_PROCESSING:
+ LOGGER.info("CDS is processing the message: {}", message);
+ break;
+ case EVENT_COMPONENT_EXECUTED:
+ outcome.setResult(PolicyResult.SUCCESS);
+ future.complete(outcome);
+ break;
+ default:
+ outcome.setResult(PolicyResult.FAILURE);
+ future.complete(outcome);
+ break;
+ }
+ }
+
+ /**
+ * {@inheritDoc}.
+ */
+ @Override
+ public void onError(final Throwable throwable) {
+ future.completeExceptionally(throwable);
+ }
+}
diff --git a/models-interactions/model-actors/actor.cds/src/main/java/org/onap/policy/controlloop/actor/cds/CdsActorServiceProvider.java b/models-interactions/model-actors/actor.cds/src/main/java/org/onap/policy/controlloop/actor/cds/CdsActorServiceProvider.java
index 05ff02e5c..91ee55dad 100644
--- a/models-interactions/model-actors/actor.cds/src/main/java/org/onap/policy/controlloop/actor/cds/CdsActorServiceProvider.java
+++ b/models-interactions/model-actors/actor.cds/src/main/java/org/onap/policy/controlloop/actor/cds/CdsActorServiceProvider.java
@@ -54,16 +54,24 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * CDS Actor service-provider implementation. This is a deploy dark feature for El-Alto release.
+ * CDS Actor service-provider implementation. This is a deploy dark feature for El-Alto
+ * release.
*/
public class CdsActorServiceProvider extends ActorImpl {
private static final Logger LOGGER = LoggerFactory.getLogger(CdsActorServiceProvider.class);
+ /**
+ * Constructs the object.
+ */
public CdsActorServiceProvider() {
super(CdsActorConstants.CDS_ACTOR);
+
+ addOperator(new GrpcOperator(CdsActorConstants.CDS_ACTOR, GrpcOperation.NAME, GrpcOperation::new));
}
+ // TODO old code: remove lines down to **HERE**
+
/**
* {@inheritDoc}.
*/
@@ -97,20 +105,26 @@ public class CdsActorServiceProvider extends ActorImpl {
}
/**
- * Build the CDS ExecutionServiceInput request from the policy object and the AAI enriched parameters. TO-DO: Avoid
- * leaking Exceptions to the Kie Session thread. TBD item for Frankfurt release.
+ * Build the CDS ExecutionServiceInput request from the policy object and the AAI
+ * enriched parameters. TO-DO: Avoid leaking Exceptions to the Kie Session thread. TBD
+ * item for Frankfurt release.
*
* @param onset the event that is reporting the alert for policy to perform an action.
- * @param operation the control loop operation specifying the actor, operation, target, etc.
- * @param policy the policy specified from the yaml generated by CLAMP or through Policy API.
+ * @param operation the control loop operation specifying the actor, operation,
+ * target, etc.
+ * @param policy the policy specified from the yaml generated by CLAMP or through
+ * Policy API.
* @param aaiParams Map of enriched AAI attributes in node.attribute notation.
- * @return an Optional ExecutionServiceInput instance if valid else an Optional empty object is returned.
+ * @return an Optional ExecutionServiceInput instance if valid else an Optional empty
+ * object is returned.
*/
public Optional<ExecutionServiceInput> constructRequest(VirtualControlLoopEvent onset,
- ControlLoopOperation operation, Policy policy, Map<String, String> aaiParams) {
+ ControlLoopOperation operation, Policy policy, Map<String, String> aaiParams) {
- // For the current operational TOSCA policy model (yaml) CBA name and version are embedded in the payload
- // section, with the new policy type model being proposed in Frankfurt we will be able to move it out.
+ // For the current operational TOSCA policy model (yaml) CBA name and version are
+ // embedded in the payload
+ // section, with the new policy type model being proposed in Frankfurt we will be
+ // able to move it out.
Map<String, String> payload = policy.getPayload();
if (!validateCdsMandatoryParams(policy)) {
return Optional.empty();
@@ -118,12 +132,14 @@ public class CdsActorServiceProvider extends ActorImpl {
String cbaName = payload.get(CdsActorConstants.KEY_CBA_NAME);
String cbaVersion = payload.get(CdsActorConstants.KEY_CBA_VERSION);
- // Retain only the payload by removing CBA name and version once they are extracted
+ // Retain only the payload by removing CBA name and version once they are
+ // extracted
// to be put in CDS request header.
payload.remove(CdsActorConstants.KEY_CBA_NAME);
payload.remove(CdsActorConstants.KEY_CBA_VERSION);
- // Embed payload from policy to ConfigDeployRequest object, serialize and inject into grpc request.
+ // Embed payload from policy to ConfigDeployRequest object, serialize and inject
+ // into grpc request.
String cbaActionName = policy.getRecipe();
CdsActionRequest request = new CdsActionRequest();
request.setPolicyPayload(payload);
@@ -132,7 +148,8 @@ public class CdsActorServiceProvider extends ActorImpl {
// Inject AAI properties into payload map. Offer flexibility to the usecase
// implementation to inject whatever AAI parameters are of interest to them.
- // E.g. For vFW usecase El-Alto inject service-instance-id, generic-vnf-id as needed by CDS.
+ // E.g. For vFW usecase El-Alto inject service-instance-id, generic-vnf-id as
+ // needed by CDS.
request.setAaiProperties(aaiParams);
// Inject any additional event parameters that may be present in the onset event
@@ -143,36 +160,28 @@ public class CdsActorServiceProvider extends ActorImpl {
Builder struct = Struct.newBuilder();
try {
String requestStr = request.generateCdsPayload();
- Preconditions.checkState(!Strings.isNullOrEmpty(requestStr), "Unable to build "
- + "config-deploy-request from payload parameters: {}", payload);
+ Preconditions.checkState(!Strings.isNullOrEmpty(requestStr),
+ "Unable to build " + "config-deploy-request from payload parameters: {}", payload);
JsonFormat.parser().merge(requestStr, struct);
} catch (InvalidProtocolBufferException | CoderException e) {
LOGGER.error("Failed to embed CDS payload string into the input request. blueprint({}:{}) for action({})",
- cbaName, cbaVersion, cbaActionName, e);
+ cbaName, cbaVersion, cbaActionName, e);
return Optional.empty();
}
// Build CDS gRPC request common-header
- CommonHeader commonHeader = CommonHeader.newBuilder()
- .setOriginatorId(CdsActorConstants.ORIGINATOR_ID)
- .setRequestId(onset.getRequestId().toString())
- .setSubRequestId(operation.getSubRequestId())
- .build();
+ CommonHeader commonHeader = CommonHeader.newBuilder().setOriginatorId(CdsActorConstants.ORIGINATOR_ID)
+ .setRequestId(onset.getRequestId().toString()).setSubRequestId(operation.getSubRequestId())
+ .build();
// Build CDS gRPC request action-identifier
- ActionIdentifiers actionIdentifiers = ActionIdentifiers.newBuilder()
- .setBlueprintName(cbaName)
- .setBlueprintVersion(cbaVersion)
- .setActionName(cbaActionName)
- .setMode(CdsActorConstants.CDS_MODE)
- .build();
+ ActionIdentifiers actionIdentifiers =
+ ActionIdentifiers.newBuilder().setBlueprintName(cbaName).setBlueprintVersion(cbaVersion)
+ .setActionName(cbaActionName).setMode(CdsActorConstants.CDS_MODE).build();
// Finally build the ExecutionServiceInput gRPC request object.
- ExecutionServiceInput executionServiceInput = ExecutionServiceInput.newBuilder()
- .setCommonHeader(commonHeader)
- .setActionIdentifiers(actionIdentifiers)
- .setPayload(struct.build())
- .build();
+ ExecutionServiceInput executionServiceInput = ExecutionServiceInput.newBuilder().setCommonHeader(commonHeader)
+ .setActionIdentifiers(actionIdentifiers).setPayload(struct.build()).build();
return Optional.of(executionServiceInput);
}
@@ -184,8 +193,8 @@ public class CdsActorServiceProvider extends ActorImpl {
String cbaName = payload.get(CdsActorConstants.KEY_CBA_NAME);
String cbaVersion = payload.get(CdsActorConstants.KEY_CBA_VERSION);
String cbaActionName = policy.getRecipe();
- return !Strings.isNullOrEmpty(cbaName) && !Strings.isNullOrEmpty(cbaVersion) && !Strings
- .isNullOrEmpty(cbaActionName);
+ return !Strings.isNullOrEmpty(cbaName) && !Strings.isNullOrEmpty(cbaVersion)
+ && !Strings.isNullOrEmpty(cbaActionName);
}
public class CdsActorServiceManager implements CdsProcessorListener {
@@ -234,10 +243,11 @@ public class CdsActorServiceProvider extends ActorImpl {
* @return the cds response.
*/
public CdsResponse sendRequestToCds(CdsProcessorGrpcClient cdsClient, CdsServerProperties cdsProps,
- ExecutionServiceInput executionServiceInput) {
+ ExecutionServiceInput executionServiceInput) {
try {
LOGGER.trace("Start CdsActorServiceProvider.executeCdsBlueprintProcessor {}.", executionServiceInput);
- // TO-DO: Handle requests asynchronously once the callback support is added to actors.
+ // TO-DO: Handle requests asynchronously once the callback support is
+ // added to actors.
CountDownLatch countDownLatch = cdsClient.sendRequest(executionServiceInput);
boolean status = countDownLatch.await(cdsProps.getTimeout(), TimeUnit.SECONDS);
if (!status) {
@@ -252,9 +262,9 @@ public class CdsActorServiceProvider extends ActorImpl {
LOGGER.info("Status of the CDS gRPC request is: {}", getCdsStatus());
CdsResponse response = new CdsResponse();
- response.setRequestId(
- executionServiceInput != null && executionServiceInput.getCommonHeader() != null
- ? executionServiceInput.getCommonHeader().getRequestId() : null);
+ response.setRequestId(executionServiceInput != null && executionServiceInput.getCommonHeader() != null
+ ? executionServiceInput.getCommonHeader().getRequestId()
+ : null);
response.setStatus(this.getCdsStatus());
return response;
}
@@ -263,4 +273,6 @@ public class CdsActorServiceProvider extends ActorImpl {
return cdsStatus.get();
}
}
+
+ // **HERE**
}
diff --git a/models-interactions/model-actors/actor.cds/src/main/java/org/onap/policy/controlloop/actor/cds/GrpcConfig.java b/models-interactions/model-actors/actor.cds/src/main/java/org/onap/policy/controlloop/actor/cds/GrpcConfig.java
new file mode 100644
index 000000000..3d79149c6
--- /dev/null
+++ b/models-interactions/model-actors/actor.cds/src/main/java/org/onap/policy/controlloop/actor/cds/GrpcConfig.java
@@ -0,0 +1,51 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2020 Bell Canada. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actor.cds;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import lombok.Getter;
+import org.onap.policy.cds.properties.CdsServerProperties;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.OperatorConfig;
+
+/**
+ * Configuration for gRPC Operators.
+ */
+@Getter
+public class GrpcConfig extends OperatorConfig {
+
+ /**
+ * Default timeout, in milliseconds, if none specified in the request.
+ */
+ private final long timeoutMs;
+
+ private CdsServerProperties cdsServerProperties;
+
+ /**
+ * Constructs the object.
+ *
+ * @param blockingExecutor executor to be used for tasks that may perform blocking I/O
+ * @param params operator parameters
+ */
+ public GrpcConfig(Executor blockingExecutor, CdsServerProperties params) {
+ super(blockingExecutor);
+ cdsServerProperties = params;
+ timeoutMs = TimeUnit.MILLISECONDS.convert(params.getTimeout(), TimeUnit.SECONDS);
+ }
+}
diff --git a/models-interactions/model-actors/actor.cds/src/main/java/org/onap/policy/controlloop/actor/cds/GrpcOperation.java b/models-interactions/model-actors/actor.cds/src/main/java/org/onap/policy/controlloop/actor/cds/GrpcOperation.java
new file mode 100644
index 000000000..efe358bad
--- /dev/null
+++ b/models-interactions/model-actors/actor.cds/src/main/java/org/onap/policy/controlloop/actor/cds/GrpcOperation.java
@@ -0,0 +1,203 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2020 Bell Canada. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actor.cds;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.Struct;
+import com.google.protobuf.Struct.Builder;
+import com.google.protobuf.util.JsonFormat;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import lombok.Getter;
+import org.onap.ccsdk.cds.controllerblueprints.common.api.ActionIdentifiers;
+import org.onap.ccsdk.cds.controllerblueprints.common.api.CommonHeader;
+import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput;
+import org.onap.policy.aai.AaiConstants;
+import org.onap.policy.aai.AaiCqResponse;
+import org.onap.policy.cds.client.CdsProcessorGrpcClient;
+import org.onap.policy.common.utils.coder.CoderException;
+import org.onap.policy.controlloop.actor.aai.AaiCustomQueryOperation;
+import org.onap.policy.controlloop.actor.cds.constants.CdsActorConstants;
+import org.onap.policy.controlloop.actor.cds.request.CdsActionRequest;
+import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome;
+import org.onap.policy.controlloop.actorserviceprovider.impl.OperationPartial;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
+
+/**
+ * Operation that uses gRPC to send request to CDS.
+ *
+ */
+@Getter
+public class GrpcOperation extends OperationPartial {
+
+ public static final String NAME = "gRPC";
+
+ private CdsProcessorGrpcClient client;
+
+ /**
+ * Configuration for this operation.
+ */
+ private final GrpcConfig config;
+
+ /**
+ * Constructs the object.
+ *
+ * @param params operation parameters
+ * @param config configuration for this operation
+ */
+ public GrpcOperation(ControlLoopOperationParams params, GrpcConfig config) {
+ super(params, config);
+ this.config = config;
+ }
+
+ /**
+ * If no timeout is specified, then it returns the operator's configured timeout.
+ */
+ @Override
+ protected long getTimeoutMs(Integer timeoutSec) {
+ return (timeoutSec == null || timeoutSec == 0 ? config.getTimeoutMs() : super.getTimeoutMs(timeoutSec));
+ }
+
+ /**
+ * Ensures that A&AI customer query has been performed.
+ */
+ @Override
+ @SuppressWarnings("unchecked")
+ protected CompletableFuture<OperationOutcome> startPreprocessorAsync() {
+ ControlLoopOperationParams cqParams = params.toBuilder().actor(AaiConstants.ACTOR_NAME)
+ .operation(AaiCustomQueryOperation.NAME).payload(null).retry(null).timeoutSec(null).build();
+
+ // run Custom Query and Guard, in parallel
+ return allOf(() -> params.getContext().obtain(AaiCqResponse.CONTEXT_KEY, cqParams), this::startGuardAsync);
+ }
+
+ /**
+ * {@inheritDoc}.
+ */
+ @Override
+ protected CompletableFuture<OperationOutcome> startOperationAsync(int attempt, OperationOutcome outcome) {
+
+ CompletableFuture<OperationOutcome> future = new CompletableFuture<>();
+ client = new CdsProcessorGrpcClient(new CdsActorServiceManager(outcome, future),
+ config.getCdsServerProperties());
+
+ ExecutionServiceInput request = constructRequest(params);
+ client.sendRequest(request);
+ return future;
+ }
+
+ /**
+ * Build the CDS ExecutionServiceInput request from the policy object and the AAI
+ * enriched parameters. TO-DO: Avoid leaking Exceptions to the Kie Session thread. TBD
+ * item for Frankfurt release.
+ *
+ * @param params the control loop parameters specifying the onset, payload, etc.
+ * @return an ExecutionServiceInput instance.
+ */
+ public ExecutionServiceInput constructRequest(ControlLoopOperationParams params) {
+
+ // For the current operational TOSCA policy model (yaml) CBA name and version are
+ // embedded in the payload
+ // section, with the new policy type model being proposed in Frankfurt we will be
+ // able to move it out.
+ if (!validateCdsMandatoryParams(params)) {
+ throw new IllegalArgumentException("missing cds mandatory params - " + params);
+ }
+ Map<String, String> payload = convertPayloadMap(params.getPayload());
+ String cbaName = payload.get(CdsActorConstants.KEY_CBA_NAME);
+ String cbaVersion = payload.get(CdsActorConstants.KEY_CBA_VERSION);
+
+ // Retain only the payload by removing CBA name and version once they are
+ // extracted
+ // to be put in CDS request header.
+ payload.remove(CdsActorConstants.KEY_CBA_NAME);
+ payload.remove(CdsActorConstants.KEY_CBA_VERSION);
+
+ // Embed payload from policy to ConfigDeployRequest object, serialize and inject
+ // into grpc request.
+ String cbaActionName = params.getOperation();
+ CdsActionRequest request = new CdsActionRequest();
+ request.setPolicyPayload(payload);
+ request.setActionName(cbaActionName);
+ request.setResolutionKey(UUID.randomUUID().toString());
+
+ // Inject AAI properties into payload map. Offer flexibility to the usecase
+ // implementation to inject whatever AAI parameters are of interest to them.
+ // E.g. For vFW usecase El-Alto inject service-instance-id, generic-vnf-id as
+ // needed by CDS.
+ request.setAaiProperties(params.getContext().getEnrichment());
+
+ // Inject any additional event parameters that may be present in the onset event
+ if (params.getContext().getEvent().getAdditionalEventParams() != null) {
+ request.setAdditionalEventParams(params.getContext().getEvent().getAdditionalEventParams());
+ }
+
+ Builder struct = Struct.newBuilder();
+ try {
+ String requestStr = request.generateCdsPayload();
+ Preconditions.checkState(!Strings.isNullOrEmpty(requestStr),
+ "Unable to build " + "config-deploy-request from payload parameters: {}", payload);
+ JsonFormat.parser().merge(requestStr, struct);
+ } catch (InvalidProtocolBufferException | CoderException e) {
+ throw new IllegalArgumentException("Failed to embed CDS payload string into the input request. blueprint({"
+ + cbaName + "}:{" + cbaVersion + "}) for action({" + cbaActionName + "})", e);
+ }
+
+ // Build CDS gRPC request common-header
+ CommonHeader commonHeader = CommonHeader.newBuilder().setOriginatorId(CdsActorConstants.ORIGINATOR_ID)
+ .setRequestId(params.getContext().getEvent().getRequestId().toString())
+ .setSubRequestId(Integer.toString(0)).build();
+
+ // Build CDS gRPC request action-identifier
+ ActionIdentifiers actionIdentifiers =
+ ActionIdentifiers.newBuilder().setBlueprintName(cbaName).setBlueprintVersion(cbaVersion)
+ .setActionName(cbaActionName).setMode(CdsActorConstants.CDS_MODE).build();
+
+ // Finally build & return the ExecutionServiceInput gRPC request object.
+ return ExecutionServiceInput.newBuilder().setCommonHeader(commonHeader).setActionIdentifiers(actionIdentifiers)
+ .setPayload(struct.build()).build();
+ }
+
+ private Map<String, String> convertPayloadMap(Map<String, Object> payload) {
+ Map<String, String> convertedPayload = new HashMap<>();
+ for (Entry<String, Object> entry : payload.entrySet()) {
+ convertedPayload.put(entry.getKey(), entry.getValue().toString());
+ }
+ return convertedPayload;
+ }
+
+ private boolean validateCdsMandatoryParams(ControlLoopOperationParams params) {
+ if (params == null || params.getPayload() == null) {
+ return false;
+ }
+ Map<String, Object> payload = params.getPayload();
+ if (payload.get(CdsActorConstants.KEY_CBA_NAME) == null
+ || payload.get(CdsActorConstants.KEY_CBA_VERSION) == null) {
+ return false;
+ }
+ return !Strings.isNullOrEmpty(payload.get(CdsActorConstants.KEY_CBA_NAME).toString())
+ && !Strings.isNullOrEmpty(payload.get(CdsActorConstants.KEY_CBA_VERSION).toString())
+ && !Strings.isNullOrEmpty(params.getOperation());
+ }
+}
diff --git a/models-interactions/model-actors/actor.cds/src/main/java/org/onap/policy/controlloop/actor/cds/GrpcOperator.java b/models-interactions/model-actors/actor.cds/src/main/java/org/onap/policy/controlloop/actor/cds/GrpcOperator.java
new file mode 100644
index 000000000..cc38d7205
--- /dev/null
+++ b/models-interactions/model-actors/actor.cds/src/main/java/org/onap/policy/controlloop/actor/cds/GrpcOperator.java
@@ -0,0 +1,106 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2020 Bell Canada. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.controlloop.actor.cds;
+
+import java.util.Map;
+import lombok.Getter;
+import org.onap.policy.cds.properties.CdsServerProperties;
+import org.onap.policy.common.parameters.ValidationResult;
+import org.onap.policy.controlloop.actorserviceprovider.Operation;
+import org.onap.policy.controlloop.actorserviceprovider.Util;
+import org.onap.policy.controlloop.actorserviceprovider.impl.OperationMaker;
+import org.onap.policy.controlloop.actorserviceprovider.impl.OperatorPartial;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams;
+import org.onap.policy.controlloop.actorserviceprovider.parameters.ParameterValidationRuntimeException;
+
+/**
+ * Operator that uses gRPC. The operator's parameters must be a
+ * {@link CdsServerProperties}.
+ */
+@Getter
+public class GrpcOperator extends OperatorPartial {
+
+ /**
+ * Function to make an operation.
+ */
+ private final OperationMaker<GrpcConfig, GrpcOperation> operationMaker;
+
+ /**
+ * Current configuration. This is set by {@link #doConfigure(Map)}.
+ */
+ private GrpcConfig currentConfig;
+
+ /**
+ * Constructs the object.
+ *
+ * @param actorName name of the actor with which this operator is associated
+ * @param name operation name
+ */
+ public GrpcOperator(String actorName, String name) {
+ this(actorName, name, null);
+ }
+
+ /**
+ * Constructs the object.
+ *
+ * @param actorName name of the actor with which this operator is associated
+ * @param name operation name
+ * @param operationMaker function to make an operation
+ */
+ public GrpcOperator(String actorName, String name, OperationMaker<GrpcConfig, GrpcOperation> operationMaker) {
+ super(actorName, name);
+ this.operationMaker = operationMaker;
+ }
+
+ /**
+ * Translates the parameters to an {@link CdsServerProperties} and then extracts the
+ * relevant values.
+ */
+ @Override
+ protected void doConfigure(Map<String, Object> parameters) {
+ currentConfig = makeConfiguration(parameters);
+ }
+
+ /**
+ * Makes a new configuration using the specified parameters.
+ *
+ * @param parameters operator parameters
+ * @return a new configuration
+ */
+ protected GrpcConfig makeConfiguration(Map<String, Object> parameters) {
+ CdsServerProperties params = Util.translate(getFullName(), parameters, CdsServerProperties.class);
+ ValidationResult result = params.validate();
+ if (!result.isValid()) {
+ throw new ParameterValidationRuntimeException("invalid parameters", result);
+ }
+ return new GrpcConfig(getBlockingExecutor(), params);
+ }
+
+ /**
+ * {@inheritDoc}.
+ */
+ @Override
+ public Operation buildOperation(ControlLoopOperationParams params) {
+ if (operationMaker == null) {
+ throw new UnsupportedOperationException("cannot make operation for " + getFullName());
+ }
+ verifyRunning();
+ return operationMaker.apply(params, currentConfig);
+ }
+}