aboutsummaryrefslogtreecommitdiffstats
path: root/dcae-analytics-cdap-common/src/main/java/org/openecomp/dcae/apod/analytics/cdap/common/utils/DMaaPMRUtils.java
blob: 950f8c8ef36d6d09e36be250aca30d6d7a762cdb (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
/*
 * ===============================LICENSE_START======================================
 *  dcae-analytics
 * ================================================================================
 *    Copyright © 2017 AT&T Intellectual Property. All rights reserved.
 * ================================================================================
 *  Licensed under the Apache License, Version 2.0 (the "License");
 *  you may not use this file except in compliance with the License.
 *   You may obtain a copy of the License at
 *
 *          http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 *  ============================LICENSE_END===========================================
 */

package org.openecomp.dcae.apod.analytics.cdap.common.utils;

import co.cask.cdap.api.metrics.Metrics;
import com.google.common.base.Optional;
import com.google.common.base.Stopwatch;
import org.openecomp.dcae.apod.analytics.cdap.common.CDAPMetricsConstants;
import org.openecomp.dcae.apod.analytics.common.exception.DCAEAnalyticsRuntimeException;
import org.openecomp.dcae.apod.analytics.common.utils.HTTPUtils;
import org.openecomp.dcae.apod.analytics.dmaap.domain.response.DMaaPMRSubscriberResponse;
import org.openecomp.dcae.apod.analytics.dmaap.service.subscriber.DMaaPMRSubscriber;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;

/**
 * Utility common methods for DMaaP MR functionality
 *
 * @author Rajiv Singla . Creation Date: 2/6/2017.
 */
public abstract class DMaaPMRUtils {

    private static final Logger LOG = LoggerFactory.getLogger(DMaaPMRUtils.class);

    private DMaaPMRUtils() {
        // private constructor
    }


    /**
     * Returns messages fetched from DMaaP MR Subscriber.
     *
     * @param subscriber DMaaP MR Subscriber instance
     * @param metrics CDAP metrics
     *
     * @return messages fetched from DMaaP MR topic
     */
    public static Optional<List<String>> getSubscriberMessages(final DMaaPMRSubscriber subscriber,
                                                               final Metrics metrics) {

        final Optional<DMaaPMRSubscriberResponse> subscriberResponseOptional =
                getSubscriberResponse(subscriber, metrics);

        // If response is not present, unable to proceed
        if (!subscriberResponseOptional.isPresent()) {
            return Optional.absent();
        }

        final DMaaPMRSubscriberResponse subscriberResponse = subscriberResponseOptional.get();

        // If response code return by the subscriber call is not successful, unable to do proceed
        if (!HTTPUtils.isSuccessfulResponseCode(subscriberResponse.getResponseCode())) {
            LOG.error("Subscriber was unable to fetch messages properly.Subscriber Response Code: {} " +
                    "Unable to proceed further....", subscriberResponse.getResponseCode());
            metrics.count(CDAPMetricsConstants.DMAAP_MR_SUBSCRIBER_UNSUCCESSFUL_RESPONSES_METRIC, 1);
            return Optional.absent();
        }

        LOG.debug("Subscriber HTTP Response Status Code match successful:  {}", subscriberResponse,
                HTTPUtils.HTTP_SUCCESS_STATUS_CODE);

        final List<String> actualMessages = subscriberResponse.getFetchedMessages();

        // If there are no message returned during from Subscriber, nothing to write to CDAP Stream
        if (actualMessages.isEmpty()) {
            LOG.debug("Subscriber Response has no messages. Nothing to write....");
            metrics.count(CDAPMetricsConstants.DMAAP_MR_SUBSCRIBER_RESPONSES_WITH_NO_MESSAGES_METRIC, 1);
            return Optional.absent();
        }

        LOG.debug("DMaaP MR Subscriber found new messages in DMaaP Topic. Message count: {}", actualMessages.size());
        metrics.count(CDAPMetricsConstants.DMAAP_MR_SUBSCRIBER_TOTAL_MESSAGES_PROCESSED_METRIC, actualMessages.size());

        return Optional.of(actualMessages);

    }


    /**
     * Get Subscriber response and records time taken to fetch messages. Returns Optional.None if Subscriber response
     * is null or response status code is not present
     *
     * @param subscriber - DMaaP Subscriber
     * @param metrics - CDAP Metrics collector
     *
     * @return - Optional of Subscriber Response
     */
    public static Optional<DMaaPMRSubscriberResponse> getSubscriberResponse(final DMaaPMRSubscriber subscriber,
                                                                            final Metrics metrics) {

        // Record all response count from subscriber
        metrics.count(CDAPMetricsConstants.DMAAP_MR_SUBSCRIBER_ALL_RESPONSES_COUNT_METRIC, 1);

        // Check how long it took for subscriber to respond
        final Stopwatch stopwatch = new Stopwatch();
        stopwatch.start();

        // Fetch messages from DMaaP MR Topic
        DMaaPMRSubscriberResponse subscriberResponse = null;
        try {
            subscriberResponse = subscriber.fetchMessages();
        } catch (DCAEAnalyticsRuntimeException e) {
            LOG.error("Error while fetching messages for DMaaP MR Topic: {}", e);
        }

        stopwatch.stop();
        final long subscriberResponseTimeMS = stopwatch.elapsedMillis();

        // If response is null is null or response code is null, unable to proceed nothing to do
        if (subscriberResponse == null || subscriberResponse.getResponseCode() == null) {
            LOG.error("Subscriber Response is null or subscriber Response code is null. Unable to proceed further...");
            return Optional.absent();
        }

        LOG.debug("Subscriber Response:{}, Subscriber HTTP Response Status Code {}, Subscriber Response Time(ms): {}",
                subscriberResponse, subscriberResponse.getResponseCode(), subscriberResponseTimeMS);

        // Record subscriber response time
        metrics.gauge(CDAPMetricsConstants.DMAAP_MR_SUBSCRIBER_RESPONSE_TIME_MS_METRIC, subscriberResponseTimeMS);

        return Optional.of(subscriberResponse);
    }

}