diff options
Diffstat (limited to 'plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/main/java/org/onap/policy/apex/plugins/event/carrier/grpc/ApexGrpcProducer.java')
-rw-r--r-- | plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/main/java/org/onap/policy/apex/plugins/event/carrier/grpc/ApexGrpcProducer.java | 31 |
1 files changed, 30 insertions, 1 deletions
diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/main/java/org/onap/policy/apex/plugins/event/carrier/grpc/ApexGrpcProducer.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/main/java/org/onap/policy/apex/plugins/event/carrier/grpc/ApexGrpcProducer.java index 380ae1274..c98fa41ea 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/main/java/org/onap/policy/apex/plugins/event/carrier/grpc/ApexGrpcProducer.java +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/main/java/org/onap/policy/apex/plugins/event/carrier/grpc/ApexGrpcProducer.java @@ -31,10 +31,13 @@ import org.onap.ccsdk.cds.controllerblueprints.common.api.Status; import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput; import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput.Builder; import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput; +import org.onap.policy.apex.service.engine.event.ApexEventConsumer; import org.onap.policy.apex.service.engine.event.ApexEventException; import org.onap.policy.apex.service.engine.event.ApexEventRuntimeException; import org.onap.policy.apex.service.engine.event.ApexPluginsEventProducer; +import org.onap.policy.apex.service.engine.event.PeeredReference; import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters; +import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode; import org.onap.policy.cds.api.CdsProcessorListener; import org.onap.policy.cds.client.CdsProcessorGrpcClient; import org.onap.policy.cds.properties.CdsServerProperties; @@ -73,7 +76,7 @@ public class ApexGrpcProducer extends ApexPluginsEventProducer implements CdsPro } GrpcCarrierTechnologyParameters grpcProducerProperties = (GrpcCarrierTechnologyParameters) producerParameters.getCarrierTechnologyParameters(); - + grpcProducerProperties.validateGrpcParameters(true); client = makeGrpcClient(grpcProducerProperties); } @@ -125,6 +128,32 @@ public class ApexGrpcProducer extends ApexPluginsEventProducer implements CdsPro + "response from CDS:\n" + cdsResponse.get(); throw new ApexEventRuntimeException(errorMessage); } + + consumeEvent(executionId, cdsResponse.get()); + } + + private void consumeEvent(long executionId, ExecutionServiceOutput event) { + // Find the peered consumer for this producer + final PeeredReference peeredRequestorReference = peerReferenceMap.get(EventHandlerPeeredMode.REQUESTOR); + if (peeredRequestorReference == null) { + return; + } + // Find the gRPC Response Consumer that will take in the response to APEX Engine + final ApexEventConsumer consumer = peeredRequestorReference.getPeeredConsumer(); + if (!(consumer instanceof ApexGrpcConsumer)) { + final String errorMessage = "Recieve of gRPC response by APEX failed," + + "The consumer is not an instance of ApexGrpcConsumer\n. The received gRPC response:" + event; + throw new ApexEventRuntimeException(errorMessage); + } + + // Use the consumer to consume this response event in APEX + final ApexGrpcConsumer grpcConsumer = (ApexGrpcConsumer) consumer; + try { + grpcConsumer.getEventReceiver().receiveEvent(executionId, new Properties(), + JsonFormat.printer().print(event)); + } catch (ApexEventException | InvalidProtocolBufferException e) { + throw new ApexEventRuntimeException("Consuming gRPC response failed.", e); + } } /** |