diff options
author | Ram Krishna Verma <ram_krishna.verma@bell.ca> | 2020-02-21 17:24:47 -0500 |
---|---|---|
committer | Ram Krishna Verma <ram_krishna.verma@bell.ca> | 2020-02-25 16:03:07 -0500 |
commit | 0c7aa23b5b429b33acc05867dffcfb97cdc5ca68 (patch) | |
tree | ab221541e82f2eccb6d1e76a4dcfb958c9ab7121 /models-interactions/model-actors/actor.cds/src | |
parent | b2ed7d2a42ec2852f89a117f4448f044053e99b8 (diff) |
Add actor for CDS
1) Create the operator, operation & manager classes for gRPC request.
2) Use CompletableFuture to track CDS request flow.
Issue-ID: POLICY-2384
Change-Id: I84e30131a69c2d24c1871ceebced2b69194f619c
Signed-off-by: Ram Krishna Verma <ram_krishna.verma@bell.ca>
Diffstat (limited to 'models-interactions/model-actors/actor.cds/src')
5 files changed, 491 insertions, 38 deletions
diff --git a/models-interactions/model-actors/actor.cds/src/main/java/org/onap/policy/controlloop/actor/cds/CdsActorServiceManager.java b/models-interactions/model-actors/actor.cds/src/main/java/org/onap/policy/controlloop/actor/cds/CdsActorServiceManager.java new file mode 100644 index 000000000..2afa9fa2e --- /dev/null +++ b/models-interactions/model-actors/actor.cds/src/main/java/org/onap/policy/controlloop/actor/cds/CdsActorServiceManager.java @@ -0,0 +1,81 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2020 Bell Canada. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.actor.cds; + +import java.util.concurrent.CompletableFuture; +import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType; +import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput; +import org.onap.policy.cds.api.CdsProcessorListener; +import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome; +import org.onap.policy.controlloop.policy.PolicyResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * CDS Actor service-manager implementation. + */ +public class CdsActorServiceManager implements CdsProcessorListener { + + private static final Logger LOGGER = LoggerFactory.getLogger(CdsActorServiceManager.class); + + private final CompletableFuture<OperationOutcome> future; + + private final OperationOutcome outcome; + + /** + * Constructs the object. + * + * @param outcome the operation outcome to populate + * @param future the future to complete + */ + public CdsActorServiceManager(OperationOutcome outcome, CompletableFuture<OperationOutcome> future) { + this.outcome = outcome; + this.future = future; + } + + /** + * {@inheritDoc}. + */ + @Override + public void onMessage(final ExecutionServiceOutput message) { + LOGGER.info("Received notification from CDS: {}", message); + EventType eventType = message.getStatus().getEventType(); + switch (eventType) { + case EVENT_COMPONENT_PROCESSING: + LOGGER.info("CDS is processing the message: {}", message); + break; + case EVENT_COMPONENT_EXECUTED: + outcome.setResult(PolicyResult.SUCCESS); + future.complete(outcome); + break; + default: + outcome.setResult(PolicyResult.FAILURE); + future.complete(outcome); + break; + } + } + + /** + * {@inheritDoc}. + */ + @Override + public void onError(final Throwable throwable) { + future.completeExceptionally(throwable); + } +} diff --git a/models-interactions/model-actors/actor.cds/src/main/java/org/onap/policy/controlloop/actor/cds/CdsActorServiceProvider.java b/models-interactions/model-actors/actor.cds/src/main/java/org/onap/policy/controlloop/actor/cds/CdsActorServiceProvider.java index 05ff02e5c..91ee55dad 100644 --- a/models-interactions/model-actors/actor.cds/src/main/java/org/onap/policy/controlloop/actor/cds/CdsActorServiceProvider.java +++ b/models-interactions/model-actors/actor.cds/src/main/java/org/onap/policy/controlloop/actor/cds/CdsActorServiceProvider.java @@ -54,16 +54,24 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * CDS Actor service-provider implementation. This is a deploy dark feature for El-Alto release. + * CDS Actor service-provider implementation. This is a deploy dark feature for El-Alto + * release. */ public class CdsActorServiceProvider extends ActorImpl { private static final Logger LOGGER = LoggerFactory.getLogger(CdsActorServiceProvider.class); + /** + * Constructs the object. + */ public CdsActorServiceProvider() { super(CdsActorConstants.CDS_ACTOR); + + addOperator(new GrpcOperator(CdsActorConstants.CDS_ACTOR, GrpcOperation.NAME, GrpcOperation::new)); } + // TODO old code: remove lines down to **HERE** + /** * {@inheritDoc}. */ @@ -97,20 +105,26 @@ public class CdsActorServiceProvider extends ActorImpl { } /** - * Build the CDS ExecutionServiceInput request from the policy object and the AAI enriched parameters. TO-DO: Avoid - * leaking Exceptions to the Kie Session thread. TBD item for Frankfurt release. + * Build the CDS ExecutionServiceInput request from the policy object and the AAI + * enriched parameters. TO-DO: Avoid leaking Exceptions to the Kie Session thread. TBD + * item for Frankfurt release. * * @param onset the event that is reporting the alert for policy to perform an action. - * @param operation the control loop operation specifying the actor, operation, target, etc. - * @param policy the policy specified from the yaml generated by CLAMP or through Policy API. + * @param operation the control loop operation specifying the actor, operation, + * target, etc. + * @param policy the policy specified from the yaml generated by CLAMP or through + * Policy API. * @param aaiParams Map of enriched AAI attributes in node.attribute notation. - * @return an Optional ExecutionServiceInput instance if valid else an Optional empty object is returned. + * @return an Optional ExecutionServiceInput instance if valid else an Optional empty + * object is returned. */ public Optional<ExecutionServiceInput> constructRequest(VirtualControlLoopEvent onset, - ControlLoopOperation operation, Policy policy, Map<String, String> aaiParams) { + ControlLoopOperation operation, Policy policy, Map<String, String> aaiParams) { - // For the current operational TOSCA policy model (yaml) CBA name and version are embedded in the payload - // section, with the new policy type model being proposed in Frankfurt we will be able to move it out. + // For the current operational TOSCA policy model (yaml) CBA name and version are + // embedded in the payload + // section, with the new policy type model being proposed in Frankfurt we will be + // able to move it out. Map<String, String> payload = policy.getPayload(); if (!validateCdsMandatoryParams(policy)) { return Optional.empty(); @@ -118,12 +132,14 @@ public class CdsActorServiceProvider extends ActorImpl { String cbaName = payload.get(CdsActorConstants.KEY_CBA_NAME); String cbaVersion = payload.get(CdsActorConstants.KEY_CBA_VERSION); - // Retain only the payload by removing CBA name and version once they are extracted + // Retain only the payload by removing CBA name and version once they are + // extracted // to be put in CDS request header. payload.remove(CdsActorConstants.KEY_CBA_NAME); payload.remove(CdsActorConstants.KEY_CBA_VERSION); - // Embed payload from policy to ConfigDeployRequest object, serialize and inject into grpc request. + // Embed payload from policy to ConfigDeployRequest object, serialize and inject + // into grpc request. String cbaActionName = policy.getRecipe(); CdsActionRequest request = new CdsActionRequest(); request.setPolicyPayload(payload); @@ -132,7 +148,8 @@ public class CdsActorServiceProvider extends ActorImpl { // Inject AAI properties into payload map. Offer flexibility to the usecase // implementation to inject whatever AAI parameters are of interest to them. - // E.g. For vFW usecase El-Alto inject service-instance-id, generic-vnf-id as needed by CDS. + // E.g. For vFW usecase El-Alto inject service-instance-id, generic-vnf-id as + // needed by CDS. request.setAaiProperties(aaiParams); // Inject any additional event parameters that may be present in the onset event @@ -143,36 +160,28 @@ public class CdsActorServiceProvider extends ActorImpl { Builder struct = Struct.newBuilder(); try { String requestStr = request.generateCdsPayload(); - Preconditions.checkState(!Strings.isNullOrEmpty(requestStr), "Unable to build " - + "config-deploy-request from payload parameters: {}", payload); + Preconditions.checkState(!Strings.isNullOrEmpty(requestStr), + "Unable to build " + "config-deploy-request from payload parameters: {}", payload); JsonFormat.parser().merge(requestStr, struct); } catch (InvalidProtocolBufferException | CoderException e) { LOGGER.error("Failed to embed CDS payload string into the input request. blueprint({}:{}) for action({})", - cbaName, cbaVersion, cbaActionName, e); + cbaName, cbaVersion, cbaActionName, e); return Optional.empty(); } // Build CDS gRPC request common-header - CommonHeader commonHeader = CommonHeader.newBuilder() - .setOriginatorId(CdsActorConstants.ORIGINATOR_ID) - .setRequestId(onset.getRequestId().toString()) - .setSubRequestId(operation.getSubRequestId()) - .build(); + CommonHeader commonHeader = CommonHeader.newBuilder().setOriginatorId(CdsActorConstants.ORIGINATOR_ID) + .setRequestId(onset.getRequestId().toString()).setSubRequestId(operation.getSubRequestId()) + .build(); // Build CDS gRPC request action-identifier - ActionIdentifiers actionIdentifiers = ActionIdentifiers.newBuilder() - .setBlueprintName(cbaName) - .setBlueprintVersion(cbaVersion) - .setActionName(cbaActionName) - .setMode(CdsActorConstants.CDS_MODE) - .build(); + ActionIdentifiers actionIdentifiers = + ActionIdentifiers.newBuilder().setBlueprintName(cbaName).setBlueprintVersion(cbaVersion) + .setActionName(cbaActionName).setMode(CdsActorConstants.CDS_MODE).build(); // Finally build the ExecutionServiceInput gRPC request object. - ExecutionServiceInput executionServiceInput = ExecutionServiceInput.newBuilder() - .setCommonHeader(commonHeader) - .setActionIdentifiers(actionIdentifiers) - .setPayload(struct.build()) - .build(); + ExecutionServiceInput executionServiceInput = ExecutionServiceInput.newBuilder().setCommonHeader(commonHeader) + .setActionIdentifiers(actionIdentifiers).setPayload(struct.build()).build(); return Optional.of(executionServiceInput); } @@ -184,8 +193,8 @@ public class CdsActorServiceProvider extends ActorImpl { String cbaName = payload.get(CdsActorConstants.KEY_CBA_NAME); String cbaVersion = payload.get(CdsActorConstants.KEY_CBA_VERSION); String cbaActionName = policy.getRecipe(); - return !Strings.isNullOrEmpty(cbaName) && !Strings.isNullOrEmpty(cbaVersion) && !Strings - .isNullOrEmpty(cbaActionName); + return !Strings.isNullOrEmpty(cbaName) && !Strings.isNullOrEmpty(cbaVersion) + && !Strings.isNullOrEmpty(cbaActionName); } public class CdsActorServiceManager implements CdsProcessorListener { @@ -234,10 +243,11 @@ public class CdsActorServiceProvider extends ActorImpl { * @return the cds response. */ public CdsResponse sendRequestToCds(CdsProcessorGrpcClient cdsClient, CdsServerProperties cdsProps, - ExecutionServiceInput executionServiceInput) { + ExecutionServiceInput executionServiceInput) { try { LOGGER.trace("Start CdsActorServiceProvider.executeCdsBlueprintProcessor {}.", executionServiceInput); - // TO-DO: Handle requests asynchronously once the callback support is added to actors. + // TO-DO: Handle requests asynchronously once the callback support is + // added to actors. CountDownLatch countDownLatch = cdsClient.sendRequest(executionServiceInput); boolean status = countDownLatch.await(cdsProps.getTimeout(), TimeUnit.SECONDS); if (!status) { @@ -252,9 +262,9 @@ public class CdsActorServiceProvider extends ActorImpl { LOGGER.info("Status of the CDS gRPC request is: {}", getCdsStatus()); CdsResponse response = new CdsResponse(); - response.setRequestId( - executionServiceInput != null && executionServiceInput.getCommonHeader() != null - ? executionServiceInput.getCommonHeader().getRequestId() : null); + response.setRequestId(executionServiceInput != null && executionServiceInput.getCommonHeader() != null + ? executionServiceInput.getCommonHeader().getRequestId() + : null); response.setStatus(this.getCdsStatus()); return response; } @@ -263,4 +273,6 @@ public class CdsActorServiceProvider extends ActorImpl { return cdsStatus.get(); } } + + // **HERE** } diff --git a/models-interactions/model-actors/actor.cds/src/main/java/org/onap/policy/controlloop/actor/cds/GrpcConfig.java b/models-interactions/model-actors/actor.cds/src/main/java/org/onap/policy/controlloop/actor/cds/GrpcConfig.java new file mode 100644 index 000000000..3d79149c6 --- /dev/null +++ b/models-interactions/model-actors/actor.cds/src/main/java/org/onap/policy/controlloop/actor/cds/GrpcConfig.java @@ -0,0 +1,51 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2020 Bell Canada. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.actor.cds; + +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; +import lombok.Getter; +import org.onap.policy.cds.properties.CdsServerProperties; +import org.onap.policy.controlloop.actorserviceprovider.parameters.OperatorConfig; + +/** + * Configuration for gRPC Operators. + */ +@Getter +public class GrpcConfig extends OperatorConfig { + + /** + * Default timeout, in milliseconds, if none specified in the request. + */ + private final long timeoutMs; + + private CdsServerProperties cdsServerProperties; + + /** + * Constructs the object. + * + * @param blockingExecutor executor to be used for tasks that may perform blocking I/O + * @param params operator parameters + */ + public GrpcConfig(Executor blockingExecutor, CdsServerProperties params) { + super(blockingExecutor); + cdsServerProperties = params; + timeoutMs = TimeUnit.MILLISECONDS.convert(params.getTimeout(), TimeUnit.SECONDS); + } +} diff --git a/models-interactions/model-actors/actor.cds/src/main/java/org/onap/policy/controlloop/actor/cds/GrpcOperation.java b/models-interactions/model-actors/actor.cds/src/main/java/org/onap/policy/controlloop/actor/cds/GrpcOperation.java new file mode 100644 index 000000000..efe358bad --- /dev/null +++ b/models-interactions/model-actors/actor.cds/src/main/java/org/onap/policy/controlloop/actor/cds/GrpcOperation.java @@ -0,0 +1,203 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2020 Bell Canada. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.actor.cds; + +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.Struct; +import com.google.protobuf.Struct.Builder; +import com.google.protobuf.util.JsonFormat; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import lombok.Getter; +import org.onap.ccsdk.cds.controllerblueprints.common.api.ActionIdentifiers; +import org.onap.ccsdk.cds.controllerblueprints.common.api.CommonHeader; +import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput; +import org.onap.policy.aai.AaiConstants; +import org.onap.policy.aai.AaiCqResponse; +import org.onap.policy.cds.client.CdsProcessorGrpcClient; +import org.onap.policy.common.utils.coder.CoderException; +import org.onap.policy.controlloop.actor.aai.AaiCustomQueryOperation; +import org.onap.policy.controlloop.actor.cds.constants.CdsActorConstants; +import org.onap.policy.controlloop.actor.cds.request.CdsActionRequest; +import org.onap.policy.controlloop.actorserviceprovider.OperationOutcome; +import org.onap.policy.controlloop.actorserviceprovider.impl.OperationPartial; +import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams; + +/** + * Operation that uses gRPC to send request to CDS. + * + */ +@Getter +public class GrpcOperation extends OperationPartial { + + public static final String NAME = "gRPC"; + + private CdsProcessorGrpcClient client; + + /** + * Configuration for this operation. + */ + private final GrpcConfig config; + + /** + * Constructs the object. + * + * @param params operation parameters + * @param config configuration for this operation + */ + public GrpcOperation(ControlLoopOperationParams params, GrpcConfig config) { + super(params, config); + this.config = config; + } + + /** + * If no timeout is specified, then it returns the operator's configured timeout. + */ + @Override + protected long getTimeoutMs(Integer timeoutSec) { + return (timeoutSec == null || timeoutSec == 0 ? config.getTimeoutMs() : super.getTimeoutMs(timeoutSec)); + } + + /** + * Ensures that A&AI customer query has been performed. + */ + @Override + @SuppressWarnings("unchecked") + protected CompletableFuture<OperationOutcome> startPreprocessorAsync() { + ControlLoopOperationParams cqParams = params.toBuilder().actor(AaiConstants.ACTOR_NAME) + .operation(AaiCustomQueryOperation.NAME).payload(null).retry(null).timeoutSec(null).build(); + + // run Custom Query and Guard, in parallel + return allOf(() -> params.getContext().obtain(AaiCqResponse.CONTEXT_KEY, cqParams), this::startGuardAsync); + } + + /** + * {@inheritDoc}. + */ + @Override + protected CompletableFuture<OperationOutcome> startOperationAsync(int attempt, OperationOutcome outcome) { + + CompletableFuture<OperationOutcome> future = new CompletableFuture<>(); + client = new CdsProcessorGrpcClient(new CdsActorServiceManager(outcome, future), + config.getCdsServerProperties()); + + ExecutionServiceInput request = constructRequest(params); + client.sendRequest(request); + return future; + } + + /** + * Build the CDS ExecutionServiceInput request from the policy object and the AAI + * enriched parameters. TO-DO: Avoid leaking Exceptions to the Kie Session thread. TBD + * item for Frankfurt release. + * + * @param params the control loop parameters specifying the onset, payload, etc. + * @return an ExecutionServiceInput instance. + */ + public ExecutionServiceInput constructRequest(ControlLoopOperationParams params) { + + // For the current operational TOSCA policy model (yaml) CBA name and version are + // embedded in the payload + // section, with the new policy type model being proposed in Frankfurt we will be + // able to move it out. + if (!validateCdsMandatoryParams(params)) { + throw new IllegalArgumentException("missing cds mandatory params - " + params); + } + Map<String, String> payload = convertPayloadMap(params.getPayload()); + String cbaName = payload.get(CdsActorConstants.KEY_CBA_NAME); + String cbaVersion = payload.get(CdsActorConstants.KEY_CBA_VERSION); + + // Retain only the payload by removing CBA name and version once they are + // extracted + // to be put in CDS request header. + payload.remove(CdsActorConstants.KEY_CBA_NAME); + payload.remove(CdsActorConstants.KEY_CBA_VERSION); + + // Embed payload from policy to ConfigDeployRequest object, serialize and inject + // into grpc request. + String cbaActionName = params.getOperation(); + CdsActionRequest request = new CdsActionRequest(); + request.setPolicyPayload(payload); + request.setActionName(cbaActionName); + request.setResolutionKey(UUID.randomUUID().toString()); + + // Inject AAI properties into payload map. Offer flexibility to the usecase + // implementation to inject whatever AAI parameters are of interest to them. + // E.g. For vFW usecase El-Alto inject service-instance-id, generic-vnf-id as + // needed by CDS. + request.setAaiProperties(params.getContext().getEnrichment()); + + // Inject any additional event parameters that may be present in the onset event + if (params.getContext().getEvent().getAdditionalEventParams() != null) { + request.setAdditionalEventParams(params.getContext().getEvent().getAdditionalEventParams()); + } + + Builder struct = Struct.newBuilder(); + try { + String requestStr = request.generateCdsPayload(); + Preconditions.checkState(!Strings.isNullOrEmpty(requestStr), + "Unable to build " + "config-deploy-request from payload parameters: {}", payload); + JsonFormat.parser().merge(requestStr, struct); + } catch (InvalidProtocolBufferException | CoderException e) { + throw new IllegalArgumentException("Failed to embed CDS payload string into the input request. blueprint({" + + cbaName + "}:{" + cbaVersion + "}) for action({" + cbaActionName + "})", e); + } + + // Build CDS gRPC request common-header + CommonHeader commonHeader = CommonHeader.newBuilder().setOriginatorId(CdsActorConstants.ORIGINATOR_ID) + .setRequestId(params.getContext().getEvent().getRequestId().toString()) + .setSubRequestId(Integer.toString(0)).build(); + + // Build CDS gRPC request action-identifier + ActionIdentifiers actionIdentifiers = + ActionIdentifiers.newBuilder().setBlueprintName(cbaName).setBlueprintVersion(cbaVersion) + .setActionName(cbaActionName).setMode(CdsActorConstants.CDS_MODE).build(); + + // Finally build & return the ExecutionServiceInput gRPC request object. + return ExecutionServiceInput.newBuilder().setCommonHeader(commonHeader).setActionIdentifiers(actionIdentifiers) + .setPayload(struct.build()).build(); + } + + private Map<String, String> convertPayloadMap(Map<String, Object> payload) { + Map<String, String> convertedPayload = new HashMap<>(); + for (Entry<String, Object> entry : payload.entrySet()) { + convertedPayload.put(entry.getKey(), entry.getValue().toString()); + } + return convertedPayload; + } + + private boolean validateCdsMandatoryParams(ControlLoopOperationParams params) { + if (params == null || params.getPayload() == null) { + return false; + } + Map<String, Object> payload = params.getPayload(); + if (payload.get(CdsActorConstants.KEY_CBA_NAME) == null + || payload.get(CdsActorConstants.KEY_CBA_VERSION) == null) { + return false; + } + return !Strings.isNullOrEmpty(payload.get(CdsActorConstants.KEY_CBA_NAME).toString()) + && !Strings.isNullOrEmpty(payload.get(CdsActorConstants.KEY_CBA_VERSION).toString()) + && !Strings.isNullOrEmpty(params.getOperation()); + } +} diff --git a/models-interactions/model-actors/actor.cds/src/main/java/org/onap/policy/controlloop/actor/cds/GrpcOperator.java b/models-interactions/model-actors/actor.cds/src/main/java/org/onap/policy/controlloop/actor/cds/GrpcOperator.java new file mode 100644 index 000000000..cc38d7205 --- /dev/null +++ b/models-interactions/model-actors/actor.cds/src/main/java/org/onap/policy/controlloop/actor/cds/GrpcOperator.java @@ -0,0 +1,106 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2020 Bell Canada. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.controlloop.actor.cds; + +import java.util.Map; +import lombok.Getter; +import org.onap.policy.cds.properties.CdsServerProperties; +import org.onap.policy.common.parameters.ValidationResult; +import org.onap.policy.controlloop.actorserviceprovider.Operation; +import org.onap.policy.controlloop.actorserviceprovider.Util; +import org.onap.policy.controlloop.actorserviceprovider.impl.OperationMaker; +import org.onap.policy.controlloop.actorserviceprovider.impl.OperatorPartial; +import org.onap.policy.controlloop.actorserviceprovider.parameters.ControlLoopOperationParams; +import org.onap.policy.controlloop.actorserviceprovider.parameters.ParameterValidationRuntimeException; + +/** + * Operator that uses gRPC. The operator's parameters must be a + * {@link CdsServerProperties}. + */ +@Getter +public class GrpcOperator extends OperatorPartial { + + /** + * Function to make an operation. + */ + private final OperationMaker<GrpcConfig, GrpcOperation> operationMaker; + + /** + * Current configuration. This is set by {@link #doConfigure(Map)}. + */ + private GrpcConfig currentConfig; + + /** + * Constructs the object. + * + * @param actorName name of the actor with which this operator is associated + * @param name operation name + */ + public GrpcOperator(String actorName, String name) { + this(actorName, name, null); + } + + /** + * Constructs the object. + * + * @param actorName name of the actor with which this operator is associated + * @param name operation name + * @param operationMaker function to make an operation + */ + public GrpcOperator(String actorName, String name, OperationMaker<GrpcConfig, GrpcOperation> operationMaker) { + super(actorName, name); + this.operationMaker = operationMaker; + } + + /** + * Translates the parameters to an {@link CdsServerProperties} and then extracts the + * relevant values. + */ + @Override + protected void doConfigure(Map<String, Object> parameters) { + currentConfig = makeConfiguration(parameters); + } + + /** + * Makes a new configuration using the specified parameters. + * + * @param parameters operator parameters + * @return a new configuration + */ + protected GrpcConfig makeConfiguration(Map<String, Object> parameters) { + CdsServerProperties params = Util.translate(getFullName(), parameters, CdsServerProperties.class); + ValidationResult result = params.validate(); + if (!result.isValid()) { + throw new ParameterValidationRuntimeException("invalid parameters", result); + } + return new GrpcConfig(getBlockingExecutor(), params); + } + + /** + * {@inheritDoc}. + */ + @Override + public Operation buildOperation(ControlLoopOperationParams params) { + if (operationMaker == null) { + throw new UnsupportedOperationException("cannot make operation for " + getFullName()); + } + verifyRunning(); + return operationMaker.apply(params, currentConfig); + } +} |