diff options
Diffstat (limited to 'plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/main')
3 files changed, 87 insertions, 36 deletions
diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/main/java/org/onap/policy/apex/plugins/event/carrier/grpc/ApexGrpcConsumer.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/main/java/org/onap/policy/apex/plugins/event/carrier/grpc/ApexGrpcConsumer.java index 7333c8a05..0f6bdf676 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/main/java/org/onap/policy/apex/plugins/event/carrier/grpc/ApexGrpcConsumer.java +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/main/java/org/onap/policy/apex/plugins/event/carrier/grpc/ApexGrpcConsumer.java @@ -20,53 +20,54 @@ package org.onap.policy.apex.plugins.event.carrier.grpc; +import lombok.Getter; import org.onap.policy.apex.service.engine.event.ApexEventException; import org.onap.policy.apex.service.engine.event.ApexEventReceiver; -import org.onap.policy.apex.service.engine.event.ApexEventRuntimeException; import org.onap.policy.apex.service.engine.event.ApexPluginsEventConsumer; -import org.onap.policy.apex.service.engine.event.PeeredReference; import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters; import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode; /** - * This class implements an Apex gRPC consumer. It is not expected to receive events using gRPC. - * So, initializing a gRPC consumer will result in error. + * This class implements an Apex gRPC consumer. The consumer is used by it's peer gRPC producer to consume response + * events. * * @author Ajith Sreekumar (ajith.sreekumar@est.tech) */ public class ApexGrpcConsumer extends ApexPluginsEventConsumer { - private static final String GRPC_CONSUMER_ERROR_MSG = - "A gRPC Consumer may not be specified. Only sending events is possible using gRPC"; + // The event receiver that will receive events from this consumer + @Getter + private ApexEventReceiver eventReceiver; @Override public void init(final String consumerName, final EventHandlerParameters consumerParameters, final ApexEventReceiver incomingEventReceiver) throws ApexEventException { - throw new ApexEventException(GRPC_CONSUMER_ERROR_MSG); - } + this.eventReceiver = incomingEventReceiver; + this.name = consumerName; - @Override - public void run() { - throw new ApexEventRuntimeException(GRPC_CONSUMER_ERROR_MSG); - } + // Check and get the gRPC Properties + if (!(consumerParameters.getCarrierTechnologyParameters() instanceof GrpcCarrierTechnologyParameters)) { + final String errorMessage = + "specified consumer properties are not applicable to the gRPC consumer (" + this.name + ")"; + throw new ApexEventException(errorMessage); + } - @Override - public void start() { - throw new ApexEventRuntimeException(GRPC_CONSUMER_ERROR_MSG); + // Check if we are in peered mode + if (!consumerParameters.isPeeredMode(EventHandlerPeeredMode.REQUESTOR)) { + final String errorMessage = + "gRPC consumer (" + this.name + ") must run in peered requestor mode with a gRPC producer"; + throw new ApexEventException(errorMessage); + } } @Override public void stop() { - throw new ApexEventRuntimeException(GRPC_CONSUMER_ERROR_MSG); + // For gRPC requests, all the implementation is in the producer } @Override - public PeeredReference getPeeredReference(EventHandlerPeeredMode peeredMode) { - throw new ApexEventRuntimeException(GRPC_CONSUMER_ERROR_MSG); + public void run() { + // For gRPC requests, all the implementation is in the producer } - @Override - public void setPeeredReference(EventHandlerPeeredMode peeredMode, PeeredReference peeredReference) { - throw new ApexEventRuntimeException(GRPC_CONSUMER_ERROR_MSG); - } } diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/main/java/org/onap/policy/apex/plugins/event/carrier/grpc/ApexGrpcProducer.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/main/java/org/onap/policy/apex/plugins/event/carrier/grpc/ApexGrpcProducer.java index 380ae1274..c98fa41ea 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/main/java/org/onap/policy/apex/plugins/event/carrier/grpc/ApexGrpcProducer.java +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/main/java/org/onap/policy/apex/plugins/event/carrier/grpc/ApexGrpcProducer.java @@ -31,10 +31,13 @@ import org.onap.ccsdk.cds.controllerblueprints.common.api.Status; import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput; import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput.Builder; import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput; +import org.onap.policy.apex.service.engine.event.ApexEventConsumer; import org.onap.policy.apex.service.engine.event.ApexEventException; import org.onap.policy.apex.service.engine.event.ApexEventRuntimeException; import org.onap.policy.apex.service.engine.event.ApexPluginsEventProducer; +import org.onap.policy.apex.service.engine.event.PeeredReference; import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters; +import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode; import org.onap.policy.cds.api.CdsProcessorListener; import org.onap.policy.cds.client.CdsProcessorGrpcClient; import org.onap.policy.cds.properties.CdsServerProperties; @@ -73,7 +76,7 @@ public class ApexGrpcProducer extends ApexPluginsEventProducer implements CdsPro } GrpcCarrierTechnologyParameters grpcProducerProperties = (GrpcCarrierTechnologyParameters) producerParameters.getCarrierTechnologyParameters(); - + grpcProducerProperties.validateGrpcParameters(true); client = makeGrpcClient(grpcProducerProperties); } @@ -125,6 +128,32 @@ public class ApexGrpcProducer extends ApexPluginsEventProducer implements CdsPro + "response from CDS:\n" + cdsResponse.get(); throw new ApexEventRuntimeException(errorMessage); } + + consumeEvent(executionId, cdsResponse.get()); + } + + private void consumeEvent(long executionId, ExecutionServiceOutput event) { + // Find the peered consumer for this producer + final PeeredReference peeredRequestorReference = peerReferenceMap.get(EventHandlerPeeredMode.REQUESTOR); + if (peeredRequestorReference == null) { + return; + } + // Find the gRPC Response Consumer that will take in the response to APEX Engine + final ApexEventConsumer consumer = peeredRequestorReference.getPeeredConsumer(); + if (!(consumer instanceof ApexGrpcConsumer)) { + final String errorMessage = "Recieve of gRPC response by APEX failed," + + "The consumer is not an instance of ApexGrpcConsumer\n. The received gRPC response:" + event; + throw new ApexEventRuntimeException(errorMessage); + } + + // Use the consumer to consume this response event in APEX + final ApexGrpcConsumer grpcConsumer = (ApexGrpcConsumer) consumer; + try { + grpcConsumer.getEventReceiver().receiveEvent(executionId, new Properties(), + JsonFormat.printer().print(event)); + } catch (ApexEventException | InvalidProtocolBufferException e) { + throw new ApexEventRuntimeException("Consuming gRPC response failed.", e); + } } /** diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/main/java/org/onap/policy/apex/plugins/event/carrier/grpc/GrpcCarrierTechnologyParameters.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/main/java/org/onap/policy/apex/plugins/event/carrier/grpc/GrpcCarrierTechnologyParameters.java index 59db16743..f13248ec5 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/main/java/org/onap/policy/apex/plugins/event/carrier/grpc/GrpcCarrierTechnologyParameters.java +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/main/java/org/onap/policy/apex/plugins/event/carrier/grpc/GrpcCarrierTechnologyParameters.java @@ -22,10 +22,9 @@ package org.onap.policy.apex.plugins.event.carrier.grpc; import lombok.Getter; import lombok.Setter; +import org.apache.commons.lang3.StringUtils; +import org.onap.policy.apex.service.engine.event.ApexEventException; import org.onap.policy.apex.service.parameters.carriertechnology.CarrierTechnologyParameters; -import org.onap.policy.common.parameters.annotations.Max; -import org.onap.policy.common.parameters.annotations.Min; -import org.onap.policy.common.parameters.annotations.NotNull; // @formatter:off /** @@ -59,20 +58,10 @@ public class GrpcCarrierTechnologyParameters extends CarrierTechnologyParameters /** The consumer plugin class for the gRPC carrier technology. */ public static final String GRPC_EVENT_CONSUMER_PLUGIN_CLASS = ApexGrpcConsumer.class.getName(); - @Min(value = 1) private int timeout; - - @Min(value = MIN_USER_PORT) - @Max(value = MAX_USER_PORT) private int port; - - @NotNull private String host; - - @NotNull private String username; - - @NotNull private String password; @@ -87,4 +76,36 @@ public class GrpcCarrierTechnologyParameters extends CarrierTechnologyParameters this.setEventProducerPluginClass(GRPC_EVENT_PRODUCER_PLUGIN_CLASS); this.setEventConsumerPluginClass(GRPC_EVENT_CONSUMER_PLUGIN_CLASS); } + + /** + * The method validates the gRPC parameters. Host details are specified as parameters only for a gRPC producer. + * + * @param isProducer if the parameters specified are for the gRPC producer or consumer + * @throws ApexEventException exception thrown when invalid parameters are provided + */ + public void validateGrpcParameters(boolean isProducer) throws ApexEventException { + StringBuilder errorMessage = new StringBuilder(); + if (isProducer) { + if (timeout < 1) { + errorMessage.append("timeout should have a positive value.\n"); + } + if (MIN_USER_PORT > port || MAX_USER_PORT < port) { + errorMessage.append("port range should be between ").append(MIN_USER_PORT).append(" and ") + .append(MAX_USER_PORT).append("\n"); + } + if (StringUtils.isEmpty(host)) { + errorMessage.append("host should be specified.\n"); + } + if (StringUtils.isEmpty(username)) { + errorMessage.append("username should be specified.\n"); + } + if (StringUtils.isEmpty(password)) { + errorMessage.append("password should be specified.\n"); + } + } + if (errorMessage.length() > 0) { + errorMessage.insert(0, "Issues in specifying gRPC Producer parameters:\n"); + throw new ApexEventException(errorMessage.toString()); + } + } } |