/* * ===============================LICENSE_START====================================== * dcae-analytics * ================================================================================ * Copyright © 2017 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * ============================LICENSE_END=========================================== */ package org.openecomp.dcae.apod.analytics.common.utils; import com.google.common.base.Preconditions; import org.openecomp.dcae.apod.analytics.common.exception.MessageProcessingException; import org.openecomp.dcae.apod.analytics.common.service.filter.GenericJsonMessageFilter; import org.openecomp.dcae.apod.analytics.common.service.filter.JsonMessageFilterProcessorContext; import org.openecomp.dcae.apod.analytics.common.service.processor.GenericMessageChainProcessor; import org.openecomp.dcae.apod.analytics.common.service.processor.MessageProcessor; import org.openecomp.dcae.apod.analytics.common.service.processor.ProcessorContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; import javax.annotation.Nonnull; /** * * @author Rajiv Singla . Creation Date: 11/8/2016. */ public abstract class MessageProcessorUtils { private static final Logger LOG = LoggerFactory.getLogger(MessageProcessorUtils.class); /** * Provides an abstraction how to apply {@link ProcessorContext} to next {@link MessageProcessor} * in the message processor chain * * @param
Sub classes of Processor Context */ public interface MessageProcessorFunction
{
/**
* Method which provides accumulated {@link ProcessorContext} from previous processors and a reference
* to next processor in the chain
*
* @param p accumulated {@link ProcessorContext} from previous processors
* @param m current {@link MessageProcessor} in the chain
* @param Sub classes for Processor Context
*
* @return processing context which results after computing the whole chain
*/
public static P computeMessageProcessorChain(
final Iterable extends MessageProcessor > messageProcessors,
final P initialProcessorContext,
final MessageProcessorFunction messageProcessorFunction) {
// Get message processor iterator
final Iterator extends MessageProcessor > processorIterator = messageProcessors.iterator();
// If no next message processor - return initial processor context
if (!processorIterator.hasNext()) {
return initialProcessorContext;
}
// An accumulator for processor Context
P processorContextAccumulator = initialProcessorContext;
while (processorIterator.hasNext()) {
final MessageProcessor nextProcessor = processorIterator.next();
// If Initial Processor Context is null
if (processorContextAccumulator == null) {
final String errorMessage =
String.format("Processor Context must not be null for Message Process: %s",
nextProcessor.getProcessorInfo().getProcessorName());
throw new MessageProcessingException(errorMessage, LOG, new IllegalStateException(errorMessage));
}
if (!processorContextAccumulator.canProcessingContinue()) {
LOG.debug("Triggering Early Termination, before Message Processor: {}, Incoming Message: {}",
nextProcessor.getProcessorInfo().getProcessorName(), processorContextAccumulator.getMessage());
break;
}
processorContextAccumulator = messageProcessorFunction.apply(processorContextAccumulator, nextProcessor);
}
return processorContextAccumulator;
}
/**
* Utility method to process Json Filter Mappings. Processes incoming json message and applies a list of json
* filter mappings and returns the resulting {@link JsonMessageFilterProcessorContext}
*
* @param jsonMessage json message to which filter mappings will be applies
* @param jsonFilterMappings Filter mappings contains a Map containing keys as filter json path
* and values as set of expected value corresponding to filter path
*
* @return json message processor context which contains the {@link JsonMessageFilterProcessorContext#isMatched}
* status after applying all filter mappings
*/
public static JsonMessageFilterProcessorContext processJsonFilterMappings(
final String jsonMessage, @Nonnull final Map