summaryrefslogtreecommitdiffstats
path: root/plugins/plugins-event
diff options
context:
space:
mode:
authora.sreekumar <ajith.sreekumar@est.tech>2020-03-09 10:56:39 +0000
committera.sreekumar <ajith.sreekumar@est.tech>2020-03-11 15:49:31 +0000
commit31ece44c935c76252cf0835e91290e88999c4f1d (patch)
treed50ce35d57786a46df2b981f083187b1d2fb0066 /plugins/plugins-event
parent6bda5d7e4e80d1aefb4f19203361b7199448e70f (diff)
Fixing the gRPC consumer side
Change-Id: I21d9253f41eee9b958e8fb723f6c19f266502cef Issue-ID: POLICY-1656 Signed-off-by: a.sreekumar <ajith.sreekumar@est.tech>
Diffstat (limited to 'plugins/plugins-event')
-rw-r--r--plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/main/java/org/onap/policy/apex/plugins/event/carrier/grpc/ApexGrpcConsumer.java45
-rw-r--r--plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/main/java/org/onap/policy/apex/plugins/event/carrier/grpc/ApexGrpcProducer.java31
-rw-r--r--plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/main/java/org/onap/policy/apex/plugins/event/carrier/grpc/GrpcCarrierTechnologyParameters.java47
-rw-r--r--plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/test/java/org/onap/policy/apex/plugins/event/carrier/grpc/ApexGrpcConsumerTest.java78
-rw-r--r--plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/test/java/org/onap/policy/apex/plugins/event/carrier/grpc/GrpcCarrierTechnologyParametersTest.java28
5 files changed, 146 insertions, 83 deletions
diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/main/java/org/onap/policy/apex/plugins/event/carrier/grpc/ApexGrpcConsumer.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/main/java/org/onap/policy/apex/plugins/event/carrier/grpc/ApexGrpcConsumer.java
index 7333c8a05..0f6bdf676 100644
--- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/main/java/org/onap/policy/apex/plugins/event/carrier/grpc/ApexGrpcConsumer.java
+++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/main/java/org/onap/policy/apex/plugins/event/carrier/grpc/ApexGrpcConsumer.java
@@ -20,53 +20,54 @@
package org.onap.policy.apex.plugins.event.carrier.grpc;
+import lombok.Getter;
import org.onap.policy.apex.service.engine.event.ApexEventException;
import org.onap.policy.apex.service.engine.event.ApexEventReceiver;
-import org.onap.policy.apex.service.engine.event.ApexEventRuntimeException;
import org.onap.policy.apex.service.engine.event.ApexPluginsEventConsumer;
-import org.onap.policy.apex.service.engine.event.PeeredReference;
import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters;
import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode;
/**
- * This class implements an Apex gRPC consumer. It is not expected to receive events using gRPC.
- * So, initializing a gRPC consumer will result in error.
+ * This class implements an Apex gRPC consumer. The consumer is used by it's peer gRPC producer to consume response
+ * events.
*
* @author Ajith Sreekumar (ajith.sreekumar@est.tech)
*/
public class ApexGrpcConsumer extends ApexPluginsEventConsumer {
- private static final String GRPC_CONSUMER_ERROR_MSG =
- "A gRPC Consumer may not be specified. Only sending events is possible using gRPC";
+ // The event receiver that will receive events from this consumer
+ @Getter
+ private ApexEventReceiver eventReceiver;
@Override
public void init(final String consumerName, final EventHandlerParameters consumerParameters,
final ApexEventReceiver incomingEventReceiver) throws ApexEventException {
- throw new ApexEventException(GRPC_CONSUMER_ERROR_MSG);
- }
+ this.eventReceiver = incomingEventReceiver;
+ this.name = consumerName;
- @Override
- public void run() {
- throw new ApexEventRuntimeException(GRPC_CONSUMER_ERROR_MSG);
- }
+ // Check and get the gRPC Properties
+ if (!(consumerParameters.getCarrierTechnologyParameters() instanceof GrpcCarrierTechnologyParameters)) {
+ final String errorMessage =
+ "specified consumer properties are not applicable to the gRPC consumer (" + this.name + ")";
+ throw new ApexEventException(errorMessage);
+ }
- @Override
- public void start() {
- throw new ApexEventRuntimeException(GRPC_CONSUMER_ERROR_MSG);
+ // Check if we are in peered mode
+ if (!consumerParameters.isPeeredMode(EventHandlerPeeredMode.REQUESTOR)) {
+ final String errorMessage =
+ "gRPC consumer (" + this.name + ") must run in peered requestor mode with a gRPC producer";
+ throw new ApexEventException(errorMessage);
+ }
}
@Override
public void stop() {
- throw new ApexEventRuntimeException(GRPC_CONSUMER_ERROR_MSG);
+ // For gRPC requests, all the implementation is in the producer
}
@Override
- public PeeredReference getPeeredReference(EventHandlerPeeredMode peeredMode) {
- throw new ApexEventRuntimeException(GRPC_CONSUMER_ERROR_MSG);
+ public void run() {
+ // For gRPC requests, all the implementation is in the producer
}
- @Override
- public void setPeeredReference(EventHandlerPeeredMode peeredMode, PeeredReference peeredReference) {
- throw new ApexEventRuntimeException(GRPC_CONSUMER_ERROR_MSG);
- }
}
diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/main/java/org/onap/policy/apex/plugins/event/carrier/grpc/ApexGrpcProducer.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/main/java/org/onap/policy/apex/plugins/event/carrier/grpc/ApexGrpcProducer.java
index 380ae1274..c98fa41ea 100644
--- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/main/java/org/onap/policy/apex/plugins/event/carrier/grpc/ApexGrpcProducer.java
+++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/main/java/org/onap/policy/apex/plugins/event/carrier/grpc/ApexGrpcProducer.java
@@ -31,10 +31,13 @@ import org.onap.ccsdk.cds.controllerblueprints.common.api.Status;
import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput;
import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput.Builder;
import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput;
+import org.onap.policy.apex.service.engine.event.ApexEventConsumer;
import org.onap.policy.apex.service.engine.event.ApexEventException;
import org.onap.policy.apex.service.engine.event.ApexEventRuntimeException;
import org.onap.policy.apex.service.engine.event.ApexPluginsEventProducer;
+import org.onap.policy.apex.service.engine.event.PeeredReference;
import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters;
+import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode;
import org.onap.policy.cds.api.CdsProcessorListener;
import org.onap.policy.cds.client.CdsProcessorGrpcClient;
import org.onap.policy.cds.properties.CdsServerProperties;
@@ -73,7 +76,7 @@ public class ApexGrpcProducer extends ApexPluginsEventProducer implements CdsPro
}
GrpcCarrierTechnologyParameters grpcProducerProperties =
(GrpcCarrierTechnologyParameters) producerParameters.getCarrierTechnologyParameters();
-
+ grpcProducerProperties.validateGrpcParameters(true);
client = makeGrpcClient(grpcProducerProperties);
}
@@ -125,6 +128,32 @@ public class ApexGrpcProducer extends ApexPluginsEventProducer implements CdsPro
+ "response from CDS:\n" + cdsResponse.get();
throw new ApexEventRuntimeException(errorMessage);
}
+
+ consumeEvent(executionId, cdsResponse.get());
+ }
+
+ private void consumeEvent(long executionId, ExecutionServiceOutput event) {
+ // Find the peered consumer for this producer
+ final PeeredReference peeredRequestorReference = peerReferenceMap.get(EventHandlerPeeredMode.REQUESTOR);
+ if (peeredRequestorReference == null) {
+ return;
+ }
+ // Find the gRPC Response Consumer that will take in the response to APEX Engine
+ final ApexEventConsumer consumer = peeredRequestorReference.getPeeredConsumer();
+ if (!(consumer instanceof ApexGrpcConsumer)) {
+ final String errorMessage = "Recieve of gRPC response by APEX failed,"
+ + "The consumer is not an instance of ApexGrpcConsumer\n. The received gRPC response:" + event;
+ throw new ApexEventRuntimeException(errorMessage);
+ }
+
+ // Use the consumer to consume this response event in APEX
+ final ApexGrpcConsumer grpcConsumer = (ApexGrpcConsumer) consumer;
+ try {
+ grpcConsumer.getEventReceiver().receiveEvent(executionId, new Properties(),
+ JsonFormat.printer().print(event));
+ } catch (ApexEventException | InvalidProtocolBufferException e) {
+ throw new ApexEventRuntimeException("Consuming gRPC response failed.", e);
+ }
}
/**
diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/main/java/org/onap/policy/apex/plugins/event/carrier/grpc/GrpcCarrierTechnologyParameters.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/main/java/org/onap/policy/apex/plugins/event/carrier/grpc/GrpcCarrierTechnologyParameters.java
index 59db16743..f13248ec5 100644
--- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/main/java/org/onap/policy/apex/plugins/event/carrier/grpc/GrpcCarrierTechnologyParameters.java
+++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/main/java/org/onap/policy/apex/plugins/event/carrier/grpc/GrpcCarrierTechnologyParameters.java
@@ -22,10 +22,9 @@ package org.onap.policy.apex.plugins.event.carrier.grpc;
import lombok.Getter;
import lombok.Setter;
+import org.apache.commons.lang3.StringUtils;
+import org.onap.policy.apex.service.engine.event.ApexEventException;
import org.onap.policy.apex.service.parameters.carriertechnology.CarrierTechnologyParameters;
-import org.onap.policy.common.parameters.annotations.Max;
-import org.onap.policy.common.parameters.annotations.Min;
-import org.onap.policy.common.parameters.annotations.NotNull;
// @formatter:off
/**
@@ -59,20 +58,10 @@ public class GrpcCarrierTechnologyParameters extends CarrierTechnologyParameters
/** The consumer plugin class for the gRPC carrier technology. */
public static final String GRPC_EVENT_CONSUMER_PLUGIN_CLASS = ApexGrpcConsumer.class.getName();
- @Min(value = 1)
private int timeout;
-
- @Min(value = MIN_USER_PORT)
- @Max(value = MAX_USER_PORT)
private int port;
-
- @NotNull
private String host;
-
- @NotNull
private String username;
-
- @NotNull
private String password;
@@ -87,4 +76,36 @@ public class GrpcCarrierTechnologyParameters extends CarrierTechnologyParameters
this.setEventProducerPluginClass(GRPC_EVENT_PRODUCER_PLUGIN_CLASS);
this.setEventConsumerPluginClass(GRPC_EVENT_CONSUMER_PLUGIN_CLASS);
}
+
+ /**
+ * The method validates the gRPC parameters. Host details are specified as parameters only for a gRPC producer.
+ *
+ * @param isProducer if the parameters specified are for the gRPC producer or consumer
+ * @throws ApexEventException exception thrown when invalid parameters are provided
+ */
+ public void validateGrpcParameters(boolean isProducer) throws ApexEventException {
+ StringBuilder errorMessage = new StringBuilder();
+ if (isProducer) {
+ if (timeout < 1) {
+ errorMessage.append("timeout should have a positive value.\n");
+ }
+ if (MIN_USER_PORT > port || MAX_USER_PORT < port) {
+ errorMessage.append("port range should be between ").append(MIN_USER_PORT).append(" and ")
+ .append(MAX_USER_PORT).append("\n");
+ }
+ if (StringUtils.isEmpty(host)) {
+ errorMessage.append("host should be specified.\n");
+ }
+ if (StringUtils.isEmpty(username)) {
+ errorMessage.append("username should be specified.\n");
+ }
+ if (StringUtils.isEmpty(password)) {
+ errorMessage.append("password should be specified.\n");
+ }
+ }
+ if (errorMessage.length() > 0) {
+ errorMessage.insert(0, "Issues in specifying gRPC Producer parameters:\n");
+ throw new ApexEventException(errorMessage.toString());
+ }
+ }
}
diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/test/java/org/onap/policy/apex/plugins/event/carrier/grpc/ApexGrpcConsumerTest.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/test/java/org/onap/policy/apex/plugins/event/carrier/grpc/ApexGrpcConsumerTest.java
index dc5cc3809..48fce1bee 100644
--- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/test/java/org/onap/policy/apex/plugins/event/carrier/grpc/ApexGrpcConsumerTest.java
+++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/test/java/org/onap/policy/apex/plugins/event/carrier/grpc/ApexGrpcConsumerTest.java
@@ -22,21 +22,24 @@ package org.onap.policy.apex.plugins.event.carrier.grpc;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Test;
import org.onap.policy.apex.service.engine.event.ApexEventException;
import org.onap.policy.apex.service.engine.event.ApexEventReceiver;
+import org.onap.policy.apex.service.engine.event.PeeredReference;
import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters;
import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode;
public class ApexGrpcConsumerTest {
- ApexGrpcConsumer grpcConsumer = null;
- EventHandlerParameters consumerParameters = null;
- ApexEventReceiver incomingEventReceiver = null;
-
- private static final String GRPC_CONSUMER_ERROR_MSG =
- "A gRPC Consumer may not be specified. Only sending events is possible using gRPC";
+ private static final String CONSUMER_NAME = "TestApexGrpcConsumer";
+ private ApexGrpcConsumer grpcConsumer = null;
+ private ApexGrpcProducer grpcProducer = null;
+ private EventHandlerParameters consumerParameters = null;
+ private ApexEventReceiver incomingEventReceiver = null;
/**
* Set up testing.
@@ -46,22 +49,22 @@ public class ApexGrpcConsumerTest {
@Before
public void setUp() throws ApexEventException {
grpcConsumer = new ApexGrpcConsumer();
- consumerParameters = new EventHandlerParameters();
- consumerParameters.setCarrierTechnologyParameters(new GrpcCarrierTechnologyParameters() {});
+ grpcProducer = new ApexGrpcProducer();
}
@Test
public void testInit() {
- assertThatThrownBy(() -> {
- grpcConsumer.init("TestApexGrpcConsumer", consumerParameters, incomingEventReceiver);
- }).hasMessage(GRPC_CONSUMER_ERROR_MSG);
+ consumerParameters = populateConsumerParameters(true, true);
+ Assertions.assertThatCode(() -> grpcConsumer.init(CONSUMER_NAME, consumerParameters, incomingEventReceiver))
+ .doesNotThrowAnyException();
}
@Test
- public void testStart() {
- assertThatThrownBy(() -> {
- grpcConsumer.start();
- }).hasMessage(GRPC_CONSUMER_ERROR_MSG);
+ public void testInit_invalidPeeredMode() {
+ consumerParameters = populateConsumerParameters(true, false);
+ assertThatThrownBy(() -> grpcConsumer.init(CONSUMER_NAME, consumerParameters, incomingEventReceiver))
+ .hasMessageContaining(
+ "gRPC consumer (" + CONSUMER_NAME + ") must run in peered requestor mode with a gRPC producer");
}
@Test
@@ -70,24 +73,35 @@ public class ApexGrpcConsumerTest {
}
@Test
- public void testGetPeeredReference() {
- assertThatThrownBy(() -> {
- grpcConsumer.getPeeredReference(EventHandlerPeeredMode.REQUESTOR);
- }).hasMessage(GRPC_CONSUMER_ERROR_MSG);
+ public void testPeeredReference() throws ApexEventException {
+ consumerParameters = populateConsumerParameters(true, true);
+ grpcConsumer.setPeeredReference(EventHandlerPeeredMode.REQUESTOR,
+ new PeeredReference(EventHandlerPeeredMode.REQUESTOR, grpcConsumer, grpcProducer));
+ grpcConsumer.init(CONSUMER_NAME, consumerParameters, incomingEventReceiver);
+ PeeredReference peeredReference = grpcConsumer.getPeeredReference(EventHandlerPeeredMode.REQUESTOR);
+ assertNotNull(peeredReference);
+ assertTrue(peeredReference.getPeeredConsumer().equals(grpcConsumer));
+ assertTrue(peeredReference.getPeeredProducer().equals(grpcProducer));
}
- @Test
- public void testSetPeeredReference() {
- assertThatThrownBy(() -> {
- grpcConsumer.setPeeredReference(null, null);
- }).hasMessage(GRPC_CONSUMER_ERROR_MSG);
- }
-
- @Test()
- public void testStop() {
- assertThatThrownBy(() -> {
- new ApexGrpcConsumer().stop();
- }).hasMessage(GRPC_CONSUMER_ERROR_MSG);
+ private EventHandlerParameters populateConsumerParameters(boolean isConsumer, boolean isPeeredMode) {
+ consumerParameters = new EventHandlerParameters();
+ GrpcCarrierTechnologyParameters params = new GrpcCarrierTechnologyParameters();
+ params.setLabel("GRPC");
+ params.setEventProducerPluginClass(ApexGrpcProducer.class.getName());
+ params.setEventConsumerPluginClass(ApexGrpcConsumer.class.getName());
+ if (!isConsumer) {
+ params.setHost("hostname");
+ params.setPort(3214);
+ params.setUsername("dummyUser");
+ params.setPassword("dummyPassword");
+ params.setTimeout(1);
+ }
+ consumerParameters.setCarrierTechnologyParameters(params);
+ if (isPeeredMode) {
+ consumerParameters.setPeeredMode(EventHandlerPeeredMode.REQUESTOR, true);
+ consumerParameters.setPeer(EventHandlerPeeredMode.REQUESTOR, "requestorPeerName");
+ }
+ return consumerParameters;
}
-
}
diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/test/java/org/onap/policy/apex/plugins/event/carrier/grpc/GrpcCarrierTechnologyParametersTest.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/test/java/org/onap/policy/apex/plugins/event/carrier/grpc/GrpcCarrierTechnologyParametersTest.java
index a3994c29e..480d8423b 100644
--- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/test/java/org/onap/policy/apex/plugins/event/carrier/grpc/GrpcCarrierTechnologyParametersTest.java
+++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/test/java/org/onap/policy/apex/plugins/event/carrier/grpc/GrpcCarrierTechnologyParametersTest.java
@@ -20,12 +20,14 @@
package org.onap.policy.apex.plugins.event.carrier.grpc;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Test;
+import org.onap.policy.apex.service.engine.event.ApexEventException;
import org.onap.policy.common.parameters.GroupValidationResult;
public class GrpcCarrierTechnologyParametersTest {
@@ -42,19 +44,13 @@ public class GrpcCarrierTechnologyParametersTest {
}
@Test
- public void testGrpcCarrierTechnologyParameters_invalid() {
+ public void testGrpcCarrierTechnologyParameters_invalid_producer_params() throws ApexEventException {
GroupValidationResult result = params.validate();
- assertFalse(result.isValid());
- assertTrue(result.getResult().contains("field \"timeout\" type \"int\" value \"0\" INVALID, must be >= 1"));
- assertTrue(result.getResult().contains("field \"port\" type \"int\" value \"0\" INVALID, must be >= 1024"));
- assertTrue(
- result.getResult().contains("field \"host\" type \"java.lang.String\" value \"null\" INVALID, is null"));
- assertTrue(result.getResult()
- .contains("field \"username\" type \"java.lang.String\" value \"null\" INVALID, is null"));
- assertTrue(result.getResult()
- .contains("field \"password\" type \"java.lang.String\" value \"null\" INVALID, is null"));
- assertTrue(result.getResult().contains(""));
- assertTrue(result.getResult().contains(""));
+ assertTrue(result.isValid());
+ assertThatThrownBy(() -> params.validateGrpcParameters(true))
+ .hasMessage("Issues in specifying gRPC Producer parameters:\ntimeout should have a positive value.\n"
+ + "port range should be between 1024 and 65535\n" + "host should be specified.\n"
+ + "username should be specified.\n" + "password should be specified.\n");
}
@Test
@@ -70,6 +66,7 @@ public class GrpcCarrierTechnologyParametersTest {
params.setUsername(USERNAME);
GroupValidationResult result = params.validate();
assertTrue(result.isValid());
+ Assertions.assertThatCode(() -> params.validateGrpcParameters(true)).doesNotThrowAnyException();
}
@Test
@@ -81,7 +78,8 @@ public class GrpcCarrierTechnologyParametersTest {
params.setPort(23); // invalid value
GroupValidationResult result = params.validate();
- assertFalse(result.isValid());
- assertTrue(result.getResult().contains("field \"port\" type \"int\" value \"23\" INVALID, must be >= 1024"));
+ assertTrue(result.isValid());
+ assertThatThrownBy(() -> params.validateGrpcParameters(true))
+ .hasMessageContaining("port range should be between 1024 and 65535");
}
}