aboutsummaryrefslogtreecommitdiffstats
path: root/feature-pooling-messages/src/main/java/org/onap/policy/drools/pooling/state/ProcessingState.java
blob: 76914b756b37f747c985c286510ec932f5c3c6c6 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
/*
 * ============LICENSE_START=======================================================
 * ONAP
 * ================================================================================
 * Copyright (C) 2018, 2021 AT&T Intellectual Property. All rights reserved.
 * Modifications Copyright (C) 2024 Nordix Foundation.
 * ================================================================================
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 * ============LICENSE_END=========================================================
 */

package org.onap.policy.drools.pooling.state;

import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import lombok.Getter;
import lombok.NonNull;
import lombok.Setter;
import org.onap.policy.drools.pooling.PoolingManager;
import org.onap.policy.drools.pooling.message.BucketAssignments;
import org.onap.policy.drools.pooling.message.Leader;
import org.onap.policy.drools.pooling.message.Query;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Any state in which events are being processed locally and forwarded, as appropriate.
 */
@Setter
@Getter
public class ProcessingState extends State {

    private static final Logger logger = LoggerFactory.getLogger(ProcessingState.class);

    /**
     * Current known leader, never {@code null}.
     */
    @NonNull
    private String leader;

    /**
     * Constructor.
     *
     * @param mgr pooling manager
     * @param leader current known leader, which need not be the same as the assignment
     *        leader. Never {@code null}
     * @throws IllegalArgumentException if an argument is invalid
     */
    public ProcessingState(PoolingManager mgr, @NonNull String leader) {
        super(mgr);

        BucketAssignments assignments = mgr.getAssignments();

        if (assignments != null) {
            String[] arr = assignments.getHostArray();
            if (arr != null && arr.length == 0) {
                throw new IllegalArgumentException("zero-length bucket assignments");
            }
        }

        this.leader = leader;
    }

    /**
     * Generates an Identification message and goes to the query state.
     */
    @Override
    public State process(Query msg) {
        logger.info("received Query message on topic {}", getTopic());
        publish(makeIdentification());
        return goQuery();
    }

    /**
     * Sets the assignments.
     *
     * @param assignments new assignments, or {@code null}
     */
    protected final void setAssignments(BucketAssignments assignments) {
        if (assignments != null) {
            startDistributing(assignments);
        }
    }

    /**
     * Determines if this host is the leader, based on the current assignments.
     *
     * @return {@code true} if this host is the leader, {@code false} otherwise
     */
    public boolean isLeader() {
        return getHost().equals(leader);
    }

    /**
     * Becomes the leader. Publishes a Leader message and enters the {@link ActiveState}.
     *
     * @param alive hosts that are known to be alive
     *
     * @return the new state
     */
    protected State becomeLeader(SortedSet<String> alive) {
        String newLeader = getHost();

        if (!newLeader.equals(alive.first())) {
            throw new IllegalArgumentException(newLeader + " cannot replace " + alive.first());
        }

        var msg = makeLeader(alive);
        logger.info("{}/{} hosts have an assignment", msg.getAssignments().getAllHosts().size(), alive.size());

        publish(msg);

        return goActive(msg.getAssignments());
    }

    /**
     * Makes a leader message. Assumes "this" host is the leader, and thus appears as the
     * first host in the set of hosts that are still alive.
     *
     * @param alive hosts that are known to be alive
     *
     * @return a new message
     */
    private Leader makeLeader(Set<String> alive) {
        return new Leader(getHost(), makeAssignments(alive));
    }

    /**
     * Makes a set of bucket assignments. Assumes "this" host is the leader.
     *
     * @param alive hosts that are known to be alive
     *
     * @return a new set of bucket assignments
     */
    private BucketAssignments makeAssignments(Set<String> alive) {

        // make a working array from the CURRENT assignments
        String[] bucket2host = makeBucketArray();

        TreeSet<String> avail = new TreeSet<>(alive);

        // if we have more hosts than buckets, then remove the extra hosts
        removeExcessHosts(bucket2host.length, avail);

        // create a host bucket for each available host
        Map<String, HostBucket> host2hb = new HashMap<>();
        avail.forEach(host -> host2hb.put(host, new HostBucket(host)));

        // add bucket indices to the appropriate host bucket
        addIndicesToHostBuckets(bucket2host, host2hb);

        // convert the collection back to an array
        fillArray(host2hb.values(), bucket2host);

        // update bucket2host with new assignments
        rebalanceBuckets(host2hb.values(), bucket2host);

        return new BucketAssignments(bucket2host);
    }

    /**
     * Makes a bucket array, copying the current assignments, if available.
     *
     * @return a new bucket array
     */
    private String[] makeBucketArray() {
        BucketAssignments asgn = getAssignments();
        if (asgn == null) {
            return new String[BucketAssignments.MAX_BUCKETS];
        }

        String[] oldArray = asgn.getHostArray();
        if (oldArray.length == 0) {
            return new String[BucketAssignments.MAX_BUCKETS];
        }

        var newArray = new String[oldArray.length];
        System.arraycopy(oldArray, 0, newArray, 0, oldArray.length);

        return newArray;
    }

    /**
     * Removes excess hosts from the set of available hosts. Assumes "this" host is the
     * leader, and thus appears as the first host in the set.
     *
     * @param maxHosts maximum number of hosts to be retained
     * @param avail available hosts
     */
    private void removeExcessHosts(int maxHosts, SortedSet<String> avail) {
        while (avail.size() > maxHosts) {
            /*
             * Don't remove this host, as it's the leader. Since the leader is always at
             * the front of the sorted set, we'll just pick off hosts from the back of the
             * set.
             */
            String host = avail.last();
            avail.remove(host);

            logger.warn("not using extra host {} for topic {}", host, getTopic());
        }
    }

    /**
     * Adds bucket indices to {@link HostBucket} objects. Buckets that are unassigned or
     * assigned to a host that does not appear within the map are re-assigned to a host
     * that appears within the map.
     *
     * @param bucket2host bucket assignments
     * @param host2data maps a host name to its {@link HostBucket}
     */
    private void addIndicesToHostBuckets(String[] bucket2host, Map<String, HostBucket> host2data) {
        LinkedList<Integer> nullBuckets = new LinkedList<>();

        for (var x = 0; x < bucket2host.length; ++x) {
            String host = bucket2host[x];
            if (host == null) {
                nullBuckets.add(x);

            } else {
                HostBucket hb = host2data.get(host);
                if (hb == null) {
                    nullBuckets.add(x);

                } else {
                    hb.add(x);
                }
            }
        }

        // assign the null buckets to other hosts
        assignNullBuckets(nullBuckets, host2data.values());
    }

    /**
     * Assigns null buckets (i.e., those having no assignment) to available hosts.
     *
     * @param buckets buckets that still need to be assigned to hosts
     * @param coll collection of current host-bucket assignments
     */
    private void assignNullBuckets(Queue<Integer> buckets, Collection<HostBucket> coll) {
        // assign null buckets to the hosts with the fewest buckets
        TreeSet<HostBucket> assignments = new TreeSet<>(coll);

        for (Integer index : buckets) {
            // add it to the host with the shortest bucket list
            HostBucket newhb = assignments.pollFirst();
            assert newhb != null;
            newhb.add(index);

            // put the item back into the queue, with its new count
            assignments.add(newhb);
        }
    }

    /**
     * Re-balances the buckets, taking from those that have a larger count and giving to
     * those that have a smaller count. Populates an output array with the new
     * assignments.
     *
     * @param coll current bucket assignment
     * @param bucket2host array to be populated with the new assignments
     */
    private void rebalanceBuckets(Collection<HostBucket> coll, String[] bucket2host) {
        if (coll.size() <= 1) {
            // only one hosts - nothing to rebalance
            return;
        }

        TreeSet<HostBucket> assignments = new TreeSet<>(coll);

        for (;;) {
            HostBucket smaller = assignments.pollFirst();
            HostBucket larger = assignments.pollLast();

            assert larger != null && smaller != null;
            if (larger.size() - smaller.size() <= 1) {
                // it's as balanced as it will get
                break;
            }

            // move the bucket from the larger to the smaller
            Integer bucket = larger.remove();
            smaller.add(bucket);

            bucket2host[bucket] = smaller.host;

            // put the items back, with their new counts
            assignments.add(larger);
            assignments.add(smaller);
        }

    }

    /**
     * Fills the array with the host assignments.
     *
     * @param coll the host assignments
     * @param bucket2host array to be filled
     */
    private void fillArray(Collection<HostBucket> coll, String[] bucket2host) {
        for (HostBucket hb : coll) {
            for (Integer index : hb.buckets) {
                bucket2host[index] = hb.host;
            }
        }
    }

    /**
     * Tracks buckets that have been assigned to a host.
     */
    protected static class HostBucket implements Comparable<HostBucket> {
        /**
         * Host to which the buckets have been assigned.
         */
        private String host;

        /**
         * Buckets that have been assigned to this host.
         */
        private Queue<Integer> buckets = new LinkedList<>();

        /**
         * Constructor.
         *
         * @param host host
         */
        public HostBucket(String host) {
            this.host = host;
        }

        /**
         * Removes the next bucket from the list.
         *
         * @return the next bucket
         */
        public final Integer remove() {
            return buckets.remove();
        }

        /**
         * Adds a bucket to the list.
         *
         * @param index index of the bucket to add
         */
        public final void add(Integer index) {
            buckets.add(index);
        }

        /**
         * Size.
         *
         * @return the number of buckets assigned to this host
         */
        public final int size() {
            return buckets.size();
        }

        /**
         * Compares host buckets, first by the number of buckets, and then by the host
         * name.
         */
        @Override
        public final int compareTo(HostBucket other) {
            int diff = buckets.size() - other.buckets.size();
            if (diff == 0) {
                diff = host.compareTo(other.host);
            }
            return diff;
        }

        @Override
        public final int hashCode() {
            throw new UnsupportedOperationException("HostBucket cannot be hashed");
        }

        @Override
        public final boolean equals(Object obj) {
            throw new UnsupportedOperationException("cannot compare HostBuckets");
        }
    }
}