aboutsummaryrefslogtreecommitdiffstats
path: root/dcae-analytics-tca/src/main/java/org/openecomp/dcae/analytics/tca/worker/TCADMaaPMRSubscriberJob.java
blob: 7212527b7973dba7153ae4f3b631c2d14c57fe80 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
/*
 * ============LICENSE_START=========================================================
 * dcae-analytics
 * ================================================================================
 *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
 * ================================================================================
 *  Licensed under the Apache License, Version 2.0 (the "License");
 *  you may not use this file except in compliance with the License.
 *  You may obtain a copy of the License at
 *
 *        http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 * ============LICENSE_END=========================================================
 */

package org.openecomp.dcae.analytics.tca.worker;

import co.cask.cdap.api.metrics.Metrics;
import co.cask.cdap.api.worker.WorkerContext;
import com.google.common.base.Optional;
import com.google.common.base.Stopwatch;
import org.openecomp.dcae.analytics.common.AnalyticsConstants;
import org.openecomp.dcae.analytics.common.CDAPMetricsConstants;
import org.openecomp.dcae.analytics.common.exception.DCAEAnalyticsRuntimeException;
import org.openecomp.dcae.analytics.common.utils.HTTPUtils;
import org.openecomp.dcae.analytics.dmaap.domain.response.DMaaPMRSubscriberResponse;
import org.openecomp.dcae.analytics.dmaap.service.subscriber.DMaaPMRSubscriber;
import org.quartz.DisallowConcurrentExecution;
import org.quartz.Job;
import org.quartz.JobDataMap;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.quartz.PersistJobDataAfterExecution;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.List;

import static java.lang.String.format;

/**
 * Quartz Job which polls DMaaP MR VES Collector Topic for messages and writes them to
 * a given CDAP Stream
 *
 * @author Rajiv Singla. Creation Date: 10/24/2016.
 */
@DisallowConcurrentExecution
@PersistJobDataAfterExecution
public class TCADMaaPMRSubscriberJob implements Job {

    private static final Logger LOG = LoggerFactory.getLogger(TCADMaaPMRSubscriberJob.class);

    @Override
    public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {

        LOG.debug("Starting DMaaP MR Topic Subscriber fetch Job. Next firing time will be: {}",
                jobExecutionContext.getNextFireTime());

        // Get Job Data Map
        final JobDataMap jobDataMap = jobExecutionContext.getMergedJobDataMap();

        // Fetch all Job Params from Job Data Map
        final String cdapStreamName = jobDataMap.getString(AnalyticsConstants.CDAP_STREAM_VARIABLE_NAME);
        final WorkerContext workerContext =
                (WorkerContext) jobDataMap.get(AnalyticsConstants.WORKER_CONTEXT_VARIABLE_NAME);
        final DMaaPMRSubscriber subscriber =
                (DMaaPMRSubscriber) jobDataMap.get(AnalyticsConstants.DMAAP_SUBSCRIBER_VARIABLE_NAME);
        final Metrics metrics = (Metrics) jobDataMap.get(AnalyticsConstants.DMAAP_METRICS_VARIABLE_NAME);

        final Optional<DMaaPMRSubscriberResponse> subscriberResponseOptional =
                getSubscriberResponse(subscriber, metrics);

        // If response is not present, unable to proceed
        if (!subscriberResponseOptional.isPresent()) {
            return;
        }

        final DMaaPMRSubscriberResponse subscriberResponse = subscriberResponseOptional.get();

        // If response code return by the subscriber call is not successful, unable to do proceed
        if (!HTTPUtils.isSuccessfulResponseCode(subscriberResponse.getResponseCode())) {
            LOG.error("Subscriber was unable to fetch messages properly. Subscriber Response Code: {} " +
                    "Unable to proceed further....", subscriberResponse.getResponseCode());
            metrics.count(CDAPMetricsConstants.TCA_SUBSCRIBER_UNSUCCESSFUL_RESPONSES_METRIC, 1);
            return;
        }

        LOG.debug("Subscriber HTTP Response Status Code match successful:  {}", subscriberResponse,
                HTTPUtils.HTTP_SUCCESS_STATUS_CODE);

        final List<String> actualMessages = subscriberResponse.getFetchedMessages();

        // If there are no message returned during from Subscriber, nothing to write to CDAP Stream
        if (actualMessages.isEmpty()) {
            LOG.debug("Subscriber Response has no messages. Nothing to write to CDAP stream....");
            metrics.count(CDAPMetricsConstants.TCA_SUBSCRIBER_RESPONSES_WITH_NO_MESSAGES_METRIC, 1);
            return;
        }

        LOG.debug("DMaaP MR Subscriber found new messages in DMaaP Topic. Message count: {}", actualMessages.size());
        metrics.count(CDAPMetricsConstants.TCA_SUBSCRIBER_TOTAL_MESSAGES_PROCESSED_METRIC, actualMessages.size());

        // Write message to CDAP Stream using Stream Batch Writer
        LOG.debug("Writing message to CDAP Stream: {}, Message Count: {}", cdapStreamName, actualMessages.size());
        try {

            for (String message : actualMessages) {
                workerContext.write(cdapStreamName, message);
            }

        } catch (IOException e) {
            metrics.count(CDAPMetricsConstants.TCA_SUBSCRIBER_FAILURE_TO_WRITE_TO_STREAM_METRIC, 1);
            final String errorMessage =
                    format("Error while DMaaP message router subscriber attempting to write to CDAP Stream: %s, " +
                            "Exception: %s", cdapStreamName, e);
            throw new DCAEAnalyticsRuntimeException(errorMessage, LOG, e);
        }

        LOG.debug("DMaaP MR Subscriber successfully finished writing messages to CDAP Stream: {}, Message count: {}",
                cdapStreamName, actualMessages.size());
    }


    /**
     * Get Subscriber response and records time taken to fetch messages. Returns Optional.None if Subscriber response
     * is null or response status code is not present
     *
     * @param subscriber - DMaaP Subscriber
     * @param metrics - CDAP Metrics collector
     *
     * @return - Optional of Subscriber Response
     */
    private static Optional<DMaaPMRSubscriberResponse> getSubscriberResponse(final DMaaPMRSubscriber subscriber,
                                                                             final Metrics metrics) {

        // Check how long it took for subscriber to respond
        final Stopwatch stopwatch = new Stopwatch();
        stopwatch.start();

        DMaaPMRSubscriberResponse subscriberResponse = null;
        // Fetch messages from DMaaP MR Topic
        try {
            subscriberResponse = subscriber.fetchMessages();
        } catch (DCAEAnalyticsRuntimeException e) {
            LOG.error("Error while fetching messages for DMaaP MR Topic: {}", e);
        }

        stopwatch.stop();
        final long subscriberResponseTimeMS = stopwatch.elapsedMillis();

        // If response is null is null or response code is null, unable to proceed nothing to do
        if (subscriberResponse == null || subscriberResponse.getResponseCode() == null) {
            LOG.error("Subscriber Response is null or subscriber Response code is null. Unable to proceed further...");
            return Optional.absent();
        }

        LOG.debug("Subscriber Response:{}, Subscriber HTTP Response Status Code {}, Subscriber Response Time(ms): {}",
                subscriberResponse, subscriberResponse.getResponseCode(), subscriberResponseTimeMS);

        // Record subscriber response time
        metrics.gauge(CDAPMetricsConstants.TCA_SUBSCRIBER_RESPONSE_TIME_MS_METRIC, subscriberResponseTimeMS);

        // Record all response count from subscriber
        metrics.count(CDAPMetricsConstants.TCA_SUBSCRIBER_ALL_RESPONSES_COUNT_METRIC, 1);

        return Optional.of(subscriberResponse);
    }

}