aboutsummaryrefslogtreecommitdiffstats
path: root/dcae-analytics-common/src/main/java/org/openecomp/dcae/apod/analytics/common/service/processor/AbstractMessageProcessor.java
blob: 70b1e31c6c3f42d483517379a4fc7fd3d2a7af95 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
/*
 * ===============================LICENSE_START======================================
 *  dcae-analytics
 * ================================================================================
 *    Copyright © 2017 AT&T Intellectual Property. All rights reserved.
 * ================================================================================
 *  Licensed under the Apache License, Version 2.0 (the "License");
 *  you may not use this file except in compliance with the License.
 *   You may obtain a copy of the License at
 *
 *          http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 *  ============================LICENSE_END===========================================
 */

package org.openecomp.dcae.apod.analytics.common.service.processor;

import com.google.common.base.Optional;
import org.openecomp.dcae.apod.analytics.common.exception.MessageProcessingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nonnull;

import static java.lang.String.format;

/**
 * An abstract Message Processor which can be extended by {@link MessageProcessor} implementations
 * to get default behavior for Message Processors
 *
 * @param <P> Processor Context sub classes
 *
 * @author Rajiv Singla . Creation Date: 11/8/2016.
 */
public abstract class AbstractMessageProcessor<P extends ProcessorContext> implements MessageProcessor<P> {

    private static final Logger LOG = LoggerFactory.getLogger(AbstractMessageProcessor.class);

    /**
     * By Default there is no processing message
     */
    private String processingMessage = null;

    /**
     * By Default Processing State is set to not required - subclasses must
     * set processing state to {@link ProcessingState#PROCESSING_FINISHED_SUCCESSFULLY} on successful processing
     * or {@link ProcessingState#PROCESSING_TERMINATED_EARLY} if processing fails
     */
    protected ProcessingState processingState = ProcessingState.PROCESSING_NOT_REQUIRED;

    /**
     * Sub classes must provide a description of a processor
     *
     * @return description of processor
     *
     */
    public abstract String getProcessorDescription();


    /**
     * Sub classes must provide implementation to process Message
     *
     * @param processorContext incoming {@link ProcessorContext}
     * @return outgoing {@link ProcessorContext}
     */
    public abstract P processMessage(P processorContext);

    @Override
    public ProcessorInfo getProcessorInfo() {
        // by default the class of the Processor is assigned as Processor Name
        final String processorClassName = getClass().getSimpleName();
        return new GenericProcessorInfo(processorClassName, getProcessorDescription());
    }

    @Override
    public P preProcessor(P processorContext) {
        LOG.debug("Processing Started for Processor: {}", getProcessorInfo().getProcessorName());
        // by default check to see if continue processing Flag is not false
        final boolean okToContinue = processorContext.canProcessingContinue();
        if (!okToContinue) {
            final String errorMessage =
                    format("Processor: %s. Processing Context flag okToContinue is false. Unable to proceed...",
                            getProcessorInfo().getProcessorName());
            throw new MessageProcessingException(errorMessage, LOG, new IllegalStateException(errorMessage));
        }
        processingState = ProcessingState.PROCESSING_STARTED;
        return processorContext;
    }

    @Override
    public ProcessingState getProcessingState() {
        return processingState;
    }

    @Override
    public Optional<String> getProcessingMessage() {
        return Optional.fromNullable(processingMessage);
    }

    @Override
    public P postProcessor(P processorContext) {
        // Default implementation updates the post processing flag if processing did not
        // completed successfully
        if (processingState != ProcessingState.PROCESSING_FINISHED_SUCCESSFULLY) {
            LOG.debug("Processor: {}, Update Process Context State to stop Processing.",
                    getProcessorInfo().getProcessorName());
            processorContext.setProcessingContinueFlag(false);
        }
        // attaches itself to message processor context
        processorContext.getMessageProcessors().add(this);
        LOG.debug("Processing Completed for Processor: {}", getProcessorInfo());
        return processorContext;
    }


    @Override
    public final P apply(@Nonnull P processorContext) {
        final P preProcessedProcessorContext = preProcessor(processorContext);
        final P processedProcessorContext = processMessage(preProcessedProcessorContext);
        return postProcessor(processedProcessorContext);
    }


    /**
     * Helper method that updates processing state in case of early termination, logs the processing
     * termination reason, updates Processor processing state as Terminated and sets it processing message
     *
     * @param terminatingMessage error Message
     * @param processorContext message processor context
     */
    protected void setTerminatingProcessingMessage(final String terminatingMessage,
                                                   final P processorContext) {

        final String message = processorContext.getMessage();
        this.processingState = ProcessingState.PROCESSING_TERMINATED_EARLY;
        this.processingMessage = terminatingMessage;
        LOG.debug("Processor: {}, Early Terminating Message: {}, Incoming Message: {}",
                getProcessorInfo().getProcessorName(), terminatingMessage, message);
    }

    /**
     * Helper method that updates Processing state and logs completion message
     * passed
     *
     * @param processorPassingMessage Processor passing message
     * @param processorContext message processor context
     */
    protected void setFinishedProcessingMessage(final String processorPassingMessage, P processorContext) {
        final String message = processorContext.getMessage();
        processingState = ProcessingState.PROCESSING_FINISHED_SUCCESSFULLY;
        this.processingMessage = processorPassingMessage;
        LOG.debug("Processor: {}, Successful Completion Message: {}, Incoming Message: {}",
                getProcessorInfo().getProcessorName(), processorPassingMessage, message);
    }


}