diff options
Diffstat (limited to 'services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/consumer/ApexFileEventConsumer.java')
-rw-r--r-- | services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/consumer/ApexFileEventConsumer.java | 61 |
1 files changed, 32 insertions, 29 deletions
diff --git a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/consumer/ApexFileEventConsumer.java b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/consumer/ApexFileEventConsumer.java index 7521c3a08..0f0996fb8 100644 --- a/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/consumer/ApexFileEventConsumer.java +++ b/services/services-engine/src/main/java/org/onap/policy/apex/service/engine/event/impl/filecarrierplugin/consumer/ApexFileEventConsumer.java @@ -33,23 +33,25 @@ import org.onap.policy.apex.service.engine.event.ApexEventConsumer; import org.onap.policy.apex.service.engine.event.ApexEventException; import org.onap.policy.apex.service.engine.event.ApexEventReceiver; import org.onap.policy.apex.service.engine.event.PeeredReference; -import org.onap.policy.apex.service.engine.event.impl.filecarrierplugin.FILECarrierTechnologyParameters; +import org.onap.policy.apex.service.engine.event.impl.filecarrierplugin.FileCarrierTechnologyParameters; import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters; import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Concrete implementation an Apex event consumer that reads events from a file. This consumer also - * implements ApexEventProducer and therefore can be used as a synchronous consumer. + * Concrete implementation an Apex event consumer that reads events from a file. This consumer also implements + * ApexEventProducer and therefore can be used as a synchronous consumer. * * @author Liam Fallon (liam.fallon@ericsson.com) */ public class ApexFileEventConsumer implements ApexEventConsumer, Runnable { - // Get a reference to the logger private static final Logger LOGGER = LoggerFactory.getLogger(ApexFileEventConsumer.class); + // Recurring string constants + private static final String APEX_FILE_CONSUMER_PREAMBLE = "ApexFileConsumer \""; + // The input stream to read events from private InputStream eventInputStream; @@ -66,35 +68,34 @@ public class ApexFileEventConsumer implements ApexEventConsumer, Runnable { private String consumerName = null; // The specific carrier technology parameters for this consumer - private FILECarrierTechnologyParameters fileCarrierTechnologyParameters; + private FileCarrierTechnologyParameters fileCarrierTechnologyParameters; // The peer references for this event handler - private final Map<EventHandlerPeeredMode, PeeredReference> peerReferenceMap = - new EnumMap<>(EventHandlerPeeredMode.class); + private final Map<EventHandlerPeeredMode, PeeredReference> peerReferenceMap = new EnumMap<>( + EventHandlerPeeredMode.class); // Holds the next identifier for event execution. private static AtomicLong nextExecutionID = new AtomicLong(0L); /** - * Private utility to get the next candidate value for a Execution ID. This value will always be - * unique in a single JVM + * Private utility to get the next candidate value for a Execution ID. This value will always be unique in a single + * JVM * * @return the next candidate value for a Execution ID */ - private static synchronized long getNextExecutionID() { + private static synchronized long getNextExecutionId() { return nextExecutionID.getAndIncrement(); } /* * (non-Javadoc) * - * @see - * org.onap.policy.apex.apps.uservice.consumer.ApexEventConsumer#init(org.onap.policy.apex.apps. + * @see org.onap.policy.apex.apps.uservice.consumer.ApexEventConsumer#init(org.onap.policy.apex.apps. * uservice.consumer.ApexEventReceiver) */ @Override public void init(final String name, final EventHandlerParameters consumerParameters, - final ApexEventReceiver incomingEventReceiver) throws ApexEventException { + final ApexEventReceiver incomingEventReceiver) throws ApexEventException { this.eventReceiver = incomingEventReceiver; this.consumerName = name; @@ -106,18 +107,18 @@ public class ApexFileEventConsumer implements ApexEventConsumer, Runnable { } // Check and get the file Properties - if (!(consumerParameters.getCarrierTechnologyParameters() instanceof FILECarrierTechnologyParameters)) { + if (!(consumerParameters.getCarrierTechnologyParameters() instanceof FileCarrierTechnologyParameters)) { final String errorMessage = "specified consumer properties for ApexFileConsumer \"" + consumerName - + "\" are not applicable to a File consumer"; + + "\" are not applicable to a File consumer"; LOGGER.warn(errorMessage); throw new ApexEventException(errorMessage); } - fileCarrierTechnologyParameters = - (FILECarrierTechnologyParameters) consumerParameters.getCarrierTechnologyParameters(); + fileCarrierTechnologyParameters = (FileCarrierTechnologyParameters) consumerParameters + .getCarrierTechnologyParameters(); // Open the file producing events try { - if (fileCarrierTechnologyParameters.isStandardIO()) { + if (fileCarrierTechnologyParameters.isStandardIo()) { eventInputStream = System.in; } else { eventInputStream = new FileInputStream(fileCarrierTechnologyParameters.getFileName()); @@ -125,10 +126,11 @@ public class ApexFileEventConsumer implements ApexEventConsumer, Runnable { // Get an event composer for our event source textBlockReader = new TextBlockReaderFactory().getTaggedReader(eventInputStream, - consumerParameters.getEventProtocolParameters()); + consumerParameters.getEventProtocolParameters()); } catch (final IOException e) { - final String errorMessage = "ApexFileConsumer \"" + consumerName + "\" failed to open file for reading: \"" - + fileCarrierTechnologyParameters.getFileName() + "\""; + final String errorMessage = APEX_FILE_CONSUMER_PREAMBLE + consumerName + + "\" failed to open file for reading: \"" + fileCarrierTechnologyParameters.getFileName() + + "\""; LOGGER.warn(errorMessage, e); throw new ApexEventException(errorMessage, e); } @@ -195,7 +197,7 @@ public class ApexFileEventConsumer implements ApexEventConsumer, Runnable { // Check that we have been initialized in async or sync mode if (eventReceiver == null) { LOGGER.warn("\"{}\" has not been initilaized for either asynchronous or synchronous event handling", - consumerName); + consumerName); return; } @@ -209,18 +211,19 @@ public class ApexFileEventConsumer implements ApexEventConsumer, Runnable { // Process the event from the text block if there is one there if (textBlock.getText() != null) { - eventReceiver.receiveEvent(getNextExecutionID(), textBlock.getText()); + eventReceiver.receiveEvent(getNextExecutionId(), textBlock.getText()); } - } while (!textBlock.isEndOfText()); + } + while (!textBlock.isEndOfText()); } catch (final Exception e) { LOGGER.warn("\"" + consumerName + "\" failed to read event from file: \"" - + fileCarrierTechnologyParameters.getFileName() + "\"", e); + + fileCarrierTechnologyParameters.getFileName() + "\"", e); } finally { try { eventInputStream.close(); } catch (final IOException e) { - LOGGER.warn("ApexFileConsumer \"" + consumerName + "\" failed to close file: \"" - + fileCarrierTechnologyParameters.getFileName() + "\"", e); + LOGGER.warn(APEX_FILE_CONSUMER_PREAMBLE + consumerName + "\" failed to close file: \"" + + fileCarrierTechnologyParameters.getFileName() + "\"", e); } } @@ -236,8 +239,8 @@ public class ApexFileEventConsumer implements ApexEventConsumer, Runnable { try { eventInputStream.close(); } catch (final IOException e) { - LOGGER.warn("ApexFileConsumer \"" + consumerName + "\" failed to close file for reading: \"" - + fileCarrierTechnologyParameters.getFileName() + "\"", e); + LOGGER.warn(APEX_FILE_CONSUMER_PREAMBLE + consumerName + "\" failed to close file for reading: \"" + + fileCarrierTechnologyParameters.getFileName() + "\"", e); } if (consumerThread.isAlive() && !consumerThread.isInterrupted()) { |