summaryrefslogtreecommitdiffstats
path: root/src/main/java/org/openecomp/sparky/synchronizer/SyncController.java
blob: 85cbeb5915c7bbd2d2f0afaaf1bb73e362d8f97f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
/**
 * ============LICENSE_START===================================================
 * SPARKY (AAI UI service)
 * ============================================================================
 * Copyright © 2017 AT&T Intellectual Property.
 * Copyright © 2017 Amdocs
 * All rights reserved.
 * ============================================================================
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 * ============LICENSE_END=====================================================
 *
 * ECOMP and OpenECOMP are trademarks
 * and service marks of AT&T Intellectual Property.
 */

package org.openecomp.sparky.synchronizer;

import static java.util.concurrent.CompletableFuture.supplyAsync;

import java.util.Collection;
import java.util.LinkedHashSet;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;

import org.openecomp.cl.api.Logger;
import org.openecomp.cl.eelf.LoggerFactory;
import org.openecomp.sparky.logging.AaiUiMsgs;
import org.openecomp.sparky.synchronizer.enumeration.SynchronizerState;
import org.openecomp.sparky.util.NodeUtils;

/**
 * The Class SyncController.
 *
 * @author davea.
 */
public class SyncController {
  private static final Logger LOG = LoggerFactory.getInstance().getLogger(SyncController.class);

  /**
   * The Enum InternalState.
   */
  private enum InternalState {
    IDLE, PRE_SYNC, SYNC_OPERATION, SELECTIVE_DELETE, ABORTING_SYNC, REPAIRING_INDEX, POST_SYNC,
    TEST_INDEX_INTEGRITY, GENERATE_FINAL_REPORT
  }

  /**
   * The Enum SyncActions.
   */
  public enum SyncActions {
    SYNCHRONIZE, REPAIR_INDEX, INDEX_INTEGRITY_VALIDATION_COMPLETE, PRE_SYNC_COMPLETE,
    SYNC_COMPLETE, SYNC_ABORTED, SYNC_FAILURE, POST_SYNC_COMPLETE, PURGE_COMPLETE, REPORT_COMPLETE
  }

  private Collection<IndexSynchronizer> registeredSynchronizers;
  private Collection<IndexValidator> registeredIndexValidators;
  private Collection<IndexCleaner> registeredIndexCleaners;
  private InternalState currentInternalState;
  private ExecutorService syncControllerExecutor;
  private ExecutorService statReporterExecutor;
  private final String controllerName;

  /**
   * Instantiates a new sync controller.
   *
   * @param name the name
   * @throws Exception the exception
   */
  public SyncController(String name) throws Exception {

    this.controllerName = name;
    /*
     * Does LHS result in a non-duplicated object collection?? What happens if you double-add an
     * object?
     */

    registeredSynchronizers = new LinkedHashSet<IndexSynchronizer>();
    registeredIndexValidators = new LinkedHashSet<IndexValidator>();
    registeredIndexCleaners = new LinkedHashSet<IndexCleaner>();

    this.syncControllerExecutor = NodeUtils.createNamedExecutor("SyncController", 5, LOG);
    this.statReporterExecutor = NodeUtils.createNamedExecutor("StatReporter", 1, LOG);

    this.currentInternalState = InternalState.IDLE;
  }
  
  /**
   * Change internal state.
   *
   * @param newState the new state
   * @param causedByAction the caused by action
   */
  private void changeInternalState(InternalState newState, SyncActions causedByAction) {
    LOG.info(AaiUiMsgs.SYNC_INTERNAL_STATE_CHANGED, controllerName,
        currentInternalState.toString(), newState.toString(), causedByAction.toString());

    this.currentInternalState = newState;

    performStateAction();
  }

  public String getControllerName() {
    return controllerName;
  }

  /**
   * Perform action.
   *
   * @param requestedAction the requested action
   */
  public void performAction(SyncActions requestedAction) {

    if (currentInternalState == InternalState.IDLE) {

      try {
        switch (requestedAction) {
          case SYNCHRONIZE:
            changeInternalState(InternalState.TEST_INDEX_INTEGRITY, requestedAction);
            break;

          default:
            break;
        }

      } catch (Exception exc) {
        String message = "An error occurred while performing action = " + requestedAction
            + ". Error = " + exc.getMessage();
        LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
      }
    } else {
      LOG.error(AaiUiMsgs.SYNC_NOT_VALID_STATE_DURING_REQUEST, currentInternalState.toString());
    }
  }

  /**
   * Perform state action.
   */
  private void performStateAction() {

    try {
      switch (currentInternalState) {

        case TEST_INDEX_INTEGRITY:
          performIndexIntegrityValidation();
          break;

        case PRE_SYNC:
          performPreSyncCleanupCollection();
          break;

        case SYNC_OPERATION:
          performSynchronization();
          break;

        case POST_SYNC:
          performIndexSyncPostCollection();
          changeInternalState(InternalState.SELECTIVE_DELETE, SyncActions.POST_SYNC_COMPLETE);
          break;

        case SELECTIVE_DELETE:
          performIndexCleanup();
          changeInternalState(InternalState.GENERATE_FINAL_REPORT, SyncActions.PURGE_COMPLETE);
          break;

        case GENERATE_FINAL_REPORT:

          dumpStatReport(true);
          clearCaches();
          changeInternalState(InternalState.IDLE, SyncActions.REPORT_COMPLETE);
          break;

        case ABORTING_SYNC:
          performSyncAbort();
          break;

        default:
          break;
      }
    } catch (Exception exc) {
      String message = "Caught an error which performing action. Error = " + exc.getMessage();
      LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
    }
  }

  /**
   * Register entity synchronizer.
   *
   * @param entitySynchronizer the entity synchronizer
   */
  public void registerEntitySynchronizer(IndexSynchronizer entitySynchronizer) {

    String indexName = entitySynchronizer.getIndexName();

    if (indexName != null) {
      registeredSynchronizers.add(entitySynchronizer);
    } else {
      String message = "Failed to register entity synchronizer because index name is null";
      LOG.error(AaiUiMsgs.FAILED_TO_REGISTER_DUE_TO_NULL, message);
    }

  }

  /**
   * Register index validator.
   *
   * @param indexValidator the index validator
   */
  public void registerIndexValidator(IndexValidator indexValidator) {

    String indexName = indexValidator.getIndexName();

    if (indexName != null) {
      registeredIndexValidators.add(indexValidator);
    } else {
      String message = "Failed to register index validator because index name is null";
      LOG.error(AaiUiMsgs.FAILED_TO_REGISTER_DUE_TO_NULL, message);
    }

  }

  /**
   * Register index cleaner.
   *
   * @param indexCleaner the index cleaner
   */
  public void registerIndexCleaner(IndexCleaner indexCleaner) {

    String indexName = indexCleaner.getIndexName();

    if (indexName != null) {
      registeredIndexCleaners.add(indexCleaner);
    } else {
      String message = "Failed to register index cleaner because index name is null";
      LOG.error(AaiUiMsgs.FAILED_TO_REGISTER_DUE_TO_NULL, message);
    }
  }

  /*
   * State machine should drive our flow dosync just dispatches an action and the state machine
   * determines what is in play and what is next
   */

  /**
   * Dump stat report.
   *
   * @param showFinalReport the show final report
   */
  private void dumpStatReport(boolean showFinalReport) {

    for (IndexSynchronizer synchronizer : registeredSynchronizers) {

      String statReport = synchronizer.getStatReport(showFinalReport);

      if (statReport != null) {
        LOG.info(AaiUiMsgs.INFO_GENERIC, statReport);
      }
    }
  }

  /**
   * Clear caches.
   */
  private void clearCaches() {

    /*
     * Any entity caches that were built as part of the sync operation should be cleared to save
     * memory. The original intent of the caching was to provide a short-lived cache to satisfy
     * entity requests from multiple synchronizers yet minimizing interactions with the AAI.
     */

    for (IndexSynchronizer synchronizer : registeredSynchronizers) {
      synchronizer.clearCache();
    }
  }

  /**
   * Perform pre sync cleanup collection.
   */
  private void performPreSyncCleanupCollection() {

    /*
     * ask the index cleaners to collect the their pre-sync object id collections
     */

    for (IndexCleaner cleaner : registeredIndexCleaners) {
      cleaner.populatePreOperationCollection();
    }

    changeInternalState(InternalState.SYNC_OPERATION, SyncActions.PRE_SYNC_COMPLETE);

  }

  /**
   * Perform index sync post collection.
   */
  private void performIndexSyncPostCollection() {

    /*
     * ask the entity purgers to collect the their pre-sync object id collections
     */

    for (IndexCleaner cleaner : registeredIndexCleaners) {
      cleaner.populatePostOperationCollection();
    }

  }

  /**
   * Perform index cleanup.
   */
  private void performIndexCleanup() {

    /*
     * ask the entity purgers to collect the their pre-sync object id collections
     */

    for (IndexCleaner cleaner : registeredIndexCleaners) {
      cleaner.performCleanup();
    }

  }

  /**
   * Perform sync abort.
   */
  private void performSyncAbort() {
    changeInternalState(InternalState.IDLE, SyncActions.SYNC_ABORTED);
  }

  /**
   * Perform index integrity validation.
   */
  private void performIndexIntegrityValidation() {

    /*
     * loop through registered index validators and test and fix, if needed
     */

    for (IndexValidator validator : registeredIndexValidators) {
      try {
        if (!validator.exists()) {
          validator.createOrRepair();
        }
      } catch (Exception exc) {
        String message = "Index validator caused an error = " + exc.getMessage();
        LOG.error(AaiUiMsgs.ERROR_GENERIC, message);
      }
    }

    changeInternalState(InternalState.PRE_SYNC, SyncActions.INDEX_INTEGRITY_VALIDATION_COMPLETE);

  }

  /**
   * Shutdown.
   */
  public void shutdown() {

    this.syncControllerExecutor.shutdown();
    for (IndexSynchronizer synchronizer : registeredSynchronizers) {

      try {
        synchronizer.shutdown();
      } catch (Exception exc) {
        LOG.error(AaiUiMsgs.ERROR_GENERIC,
            "Synchronizer shutdown caused an error = " + exc.getMessage());
      }

    }
    this.statReporterExecutor.shutdown();
  }

  /*
   * Need some kind of task running that responds to a transient boolean to kill it or we just stop
   * the executor that it is in?
   */



  /**
   * Perform synchronization.
   */
  private void performSynchronization() {

    /*
     * Get all the synchronizers running in parallel
     */

    for (IndexSynchronizer synchronizer : registeredSynchronizers) {
      supplyAsync(new Supplier<Void>() {

        @Override
        public Void get() {

          synchronizer.doSync();
          return null;
        }

      }, this.syncControllerExecutor).whenComplete((result, error) -> {

        /*
         * We don't bother checking the result, because it will always be null as the doSync() is
         * non-blocking.
         */

        if (error != null) {
          LOG.error(AaiUiMsgs.ERROR_GENERIC,
              "doSync operation failed with an error = " + error.getMessage());
        }
      });
    }

    boolean allDone = false;
    long nextReportTimeStampInMs = System.currentTimeMillis() + 30000L;

    while (!allDone) {

      // allDone = false;

      int totalFinished = 0;

      for (IndexSynchronizer synchronizer : registeredSynchronizers) {
        if (System.currentTimeMillis() > nextReportTimeStampInMs) {

          nextReportTimeStampInMs = System.currentTimeMillis() + 30000L;

          String statReport = synchronizer.getStatReport(false);

          if (statReport != null) {
            LOG.info(AaiUiMsgs.INFO_GENERIC, statReport);
          }
        }

        if (synchronizer.getState() == SynchronizerState.IDLE) {
          totalFinished++;
        }
      }

      allDone = (totalFinished == registeredSynchronizers.size());

      try {
        Thread.sleep(250);
      } catch (InterruptedException exc) {
        LOG.error(AaiUiMsgs.ERROR_GENERIC,
            "An error occurred while waiting for sync to complete. Error = " + exc.getMessage());
      }

    }

    changeInternalState(InternalState.POST_SYNC, SyncActions.SYNC_COMPLETE);

  }

  public SynchronizerState getState() {

    switch (currentInternalState) {

      case IDLE: {
        return SynchronizerState.IDLE;
      }

      default: {
        return SynchronizerState.PERFORMING_SYNCHRONIZATION;

      }
    }

  }

}