diff options
author | a.sreekumar <ajith.sreekumar@est.tech> | 2020-03-09 10:56:39 +0000 |
---|---|---|
committer | a.sreekumar <ajith.sreekumar@est.tech> | 2020-03-11 15:49:31 +0000 |
commit | 31ece44c935c76252cf0835e91290e88999c4f1d (patch) | |
tree | d50ce35d57786a46df2b981f083187b1d2fb0066 /plugins/plugins-event | |
parent | 6bda5d7e4e80d1aefb4f19203361b7199448e70f (diff) |
Fixing the gRPC consumer side
Change-Id: I21d9253f41eee9b958e8fb723f6c19f266502cef
Issue-ID: POLICY-1656
Signed-off-by: a.sreekumar <ajith.sreekumar@est.tech>
Diffstat (limited to 'plugins/plugins-event')
5 files changed, 146 insertions, 83 deletions
diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/main/java/org/onap/policy/apex/plugins/event/carrier/grpc/ApexGrpcConsumer.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/main/java/org/onap/policy/apex/plugins/event/carrier/grpc/ApexGrpcConsumer.java index 7333c8a05..0f6bdf676 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/main/java/org/onap/policy/apex/plugins/event/carrier/grpc/ApexGrpcConsumer.java +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/main/java/org/onap/policy/apex/plugins/event/carrier/grpc/ApexGrpcConsumer.java @@ -20,53 +20,54 @@ package org.onap.policy.apex.plugins.event.carrier.grpc; +import lombok.Getter; import org.onap.policy.apex.service.engine.event.ApexEventException; import org.onap.policy.apex.service.engine.event.ApexEventReceiver; -import org.onap.policy.apex.service.engine.event.ApexEventRuntimeException; import org.onap.policy.apex.service.engine.event.ApexPluginsEventConsumer; -import org.onap.policy.apex.service.engine.event.PeeredReference; import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters; import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode; /** - * This class implements an Apex gRPC consumer. It is not expected to receive events using gRPC. - * So, initializing a gRPC consumer will result in error. + * This class implements an Apex gRPC consumer. The consumer is used by it's peer gRPC producer to consume response + * events. * * @author Ajith Sreekumar (ajith.sreekumar@est.tech) */ public class ApexGrpcConsumer extends ApexPluginsEventConsumer { - private static final String GRPC_CONSUMER_ERROR_MSG = - "A gRPC Consumer may not be specified. Only sending events is possible using gRPC"; + // The event receiver that will receive events from this consumer + @Getter + private ApexEventReceiver eventReceiver; @Override public void init(final String consumerName, final EventHandlerParameters consumerParameters, final ApexEventReceiver incomingEventReceiver) throws ApexEventException { - throw new ApexEventException(GRPC_CONSUMER_ERROR_MSG); - } + this.eventReceiver = incomingEventReceiver; + this.name = consumerName; - @Override - public void run() { - throw new ApexEventRuntimeException(GRPC_CONSUMER_ERROR_MSG); - } + // Check and get the gRPC Properties + if (!(consumerParameters.getCarrierTechnologyParameters() instanceof GrpcCarrierTechnologyParameters)) { + final String errorMessage = + "specified consumer properties are not applicable to the gRPC consumer (" + this.name + ")"; + throw new ApexEventException(errorMessage); + } - @Override - public void start() { - throw new ApexEventRuntimeException(GRPC_CONSUMER_ERROR_MSG); + // Check if we are in peered mode + if (!consumerParameters.isPeeredMode(EventHandlerPeeredMode.REQUESTOR)) { + final String errorMessage = + "gRPC consumer (" + this.name + ") must run in peered requestor mode with a gRPC producer"; + throw new ApexEventException(errorMessage); + } } @Override public void stop() { - throw new ApexEventRuntimeException(GRPC_CONSUMER_ERROR_MSG); + // For gRPC requests, all the implementation is in the producer } @Override - public PeeredReference getPeeredReference(EventHandlerPeeredMode peeredMode) { - throw new ApexEventRuntimeException(GRPC_CONSUMER_ERROR_MSG); + public void run() { + // For gRPC requests, all the implementation is in the producer } - @Override - public void setPeeredReference(EventHandlerPeeredMode peeredMode, PeeredReference peeredReference) { - throw new ApexEventRuntimeException(GRPC_CONSUMER_ERROR_MSG); - } } diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/main/java/org/onap/policy/apex/plugins/event/carrier/grpc/ApexGrpcProducer.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/main/java/org/onap/policy/apex/plugins/event/carrier/grpc/ApexGrpcProducer.java index 380ae1274..c98fa41ea 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/main/java/org/onap/policy/apex/plugins/event/carrier/grpc/ApexGrpcProducer.java +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/main/java/org/onap/policy/apex/plugins/event/carrier/grpc/ApexGrpcProducer.java @@ -31,10 +31,13 @@ import org.onap.ccsdk.cds.controllerblueprints.common.api.Status; import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput; import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput.Builder; import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput; +import org.onap.policy.apex.service.engine.event.ApexEventConsumer; import org.onap.policy.apex.service.engine.event.ApexEventException; import org.onap.policy.apex.service.engine.event.ApexEventRuntimeException; import org.onap.policy.apex.service.engine.event.ApexPluginsEventProducer; +import org.onap.policy.apex.service.engine.event.PeeredReference; import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters; +import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode; import org.onap.policy.cds.api.CdsProcessorListener; import org.onap.policy.cds.client.CdsProcessorGrpcClient; import org.onap.policy.cds.properties.CdsServerProperties; @@ -73,7 +76,7 @@ public class ApexGrpcProducer extends ApexPluginsEventProducer implements CdsPro } GrpcCarrierTechnologyParameters grpcProducerProperties = (GrpcCarrierTechnologyParameters) producerParameters.getCarrierTechnologyParameters(); - + grpcProducerProperties.validateGrpcParameters(true); client = makeGrpcClient(grpcProducerProperties); } @@ -125,6 +128,32 @@ public class ApexGrpcProducer extends ApexPluginsEventProducer implements CdsPro + "response from CDS:\n" + cdsResponse.get(); throw new ApexEventRuntimeException(errorMessage); } + + consumeEvent(executionId, cdsResponse.get()); + } + + private void consumeEvent(long executionId, ExecutionServiceOutput event) { + // Find the peered consumer for this producer + final PeeredReference peeredRequestorReference = peerReferenceMap.get(EventHandlerPeeredMode.REQUESTOR); + if (peeredRequestorReference == null) { + return; + } + // Find the gRPC Response Consumer that will take in the response to APEX Engine + final ApexEventConsumer consumer = peeredRequestorReference.getPeeredConsumer(); + if (!(consumer instanceof ApexGrpcConsumer)) { + final String errorMessage = "Recieve of gRPC response by APEX failed," + + "The consumer is not an instance of ApexGrpcConsumer\n. The received gRPC response:" + event; + throw new ApexEventRuntimeException(errorMessage); + } + + // Use the consumer to consume this response event in APEX + final ApexGrpcConsumer grpcConsumer = (ApexGrpcConsumer) consumer; + try { + grpcConsumer.getEventReceiver().receiveEvent(executionId, new Properties(), + JsonFormat.printer().print(event)); + } catch (ApexEventException | InvalidProtocolBufferException e) { + throw new ApexEventRuntimeException("Consuming gRPC response failed.", e); + } } /** diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/main/java/org/onap/policy/apex/plugins/event/carrier/grpc/GrpcCarrierTechnologyParameters.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/main/java/org/onap/policy/apex/plugins/event/carrier/grpc/GrpcCarrierTechnologyParameters.java index 59db16743..f13248ec5 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/main/java/org/onap/policy/apex/plugins/event/carrier/grpc/GrpcCarrierTechnologyParameters.java +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/main/java/org/onap/policy/apex/plugins/event/carrier/grpc/GrpcCarrierTechnologyParameters.java @@ -22,10 +22,9 @@ package org.onap.policy.apex.plugins.event.carrier.grpc; import lombok.Getter; import lombok.Setter; +import org.apache.commons.lang3.StringUtils; +import org.onap.policy.apex.service.engine.event.ApexEventException; import org.onap.policy.apex.service.parameters.carriertechnology.CarrierTechnologyParameters; -import org.onap.policy.common.parameters.annotations.Max; -import org.onap.policy.common.parameters.annotations.Min; -import org.onap.policy.common.parameters.annotations.NotNull; // @formatter:off /** @@ -59,20 +58,10 @@ public class GrpcCarrierTechnologyParameters extends CarrierTechnologyParameters /** The consumer plugin class for the gRPC carrier technology. */ public static final String GRPC_EVENT_CONSUMER_PLUGIN_CLASS = ApexGrpcConsumer.class.getName(); - @Min(value = 1) private int timeout; - - @Min(value = MIN_USER_PORT) - @Max(value = MAX_USER_PORT) private int port; - - @NotNull private String host; - - @NotNull private String username; - - @NotNull private String password; @@ -87,4 +76,36 @@ public class GrpcCarrierTechnologyParameters extends CarrierTechnologyParameters this.setEventProducerPluginClass(GRPC_EVENT_PRODUCER_PLUGIN_CLASS); this.setEventConsumerPluginClass(GRPC_EVENT_CONSUMER_PLUGIN_CLASS); } + + /** + * The method validates the gRPC parameters. Host details are specified as parameters only for a gRPC producer. + * + * @param isProducer if the parameters specified are for the gRPC producer or consumer + * @throws ApexEventException exception thrown when invalid parameters are provided + */ + public void validateGrpcParameters(boolean isProducer) throws ApexEventException { + StringBuilder errorMessage = new StringBuilder(); + if (isProducer) { + if (timeout < 1) { + errorMessage.append("timeout should have a positive value.\n"); + } + if (MIN_USER_PORT > port || MAX_USER_PORT < port) { + errorMessage.append("port range should be between ").append(MIN_USER_PORT).append(" and ") + .append(MAX_USER_PORT).append("\n"); + } + if (StringUtils.isEmpty(host)) { + errorMessage.append("host should be specified.\n"); + } + if (StringUtils.isEmpty(username)) { + errorMessage.append("username should be specified.\n"); + } + if (StringUtils.isEmpty(password)) { + errorMessage.append("password should be specified.\n"); + } + } + if (errorMessage.length() > 0) { + errorMessage.insert(0, "Issues in specifying gRPC Producer parameters:\n"); + throw new ApexEventException(errorMessage.toString()); + } + } } diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/test/java/org/onap/policy/apex/plugins/event/carrier/grpc/ApexGrpcConsumerTest.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/test/java/org/onap/policy/apex/plugins/event/carrier/grpc/ApexGrpcConsumerTest.java index dc5cc3809..48fce1bee 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/test/java/org/onap/policy/apex/plugins/event/carrier/grpc/ApexGrpcConsumerTest.java +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/test/java/org/onap/policy/apex/plugins/event/carrier/grpc/ApexGrpcConsumerTest.java @@ -22,21 +22,24 @@ package org.onap.policy.apex.plugins.event.carrier.grpc; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import org.assertj.core.api.Assertions; import org.junit.Before; import org.junit.Test; import org.onap.policy.apex.service.engine.event.ApexEventException; import org.onap.policy.apex.service.engine.event.ApexEventReceiver; +import org.onap.policy.apex.service.engine.event.PeeredReference; import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters; import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode; public class ApexGrpcConsumerTest { - ApexGrpcConsumer grpcConsumer = null; - EventHandlerParameters consumerParameters = null; - ApexEventReceiver incomingEventReceiver = null; - - private static final String GRPC_CONSUMER_ERROR_MSG = - "A gRPC Consumer may not be specified. Only sending events is possible using gRPC"; + private static final String CONSUMER_NAME = "TestApexGrpcConsumer"; + private ApexGrpcConsumer grpcConsumer = null; + private ApexGrpcProducer grpcProducer = null; + private EventHandlerParameters consumerParameters = null; + private ApexEventReceiver incomingEventReceiver = null; /** * Set up testing. @@ -46,22 +49,22 @@ public class ApexGrpcConsumerTest { @Before public void setUp() throws ApexEventException { grpcConsumer = new ApexGrpcConsumer(); - consumerParameters = new EventHandlerParameters(); - consumerParameters.setCarrierTechnologyParameters(new GrpcCarrierTechnologyParameters() {}); + grpcProducer = new ApexGrpcProducer(); } @Test public void testInit() { - assertThatThrownBy(() -> { - grpcConsumer.init("TestApexGrpcConsumer", consumerParameters, incomingEventReceiver); - }).hasMessage(GRPC_CONSUMER_ERROR_MSG); + consumerParameters = populateConsumerParameters(true, true); + Assertions.assertThatCode(() -> grpcConsumer.init(CONSUMER_NAME, consumerParameters, incomingEventReceiver)) + .doesNotThrowAnyException(); } @Test - public void testStart() { - assertThatThrownBy(() -> { - grpcConsumer.start(); - }).hasMessage(GRPC_CONSUMER_ERROR_MSG); + public void testInit_invalidPeeredMode() { + consumerParameters = populateConsumerParameters(true, false); + assertThatThrownBy(() -> grpcConsumer.init(CONSUMER_NAME, consumerParameters, incomingEventReceiver)) + .hasMessageContaining( + "gRPC consumer (" + CONSUMER_NAME + ") must run in peered requestor mode with a gRPC producer"); } @Test @@ -70,24 +73,35 @@ public class ApexGrpcConsumerTest { } @Test - public void testGetPeeredReference() { - assertThatThrownBy(() -> { - grpcConsumer.getPeeredReference(EventHandlerPeeredMode.REQUESTOR); - }).hasMessage(GRPC_CONSUMER_ERROR_MSG); + public void testPeeredReference() throws ApexEventException { + consumerParameters = populateConsumerParameters(true, true); + grpcConsumer.setPeeredReference(EventHandlerPeeredMode.REQUESTOR, + new PeeredReference(EventHandlerPeeredMode.REQUESTOR, grpcConsumer, grpcProducer)); + grpcConsumer.init(CONSUMER_NAME, consumerParameters, incomingEventReceiver); + PeeredReference peeredReference = grpcConsumer.getPeeredReference(EventHandlerPeeredMode.REQUESTOR); + assertNotNull(peeredReference); + assertTrue(peeredReference.getPeeredConsumer().equals(grpcConsumer)); + assertTrue(peeredReference.getPeeredProducer().equals(grpcProducer)); } - @Test - public void testSetPeeredReference() { - assertThatThrownBy(() -> { - grpcConsumer.setPeeredReference(null, null); - }).hasMessage(GRPC_CONSUMER_ERROR_MSG); - } - - @Test() - public void testStop() { - assertThatThrownBy(() -> { - new ApexGrpcConsumer().stop(); - }).hasMessage(GRPC_CONSUMER_ERROR_MSG); + private EventHandlerParameters populateConsumerParameters(boolean isConsumer, boolean isPeeredMode) { + consumerParameters = new EventHandlerParameters(); + GrpcCarrierTechnologyParameters params = new GrpcCarrierTechnologyParameters(); + params.setLabel("GRPC"); + params.setEventProducerPluginClass(ApexGrpcProducer.class.getName()); + params.setEventConsumerPluginClass(ApexGrpcConsumer.class.getName()); + if (!isConsumer) { + params.setHost("hostname"); + params.setPort(3214); + params.setUsername("dummyUser"); + params.setPassword("dummyPassword"); + params.setTimeout(1); + } + consumerParameters.setCarrierTechnologyParameters(params); + if (isPeeredMode) { + consumerParameters.setPeeredMode(EventHandlerPeeredMode.REQUESTOR, true); + consumerParameters.setPeer(EventHandlerPeeredMode.REQUESTOR, "requestorPeerName"); + } + return consumerParameters; } - } diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/test/java/org/onap/policy/apex/plugins/event/carrier/grpc/GrpcCarrierTechnologyParametersTest.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/test/java/org/onap/policy/apex/plugins/event/carrier/grpc/GrpcCarrierTechnologyParametersTest.java index a3994c29e..480d8423b 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/test/java/org/onap/policy/apex/plugins/event/carrier/grpc/GrpcCarrierTechnologyParametersTest.java +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/test/java/org/onap/policy/apex/plugins/event/carrier/grpc/GrpcCarrierTechnologyParametersTest.java @@ -20,12 +20,14 @@ package org.onap.policy.apex.plugins.event.carrier.grpc; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import org.assertj.core.api.Assertions; import org.junit.Before; import org.junit.Test; +import org.onap.policy.apex.service.engine.event.ApexEventException; import org.onap.policy.common.parameters.GroupValidationResult; public class GrpcCarrierTechnologyParametersTest { @@ -42,19 +44,13 @@ public class GrpcCarrierTechnologyParametersTest { } @Test - public void testGrpcCarrierTechnologyParameters_invalid() { + public void testGrpcCarrierTechnologyParameters_invalid_producer_params() throws ApexEventException { GroupValidationResult result = params.validate(); - assertFalse(result.isValid()); - assertTrue(result.getResult().contains("field \"timeout\" type \"int\" value \"0\" INVALID, must be >= 1")); - assertTrue(result.getResult().contains("field \"port\" type \"int\" value \"0\" INVALID, must be >= 1024")); - assertTrue( - result.getResult().contains("field \"host\" type \"java.lang.String\" value \"null\" INVALID, is null")); - assertTrue(result.getResult() - .contains("field \"username\" type \"java.lang.String\" value \"null\" INVALID, is null")); - assertTrue(result.getResult() - .contains("field \"password\" type \"java.lang.String\" value \"null\" INVALID, is null")); - assertTrue(result.getResult().contains("")); - assertTrue(result.getResult().contains("")); + assertTrue(result.isValid()); + assertThatThrownBy(() -> params.validateGrpcParameters(true)) + .hasMessage("Issues in specifying gRPC Producer parameters:\ntimeout should have a positive value.\n" + + "port range should be between 1024 and 65535\n" + "host should be specified.\n" + + "username should be specified.\n" + "password should be specified.\n"); } @Test @@ -70,6 +66,7 @@ public class GrpcCarrierTechnologyParametersTest { params.setUsername(USERNAME); GroupValidationResult result = params.validate(); assertTrue(result.isValid()); + Assertions.assertThatCode(() -> params.validateGrpcParameters(true)).doesNotThrowAnyException(); } @Test @@ -81,7 +78,8 @@ public class GrpcCarrierTechnologyParametersTest { params.setPort(23); // invalid value GroupValidationResult result = params.validate(); - assertFalse(result.isValid()); - assertTrue(result.getResult().contains("field \"port\" type \"int\" value \"23\" INVALID, must be >= 1024")); + assertTrue(result.isValid()); + assertThatThrownBy(() -> params.validateGrpcParameters(true)) + .hasMessageContaining("port range should be between 1024 and 65535"); } } |