aboutsummaryrefslogtreecommitdiffstats
path: root/appc-event-listener/appc-event-listener-bundle/src/main/java/org/openecomp/appc/listener/CL/impl/ListenerImpl.java
blob: 3f0f8a11264041afdcc100ccd38a9cc84419f825 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
/*-
 * ============LICENSE_START=======================================================
 * openECOMP : APP-C
 * ================================================================================
 * Copyright (C) 2017 AT&T Intellectual Property. All rights
 * 						reserved.
 * ================================================================================
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * 
 *      http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 * ============LICENSE_END=========================================================
 */

package org.openecomp.appc.listener.CL.impl;

import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
import java.util.TimeZone;
import java.util.concurrent.RejectedExecutionException;

import org.openecomp.appc.listener.AbstractListener;
import org.openecomp.appc.listener.ListenerProperties;
import org.openecomp.appc.listener.CL.model.IncomingMessage;
import org.openecomp.appc.listener.CL.model.Status;

import com.att.eelf.configuration.EELFLogger;
import com.att.eelf.configuration.EELFManager;
import com.att.eelf.i18n.EELFResourceManager;

public class ListenerImpl extends AbstractListener {

    private final EELFLogger LOG = EELFManager.getInstance().getLogger(ListenerImpl.class);

    private long startTime = 0;

    public ListenerImpl(ListenerProperties props) {
        super(props);
        String url = props.getProperty("provider.url");
        LOG.info("DMaaP Provider Endpoint: " + url);
        ProviderOperations.setUrl(url);

        // Set Basic Auth
        String user = props.getProperty("provider.user");
        String pass = props.getProperty("provider.pass");
        ProviderOperations.setAuthentication(user, pass);
    }

    @Override
    public void run() {
        // Some vars for benchmarking
        startTime = System.currentTimeMillis();

        LOG.info("Running DMaaP Listener");

        while (run.get()) {
            // Only update if the queue is low. otherwise we read in more
            // messages than we need
            try {
                if (executor.getQueue().size() <= QUEUED_MIN) {
                    LOG.debug("DMaaP queue running low. Querying for more jobs");
                    List<IncomingMessage> messages = dmaap.getIncomingEvents(IncomingMessage.class, QUEUED_MAX);
                    LOG.debug(String.format("Read %d messages from dmaap", messages.size()));
                    for (IncomingMessage incoming : messages) {
                        // Acknowledge that we read the event
                        LOG.info("Acknowledging Message: " + incoming.getId());
                        dmaap.postStatus(incoming.toOutgoing(Status.PENDING, null).toString());
                    }
                    for (IncomingMessage incoming : messages) {
                        // Add to pool if still running
                        if (run.get()) {
                            LOG.info(String.format("Adding DMaaP message to pool queue [%s]", incoming.getId()));
                            if (incoming.isValid()) {
                                try {
                                    executor.execute(new WorkerImpl(incoming, dmaap));
                                } catch (RejectedExecutionException rejectEx) {
                                    LOG.error("Task Rejected: ", rejectEx);
                                }
                            } else {
                                // Badly formed message
                                LOG.error("Message was not valid. Rejecting");
                            }
                        } else {
                            LOG.info("Run stopped. Orphaning Message: " + incoming.getId());
                        }
                    }
                }
            } catch (Exception e) {
                LOG.error("Exception " + e.getClass().getSimpleName() + " caught in DMaaP listener");
                LOG.error(EELFResourceManager.format(e));
                LOG.error("DMaaP Listener logging and ignoring the exception, continue...");
            }
        }

        LOG.info("Stopping DMaaP Listener thread");

        // We've told the listener to stop
        // TODO - Should we:
        // 1) Put a message back on the queue indicating that APP-C never got to
        // the message
        // or
        // 2) Let downstream figure it out after timeout between PENDING and
        // ACTIVE messages
    }

    @Override
    public String getBenchmark() {
        long time = System.currentTimeMillis();
        DateFormat df = new SimpleDateFormat("HH:mm:ss");
        df.setTimeZone(TimeZone.getTimeZone("UTC"));
        String runningTime = df.format(new Date(time - startTime));

        String out = String.format("Running for %s and completed %d jobs using %d threads.", runningTime,
            executor.getCompletedTaskCount(), executor.getPoolSize());
        LOG.info("***BENCHMARK*** " + out);
        return out;
    }

}