diff options
Diffstat (limited to 'services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl')
27 files changed, 2980 insertions, 0 deletions
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/EventConsumerFactory.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/EventConsumerFactory.java new file mode 100644 index 000000000..8f54c049b --- /dev/null +++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/EventConsumerFactory.java @@ -0,0 +1,83 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.service.engine.event.impl; + +import org.onap.policy.apex.service.engine.event.ApexEventConsumer; +import org.onap.policy.apex.service.engine.event.ApexEventException; +import org.onap.policy.apex.service.parameters.carriertechnology.CarrierTechnologyParameters; +import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters; +import org.slf4j.ext.XLogger; +import org.slf4j.ext.XLoggerFactory; + +/** + * This factory class creates event consumers of various technology types for Apex engines. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ +public class EventConsumerFactory { + // The logger for this class + private static final XLogger LOGGER = XLoggerFactory.getXLogger(EventConsumerFactory.class); + + /** + * Empty constructor with no generic overloading. + */ + public EventConsumerFactory() {} + + /** + * Create an event consumer of the required type for the specified consumer technology. + * + * @param name the name of the consumer + * @param consumerParameters The parameters for the Apex engine, we use the technology type of + * the required consumer + * @return the event consumer + * @throws ApexEventException on errors creating the Apex event consumer + */ + public ApexEventConsumer createConsumer(final String name, final EventHandlerParameters consumerParameters) + throws ApexEventException { + // Get the carrier technology parameters + final CarrierTechnologyParameters technologyParameters = consumerParameters.getCarrierTechnologyParameters(); + + // Get the class for the event consumer using reflection + final String consumerPluginClass = technologyParameters.getEventConsumerPluginClass(); + Object consumerPluginObject = null; + try { + consumerPluginObject = Class.forName(consumerPluginClass).newInstance(); + } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) { + final String errorMessage = "could not create an Apex event consumer for \"" + name + + "\" for the carrier technology \"" + technologyParameters.getLabel() + + "\", specified event consumer plugin class \"" + consumerPluginClass + "\" not found"; + LOGGER.error(errorMessage, e); + throw new ApexEventException(errorMessage, e); + } + + // Check the class is an event consumer + if (!(consumerPluginObject instanceof ApexEventConsumer)) { + final String errorMessage = "could not create an Apex event consumer \"" + name + + "\" for the carrier technology \"" + technologyParameters.getLabel() + + "\", specified event consumer plugin class \"" + consumerPluginClass + + "\" is not an instance of \"" + ApexEventConsumer.class.getCanonicalName() + "\""; + LOGGER.error(errorMessage); + throw new ApexEventException(errorMessage); + } + + return (ApexEventConsumer) consumerPluginObject; + } +} diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/EventProducerFactory.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/EventProducerFactory.java new file mode 100644 index 000000000..9bbbad362 --- /dev/null +++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/EventProducerFactory.java @@ -0,0 +1,82 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.service.engine.event.impl; + +import org.onap.policy.apex.service.engine.event.ApexEventException; +import org.onap.policy.apex.service.engine.event.ApexEventProducer; +import org.onap.policy.apex.service.parameters.carriertechnology.CarrierTechnologyParameters; +import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters; +import org.slf4j.ext.XLogger; +import org.slf4j.ext.XLoggerFactory; + +/** + * This factory class creates event producers for the defined technology type for Apex engines. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ +public class EventProducerFactory { + // The logger for this class + private static final XLogger LOGGER = XLoggerFactory.getXLogger(EventProducerFactory.class); + + /** + * Empty constructor with no generic overloading. + */ + public EventProducerFactory() {} + + /** + * Create an event producer of the required type for the specified producer technology. + * + * @param name the name of the producer + * @param producerParameters The Apex parameters containing the configuration for the producer + * @return the event producer + * @throws ApexEventException on errors creating the Apex event producer + */ + public ApexEventProducer createProducer(final String name, final EventHandlerParameters producerParameters) + throws ApexEventException { + // Get the carrier technology parameters + final CarrierTechnologyParameters technologyParameters = producerParameters.getCarrierTechnologyParameters(); + + // Get the class for the event producer using reflection + final String producerPluginClass = technologyParameters.getEventProducerPluginClass(); + Object producerPluginObject = null; + try { + producerPluginObject = Class.forName(producerPluginClass).newInstance(); + } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) { + final String errorMessage = "could not create an Apex event producer for Producer \"" + name + + "\" for the carrier technology \"" + technologyParameters.getLabel() + + "\", specified event producer plugin class \"" + producerPluginClass + "\" not found"; + LOGGER.error(errorMessage, e); + throw new ApexEventException(errorMessage, e); + } + + // Check the class is an event producer + if (!(producerPluginObject instanceof ApexEventProducer)) { + final String errorMessage = "could not create an Apex event producer for Producer \"" + name + + "\" for the carrier technology \"" + technologyParameters.getLabel() + + "\", specified event producer plugin class \"" + producerPluginClass + + "\" is not an instance of \"" + ApexEventProducer.class.getCanonicalName() + "\""; + LOGGER.error(errorMessage); + throw new ApexEventException(errorMessage); + } + + return (ApexEventProducer) producerPluginObject; + } +} diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/EventProtocolFactory.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/EventProtocolFactory.java new file mode 100644 index 000000000..85c5bf03f --- /dev/null +++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/EventProtocolFactory.java @@ -0,0 +1,76 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.service.engine.event.impl; + +import org.onap.policy.apex.service.engine.event.ApexEventProtocolConverter; +import org.onap.policy.apex.service.engine.event.ApexEventRuntimeException; +import org.onap.policy.apex.service.parameters.eventprotocol.EventProtocolParameters; +import org.slf4j.ext.XLogger; +import org.slf4j.ext.XLoggerFactory; + +/** + * This factory class uses the Apex event protocol parameters to create and return an instance of + * the correct Apex event protocol converter plugin for the specified event protocol. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ +public class EventProtocolFactory { + // The logger for this class + private static final XLogger LOGGER = XLoggerFactory.getXLogger(EventProtocolFactory.class); + + /** + * Create an event converter that converts between an + * {@link org.onap.policy.apex.service.engine.event.ApexEvent} and the specified event protocol. + * + * @param name the name of the event protocol + * @param eventProtocolParameters the event protocol parameters defining what to convert from + * and to + * @return The event converter for converting events to and from Apex format + */ + public ApexEventProtocolConverter createConverter(final String name, + final EventProtocolParameters eventProtocolParameters) { + // Get the class for the event protocol plugin using reflection + final String eventProtocolPluginClass = eventProtocolParameters.getEventProtocolPluginClass(); + Object eventProtocolPluginObject = null; + try { + eventProtocolPluginObject = Class.forName(eventProtocolPluginClass).newInstance(); + } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) { + final String errorMessage = "could not create an Apex event protocol converter for \"" + name + + "\" for the protocol \"" + eventProtocolParameters.getLabel() + + "\", specified event protocol converter plugin class \"" + eventProtocolPluginClass + + "\" not found"; + LOGGER.error(errorMessage, e); + throw new ApexEventRuntimeException(errorMessage, e); + } + + // Check the class is an event consumer + if (!(eventProtocolPluginObject instanceof ApexEventProtocolConverter)) { + final String errorMessage = "could not create an Apex event protocol converter for \"" + name + + "\" for the protocol \"" + eventProtocolParameters.getLabel() + + "\", specified event protocol converter plugin class \"" + eventProtocolPluginClass + + "\" is not an instance of \"" + ApexEventProtocolConverter.class.getCanonicalName() + "\""; + LOGGER.error(errorMessage); + throw new ApexEventRuntimeException(errorMessage); + } + ((ApexEventProtocolConverter) eventProtocolPluginObject).init(eventProtocolParameters); + return (ApexEventProtocolConverter) eventProtocolPluginObject; + } +} diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/apexprotocolplugin/Apex2ApexEventConverter.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/apexprotocolplugin/Apex2ApexEventConverter.java new file mode 100644 index 000000000..b73aeb567 --- /dev/null +++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/apexprotocolplugin/Apex2ApexEventConverter.java @@ -0,0 +1,140 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.service.engine.event.impl.apexprotocolplugin; + +import java.util.ArrayList; +import java.util.List; + +import org.onap.policy.apex.service.engine.event.ApexEvent; +import org.onap.policy.apex.service.engine.event.ApexEventException; +import org.onap.policy.apex.service.engine.event.ApexEventList; +import org.onap.policy.apex.service.engine.event.ApexEventProtocolConverter; +import org.onap.policy.apex.service.engine.event.ApexEventRuntimeException; +import org.onap.policy.apex.service.parameters.eventprotocol.EventProtocolParameters; +import org.slf4j.ext.XLogger; +import org.slf4j.ext.XLoggerFactory; + +/** + * The Class Apex2ApexEventConverter passes through {@link ApexEvent} instances. It is used for + * transferring Apex events directly as POJOs between APEX producers and consumers. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ +public class Apex2ApexEventConverter implements ApexEventProtocolConverter { + private static final XLogger LOGGER = XLoggerFactory.getXLogger(Apex2ApexEventConverter.class); + + /* + * (non-Javadoc) + * + * @see + * org.onap.policy.apex.service.engine.event.ApexEventProtocolConverter#init(org.onap.policy. + * apex. service.parameters.eventprotocol.EventProtocolParameters) + */ + @Override + public void init(final EventProtocolParameters parameters) { + // Check and get the APEX parameters + if (!(parameters instanceof ApexEventProtocolParameters)) { + final String errorMessage = "specified consumer properties are not applicable to the APEX event protocol"; + LOGGER.warn(errorMessage); + throw new ApexEventRuntimeException(errorMessage); + } + } + + /* + * (non-Javadoc) + * + * @see + * org.onap.policy.apex.service.engine.event.ApexEventConverter#toApexEvent(java.lang.String, + * java.lang.Object) + */ + @Override + public List<ApexEvent> toApexEvent(final String eventName, final Object eventObject) throws ApexEventException { + // Check the event eventObject + if (eventObject == null) { + LOGGER.warn("event processing failed, event is null"); + throw new ApexEventException("event processing failed, event is null"); + } + + // The list of events we will return + final List<ApexEvent> eventList = new ArrayList<>(); + + try { + // Check if its a single APEX event + if (!(eventObject instanceof ApexEvent)) { + throw new ApexEventException("incoming event (" + eventObject + ") is not an ApexEvent"); + } + + final ApexEvent event = (ApexEvent) eventObject; + + // Check whether we have any ApexEventList fields, if so this is an event of events and + // all fields should be of type ApexEventList + boolean foundEventListFields = false; + boolean foundOtherFields = false; + for (final Object fieldObject : event.values()) { + if (fieldObject instanceof ApexEventList) { + foundEventListFields = true; + + // Add the events to the event list + eventList.addAll((ApexEventList) fieldObject); + } else { + foundOtherFields = true; + } + } + + // If we found both event list fields and other fields we're in trouble + if (foundEventListFields && foundOtherFields) { + throw new ApexEventException("incoming event (" + eventObject + + ") has both event list fields and other fields, it cannot be processed"); + } + + // Check if the incoming event just has other fields, if so it's just a regular event + // and we add it to the event list as the only event there + if (foundOtherFields) { + eventList.add(event); + } + } catch (final Exception e) { + final String errorString = "Failed to unmarshal APEX event: " + e.getMessage() + ", event=" + eventObject; + LOGGER.warn(errorString, e); + throw new ApexEventException(errorString, e); + } + + // Return the list of events we have unmarshalled + return eventList; + } + + /* + * (non-Javadoc) + * + * @see + * org.onap.policy.apex.service.engine.event.ApexEventConverter#fromApexEvent(org.onap.policy. + * apex.service.engine.event.ApexEvent) + */ + @Override + public Object fromApexEvent(final ApexEvent apexEvent) throws ApexEventException { + // Check the Apex event + if (apexEvent == null) { + LOGGER.warn("event processing failed, Apex event is null"); + throw new ApexEventException("event processing failed, Apex event is null"); + } + + return apexEvent; + } +} diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/apexprotocolplugin/ApexEventProtocolParameters.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/apexprotocolplugin/ApexEventProtocolParameters.java new file mode 100644 index 000000000..10cd58eb7 --- /dev/null +++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/apexprotocolplugin/ApexEventProtocolParameters.java @@ -0,0 +1,58 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.service.engine.event.impl.apexprotocolplugin; + +import org.onap.policy.apex.service.parameters.eventprotocol.EventProtocolParameters; + +/** + * Event protocol parameters for JSON as an event protocol, there are no user defined parameters. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ +public class ApexEventProtocolParameters extends EventProtocolParameters { + /** The label of this event protocol. */ + public static final String APEX_EVENT_PROTOCOL_LABEL = "APEX"; + + /** + * Constructor to create a JSON event protocol parameter instance and register the instance with + * the parameter service. + */ + public ApexEventProtocolParameters() { + this(ApexEventProtocolParameters.class.getCanonicalName(), APEX_EVENT_PROTOCOL_LABEL); + } + + /** + * Constructor to create an event protocol parameters instance with the name of a sub class of + * this class. + * + * @param parameterClassName the class name of a sub class of this class + * @param eventProtocolLabel the name of the event protocol for this plugin + */ + public ApexEventProtocolParameters(final String parameterClassName, final String eventProtocolLabel) { + super(parameterClassName); + + // Set the event protocol properties for the JSON event protocol + this.setLabel(eventProtocolLabel); + + // Set the event protocol plugin class + this.setEventProtocolPluginClass(Apex2ApexEventConverter.class.getCanonicalName()); + } +} diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/apexprotocolplugin/package-info.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/apexprotocolplugin/package-info.java new file mode 100644 index 000000000..a3c7d0d79 --- /dev/null +++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/apexprotocolplugin/package-info.java @@ -0,0 +1,27 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +/** + * Contains the implementation of the APEX event protocol converter plugin for events in Json + * format. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ +package org.onap.policy.apex.service.engine.event.impl.apexprotocolplugin; diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/enevent/ApexEvent2EnEventConverter.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/enevent/ApexEvent2EnEventConverter.java new file mode 100644 index 000000000..90a19fff2 --- /dev/null +++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/enevent/ApexEvent2EnEventConverter.java @@ -0,0 +1,143 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.service.engine.event.impl.enevent; + +import java.util.ArrayList; +import java.util.List; + +import org.onap.policy.apex.service.engine.event.ApexEvent; +import org.onap.policy.apex.service.engine.event.ApexEventConverter; +import org.onap.policy.apex.service.engine.event.ApexEventException; +import org.onap.policy.apex.service.engine.event.ApexEventRuntimeException; +import org.slf4j.ext.XLogger; +import org.slf4j.ext.XLoggerFactory; + +import org.onap.policy.apex.core.engine.engine.ApexEngine; +import org.onap.policy.apex.core.engine.event.EnEvent; +import org.onap.policy.apex.model.basicmodel.concepts.ApexException; +import org.onap.policy.apex.model.basicmodel.service.ModelService; +import org.onap.policy.apex.model.eventmodel.concepts.AxEvent; +import org.onap.policy.apex.model.eventmodel.concepts.AxEvents; + +/** + * The Class ApexEvent2EnEventConverter converts externally facing {@link ApexEvent} instances to + * and from instances of {@link EnEvent} that are used internally in the Apex engine core. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ +public final class ApexEvent2EnEventConverter implements ApexEventConverter { + private static final XLogger LOGGER = XLoggerFactory.getXLogger(ApexEvent2EnEventConverter.class); + + // The Apex engine with its event definitions + private final ApexEngine apexEngine; + + /** + * Set up the event converter. + * + * @param apexEngine The engine to use to create events to be converted + */ + public ApexEvent2EnEventConverter(final ApexEngine apexEngine) { + this.apexEngine = apexEngine; + } + + /* + * (non-Javadoc) + * + * @see + * org.onap.policy.apex.service.engine.event.ApexEventConverter#toApexEvent(java.lang.String, + * java.lang.Object) + */ + @Override + public List<ApexEvent> toApexEvent(final String eventName, final Object event) throws ApexException { + // Check the Engine event + if (event == null) { + LOGGER.warn("event processing failed, engine event is null"); + throw new ApexEventException("event processing failed, engine event is null"); + } + + // Cast the event to an Engine event event, if our conversion is correctly configured, this + // cast should always work + EnEvent enEvent = null; + try { + enEvent = (EnEvent) event; + } catch (final Exception e) { + final String errorMessage = "error transferring event \"" + event + "\" to the Apex engine"; + LOGGER.debug(errorMessage, e); + throw new ApexEventRuntimeException(errorMessage, e); + } + + // Create the Apex event + final AxEvent axEvent = enEvent.getAxEvent(); + final ApexEvent apexEvent = new ApexEvent(axEvent.getKey().getName(), axEvent.getKey().getVersion(), + axEvent.getNameSpace(), axEvent.getSource(), axEvent.getTarget()); + + // Copy the ExecutionID from the EnEvent into the ApexEvent + apexEvent.setExecutionID(enEvent.getExecutionID()); + + // Copy he exception message to the Apex event if it is set + if (enEvent.getExceptionMessage() != null) { + apexEvent.setExceptionMessage(enEvent.getExceptionMessage()); + } + + // Set the data on the apex event + apexEvent.putAll(enEvent); + + // Return the event in a single element + final ArrayList<ApexEvent> eventList = new ArrayList<ApexEvent>(); + eventList.add(apexEvent); + return eventList; + } + + /* + * (non-Javadoc) + * + * @see + * org.onap.policy.apex.service.engine.event.ApexEventConverter#fromApexEvent(org.onap.policy. + * apex.service.engine.event.ApexEvent) + */ + @Override + public EnEvent fromApexEvent(final ApexEvent apexEvent) throws ApexException { + // Check the Apex model + if (apexEngine == null) { + LOGGER.warn("event processing failed, apex engine is null"); + throw new ApexEventException("event processing failed, apex engine is null"); + } + + // Get the event definition + final AxEvent eventDefinition = ModelService.getModel(AxEvents.class).get(apexEvent.getName()); + if (eventDefinition == null) { + LOGGER.warn("event processing failed, event \"" + apexEvent.getName() + "\" not found in apex model"); + throw new ApexEventException( + "event processing failed, event \"" + apexEvent.getName() + "\" not found in apex model"); + } + + // Create the internal engine event + final EnEvent enEvent = apexEngine.createEvent(eventDefinition.getKey()); + + // Set the data on the engine event + enEvent.putAll(apexEvent); + + // copy the ExecutionID from the ApexEvent into the EnEvent + enEvent.setExecutionID(apexEvent.getExecutionID()); + + return enEvent; + } +} diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/enevent/package-info.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/enevent/package-info.java new file mode 100644 index 000000000..6bc6bc2b3 --- /dev/null +++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/enevent/package-info.java @@ -0,0 +1,28 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +/** + * Provides conversion between externally facing + * {@link org.onap.policy.apex.service.engine.event.ApexEvent} instances and internal + * {@link org.onap.policy.apex.core.engine.event.EnEvent} instances. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ +package org.onap.policy.apex.service.engine.event.impl.enevent; diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/eventrequestor/EventRequestorCarrierTechnologyParameters.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/eventrequestor/EventRequestorCarrierTechnologyParameters.java new file mode 100644 index 000000000..fb722ea2f --- /dev/null +++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/eventrequestor/EventRequestorCarrierTechnologyParameters.java @@ -0,0 +1,67 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.service.engine.event.impl.eventrequestor; + +import org.onap.policy.apex.service.parameters.carriertechnology.CarrierTechnologyParameters; + +/** + * This class holds the parameters that allows an output event to to be sent back into APEX as one + * or multiple input events, there are no user defined parameters. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ +public class EventRequestorCarrierTechnologyParameters extends CarrierTechnologyParameters { + // @formatter:off + /** The label of this carrier technology. */ + public static final String EVENT_REQUESTOR_CARRIER_TECHNOLOGY_LABEL = "EVENT_REQUESTOR"; + + /** The producer plugin class for the EVENT_REQUESTOR carrier technology. */ + public static final String EVENT_REQUESTOR_EVENT_PRODUCER_PLUGIN_CLASS = + EventRequestorProducer.class.getCanonicalName(); + + /** The consumer plugin class for the EVENT_REQUESTOR carrier technology. */ + public static final String EVENT_REQUESTOR_EVENT_CONSUMER_PLUGIN_CLASS = + EventRequestorConsumer.class.getCanonicalName(); + // @formatter:on + + /** + * Constructor to create an event requestor carrier technology parameters instance and register + * the instance with the parameter service. + */ + public EventRequestorCarrierTechnologyParameters() { + super(EventRequestorCarrierTechnologyParameters.class.getCanonicalName()); + + // Set the carrier technology properties for the EVENT_REQUESTOR carrier technology + this.setLabel(EVENT_REQUESTOR_CARRIER_TECHNOLOGY_LABEL); + this.setEventProducerPluginClass(EVENT_REQUESTOR_EVENT_PRODUCER_PLUGIN_CLASS); + this.setEventConsumerPluginClass(EVENT_REQUESTOR_EVENT_CONSUMER_PLUGIN_CLASS); + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.apps.uservice.parameters.ApexParameterValidator#validate() + */ + @Override + public String validate() { + return ""; + } +} diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/eventrequestor/EventRequestorConsumer.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/eventrequestor/EventRequestorConsumer.java new file mode 100644 index 000000000..b472cc9c7 --- /dev/null +++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/eventrequestor/EventRequestorConsumer.java @@ -0,0 +1,218 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.service.engine.event.impl.eventrequestor; + +import java.util.EnumMap; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import org.onap.policy.apex.core.infrastructure.threading.ApplicationThreadFactory; +import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities; +import org.onap.policy.apex.service.engine.event.ApexEventConsumer; +import org.onap.policy.apex.service.engine.event.ApexEventException; +import org.onap.policy.apex.service.engine.event.ApexEventReceiver; +import org.onap.policy.apex.service.engine.event.ApexEventRuntimeException; +import org.onap.policy.apex.service.engine.event.PeeredReference; +import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters; +import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class implements an Apex event consumer that receives events from its peered event requestor + * producer. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ +public class EventRequestorConsumer implements ApexEventConsumer, Runnable { + // Get a reference to the logger + private static final Logger LOGGER = LoggerFactory.getLogger(EventRequestorConsumer.class); + + // The amount of time to wait in milliseconds between checks that the consumer thread has + // stopped + private static final long EVENT_REQUESTOR_WAIT_SLEEP_TIME = 50; + + // The event receiver that will receive events from this consumer + private ApexEventReceiver eventReceiver; + + // The name for this consumer + private String name = null; + + // The peer references for this event handler + private final Map<EventHandlerPeeredMode, PeeredReference> peerReferenceMap = + new EnumMap<>(EventHandlerPeeredMode.class); + + // Temporary request holder for incoming event send requests + private final BlockingQueue<Object> incomingEventRequestQueue = new LinkedBlockingQueue<>(); + + // The consumer thread and stopping flag + private Thread consumerThread; + private boolean stopOrderedFlag = false; + + // The number of events received to date + private int eventsReceived = 0; + + @Override + public void init(final String consumerName, final EventHandlerParameters consumerParameters, + final ApexEventReceiver incomingEventReceiver) throws ApexEventException { + this.eventReceiver = incomingEventReceiver; + this.name = consumerName; + + // Check and get the event requestor consumer properties + if (!(consumerParameters + .getCarrierTechnologyParameters() instanceof EventRequestorCarrierTechnologyParameters)) { + final String errorMessage = + "specified consumer properties are not applicable to event Requestor consumer (" + this.name + ")"; + LOGGER.warn(errorMessage); + throw new ApexEventException(errorMessage); + } + + // Check if we are in peered mode + if (!consumerParameters.isPeeredMode(EventHandlerPeeredMode.REQUESTOR)) { + final String errorMessage = "event Requestor consumer (" + this.name + + ") must run in peered requestor mode with a event Requestor producer"; + LOGGER.warn(errorMessage); + throw new ApexEventException(errorMessage); + } + + } + + /** + * Receive an incoming event send request from the peered event Requestor producer and queue it + * + * @param eventObject the incoming event to process + * @throws ApexEventRuntimeException on queueing errors + */ + public void processEvent(final Object eventObject) { + // Push the event onto the queue for handling + try { + incomingEventRequestQueue.add(eventObject); + } catch (final Exception e) { + final String errorMessage = + "could not queue request \"" + eventObject + "\" on event Requestor consumer (" + this.name + ")"; + LOGGER.warn(errorMessage); + throw new ApexEventRuntimeException(errorMessage); + } + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.service.engine.event.ApexEventConsumer#start() + */ + @Override + public void start() { + // Configure and start the event reception thread + final String threadName = this.getClass().getName() + ":" + this.name; + consumerThread = new ApplicationThreadFactory(threadName).newThread(this); + consumerThread.setDaemon(true); + consumerThread.start(); + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.service.engine.event.ApexEventConsumer#getName() + */ + @Override + public String getName() { + return name; + } + + /** + * Get the number of events received to date + * + * @return the number of events received + */ + public int getEventsReceived() { + return eventsReceived; + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.service.engine.event.ApexEventConsumer#getPeeredReference(org.onap. + * policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode) + */ + @Override + public PeeredReference getPeeredReference(final EventHandlerPeeredMode peeredMode) { + return peerReferenceMap.get(peeredMode); + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.service.engine.event.ApexEventConsumer#setPeeredReference(org.onap. + * policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode, + * org.onap.policy.apex.service.engine.event.PeeredReference) + */ + @Override + public void setPeeredReference(final EventHandlerPeeredMode peeredMode, final PeeredReference peeredReference) { + peerReferenceMap.put(peeredMode, peeredReference); + } + + /* + * (non-Javadoc) + * + * @see java.lang.Runnable#run() + */ + @Override + public void run() { + // The endless loop that receives events using REST calls + while (consumerThread.isAlive() && !stopOrderedFlag) { + try { + // Take the next event from the queue + final Object eventObject = + incomingEventRequestQueue.poll(EVENT_REQUESTOR_WAIT_SLEEP_TIME, TimeUnit.MILLISECONDS); + if (eventObject == null) { + // Poll timed out, wait again + continue; + } + + // Send the event into Apex + eventReceiver.receiveEvent(eventObject); + + eventsReceived++; + } catch (final InterruptedException e) { + LOGGER.debug("Thread interrupted, Reason {}", e.getMessage()); + Thread.currentThread().interrupt(); + } catch (final Exception e) { + LOGGER.warn("error receiving events on thread {}", consumerThread.getName(), e); + } + } + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.apps.uservice.producer.ApexEventConsumer#stop() + */ + @Override + public void stop() { + stopOrderedFlag = true; + + while (consumerThread.isAlive()) { + ThreadUtilities.sleep(EVENT_REQUESTOR_WAIT_SLEEP_TIME); + } + } +} diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/eventrequestor/EventRequestorProducer.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/eventrequestor/EventRequestorProducer.java new file mode 100644 index 000000000..4a972f2ce --- /dev/null +++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/eventrequestor/EventRequestorProducer.java @@ -0,0 +1,178 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.service.engine.event.impl.eventrequestor; + +import java.util.EnumMap; +import java.util.Map; + +import org.onap.policy.apex.service.engine.event.ApexEventConsumer; +import org.onap.policy.apex.service.engine.event.ApexEventException; +import org.onap.policy.apex.service.engine.event.ApexEventProducer; +import org.onap.policy.apex.service.engine.event.ApexEventRuntimeException; +import org.onap.policy.apex.service.engine.event.PeeredReference; +import org.onap.policy.apex.service.engine.event.SynchronousEventCache; +import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters; +import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Concrete implementation of an Apex event producer that sends one or more events to its peered + * event requestor consumer. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + * + */ +public class EventRequestorProducer implements ApexEventProducer { + private static final Logger LOGGER = LoggerFactory.getLogger(EventRequestorProducer.class); + + // The name for this producer + private String name = null; + + // The peer references for this event handler + private final Map<EventHandlerPeeredMode, PeeredReference> peerReferenceMap = + new EnumMap<>(EventHandlerPeeredMode.class); + + // The number of events sent + private int eventsSent = 0; + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#init(java.lang.String, + * org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters) + */ + @Override + public void init(final String producerName, final EventHandlerParameters producerParameters) + throws ApexEventException { + this.name = producerName; + + // Check and get the producer Properties + if (!(producerParameters + .getCarrierTechnologyParameters() instanceof EventRequestorCarrierTechnologyParameters)) { + final String errorMessage = + "specified consumer properties are not applicable to event requestor producer (" + this.name + ")"; + LOGGER.warn(errorMessage); + throw new ApexEventException(errorMessage); + } + + // Check if we are in peered mode + if (!producerParameters.isPeeredMode(EventHandlerPeeredMode.REQUESTOR)) { + final String errorMessage = "Event Requestor producer (" + this.name + + ") must run in peered requestor mode with a Event Requestor consumer"; + LOGGER.warn(errorMessage); + throw new ApexEventException(errorMessage); + } + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#getName() + */ + @Override + public String getName() { + return name; + } + + /** + * Get the number of events sent to date + * + * @return the number of events received + */ + public int getEventsSent() { + return eventsSent; + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#getPeeredReference(org.onap. + * policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode) + */ + @Override + public PeeredReference getPeeredReference(final EventHandlerPeeredMode peeredMode) { + return peerReferenceMap.get(peeredMode); + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#setPeeredReference(org.onap. + * policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode, + * org.onap.policy.apex.service.engine.event.PeeredReference) + */ + @Override + public void setPeeredReference(final EventHandlerPeeredMode peeredMode, final PeeredReference peeredReference) { + peerReferenceMap.put(peeredMode, peeredReference); + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#sendEvent(long, java.lang. + * String, java.lang.Object) + */ + @Override + public void sendEvent(final long executionId, final String eventName, final Object eventObject) { + // Check if this is a synchronized event, if so we have received a reply + final SynchronousEventCache synchronousEventCache = + (SynchronousEventCache) peerReferenceMap.get(EventHandlerPeeredMode.SYNCHRONOUS); + if (synchronousEventCache != null) { + synchronousEventCache.removeCachedEventToApexIfExists(executionId); + } + + // Find the peered consumer for this producer + final PeeredReference peeredRequestorReference = peerReferenceMap.get(EventHandlerPeeredMode.REQUESTOR); + if (peeredRequestorReference != null) { + // Find the event Response Consumer that will handle this request + final ApexEventConsumer consumer = peeredRequestorReference.getPeeredConsumer(); + if (!(consumer instanceof EventRequestorConsumer)) { + final String errorMessage = "send of event to event consumer \"" + + peeredRequestorReference.getPeeredConsumer() + "\" failed," + + " event response consumer is not an instance of EventRequestorConsumer\n" + eventObject; + LOGGER.warn(errorMessage); + throw new ApexEventRuntimeException(errorMessage); + } + + // Use the consumer to handle this event + final EventRequestorConsumer eventRequstConsumer = (EventRequestorConsumer) consumer; + eventRequstConsumer.processEvent(eventObject); + + eventsSent++; + } else { + // No peered consumer defined + final String errorMessage = "send of event failed, event response consumer is not defined\n" + eventObject; + LOGGER.warn(errorMessage); + throw new ApexEventRuntimeException(errorMessage); + } + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#stop() + */ + @Override + public void stop() { + // For event requestor, all the implementation is in the consumer + } +} diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/eventrequestor/package-info.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/eventrequestor/package-info.java new file mode 100644 index 000000000..3b6da08a4 --- /dev/null +++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/eventrequestor/package-info.java @@ -0,0 +1,26 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +/** + * Implements the Event Requestor carrier technology for multiple event input from an output event. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ +package org.onap.policy.apex.service.engine.event.impl.eventrequestor; diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/FILECarrierTechnologyParameters.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/FILECarrierTechnologyParameters.java new file mode 100644 index 000000000..f7f25cb9e --- /dev/null +++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/FILECarrierTechnologyParameters.java @@ -0,0 +1,208 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.service.engine.event.impl.filecarrierplugin; + +import org.onap.policy.apex.model.utilities.ResourceUtils; +import org.onap.policy.apex.service.engine.event.impl.filecarrierplugin.consumer.ApexFileEventConsumer; +import org.onap.policy.apex.service.engine.event.impl.filecarrierplugin.producer.ApexFileEventProducer; +import org.onap.policy.apex.service.parameters.carriertechnology.CarrierTechnologyParameters; + +/** + * This class holds the parameters that allows transport of events into and out of Apex using files + * and standard input and output. + * + * <p> + * The following parameters are defined: + * <ol> + * <li>fileName: The full path to the file from which to read events or to which to write events. + * <li>standardIO: If this flag is set to true, then standard input is used to read events in or + * standard output is used to write events and the fileName parameter is ignored if present + * <li>standardError: If this flag is set to true, then standard error is used to write events + * <li>streamingMode: If this flag is set to true, then streaming mode is set for reading events and + * event handling will wait on the input stream for events until the stream is closed. If streaming + * model is off, then event reading completes when the end of input is detected. + * <li>startDelay: The amount of milliseconds to wait at startup startup before processing the first + * event. + * </ol> + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ +public class FILECarrierTechnologyParameters extends CarrierTechnologyParameters { + // @formatter:off + /** The label of this carrier technology. */ + public static final String FILE_CARRIER_TECHNOLOGY_LABEL = "FILE"; + + /** The producer plugin class for the FILE carrier technology. */ + public static final String FILE_EVENT_PRODUCER_PLUGIN_CLASS = ApexFileEventProducer.class.getCanonicalName(); + + /** The consumer plugin class for the FILE carrier technology. */ + public static final String FILE_EVENT_CONSUMER_PLUGIN_CLASS = ApexFileEventConsumer.class.getCanonicalName(); + + private String fileName; + private boolean standardIO = false; + private boolean standardError = false; + private boolean streamingMode = false; + private long startDelay = 0; + // @formatter:on + + /** + * Constructor to create a file carrier technology parameters instance and register the instance + * with the parameter service. + */ + public FILECarrierTechnologyParameters() { + super(FILECarrierTechnologyParameters.class.getCanonicalName()); + + // Set the carrier technology properties for the FILE carrier technology + this.setLabel(FILE_CARRIER_TECHNOLOGY_LABEL); + this.setEventProducerPluginClass(FILE_EVENT_PRODUCER_PLUGIN_CLASS); + this.setEventConsumerPluginClass(FILE_EVENT_CONSUMER_PLUGIN_CLASS); + } + + /** + * Gets the file name from which to read or to which to write events. + * + * @return the file name from which to read or to which to write events + */ + public String getFileName() { + return ResourceUtils.getFilePath4Resource(fileName); + } + + /** + * Checks if is standard IO should be used for input or output. + * + * @return true, if standard IO should be used for input or output + */ + public boolean isStandardIO() { + return standardIO; + } + + /** + * Checks if is standard error should be used for output. + * + * @return true, if standard error should be used for output + */ + public boolean isStandardError() { + return standardError; + } + + /** + * Checks if is streaming mode is on. + * + * @return true, if streaming mode is on + */ + public boolean isStreamingMode() { + return streamingMode; + } + + /** + * Sets the file name from which to read or to which to write events. + * + * @param fileName the file name from which to read or to which to write events + */ + public void setFileName(final String fileName) { + this.fileName = fileName; + } + + /** + * Sets if standard IO should be used for event input or output. + * + * @param standardIO if standard IO should be used for event input or output + */ + public void setStandardIO(final boolean standardIO) { + this.standardIO = standardIO; + } + + /** + * Sets if standard error should be used for event output. + * + * @param standardError if standard error should be used for event output + */ + public void setStandardError(final boolean standardError) { + this.standardError = standardError; + } + + /** + * Sets streaming mode. + * + * @param streamingMode the streaming mode value + */ + public void setStreamingMode(final boolean streamingMode) { + this.streamingMode = streamingMode; + } + + /** + * Gets the delay in milliseconds before the plugin starts processing + * + * @return the delay + */ + public long getStartDelay() { + return startDelay; + } + + /** + * Sets the delay in milliseconds before the plugin starts processing + * + * @param startDelay the delay + */ + public void setStartDelay(final long startDelay) { + this.startDelay = startDelay; + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.service.parameters.carriertechnology.CarrierTechnologyParameters# + * toString() + */ + @Override + public String toString() { + return "FILECarrierTechnologyParameters [fileName=" + fileName + ", standardIO=" + standardIO + + ", standardError=" + standardError + ", streamingMode=" + streamingMode + ", startDelay=" + startDelay + + "]"; + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.apps.uservice.parameters.ApexParameterValidator#validate() + */ + @Override + public String validate() { + final StringBuilder errorMessageBuilder = new StringBuilder(); + + errorMessageBuilder.append(super.validate()); + + if (!standardIO && !standardError && (fileName == null || fileName.trim().length() == 0)) { + errorMessageBuilder.append( + " fileName not specified or is blank or null, it must be specified as a valid file location\n"); + } + + if (standardIO || standardError) { + streamingMode = true; + } + + if (startDelay < 0) { + errorMessageBuilder.append(" startDelay must be zero or a positive number of milliseconds\n"); + } + + return errorMessageBuilder.toString(); + } +} diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/consumer/ApexFileEventConsumer.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/consumer/ApexFileEventConsumer.java new file mode 100644 index 000000000..7521c3a08 --- /dev/null +++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/consumer/ApexFileEventConsumer.java @@ -0,0 +1,247 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.service.engine.event.impl.filecarrierplugin.consumer; + +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.EnumMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + +import org.onap.policy.apex.core.infrastructure.threading.ApplicationThreadFactory; +import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities; +import org.onap.policy.apex.service.engine.event.ApexEventConsumer; +import org.onap.policy.apex.service.engine.event.ApexEventException; +import org.onap.policy.apex.service.engine.event.ApexEventReceiver; +import org.onap.policy.apex.service.engine.event.PeeredReference; +import org.onap.policy.apex.service.engine.event.impl.filecarrierplugin.FILECarrierTechnologyParameters; +import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters; +import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Concrete implementation an Apex event consumer that reads events from a file. This consumer also + * implements ApexEventProducer and therefore can be used as a synchronous consumer. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ +public class ApexFileEventConsumer implements ApexEventConsumer, Runnable { + + // Get a reference to the logger + private static final Logger LOGGER = LoggerFactory.getLogger(ApexFileEventConsumer.class); + + // The input stream to read events from + private InputStream eventInputStream; + + // The text block reader that will read text blocks from the contents of the file + private TextBlockReader textBlockReader; + + // The event receiver that will receive asynchronous events from this consumer + private ApexEventReceiver eventReceiver = null; + + // The consumer thread and stopping flag + private Thread consumerThread; + + // The name for this consumer + private String consumerName = null; + + // The specific carrier technology parameters for this consumer + private FILECarrierTechnologyParameters fileCarrierTechnologyParameters; + + // The peer references for this event handler + private final Map<EventHandlerPeeredMode, PeeredReference> peerReferenceMap = + new EnumMap<>(EventHandlerPeeredMode.class); + + // Holds the next identifier for event execution. + private static AtomicLong nextExecutionID = new AtomicLong(0L); + + /** + * Private utility to get the next candidate value for a Execution ID. This value will always be + * unique in a single JVM + * + * @return the next candidate value for a Execution ID + */ + private static synchronized long getNextExecutionID() { + return nextExecutionID.getAndIncrement(); + } + + /* + * (non-Javadoc) + * + * @see + * org.onap.policy.apex.apps.uservice.consumer.ApexEventConsumer#init(org.onap.policy.apex.apps. + * uservice.consumer.ApexEventReceiver) + */ + @Override + public void init(final String name, final EventHandlerParameters consumerParameters, + final ApexEventReceiver incomingEventReceiver) throws ApexEventException { + this.eventReceiver = incomingEventReceiver; + this.consumerName = name; + + // Get and check the Apex parameters from the parameter service + if (consumerParameters == null) { + final String errorMessage = "Consumer parameters for ApexFileConsumer \"" + consumerName + "\" is null"; + LOGGER.warn(errorMessage); + throw new ApexEventException(errorMessage); + } + + // Check and get the file Properties + if (!(consumerParameters.getCarrierTechnologyParameters() instanceof FILECarrierTechnologyParameters)) { + final String errorMessage = "specified consumer properties for ApexFileConsumer \"" + consumerName + + "\" are not applicable to a File consumer"; + LOGGER.warn(errorMessage); + throw new ApexEventException(errorMessage); + } + fileCarrierTechnologyParameters = + (FILECarrierTechnologyParameters) consumerParameters.getCarrierTechnologyParameters(); + + // Open the file producing events + try { + if (fileCarrierTechnologyParameters.isStandardIO()) { + eventInputStream = System.in; + } else { + eventInputStream = new FileInputStream(fileCarrierTechnologyParameters.getFileName()); + } + + // Get an event composer for our event source + textBlockReader = new TextBlockReaderFactory().getTaggedReader(eventInputStream, + consumerParameters.getEventProtocolParameters()); + } catch (final IOException e) { + final String errorMessage = "ApexFileConsumer \"" + consumerName + "\" failed to open file for reading: \"" + + fileCarrierTechnologyParameters.getFileName() + "\""; + LOGGER.warn(errorMessage, e); + throw new ApexEventException(errorMessage, e); + } + + if (fileCarrierTechnologyParameters.getStartDelay() > 0) { + ThreadUtilities.sleep(fileCarrierTechnologyParameters.getStartDelay()); + } + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.service.engine.event.ApexEventConsumer#getName() + */ + @Override + public String getName() { + return consumerName; + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.service.engine.event.ApexEventConsumer#getPeeredReference(org.onap. + * policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode) + */ + @Override + public PeeredReference getPeeredReference(final EventHandlerPeeredMode peeredMode) { + return peerReferenceMap.get(peeredMode); + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.service.engine.event.ApexEventConsumer#setPeeredReference(org.onap. + * policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode, + * org.onap.policy.apex.service.engine.event.PeeredReference) + */ + @Override + public void setPeeredReference(final EventHandlerPeeredMode peeredMode, final PeeredReference peeredReference) { + peerReferenceMap.put(peeredMode, peeredReference); + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.service.engine.event.ApexEventConsumer#start() + */ + @Override + public void start() { + // Configure and start the event reception thread + final String threadName = this.getClass().getName() + " : " + consumerName; + consumerThread = new ApplicationThreadFactory(threadName).newThread(this); + consumerThread.setDaemon(true); + consumerThread.start(); + } + + /* + * (non-Javadoc) + * + * @see java.lang.Runnable#run() + */ + @Override + public void run() { + // Check that we have been initialized in async or sync mode + if (eventReceiver == null) { + LOGGER.warn("\"{}\" has not been initilaized for either asynchronous or synchronous event handling", + consumerName); + return; + } + + // Read the events from the file while there are still events in the file + try { + // Read all the text blocks + TextBlock textBlock; + do { + // Read the text block + textBlock = textBlockReader.readTextBlock(); + + // Process the event from the text block if there is one there + if (textBlock.getText() != null) { + eventReceiver.receiveEvent(getNextExecutionID(), textBlock.getText()); + } + } while (!textBlock.isEndOfText()); + } catch (final Exception e) { + LOGGER.warn("\"" + consumerName + "\" failed to read event from file: \"" + + fileCarrierTechnologyParameters.getFileName() + "\"", e); + } finally { + try { + eventInputStream.close(); + } catch (final IOException e) { + LOGGER.warn("ApexFileConsumer \"" + consumerName + "\" failed to close file: \"" + + fileCarrierTechnologyParameters.getFileName() + "\"", e); + } + } + + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.apps.uservice.producer.ApexEventProducer#stop() + */ + @Override + public void stop() { + try { + eventInputStream.close(); + } catch (final IOException e) { + LOGGER.warn("ApexFileConsumer \"" + consumerName + "\" failed to close file for reading: \"" + + fileCarrierTechnologyParameters.getFileName() + "\"", e); + } + + if (consumerThread.isAlive() && !consumerThread.isInterrupted()) { + consumerThread.interrupt(); + } + } +} diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/consumer/CharacterDelimitedTextBlockReader.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/consumer/CharacterDelimitedTextBlockReader.java new file mode 100644 index 000000000..b286f8afe --- /dev/null +++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/consumer/CharacterDelimitedTextBlockReader.java @@ -0,0 +1,141 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.service.engine.event.impl.filecarrierplugin.consumer; + +import java.io.IOException; +import java.io.InputStream; + +import org.onap.policy.apex.service.parameters.eventprotocol.EventProtocolTextCharDelimitedParameters; +import org.slf4j.ext.XLogger; +import org.slf4j.ext.XLoggerFactory; + +/** + * The class CharacterDelimitedTextBlockReader reads the next block of text between two character + * tags from an input stream. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ +public class CharacterDelimitedTextBlockReader implements TextBlockReader { + // The logger for this class + private static final XLogger LOGGER = XLoggerFactory.getXLogger(CharacterDelimitedTextBlockReader.class); + + // The character tags + private final char startTagChar; + private final char endTagChar; + + // The input stream for text + private InputStream inputStream; + + // Flag indicating we have seen EOF on the stream + private boolean eofOnInputStream = false; + + /** + * Constructor, set the delimiters. + * + * @param startTagChar The start tag for text blocks + * @param endTagChar The end tag for text blocks + */ + public CharacterDelimitedTextBlockReader(final char startTagChar, final char endTagChar) { + this.startTagChar = startTagChar; + this.endTagChar = endTagChar; + } + + /** + * Constructor, set the delimiters from a character delimited event protocol parameter class. + * + * @param charDelimitedParameters the character delimited event protocol parameter class + */ + public CharacterDelimitedTextBlockReader(final EventProtocolTextCharDelimitedParameters charDelimitedParameters) { + this.startTagChar = charDelimitedParameters.getStartChar(); + this.endTagChar = charDelimitedParameters.getEndChar(); + } + + /* + * (non-Javadoc) + * + * @see + * org.onap.policy.apex.service.engine.event.impl.filecarrierplugin.consumer.TextBlockReader#init( + * java.io.InputStream) + */ + @Override + public void init(final InputStream incomingInputStream) { + this.inputStream = incomingInputStream; + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.service.engine.event.impl.filecarrierplugin.consumer.TextBlockReader# + * readTextBlock() + */ + @Override + public TextBlock readTextBlock() throws IOException { + // Check if there was a previous end of a text block with a non-empty text block returned + if (eofOnInputStream) { + return new TextBlock(eofOnInputStream, null); + } + + // The initial nesting level of incoming text blocks is always zero + int nestingLevel = 0; + + // Holder for the text block + final StringBuilder textBlockBuilder = new StringBuilder(); + + // Read the next text block + while (true) { + final char nextChar = (char) inputStream.read(); + + // Check for EOF + if (nextChar == (char) -1) { + eofOnInputStream = true; + break; + } + + if (nextChar == startTagChar) { + nestingLevel++; + } else if (nestingLevel == 0 && !Character.isWhitespace(nextChar)) { + LOGGER.warn("invalid input on consumer: " + nextChar); + continue; + } + + textBlockBuilder.append(nextChar); + + // Check for end of the text block, we have come back to level 0 + if (nextChar == endTagChar) { + if (nestingLevel > 0) { + nestingLevel--; + } + + if (nestingLevel == 0) { + break; + } + } + } + + // Condition the text block and return it + final String textBlock = textBlockBuilder.toString().trim(); + if (textBlock.length() > 0) { + return new TextBlock(eofOnInputStream, textBlock); + } else { + return new TextBlock(eofOnInputStream, null); + } + } +} diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/consumer/HeaderDelimitedTextBlockReader.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/consumer/HeaderDelimitedTextBlockReader.java new file mode 100644 index 000000000..e40bc756c --- /dev/null +++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/consumer/HeaderDelimitedTextBlockReader.java @@ -0,0 +1,167 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.service.engine.event.impl.filecarrierplugin.consumer; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.Queue; +import java.util.concurrent.LinkedBlockingQueue; + +import org.onap.policy.apex.core.infrastructure.threading.ApplicationThreadFactory; +import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities; +import org.onap.policy.apex.service.parameters.eventprotocol.EventProtocolTextTokenDelimitedParameters; +import org.slf4j.ext.XLogger; +import org.slf4j.ext.XLoggerFactory; + +/** + * The Class TextBlockReader reads the next block of text from an input stream. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ +public class HeaderDelimitedTextBlockReader implements TextBlockReader, Runnable { + // The logger for this class + private static final XLogger LOGGER = XLoggerFactory.getXLogger(HeaderDelimitedTextBlockReader.class); + + // The amount of time to wait for input on the text block reader + private static final long TEXT_BLOCK_DELAY = 250; + + // Tag for the start of a text block + private final String blockStartToken; + + // The input stream for text + private InputStream inputStream; + + // The lines of input read from the input stream + private final Queue<String> textLineQueue = new LinkedBlockingQueue<>(); + + // The thread used to read text from the input stream + private Thread textConsumputionThread; + + // True while EOF has not been seen on input + private boolean eofOnInputStream = false; + + /** + * Constructor, initialize the text block reader. + * + * @param blockStartToken the block start token for the start of a text block + */ + public HeaderDelimitedTextBlockReader(final String blockStartToken) { + this.blockStartToken = blockStartToken; + } + + /** + * Constructor, initialize the text block reader using token delimited event protocol + * parameters. + * + * @param tokenDelimitedParameters the token delimited event protocol parameters + */ + public HeaderDelimitedTextBlockReader(final EventProtocolTextTokenDelimitedParameters tokenDelimitedParameters) { + this.blockStartToken = tokenDelimitedParameters.getDelimiterToken(); + } + + /* + * (non-Javadoc) + * + * @see + * org.onap.policy.apex.service.engine.event.impl.filecarrierplugin.consumer.TextBlockReader# + * init( java.io.InputStream) + */ + @Override + public void init(final InputStream incomingInputStream) { + this.inputStream = incomingInputStream; + + // Configure and start the text reading thread + textConsumputionThread = new ApplicationThreadFactory(this.getClass().getName()).newThread(this); + textConsumputionThread.setDaemon(true); + textConsumputionThread.start(); + } + + /* + * (non-Javadoc) + * + * @see + * org.onap.policy.apex.service.engine.event.impl.filecarrierplugin.consumer.TextBlockReader# + * readTextBlock() + */ + @Override + public TextBlock readTextBlock() throws IOException { + // Holder for the current text block + final StringBuilder textBlockBuilder = new StringBuilder(); + + // Wait for the timeout period if there is no input + if (!eofOnInputStream && textLineQueue.size() == 0) { + ThreadUtilities.sleep(TEXT_BLOCK_DELAY); + } + + // Scan the lines in the queue + while (textLineQueue.size() > 0) { + // Scroll down in the available lines looking for the start of the text block + if (textLineQueue.peek().startsWith(blockStartToken)) { + // Process the input line header + textBlockBuilder.append(textLineQueue.remove()); + textBlockBuilder.append('\n'); + break; + } else { + LOGGER.warn("invalid input on consumer: " + textLineQueue.remove()); + } + } + + // Get the rest of the text document + while (textLineQueue.size() > 0 && !textLineQueue.peek().startsWith(blockStartToken)) { + textBlockBuilder.append(textLineQueue.remove()); + textBlockBuilder.append('\n'); + } + + // Condition the text block and return it + final String textBlock = textBlockBuilder.toString().trim(); + final boolean endOfText = (eofOnInputStream && textLineQueue.size() == 0 ? true : false); + + if (textBlock.length() > 0) { + return new TextBlock(endOfText, textBlock); + } else { + return new TextBlock(endOfText, null); + } + } + + /* + * (non-Javadoc) + * + * @see java.lang.Runnable#run() + */ + @Override + public void run() { + final BufferedReader textReader = new BufferedReader(new InputStreamReader(inputStream)); + + try { + // Read the input line by line until we see end of file on the stream + String line; + while ((line = textReader.readLine()) != null) { + textLineQueue.add(line); + } + } catch (final IOException e) { + LOGGER.warn("I/O exception on text input on consumer: ", e); + } finally { + eofOnInputStream = true; + } + } +} diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/consumer/TextBlock.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/consumer/TextBlock.java new file mode 100644 index 000000000..526d9c318 --- /dev/null +++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/consumer/TextBlock.java @@ -0,0 +1,78 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.service.engine.event.impl.filecarrierplugin.consumer; + +/** + * This class is a bean that holds a block of text read from an incoming text file. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ +public class TextBlock { + private boolean endOfText = false; + private String text; + + /** + * Constructor to initiate the text block. + * + * @param endOfText the end of text + * @param text the text + */ + public TextBlock(final boolean endOfText, final String text) { + this.endOfText = endOfText; + this.text = text; + } + + /** + * Checks if is end of text. + * + * @return true, if checks if is end of text + */ + public boolean isEndOfText() { + return endOfText; + } + + /** + * Sets whether end of text has been reached. + * + * @param endOfText the end of text flag value + */ + public void setEndOfText(final boolean endOfText) { + this.endOfText = endOfText; + } + + /** + * Gets the text of the text block. + * + * @return the text of the text block + */ + public String getText() { + return text; + } + + /** + * Sets the text of the text block. + * + * @param text the text of the text block + */ + public void setText(final String text) { + this.text = text; + } +} diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/consumer/TextBlockReader.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/consumer/TextBlockReader.java new file mode 100644 index 000000000..627718402 --- /dev/null +++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/consumer/TextBlockReader.java @@ -0,0 +1,46 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.service.engine.event.impl.filecarrierplugin.consumer; + +import java.io.IOException; +import java.io.InputStream; + +/** + * Implementers of the interface TextBlockReader read the next block of text from an input stream. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ +public interface TextBlockReader { + /** + * Initialize the text block reader reader. + * + * @param inputStream The stream to read from + */ + void init(InputStream inputStream); + + /** + * Read a block of text between two delimiters. + * + * @return The text block + * @throws IOException On reading errors + */ + TextBlock readTextBlock() throws IOException; +} diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/consumer/TextBlockReaderFactory.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/consumer/TextBlockReaderFactory.java new file mode 100644 index 000000000..e48266634 --- /dev/null +++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/consumer/TextBlockReaderFactory.java @@ -0,0 +1,80 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.service.engine.event.impl.filecarrierplugin.consumer; + +import java.io.InputStream; + +import org.onap.policy.apex.service.engine.event.ApexEventException; +import org.onap.policy.apex.service.parameters.eventprotocol.EventProtocolParameters; +import org.onap.policy.apex.service.parameters.eventprotocol.EventProtocolTextCharDelimitedParameters; +import org.onap.policy.apex.service.parameters.eventprotocol.EventProtocolTextTokenDelimitedParameters; +import org.slf4j.ext.XLogger; +import org.slf4j.ext.XLoggerFactory; + +/** + * This factory creates text block readers for breaking character streams into blocks of text. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ +public class TextBlockReaderFactory { + // The logger for this class + private static final XLogger LOGGER = XLoggerFactory.getXLogger(TextBlockReaderFactory.class); + + /** + * Get a text block reader for the given event protocol. + * + * @param inputStream the input stream that will be used for reading + * @param eventProtocolParameters the parameters that have been specified for event protocols + * @return the tagged reader + * @throws ApexEventException On an unsupported event protocol + */ + public TextBlockReader getTaggedReader(final InputStream inputStream, + final EventProtocolParameters eventProtocolParameters) throws ApexEventException { + // Check the type of event protocol we have + if (eventProtocolParameters instanceof EventProtocolTextCharDelimitedParameters) { + // We have character delimited textual input + final EventProtocolTextCharDelimitedParameters charDelimitedParameters = + (EventProtocolTextCharDelimitedParameters) eventProtocolParameters; + + // Create the text block reader + final TextBlockReader characterDelimitedTextBlockReader = + new CharacterDelimitedTextBlockReader(charDelimitedParameters); + characterDelimitedTextBlockReader.init(inputStream); + return characterDelimitedTextBlockReader; + } else if (eventProtocolParameters instanceof EventProtocolTextTokenDelimitedParameters) { + // We have token delimited textual input + final EventProtocolTextTokenDelimitedParameters tokenDelimitedParameters = + (EventProtocolTextTokenDelimitedParameters) eventProtocolParameters; + + // Create the text block reader + final HeaderDelimitedTextBlockReader headerDelimitedTextBlockReader = + new HeaderDelimitedTextBlockReader(tokenDelimitedParameters); + headerDelimitedTextBlockReader.init(inputStream); + return headerDelimitedTextBlockReader; + } else { + final String errorMessage = + "could not create text block reader for a textual event protocol, the required type " + + eventProtocolParameters.getLabel() + " is not supported"; + LOGGER.error(errorMessage); + throw new ApexEventException(errorMessage); + } + } +} diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/consumer/package-info.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/consumer/package-info.java new file mode 100644 index 000000000..05833ac7c --- /dev/null +++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/consumer/package-info.java @@ -0,0 +1,27 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +/** + * Implements the FILE carrier technology consumer that sends events to APEX from files, standard IO + * or named pipes. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ +package org.onap.policy.apex.service.engine.event.impl.filecarrierplugin.consumer; diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/package-info.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/package-info.java new file mode 100644 index 000000000..de0b1b56e --- /dev/null +++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/package-info.java @@ -0,0 +1,27 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +/** + * Implements the FILE carrier technology for event input and output to and from Apex using files, + * named pipes, and standard IO. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ +package org.onap.policy.apex.service.engine.event.impl.filecarrierplugin; diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/producer/ApexFileEventProducer.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/producer/ApexFileEventProducer.java new file mode 100644 index 000000000..d5f9ff1b2 --- /dev/null +++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/producer/ApexFileEventProducer.java @@ -0,0 +1,181 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.service.engine.event.impl.filecarrierplugin.producer; + +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.PrintStream; +import java.util.EnumMap; +import java.util.Map; + +import org.onap.policy.apex.core.infrastructure.threading.ThreadUtilities; +import org.onap.policy.apex.service.engine.event.ApexEventException; +import org.onap.policy.apex.service.engine.event.ApexEventProducer; +import org.onap.policy.apex.service.engine.event.ApexEventRuntimeException; +import org.onap.policy.apex.service.engine.event.PeeredReference; +import org.onap.policy.apex.service.engine.event.SynchronousEventCache; +import org.onap.policy.apex.service.engine.event.impl.filecarrierplugin.FILECarrierTechnologyParameters; +import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters; +import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Concrete implementation of an Apex event producer that sends events to a file. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ +public class ApexFileEventProducer implements ApexEventProducer { + // Get a reference to the logger + private static final Logger LOGGER = LoggerFactory.getLogger(ApexFileEventProducer.class); + + // The name for this producer + private String producerName = null; + + // The output stream to write events to + private PrintStream eventOutputStream; + + // The peer references for this event handler + private final Map<EventHandlerPeeredMode, PeeredReference> peerReferenceMap = + new EnumMap<>(EventHandlerPeeredMode.class); + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.apps.uservice.producer.ApexEventProducer#init() + */ + @Override + public void init(final String name, final EventHandlerParameters producerParameters) throws ApexEventException { + producerName = name; + + // Get and check the Apex parameters from the parameter service + if (producerParameters == null) { + final String errorMessage = "Producer parameters for ApexFileProducer \"" + producerName + "\" is null"; + LOGGER.warn(errorMessage); + throw new ApexEventException(errorMessage); + } + + // Check and get the file Properties + if (!(producerParameters.getCarrierTechnologyParameters() instanceof FILECarrierTechnologyParameters)) { + final String errorMessage = "specified producer properties for ApexFileProducer \"" + producerName + + "\" are not applicable to a FILE producer"; + LOGGER.warn(errorMessage); + throw new ApexEventException(errorMessage); + } + final FILECarrierTechnologyParameters fileCarrierTechnologyParameters = + (FILECarrierTechnologyParameters) producerParameters.getCarrierTechnologyParameters(); + + // Now we create a writer for events + try { + if (fileCarrierTechnologyParameters.isStandardError()) { + eventOutputStream = System.err; + } else if (fileCarrierTechnologyParameters.isStandardIO()) { + eventOutputStream = System.out; + } else { + eventOutputStream = + new PrintStream(new FileOutputStream(fileCarrierTechnologyParameters.getFileName()), true); + } + } catch (final IOException e) { + final String errorMessage = "ApexFileProducer \"" + producerName + "\" failed to open file for writing: \"" + + fileCarrierTechnologyParameters.getFileName() + "\""; + LOGGER.warn(errorMessage, e); + throw new ApexEventException(errorMessage, e); + } + + if (fileCarrierTechnologyParameters.getStartDelay() > 0) { + ThreadUtilities.sleep(fileCarrierTechnologyParameters.getStartDelay()); + } + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#getName() + */ + @Override + public String getName() { + return producerName; + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#getPeeredReference(org.onap. + * policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode) + */ + @Override + public PeeredReference getPeeredReference(final EventHandlerPeeredMode peeredMode) { + return peerReferenceMap.get(peeredMode); + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#setPeeredReference(org.onap. + * policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode, + * org.onap.policy.apex.service.engine.event.PeeredReference) + */ + @Override + public void setPeeredReference(final EventHandlerPeeredMode peeredMode, final PeeredReference peeredReference) { + peerReferenceMap.put(peeredMode, peeredReference); + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.service.engine.event.ApexEventProducer#sendEvent(long, + * java.lang.String, java.lang.Object) + */ + @Override + public void sendEvent(final long executionId, final String eventName, final Object event) { + // Check if this is a synchronized event, if so we have received a reply + final SynchronousEventCache synchronousEventCache = + (SynchronousEventCache) peerReferenceMap.get(EventHandlerPeeredMode.SYNCHRONOUS); + if (synchronousEventCache != null) { + synchronousEventCache.removeCachedEventToApexIfExists(executionId); + } + + // Cast the event to a string, if our conversion is correctly configured, this cast should + // always work + String stringEvent = null; + try { + stringEvent = (String) event; + } catch (final Exception e) { + final String errorMessage = "error in ApexFileProducer \"" + producerName + "\" while transferring event \"" + + event + "\" to the output stream"; + LOGGER.debug(errorMessage, e); + throw new ApexEventRuntimeException(errorMessage, e); + } + + eventOutputStream.println(stringEvent); + eventOutputStream.flush(); + } + + /* + * (non-Javadoc) + * + * @see org.onap.policy.apex.apps.uservice.producer.ApexEventProducer#stop() + */ + @Override + public void stop() { + eventOutputStream.close(); + } +} diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/producer/package-info.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/producer/package-info.java new file mode 100644 index 000000000..f7d7cbfbd --- /dev/null +++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/producer/package-info.java @@ -0,0 +1,27 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +/** + * Implements the FILE carrier technology producer that outputs events from APEX to files, standard + * IO or named pipes. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ +package org.onap.policy.apex.service.engine.event.impl.filecarrierplugin.producer; diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/jsonprotocolplugin/Apex2JSONEventConverter.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/jsonprotocolplugin/Apex2JSONEventConverter.java new file mode 100644 index 000000000..3b21a29ca --- /dev/null +++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/jsonprotocolplugin/Apex2JSONEventConverter.java @@ -0,0 +1,433 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.service.engine.event.impl.jsonprotocolplugin; + +import java.util.ArrayList; +import java.util.List; + +import org.onap.policy.apex.context.SchemaHelper; +import org.onap.policy.apex.context.impl.schema.SchemaHelperFactory; +import org.onap.policy.apex.model.basicmodel.service.ModelService; +import org.onap.policy.apex.model.eventmodel.concepts.AxEvent; +import org.onap.policy.apex.model.eventmodel.concepts.AxEvents; +import org.onap.policy.apex.model.eventmodel.concepts.AxField; +import org.onap.policy.apex.service.engine.event.ApexEvent; +import org.onap.policy.apex.service.engine.event.ApexEventException; +import org.onap.policy.apex.service.engine.event.ApexEventProtocolConverter; +import org.onap.policy.apex.service.engine.event.ApexEventRuntimeException; +import org.onap.policy.apex.service.parameters.eventprotocol.EventProtocolParameters; +import org.slf4j.ext.XLogger; +import org.slf4j.ext.XLoggerFactory; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; + +/** + * The Class Apex2JSONEventConverter converts {@link ApexEvent} instances to and from JSON string + * representations of Apex events. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ +public class Apex2JSONEventConverter implements ApexEventProtocolConverter { + private static final XLogger LOGGER = XLoggerFactory.getXLogger(Apex2JSONEventConverter.class); + + // The parameters for the JSON event protocol + private JSONEventProtocolParameters jsonPars; + + /* + * (non-Javadoc) + * + * @see + * org.onap.policy.apex.service.engine.event.ApexEventProtocolConverter#init(org.onap.policy. + * apex.service.parameters.eventprotocol.EventProtocolParameters) + */ + @Override + public void init(final EventProtocolParameters parameters) { + // Check and get the JSON parameters + if (!(parameters instanceof JSONEventProtocolParameters)) { + final String errorMessage = "specified consumer properties are not applicable to the JSON event protocol"; + LOGGER.warn(errorMessage); + throw new ApexEventRuntimeException(errorMessage); + } + + jsonPars = (JSONEventProtocolParameters) parameters; + } + + /* + * (non-Javadoc) + * + * @see + * org.onap.policy.apex.service.engine.event.ApexEventConverter#toApexEvent(java.lang.String, + * java.lang.Object) + */ + @Override + public List<ApexEvent> toApexEvent(final String eventName, final Object eventObject) throws ApexEventException { + // Check the event eventObject + if (eventObject == null) { + LOGGER.warn("event processing failed, event is null"); + throw new ApexEventException("event processing failed, event is null"); + } + + // Cast the event to a string, if our conversion is correctly configured, this cast should + // always work + String jsonEventString = null; + try { + jsonEventString = (String) eventObject; + } catch (final Exception e) { + final String errorMessage = "error converting event \"" + eventObject + "\" to a string"; + LOGGER.debug(errorMessage, e); + throw new ApexEventRuntimeException(errorMessage, e); + } + + // The list of events we will return + final List<ApexEvent> eventList = new ArrayList<ApexEvent>(); + + try { + // We may have a single JSON object with a single event or an array of JSON objects + final Object decodedJsonObject = + new GsonBuilder().serializeNulls().create().fromJson(jsonEventString, Object.class); + + // Check if we have a list of objects + if (decodedJsonObject instanceof List) { + // Check if it's a list of JSON objects or a list of strings + @SuppressWarnings("unchecked") + final List<Object> decodedJsonList = (List<Object>) decodedJsonObject; + + // Decode each of the list elements in sequence + for (final Object jsonListObject : decodedJsonList) { + if (jsonListObject instanceof String) { + eventList.add(jsonStringApexEvent(eventName, (String) jsonListObject)); + } else if (jsonListObject instanceof JsonObject) { + eventList.add(jsonObject2ApexEvent(eventName, (JsonObject) jsonListObject)); + } else { + throw new ApexEventException("incoming event (" + jsonEventString + + ") is a JSON object array containing an invalid object " + jsonListObject); + } + } + } else { + eventList.add(jsonStringApexEvent(eventName, jsonEventString)); + } + } catch (final Exception e) { + final String errorString = + "Failed to unmarshal JSON event: " + e.getMessage() + ", event=" + jsonEventString; + LOGGER.warn(errorString, e); + throw new ApexEventException(errorString, e); + } + + // Return the list of events we have unmarshalled + return eventList; + } + + /* + * (non-Javadoc) + * + * @see + * org.onap.policy.apex.service.engine.event.ApexEventConverter#fromApexEvent(org.onap.policy. + * apex.service.engine.event.ApexEvent) + */ + @Override + public Object fromApexEvent(final ApexEvent apexEvent) throws ApexEventException { + // Check the Apex event + if (apexEvent == null) { + LOGGER.warn("event processing failed, Apex event is null"); + throw new ApexEventException("event processing failed, Apex event is null"); + } + + // Get the event definition for the event from the model service + final AxEvent eventDefinition = + ModelService.getModel(AxEvents.class).get(apexEvent.getName(), apexEvent.getVersion()); + + // Use a GSON Json object to marshal the Apex event to JSON + final Gson gson = new GsonBuilder().serializeNulls().setPrettyPrinting().create(); + final JsonObject jsonObject = new JsonObject(); + + jsonObject.addProperty(ApexEvent.NAME_HEADER_FIELD, apexEvent.getName()); + jsonObject.addProperty(ApexEvent.VERSION_HEADER_FIELD, apexEvent.getVersion()); + jsonObject.addProperty(ApexEvent.NAMESPACE_HEADER_FIELD, apexEvent.getNameSpace()); + jsonObject.addProperty(ApexEvent.SOURCE_HEADER_FIELD, apexEvent.getSource()); + jsonObject.addProperty(ApexEvent.TARGET_HEADER_FIELD, apexEvent.getTarget()); + + if (apexEvent.getExceptionMessage() != null) { + jsonObject.addProperty(ApexEvent.EXCEPTION_MESSAGE_HEADER_FIELD, apexEvent.getExceptionMessage()); + } + + for (final AxField eventField : eventDefinition.getFields()) { + final String fieldName = eventField.getKey().getLocalName(); + + if (!apexEvent.containsKey(fieldName)) { + if (!eventField.getOptional()) { + final String errorMessage = "error parsing " + eventDefinition.getID() + " event to Json. " + + "Field \"" + fieldName + "\" is missing, but is mandatory. Fields: " + apexEvent; + LOGGER.debug(errorMessage); + throw new ApexEventRuntimeException(errorMessage); + } + continue; + } + + final Object fieldValue = apexEvent.get(fieldName); + + // Get the schema helper + final SchemaHelper fieldSchemaHelper = + new SchemaHelperFactory().createSchemaHelper(eventField.getKey(), eventField.getSchema()); + jsonObject.add(fieldName, fieldSchemaHelper.marshal2JsonElement(fieldValue)); + } + + // Output JSON string in a pretty format + return gson.toJson(jsonObject); + } + + /** + * This method converts a JSON object into an Apex event. + * + * @param eventName the name of the event + * @param jsonEventString the JSON string that holds the event + * @return the apex event that we have converted the JSON object into + * @throws ApexEventException thrown on unmarshaling exceptions + */ + private ApexEvent jsonStringApexEvent(final String eventName, final String jsonEventString) + throws ApexEventException { + // Use GSON to read the event string + final JsonObject jsonObject = + new GsonBuilder().serializeNulls().create().fromJson(jsonEventString, JsonObject.class); + + if (jsonObject == null || !jsonObject.isJsonObject()) { + throw new ApexEventException( + "incoming event (" + jsonEventString + ") is not a JSON object or an JSON object array"); + } + + return jsonObject2ApexEvent(eventName, jsonObject); + } + + /** + * This method converts a JSON object into an Apex event. + * + * @param eventName the name of the event + * @param jsonObject the JSON object that holds the event + * @return the apex event that we have converted the JSON object into + * @throws ApexEventException thrown on unmarshaling exceptions + */ + private ApexEvent jsonObject2ApexEvent(final String eventName, final JsonObject jsonObject) + throws ApexEventException { + // Process the mandatory Apex header + final ApexEvent apexEvent = processApexEventHeader(eventName, jsonObject); + + // Get the event definition for the event from the model service + final AxEvent eventDefinition = + ModelService.getModel(AxEvents.class).get(apexEvent.getName(), apexEvent.getVersion()); + + // Iterate over the input fields in the event + for (final AxField eventField : eventDefinition.getFields()) { + final String fieldName = eventField.getKey().getLocalName(); + if (!hasJSONField(jsonObject, fieldName)) { + if (!eventField.getOptional()) { + final String errorMessage = "error parsing " + eventDefinition.getID() + " event from Json. " + + "Field \"" + fieldName + "\" is missing, but is mandatory."; + LOGGER.debug(errorMessage); + throw new ApexEventException(errorMessage); + } + continue; + } + + final JsonElement fieldValue = getJSONField(jsonObject, fieldName, null, !eventField.getOptional()); + + if (fieldValue != null && !fieldValue.isJsonNull()) { + // Get the schema helper + final SchemaHelper fieldSchemaHelper = + new SchemaHelperFactory().createSchemaHelper(eventField.getKey(), eventField.getSchema()); + apexEvent.put(fieldName, fieldSchemaHelper.createNewInstance(fieldValue)); + } else { + apexEvent.put(fieldName, null); + } + } + return apexEvent; + + } + + /** + * This method processes the event header of an Apex event. + * + * @param eventName the name of the event + * @param jsonObject the JSON object containing the JSON representation of the incoming event + * @return an apex event constructed using the header fields of the event + * @throws ApexEventRuntimeException the apex event runtime exception + * @throws ApexEventException on invalid events with missing header fields + */ + private ApexEvent processApexEventHeader(final String eventName, final JsonObject jsonObject) + throws ApexEventRuntimeException, ApexEventException { + // Get the event header fields + // @formatter:off + String name = getJSONStringField(jsonObject, ApexEvent.NAME_HEADER_FIELD, jsonPars.getNameAlias(), ApexEvent.NAME_REGEXP, false); + String version = getJSONStringField(jsonObject, ApexEvent.VERSION_HEADER_FIELD, jsonPars.getVersionAlias(), ApexEvent.VERSION_REGEXP, false); + String namespace = getJSONStringField(jsonObject, ApexEvent.NAMESPACE_HEADER_FIELD, jsonPars.getNameSpaceAlias(), ApexEvent.NAMESPACE_REGEXP, false); + String source = getJSONStringField(jsonObject, ApexEvent.SOURCE_HEADER_FIELD, jsonPars.getSourceAlias(), ApexEvent.SOURCE_REGEXP, false); + String target = getJSONStringField(jsonObject, ApexEvent.TARGET_HEADER_FIELD, jsonPars.getTargetAlias(), ApexEvent.TARGET_REGEXP, false); + // @formatter:on + + // Check if an event name was specified on the event parameters + if (eventName != null) { + if (name != null && !eventName.equals(name)) { + LOGGER.warn("The incoming event name \"" + name + "\" does not match the configured event name \"" + + eventName + "\", using configured event name"); + } + name = eventName; + } else { + if (name == null) { + throw new ApexEventRuntimeException( + "event received without mandatory parameter \"name\" on configuration or on event"); + } + } + + // Now, find the event definition in the model service. If version is null, the newest event + // definition in the model service is used + final AxEvent eventDefinition = ModelService.getModel(AxEvents.class).get(name, version); + if (eventDefinition == null) { + if (version == null) { + throw new ApexEventRuntimeException( + "an event definition for an event named \"" + name + "\" not found in Apex model"); + } else { + throw new ApexEventRuntimeException("an event definition for an event named \"" + name + + "\" with version \"" + version + "\" not found in Apex model"); + } + } + + // Use the defined event version if no version is specified on the incoming fields + if (version == null) { + version = eventDefinition.getKey().getVersion(); + } + + // Check the name space is OK if it is defined, if not, use the name space from the model + if (namespace != null) { + if (!namespace.equals(eventDefinition.getNameSpace())) { + throw new ApexEventRuntimeException( + "namespace \"" + namespace + "\" on event \"" + name + "\" does not match namespace \"" + + eventDefinition.getNameSpace() + "\" for that event in the Apex model"); + } + } else { + namespace = eventDefinition.getNameSpace(); + } + + // For source, use the defined source only if the source is not found on the incoming event + if (source == null) { + source = eventDefinition.getSource(); + } + + // For target, use the defined source only if the source is not found on the incoming event + if (target == null) { + target = eventDefinition.getTarget(); + } + + return new ApexEvent(name, version, namespace, source, target); + } + + /** + * This method gets an event string field from a JSON object. + * + * @param jsonObject the JSON object containing the JSON representation of the incoming event + * @param fieldName the field name to find in the event + * @param fieldAlias the alias for the field to find in the event, overrides the field name if + * it is not null + * @param fieldRE the regular expression to check the field against for validity + * @param mandatory true if the field is mandatory + * @return the value of the field in the JSON object or null if the field is optional + * @throws ApexEventRuntimeException the apex event runtime exception + */ + private String getJSONStringField(final JsonObject jsonObject, final String fieldName, final String fieldAlias, + final String fieldRE, final boolean mandatory) throws ApexEventRuntimeException { + // Get the JSON field for the string field + final JsonElement jsonField = getJSONField(jsonObject, fieldName, fieldAlias, mandatory); + + // Null strings are allowed + if (jsonField == null || jsonField.isJsonNull()) { + return null; + } + + // Check if this is a string field + String fieldValueString = null; + try { + fieldValueString = jsonField.getAsString(); + } catch (final Exception e) { + // The element is not a string so throw an error + throw new ApexEventRuntimeException("field \"" + fieldName + "\" with type \"" + + jsonField.getClass().getCanonicalName() + "\" is not a string value"); + } + + // Is regular expression checking required + if (fieldRE == null) { + return fieldValueString; + } + + // Check the event field against its regular expression + if (!fieldValueString.matches(fieldRE)) { + throw new ApexEventRuntimeException( + "field \"" + fieldName + "\" with value \"" + fieldValueString + "\" is invalid"); + } + + return fieldValueString; + } + + /** + * This method gets an event field from a JSON object. + * + * @param jsonObject the JSON object containing the JSON representation of the incoming event + * @param fieldName the field name to find in the event + * @param fieldAlias the alias for the field to find in the event, overrides the field name if + * it is not null + * @param mandatory true if the field is mandatory + * @return the value of the field in the JSON object or null if the field is optional + * @throws ApexEventRuntimeException the apex event runtime exception + */ + private JsonElement getJSONField(final JsonObject jsonObject, final String fieldName, final String fieldAlias, + final boolean mandatory) throws ApexEventRuntimeException { + + // Check if we should use the alias for this field + String fieldToFind = fieldName; + if (fieldAlias != null) { + fieldToFind = fieldAlias; + } + + // Get the event field + final JsonElement eventElement = jsonObject.get(fieldToFind); + if (eventElement == null) { + if (!mandatory) { + return null; + } else { + throw new ApexEventRuntimeException("mandatory field \"" + fieldToFind + "\" is missing"); + } + } + + return eventElement; + } + + /** + * This method if a JSON object has a named field. + * + * @param jsonObject the JSON object containing the JSON representation of the incoming event + * @param fieldName the field name to find in the event + * @return true if the field is present + * @throws ApexEventRuntimeException the apex event runtime exception + */ + private boolean hasJSONField(final JsonObject jsonObject, final String fieldName) throws ApexEventRuntimeException { + // check for the field + return jsonObject.has(fieldName); + } +} diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/jsonprotocolplugin/JSONEventProtocolParameters.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/jsonprotocolplugin/JSONEventProtocolParameters.java new file mode 100644 index 000000000..b4a4055d7 --- /dev/null +++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/jsonprotocolplugin/JSONEventProtocolParameters.java @@ -0,0 +1,135 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.policy.apex.service.engine.event.impl.jsonprotocolplugin; + +import org.onap.policy.apex.service.parameters.eventprotocol.EventProtocolTextCharDelimitedParameters; + +/** + * Event protocol parameters for JSON as an event protocol. + * + * The parameters for this plugin are: + * <ol> + * <li>nameAlias: The field in a JSON event to use as an alias for the event name. This parameter is + * optional. + * <li>versionAlias: The field in a JSON event to use as an alias for the event version. This + * parameter is optional. + * <li>nameSpaceAlias: The field in a JSON event to use as an alias for the event name space. This + * parameter is optional. + * <li>sourceAlias: The field in a JSON event to use as an alias for the event source. This + * parameter is optional. + * <li>targetAlias: The field in a JSON event to use as an alias for the event target. This + * parameter is optional. + * </ol> + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ +public class JSONEventProtocolParameters extends EventProtocolTextCharDelimitedParameters { + /** The label of this event protocol. */ + public static final String JSON_EVENT_PROTOCOL_LABEL = "JSON"; + + // Constants for text block delimiters + private static final char JSON_TEXT_BLOCK_START_DELIMITER = '{'; + private static final char JSON_TEXT_BLOCK_END_DELIMITER = '}'; + + // Aliases for Apex event header fields + // @formatter:off + private final String nameAlias = null; + private final String versionAlias = null; + private final String nameSpaceAlias = null; + private final String sourceAlias = null; + private final String targetAlias = null; + // @formatter:on + + /** + * Constructor to create a JSON event protocol parameter instance and register the instance with + * the parameter service. + */ + public JSONEventProtocolParameters() { + this(JSONEventProtocolParameters.class.getCanonicalName(), JSON_EVENT_PROTOCOL_LABEL); + } + + /** + * Constructor to create an event protocol parameters instance with the name of a sub class of + * this class. + * + * @param parameterClassName the class name of a sub class of this class + * @param eventProtocolLabel the name of the event protocol for this plugin + */ + public JSONEventProtocolParameters(final String parameterClassName, final String eventProtocolLabel) { + super(parameterClassName); + + // Set the event protocol properties for the JSON event protocol + this.setLabel(eventProtocolLabel); + + // Set the starting and ending delimiters for text blocks of JSON events + this.setStartChar(JSON_TEXT_BLOCK_START_DELIMITER); + this.setEndChar(JSON_TEXT_BLOCK_END_DELIMITER); + + // Set the event protocol plugin class + this.setEventProtocolPluginClass(Apex2JSONEventConverter.class.getCanonicalName()); + } + + /** + * Gets the name alias. + * + * @return the name alias + */ + public String getNameAlias() { + return nameAlias; + } + + /** + * Gets the version alias. + * + * @return the version alias + */ + public String getVersionAlias() { + return versionAlias; + } + + /** + * Gets the name space alias. + * + * @return the name space alias + */ + public String getNameSpaceAlias() { + return nameSpaceAlias; + } + + /** + * Gets the source alias. + * + * @return the source alias + */ + public String getSourceAlias() { + return sourceAlias; + } + + /** + * Gets the target alias. + * + * @return the target alias + */ + public String getTargetAlias() { + return targetAlias; + } + +} diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/jsonprotocolplugin/package-info.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/jsonprotocolplugin/package-info.java new file mode 100644 index 000000000..65f4831ec --- /dev/null +++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/jsonprotocolplugin/package-info.java @@ -0,0 +1,27 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +/** + * Contains the implementation of the APEX event protocol cinverter plugin for events in Json + * format. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ +package org.onap.policy.apex.service.engine.event.impl.jsonprotocolplugin; diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/package-info.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/package-info.java new file mode 100644 index 000000000..ae2d58701 --- /dev/null +++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/package-info.java @@ -0,0 +1,30 @@ +/*- + * ============LICENSE_START======================================================= + * Copyright (C) 2016-2018 Ericsson. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +/** + * Contains implementations for conversion between externally facing + * {@link org.onap.policy.apex.service.engine.event.ApexEvent} instances and internal APEX engine + * {@link org.onap.policy.apex.core.engine.event.EnEvent} instances. It also contains the + * implementation of the default APEX File carrier technology plugin as well as the protocol plugins + * for XML and JSON event protocols. + * + * @author Liam Fallon (liam.fallon@ericsson.com) + */ +package org.onap.policy.apex.service.engine.event.impl; |