aboutsummaryrefslogtreecommitdiffstats
path: root/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/main/java/org/onap/policy/apex/plugins/event/carrier/grpc/ApexGrpcProducer.java
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/main/java/org/onap/policy/apex/plugins/event/carrier/grpc/ApexGrpcProducer.java')
-rw-r--r--plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/main/java/org/onap/policy/apex/plugins/event/carrier/grpc/ApexGrpcProducer.java31
1 files changed, 30 insertions, 1 deletions
diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/main/java/org/onap/policy/apex/plugins/event/carrier/grpc/ApexGrpcProducer.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/main/java/org/onap/policy/apex/plugins/event/carrier/grpc/ApexGrpcProducer.java
index 380ae1274..c98fa41ea 100644
--- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/main/java/org/onap/policy/apex/plugins/event/carrier/grpc/ApexGrpcProducer.java
+++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/main/java/org/onap/policy/apex/plugins/event/carrier/grpc/ApexGrpcProducer.java
@@ -31,10 +31,13 @@ import org.onap.ccsdk.cds.controllerblueprints.common.api.Status;
import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput;
import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput.Builder;
import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput;
+import org.onap.policy.apex.service.engine.event.ApexEventConsumer;
import org.onap.policy.apex.service.engine.event.ApexEventException;
import org.onap.policy.apex.service.engine.event.ApexEventRuntimeException;
import org.onap.policy.apex.service.engine.event.ApexPluginsEventProducer;
+import org.onap.policy.apex.service.engine.event.PeeredReference;
import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters;
+import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode;
import org.onap.policy.cds.api.CdsProcessorListener;
import org.onap.policy.cds.client.CdsProcessorGrpcClient;
import org.onap.policy.cds.properties.CdsServerProperties;
@@ -73,7 +76,7 @@ public class ApexGrpcProducer extends ApexPluginsEventProducer implements CdsPro
}
GrpcCarrierTechnologyParameters grpcProducerProperties =
(GrpcCarrierTechnologyParameters) producerParameters.getCarrierTechnologyParameters();
-
+ grpcProducerProperties.validateGrpcParameters(true);
client = makeGrpcClient(grpcProducerProperties);
}
@@ -125,6 +128,32 @@ public class ApexGrpcProducer extends ApexPluginsEventProducer implements CdsPro
+ "response from CDS:\n" + cdsResponse.get();
throw new ApexEventRuntimeException(errorMessage);
}
+
+ consumeEvent(executionId, cdsResponse.get());
+ }
+
+ private void consumeEvent(long executionId, ExecutionServiceOutput event) {
+ // Find the peered consumer for this producer
+ final PeeredReference peeredRequestorReference = peerReferenceMap.get(EventHandlerPeeredMode.REQUESTOR);
+ if (peeredRequestorReference == null) {
+ return;
+ }
+ // Find the gRPC Response Consumer that will take in the response to APEX Engine
+ final ApexEventConsumer consumer = peeredRequestorReference.getPeeredConsumer();
+ if (!(consumer instanceof ApexGrpcConsumer)) {
+ final String errorMessage = "Recieve of gRPC response by APEX failed,"
+ + "The consumer is not an instance of ApexGrpcConsumer\n. The received gRPC response:" + event;
+ throw new ApexEventRuntimeException(errorMessage);
+ }
+
+ // Use the consumer to consume this response event in APEX
+ final ApexGrpcConsumer grpcConsumer = (ApexGrpcConsumer) consumer;
+ try {
+ grpcConsumer.getEventReceiver().receiveEvent(executionId, new Properties(),
+ JsonFormat.printer().print(event));
+ } catch (ApexEventException | InvalidProtocolBufferException e) {
+ throw new ApexEventRuntimeException("Consuming gRPC response failed.", e);
+ }
}
/**