From 3db2a659f2d75a7303b39235dd81f4d55d3b87e2 Mon Sep 17 00:00:00 2001 From: "a.sreekumar" Date: Sun, 1 Mar 2020 19:38:50 +0000 Subject: GRPC support for APEX-CDS interaction Change-Id: I586153244dbd97a41e9b9d616ee9a84327b7c2da Issue-ID: POLICY-1656 Signed-off-by: a.sreekumar --- .../event/carrier/grpc/ApexGrpcConsumer.java | 72 ++++++++++ .../event/carrier/grpc/ApexGrpcProducer.java | 152 +++++++++++++++++++++ .../grpc/GrpcCarrierTechnologyParameters.java | 90 ++++++++++++ .../plugins/event/carrier/grpc/package-info.java | 21 +++ 4 files changed, 335 insertions(+) create mode 100644 plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/main/java/org/onap/policy/apex/plugins/event/carrier/grpc/ApexGrpcConsumer.java create mode 100644 plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/main/java/org/onap/policy/apex/plugins/event/carrier/grpc/ApexGrpcProducer.java create mode 100644 plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/main/java/org/onap/policy/apex/plugins/event/carrier/grpc/GrpcCarrierTechnologyParameters.java create mode 100644 plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/main/java/org/onap/policy/apex/plugins/event/carrier/grpc/package-info.java (limited to 'plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/main/java/org') diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/main/java/org/onap/policy/apex/plugins/event/carrier/grpc/ApexGrpcConsumer.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/main/java/org/onap/policy/apex/plugins/event/carrier/grpc/ApexGrpcConsumer.java new file mode 100644 index 000000000..7333c8a05 --- /dev/null +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/main/java/org/onap/policy/apex/plugins/event/carrier/grpc/ApexGrpcConsumer.java @@ -0,0 +1,72 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2020 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.plugins.event.carrier.grpc; + +import org.onap.policy.apex.service.engine.event.ApexEventException; +import org.onap.policy.apex.service.engine.event.ApexEventReceiver; +import org.onap.policy.apex.service.engine.event.ApexEventRuntimeException; +import org.onap.policy.apex.service.engine.event.ApexPluginsEventConsumer; +import org.onap.policy.apex.service.engine.event.PeeredReference; +import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters; +import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode; + +/** + * This class implements an Apex gRPC consumer. It is not expected to receive events using gRPC. + * So, initializing a gRPC consumer will result in error. + * + * @author Ajith Sreekumar (ajith.sreekumar@est.tech) + */ +public class ApexGrpcConsumer extends ApexPluginsEventConsumer { + + private static final String GRPC_CONSUMER_ERROR_MSG = + "A gRPC Consumer may not be specified. Only sending events is possible using gRPC"; + + @Override + public void init(final String consumerName, final EventHandlerParameters consumerParameters, + final ApexEventReceiver incomingEventReceiver) throws ApexEventException { + throw new ApexEventException(GRPC_CONSUMER_ERROR_MSG); + } + + @Override + public void run() { + throw new ApexEventRuntimeException(GRPC_CONSUMER_ERROR_MSG); + } + + @Override + public void start() { + throw new ApexEventRuntimeException(GRPC_CONSUMER_ERROR_MSG); + } + + @Override + public void stop() { + throw new ApexEventRuntimeException(GRPC_CONSUMER_ERROR_MSG); + } + + @Override + public PeeredReference getPeeredReference(EventHandlerPeeredMode peeredMode) { + throw new ApexEventRuntimeException(GRPC_CONSUMER_ERROR_MSG); + } + + @Override + public void setPeeredReference(EventHandlerPeeredMode peeredMode, PeeredReference peeredReference) { + throw new ApexEventRuntimeException(GRPC_CONSUMER_ERROR_MSG); + } +} diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/main/java/org/onap/policy/apex/plugins/event/carrier/grpc/ApexGrpcProducer.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/main/java/org/onap/policy/apex/plugins/event/carrier/grpc/ApexGrpcProducer.java new file mode 100644 index 000000000..380ae1274 --- /dev/null +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/main/java/org/onap/policy/apex/plugins/event/carrier/grpc/ApexGrpcProducer.java @@ -0,0 +1,152 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2020 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.plugins.event.carrier.grpc; + +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.util.JsonFormat; +import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType; +import org.onap.ccsdk.cds.controllerblueprints.common.api.Status; +import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput; +import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput.Builder; +import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput; +import org.onap.policy.apex.service.engine.event.ApexEventException; +import org.onap.policy.apex.service.engine.event.ApexEventRuntimeException; +import org.onap.policy.apex.service.engine.event.ApexPluginsEventProducer; +import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters; +import org.onap.policy.cds.api.CdsProcessorListener; +import org.onap.policy.cds.client.CdsProcessorGrpcClient; +import org.onap.policy.cds.properties.CdsServerProperties; +import org.onap.policy.controlloop.actor.cds.constants.CdsActorConstants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Concrete implementation of an Apex gRPC plugin that manages to send a GRPC request. + * + * @author Ajith Sreekumar(ajith.sreekumar@est.tech) + * + */ +public class ApexGrpcProducer extends ApexPluginsEventProducer implements CdsProcessorListener { + private static final Logger LOGGER = LoggerFactory.getLogger(ApexGrpcProducer.class); + + private CdsServerProperties props; + // The gRPC client + private CdsProcessorGrpcClient client; + + private AtomicReference cdsResponse = new AtomicReference<>(); + + /** + * {@inheritDoc}. + */ + @Override + public void init(final String producerName, final EventHandlerParameters producerParameters) + throws ApexEventException { + this.name = producerName; + + // Check and get the gRPC Properties + if (!(producerParameters.getCarrierTechnologyParameters() instanceof GrpcCarrierTechnologyParameters)) { + final String errorMessage = + "Specified producer properties are not applicable to gRPC producer (" + this.name + ")"; + throw new ApexEventException(errorMessage); + } + GrpcCarrierTechnologyParameters grpcProducerProperties = + (GrpcCarrierTechnologyParameters) producerParameters.getCarrierTechnologyParameters(); + + client = makeGrpcClient(grpcProducerProperties); + } + + private CdsProcessorGrpcClient makeGrpcClient(GrpcCarrierTechnologyParameters grpcProducerProperties) { + props = new CdsServerProperties(); + props.setHost(grpcProducerProperties.getHost()); + props.setPort(grpcProducerProperties.getPort()); + props.setUsername(grpcProducerProperties.getUsername()); + props.setPassword(grpcProducerProperties.getPassword()); + props.setTimeout(grpcProducerProperties.getTimeout()); + + return new CdsProcessorGrpcClient(this, props); + } + + /** + * {@inheritDoc}. + */ + @Override + public void sendEvent(final long executionId, final Properties executionProperties, final String eventName, + final Object event) { + + ExecutionServiceInput executionServiceInput; + Builder builder = ExecutionServiceInput.newBuilder(); + try { + JsonFormat.parser().ignoringUnknownFields().merge((String) event, builder); + executionServiceInput = builder.build(); + } catch (InvalidProtocolBufferException e) { + throw new ApexEventRuntimeException( + "Incoming Event cannot be converted to ExecutionServiceInput type for gRPC request." + e.getMessage()); + } + try { + CountDownLatch countDownLatch = client.sendRequest(executionServiceInput); + if (!countDownLatch.await(props.getTimeout(), TimeUnit.SECONDS)) { + cdsResponse.set(ExecutionServiceOutput.newBuilder().setStatus(Status.newBuilder() + .setErrorMessage(CdsActorConstants.TIMED_OUT).setEventType(EventType.EVENT_COMPONENT_FAILURE)) + .build()); + LOGGER.error("gRPC Request timed out."); + } + } catch (InterruptedException e) { + LOGGER.error("gRPC request failed. {}", e.getMessage()); + cdsResponse.set(ExecutionServiceOutput.newBuilder().setStatus(Status.newBuilder() + .setErrorMessage(CdsActorConstants.INTERRUPTED).setEventType(EventType.EVENT_COMPONENT_FAILURE)) + .build()); + Thread.currentThread().interrupt(); + } + + if (!EventType.EVENT_COMPONENT_EXECUTED.equals(cdsResponse.get().getStatus().getEventType())) { + String errorMessage = "Sending event \"" + eventName + "\" by " + this.name + " to CDS failed, " + + "response from CDS:\n" + cdsResponse.get(); + throw new ApexEventRuntimeException(errorMessage); + } + } + + /** + * {@inheritDoc}. + */ + @Override + public void stop() { + client.close(); + } + + @Override + public void onMessage(ExecutionServiceOutput message) { + LOGGER.info("Received notification from CDS: {}", message); + cdsResponse.set(message); + } + + @Override + public void onError(Throwable throwable) { + String errorMsg = throwable.getLocalizedMessage(); + cdsResponse.set(ExecutionServiceOutput.newBuilder() + .setStatus(Status.newBuilder().setErrorMessage(errorMsg).setEventType(EventType.EVENT_COMPONENT_FAILURE)) + .build()); + LOGGER.error("Failed processing blueprint {} {}", errorMsg, throwable); + } +} diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/main/java/org/onap/policy/apex/plugins/event/carrier/grpc/GrpcCarrierTechnologyParameters.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/main/java/org/onap/policy/apex/plugins/event/carrier/grpc/GrpcCarrierTechnologyParameters.java new file mode 100644 index 000000000..59db16743 --- /dev/null +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/main/java/org/onap/policy/apex/plugins/event/carrier/grpc/GrpcCarrierTechnologyParameters.java @@ -0,0 +1,90 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2020 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.plugins.event.carrier.grpc; + +import lombok.Getter; +import lombok.Setter; +import org.onap.policy.apex.service.parameters.carriertechnology.CarrierTechnologyParameters; +import org.onap.policy.common.parameters.annotations.Max; +import org.onap.policy.common.parameters.annotations.Min; +import org.onap.policy.common.parameters.annotations.NotNull; + +// @formatter:off +/** + * Apex parameters for gRPC as an event carrier technology. + * + *

The parameters for this plugin are: + *

    + *
  1. host: The host on which CDS is running. This parameter is mandatory + *
  2. port: The port on the CDS host to connect to for CDS. This parameter is mandatory. + *
  3. username: The username for basic authentication to connect to CDS. This parameter is mandatory. + *
  4. password: The password for basic authentication to connect to CDS. This parameter is mandatory. + *
  5. timeout: The timeout in seconds for CDS requests. This parameter is mandatory. + *
+ * + * @author Ajith Sreekumar(ajith.sreekumar@est.tech) + */ +//@formatter:on +@Getter +@Setter +public class GrpcCarrierTechnologyParameters extends CarrierTechnologyParameters { + // @formatter:off + private static final int MIN_USER_PORT = 1024; + private static final int MAX_USER_PORT = 65535; + + /** The label of this carrier technology. */ + public static final String GRPC_CARRIER_TECHNOLOGY_LABEL = "GRPC"; + + /** The producer plugin class for the grpc carrier technology. */ + public static final String GRPC_EVENT_PRODUCER_PLUGIN_CLASS = ApexGrpcProducer.class.getName(); + + /** The consumer plugin class for the gRPC carrier technology. */ + public static final String GRPC_EVENT_CONSUMER_PLUGIN_CLASS = ApexGrpcConsumer.class.getName(); + + @Min(value = 1) + private int timeout; + + @Min(value = MIN_USER_PORT) + @Max(value = MAX_USER_PORT) + private int port; + + @NotNull + private String host; + + @NotNull + private String username; + + @NotNull + private String password; + + + /** + * Constructor to create a gRPC carrier technology parameters instance and register the instance with the + * parameter service. + */ + public GrpcCarrierTechnologyParameters() { + super(); + // Set the carrier technology properties for the gRPC carrier technology + this.setLabel(GRPC_CARRIER_TECHNOLOGY_LABEL); + this.setEventProducerPluginClass(GRPC_EVENT_PRODUCER_PLUGIN_CLASS); + this.setEventConsumerPluginClass(GRPC_EVENT_CONSUMER_PLUGIN_CLASS); + } +} diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/main/java/org/onap/policy/apex/plugins/event/carrier/grpc/package-info.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/main/java/org/onap/policy/apex/plugins/event/carrier/grpc/package-info.java new file mode 100644 index 000000000..77d26266d --- /dev/null +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-grpc/src/main/java/org/onap/policy/apex/plugins/event/carrier/grpc/package-info.java @@ -0,0 +1,21 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2020 Nordix Foundation. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.plugins.event.carrier.grpc; -- cgit 1.2.3-korg