summaryrefslogtreecommitdiffstats
path: root/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/main
diff options
context:
space:
mode:
authora.sreekumar <ajith.sreekumar@est.tech>2020-03-09 10:56:39 +0000
committera.sreekumar <ajith.sreekumar@est.tech>2020-03-11 15:49:31 +0000
commit31ece44c935c76252cf0835e91290e88999c4f1d (patch)
treed50ce35d57786a46df2b981f083187b1d2fb0066 /plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/main
parent6bda5d7e4e80d1aefb4f19203361b7199448e70f (diff)
Fixing the gRPC consumer side
Change-Id: I21d9253f41eee9b958e8fb723f6c19f266502cef Issue-ID: POLICY-1656 Signed-off-by: a.sreekumar <ajith.sreekumar@est.tech>
Diffstat (limited to 'plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/main')
-rw-r--r--plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/main/java/org/onap/policy/apex/plugins/event/carrier/grpc/ApexGrpcConsumer.java45
-rw-r--r--plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/main/java/org/onap/policy/apex/plugins/event/carrier/grpc/ApexGrpcProducer.java31
-rw-r--r--plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/main/java/org/onap/policy/apex/plugins/event/carrier/grpc/GrpcCarrierTechnologyParameters.java47
3 files changed, 87 insertions, 36 deletions
diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/main/java/org/onap/policy/apex/plugins/event/carrier/grpc/ApexGrpcConsumer.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/main/java/org/onap/policy/apex/plugins/event/carrier/grpc/ApexGrpcConsumer.java
index 7333c8a05..0f6bdf676 100644
--- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/main/java/org/onap/policy/apex/plugins/event/carrier/grpc/ApexGrpcConsumer.java
+++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/main/java/org/onap/policy/apex/plugins/event/carrier/grpc/ApexGrpcConsumer.java
@@ -20,53 +20,54 @@
package org.onap.policy.apex.plugins.event.carrier.grpc;
+import lombok.Getter;
import org.onap.policy.apex.service.engine.event.ApexEventException;
import org.onap.policy.apex.service.engine.event.ApexEventReceiver;
-import org.onap.policy.apex.service.engine.event.ApexEventRuntimeException;
import org.onap.policy.apex.service.engine.event.ApexPluginsEventConsumer;
-import org.onap.policy.apex.service.engine.event.PeeredReference;
import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters;
import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode;
/**
- * This class implements an Apex gRPC consumer. It is not expected to receive events using gRPC.
- * So, initializing a gRPC consumer will result in error.
+ * This class implements an Apex gRPC consumer. The consumer is used by it's peer gRPC producer to consume response
+ * events.
*
* @author Ajith Sreekumar (ajith.sreekumar@est.tech)
*/
public class ApexGrpcConsumer extends ApexPluginsEventConsumer {
- private static final String GRPC_CONSUMER_ERROR_MSG =
- "A gRPC Consumer may not be specified. Only sending events is possible using gRPC";
+ // The event receiver that will receive events from this consumer
+ @Getter
+ private ApexEventReceiver eventReceiver;
@Override
public void init(final String consumerName, final EventHandlerParameters consumerParameters,
final ApexEventReceiver incomingEventReceiver) throws ApexEventException {
- throw new ApexEventException(GRPC_CONSUMER_ERROR_MSG);
- }
+ this.eventReceiver = incomingEventReceiver;
+ this.name = consumerName;
- @Override
- public void run() {
- throw new ApexEventRuntimeException(GRPC_CONSUMER_ERROR_MSG);
- }
+ // Check and get the gRPC Properties
+ if (!(consumerParameters.getCarrierTechnologyParameters() instanceof GrpcCarrierTechnologyParameters)) {
+ final String errorMessage =
+ "specified consumer properties are not applicable to the gRPC consumer (" + this.name + ")";
+ throw new ApexEventException(errorMessage);
+ }
- @Override
- public void start() {
- throw new ApexEventRuntimeException(GRPC_CONSUMER_ERROR_MSG);
+ // Check if we are in peered mode
+ if (!consumerParameters.isPeeredMode(EventHandlerPeeredMode.REQUESTOR)) {
+ final String errorMessage =
+ "gRPC consumer (" + this.name + ") must run in peered requestor mode with a gRPC producer";
+ throw new ApexEventException(errorMessage);
+ }
}
@Override
public void stop() {
- throw new ApexEventRuntimeException(GRPC_CONSUMER_ERROR_MSG);
+ // For gRPC requests, all the implementation is in the producer
}
@Override
- public PeeredReference getPeeredReference(EventHandlerPeeredMode peeredMode) {
- throw new ApexEventRuntimeException(GRPC_CONSUMER_ERROR_MSG);
+ public void run() {
+ // For gRPC requests, all the implementation is in the producer
}
- @Override
- public void setPeeredReference(EventHandlerPeeredMode peeredMode, PeeredReference peeredReference) {
- throw new ApexEventRuntimeException(GRPC_CONSUMER_ERROR_MSG);
- }
}
diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/main/java/org/onap/policy/apex/plugins/event/carrier/grpc/ApexGrpcProducer.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/main/java/org/onap/policy/apex/plugins/event/carrier/grpc/ApexGrpcProducer.java
index 380ae1274..c98fa41ea 100644
--- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/main/java/org/onap/policy/apex/plugins/event/carrier/grpc/ApexGrpcProducer.java
+++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/main/java/org/onap/policy/apex/plugins/event/carrier/grpc/ApexGrpcProducer.java
@@ -31,10 +31,13 @@ import org.onap.ccsdk.cds.controllerblueprints.common.api.Status;
import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput;
import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput.Builder;
import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput;
+import org.onap.policy.apex.service.engine.event.ApexEventConsumer;
import org.onap.policy.apex.service.engine.event.ApexEventException;
import org.onap.policy.apex.service.engine.event.ApexEventRuntimeException;
import org.onap.policy.apex.service.engine.event.ApexPluginsEventProducer;
+import org.onap.policy.apex.service.engine.event.PeeredReference;
import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters;
+import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode;
import org.onap.policy.cds.api.CdsProcessorListener;
import org.onap.policy.cds.client.CdsProcessorGrpcClient;
import org.onap.policy.cds.properties.CdsServerProperties;
@@ -73,7 +76,7 @@ public class ApexGrpcProducer extends ApexPluginsEventProducer implements CdsPro
}
GrpcCarrierTechnologyParameters grpcProducerProperties =
(GrpcCarrierTechnologyParameters) producerParameters.getCarrierTechnologyParameters();
-
+ grpcProducerProperties.validateGrpcParameters(true);
client = makeGrpcClient(grpcProducerProperties);
}
@@ -125,6 +128,32 @@ public class ApexGrpcProducer extends ApexPluginsEventProducer implements CdsPro
+ "response from CDS:\n" + cdsResponse.get();
throw new ApexEventRuntimeException(errorMessage);
}
+
+ consumeEvent(executionId, cdsResponse.get());
+ }
+
+ private void consumeEvent(long executionId, ExecutionServiceOutput event) {
+ // Find the peered consumer for this producer
+ final PeeredReference peeredRequestorReference = peerReferenceMap.get(EventHandlerPeeredMode.REQUESTOR);
+ if (peeredRequestorReference == null) {
+ return;
+ }
+ // Find the gRPC Response Consumer that will take in the response to APEX Engine
+ final ApexEventConsumer consumer = peeredRequestorReference.getPeeredConsumer();
+ if (!(consumer instanceof ApexGrpcConsumer)) {
+ final String errorMessage = "Recieve of gRPC response by APEX failed,"
+ + "The consumer is not an instance of ApexGrpcConsumer\n. The received gRPC response:" + event;
+ throw new ApexEventRuntimeException(errorMessage);
+ }
+
+ // Use the consumer to consume this response event in APEX
+ final ApexGrpcConsumer grpcConsumer = (ApexGrpcConsumer) consumer;
+ try {
+ grpcConsumer.getEventReceiver().receiveEvent(executionId, new Properties(),
+ JsonFormat.printer().print(event));
+ } catch (ApexEventException | InvalidProtocolBufferException e) {
+ throw new ApexEventRuntimeException("Consuming gRPC response failed.", e);
+ }
}
/**
diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/main/java/org/onap/policy/apex/plugins/event/carrier/grpc/GrpcCarrierTechnologyParameters.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/main/java/org/onap/policy/apex/plugins/event/carrier/grpc/GrpcCarrierTechnologyParameters.java
index 59db16743..f13248ec5 100644
--- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/main/java/org/onap/policy/apex/plugins/event/carrier/grpc/GrpcCarrierTechnologyParameters.java
+++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/main/java/org/onap/policy/apex/plugins/event/carrier/grpc/GrpcCarrierTechnologyParameters.java
@@ -22,10 +22,9 @@ package org.onap.policy.apex.plugins.event.carrier.grpc;
import lombok.Getter;
import lombok.Setter;
+import org.apache.commons.lang3.StringUtils;
+import org.onap.policy.apex.service.engine.event.ApexEventException;
import org.onap.policy.apex.service.parameters.carriertechnology.CarrierTechnologyParameters;
-import org.onap.policy.common.parameters.annotations.Max;
-import org.onap.policy.common.parameters.annotations.Min;
-import org.onap.policy.common.parameters.annotations.NotNull;
// @formatter:off
/**
@@ -59,20 +58,10 @@ public class GrpcCarrierTechnologyParameters extends CarrierTechnologyParameters
/** The consumer plugin class for the gRPC carrier technology. */
public static final String GRPC_EVENT_CONSUMER_PLUGIN_CLASS = ApexGrpcConsumer.class.getName();
- @Min(value = 1)
private int timeout;
-
- @Min(value = MIN_USER_PORT)
- @Max(value = MAX_USER_PORT)
private int port;
-
- @NotNull
private String host;
-
- @NotNull
private String username;
-
- @NotNull
private String password;
@@ -87,4 +76,36 @@ public class GrpcCarrierTechnologyParameters extends CarrierTechnologyParameters
this.setEventProducerPluginClass(GRPC_EVENT_PRODUCER_PLUGIN_CLASS);
this.setEventConsumerPluginClass(GRPC_EVENT_CONSUMER_PLUGIN_CLASS);
}
+
+ /**
+ * The method validates the gRPC parameters. Host details are specified as parameters only for a gRPC producer.
+ *
+ * @param isProducer if the parameters specified are for the gRPC producer or consumer
+ * @throws ApexEventException exception thrown when invalid parameters are provided
+ */
+ public void validateGrpcParameters(boolean isProducer) throws ApexEventException {
+ StringBuilder errorMessage = new StringBuilder();
+ if (isProducer) {
+ if (timeout < 1) {
+ errorMessage.append("timeout should have a positive value.\n");
+ }
+ if (MIN_USER_PORT > port || MAX_USER_PORT < port) {
+ errorMessage.append("port range should be between ").append(MIN_USER_PORT).append(" and ")
+ .append(MAX_USER_PORT).append("\n");
+ }
+ if (StringUtils.isEmpty(host)) {
+ errorMessage.append("host should be specified.\n");
+ }
+ if (StringUtils.isEmpty(username)) {
+ errorMessage.append("username should be specified.\n");
+ }
+ if (StringUtils.isEmpty(password)) {
+ errorMessage.append("password should be specified.\n");
+ }
+ }
+ if (errorMessage.length() > 0) {
+ errorMessage.insert(0, "Issues in specifying gRPC Producer parameters:\n");
+ throw new ApexEventException(errorMessage.toString());
+ }
+ }
}