diff options
Diffstat (limited to 'src/main/java/org/onap/a1pesimulator/service/ves/RanVesHolder.java')
-rw-r--r-- | src/main/java/org/onap/a1pesimulator/service/ves/RanVesHolder.java | 161 |
1 files changed, 133 insertions, 28 deletions
diff --git a/src/main/java/org/onap/a1pesimulator/service/ves/RanVesHolder.java b/src/main/java/org/onap/a1pesimulator/service/ves/RanVesHolder.java index d53d8dd..f711347 100644 --- a/src/main/java/org/onap/a1pesimulator/service/ves/RanVesHolder.java +++ b/src/main/java/org/onap/a1pesimulator/service/ves/RanVesHolder.java @@ -13,6 +13,9 @@ package org.onap.a1pesimulator.service.ves; +import static java.util.Objects.isNull; +import static java.util.Objects.nonNull; + import java.text.MessageFormat; import java.util.Collection; import java.util.Map; @@ -20,61 +23,122 @@ import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledFuture; import java.util.function.BiFunction; -import org.onap.a1pesimulator.data.ves.Event; -import org.onap.a1pesimulator.data.ves.RanPeriodicVesEvent; + +import org.onap.a1pesimulator.data.ReportingMethodEnum; +import org.onap.a1pesimulator.data.RequestParameters; +import org.onap.a1pesimulator.data.fileready.RanPeriodicEvent; +import org.onap.a1pesimulator.data.fileready.RanPeriodicSendReport; +import org.onap.a1pesimulator.data.ves.VesEvent; +import org.onap.a1pesimulator.service.common.AbstractRanRunnable; +import org.onap.a1pesimulator.service.common.EventCustomizer; +import org.onap.a1pesimulator.service.fileready.RanFileReadyHolder; +import org.onap.a1pesimulator.service.fileready.RanSaveFileReadyRunnable; +import org.onap.a1pesimulator.service.fileready.RanSendReportsRunnable; import org.onap.a1pesimulator.service.ves.RanEventCustomizerFactory.Mode; -import org.onap.a1pesimulator.service.ves.RanSendVesRunnable.EventCustomizer; +import org.onap.a1pesimulator.util.VnfConfigReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.http.ResponseEntity; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import org.springframework.stereotype.Service; +import lombok.Getter; + @Service public class RanVesHolder { - private final Map<String, RanPeriodicVesEvent> periodicEventsCache = new ConcurrentHashMap<>(); + private static final Logger log = LoggerFactory.getLogger(RanVesHolder.class); + private final Map<String, RanPeriodicEvent> periodicEventsCache = new ConcurrentHashMap<>(); private final RanVesDataProvider vesDataProvider; private final RanEventCustomizerFactory eventCustomizerFactory; private final ThreadPoolTaskScheduler vesPmThreadPoolTaskScheduler; private final Collection<OnEventAction> onEventActions; + private final RanFileReadyHolder ranFileReadyHolder; private final RanVesSender vesSender; + private final VnfConfigReader vnfConfigReader; + private ThreadSendReportFunction threadSendReportFunction; - public RanVesHolder(ThreadPoolTaskScheduler vesPmThreadPoolTaskScheduler, RanVesSender vesSender, + public RanVesHolder(ThreadPoolTaskScheduler vesPmThreadPoolTaskScheduler, RanFileReadyHolder ranFileReadyHolder, RanVesSender vesSender, + VnfConfigReader vnfConfigReader, RanEventCustomizerFactory eventCustomizerFactory, RanVesDataProvider vesDataProvider, Collection<OnEventAction> onEventActions) { this.vesPmThreadPoolTaskScheduler = vesPmThreadPoolTaskScheduler; + this.ranFileReadyHolder = ranFileReadyHolder; this.vesSender = vesSender; + this.vnfConfigReader = vnfConfigReader; this.eventCustomizerFactory = eventCustomizerFactory; this.vesDataProvider = vesDataProvider; this.onEventActions = onEventActions; } - Map<String, RanPeriodicVesEvent> getPeriodicEventsCache() { + /** + * Thread for periodical sending of PM Bulk Files and fileReady Events + */ + private void startSendingReports() { + if (isNull(threadSendReportFunction) || !threadSendReportFunction.isProcessRunning()) { + int repPeriod = vnfConfigReader.getVnfConfig().getRepPeriod(); + threadSendReportFunction = new ThreadSendReportFunction(vesPmThreadPoolTaskScheduler, repPeriod, ranFileReadyHolder); + threadSendReportFunction.startEvent(); + log.info("Start sending reports every {} seconds", repPeriod); + } + } + + /** + * Stops sending the report after the last cell was stopped. It send the last report before stop completely + */ + private void stopSendingReports() { + if (nonNull(threadSendReportFunction) && !isAnyEventRunning()) { + threadSendReportFunction.getRanPeriodicVesEvent().getScheduledFuture().cancel(false); + sendLastReportAfterCancel(); + log.info("Stop sending reports every {} seconds", vnfConfigReader.getVnfConfig().getRepPeriod()); + } + } + + /** + * Sends the last report after all threads were stopped + */ + private void sendLastReportAfterCancel() { + log.trace("Send last report after report thread was canceled"); + ranFileReadyHolder.createPMBulkFileAndSendFileReadyMessage(); + } + + Map<String, RanPeriodicEvent> getPeriodicEventsCache() { return periodicEventsCache; } - ResponseEntity<String> startSendingVesEvents(String identifier, Event vesEvent, Integer interval) { + ResponseEntity<String> startSendingVesEvents(String identifier, VesEvent vesEvent, Integer interval, ReportingMethodEnum reportingMethod) { periodicEventsCache.compute(identifier, - new ThreadCacheUpdateFunction(vesPmThreadPoolTaskScheduler, vesEvent, interval, - eventCustomizerFactory.getEventCustomizer(vesEvent, Mode.REGULAR), onEventActions, vesSender)); + new ThreadCacheUpdateFunction(vesPmThreadPoolTaskScheduler, eventCustomizerFactory.getEventCustomizer(vesEvent, Mode.REGULAR), onEventActions, + ranFileReadyHolder, vesSender, RequestParameters.builder() + .vesEvent(vesEvent).identifier(identifier).reportingMethod(reportingMethod).interval(interval).build())); + if (ReportingMethodEnum.FILE_READY.equals(reportingMethod)) { + startSendingReports(); + } return ResponseEntity.accepted().body("VES Event sending started"); } - ResponseEntity<String> startSendingFailureVesEvents(String identifier, Event vesEvent) { + ResponseEntity<String> startSendingFailureVesEvents(String identifier, VesEvent vesEvent, ReportingMethodEnum reportingMethod) { - periodicEventsCache.compute(identifier, new ThreadCacheUpdateFunction(vesPmThreadPoolTaskScheduler, vesEvent, - vesDataProvider.getFailureVesInterval(), - eventCustomizerFactory.getEventCustomizer(vesEvent, Mode.FAILURE), onEventActions, vesSender)); + periodicEventsCache.compute(identifier, + new ThreadCacheUpdateFunction(vesPmThreadPoolTaskScheduler, eventCustomizerFactory.getEventCustomizer(vesEvent, Mode.FAILURE), onEventActions, + ranFileReadyHolder, + vesSender, RequestParameters.builder().vesEvent(vesEvent).identifier(identifier).interval(vesDataProvider.getFailureVesInterval()) + .reportingMethod(reportingMethod).build())); + if (ReportingMethodEnum.FILE_READY.equals(reportingMethod)) { + startSendingReports(); + } return ResponseEntity.accepted().body("Failure VES Event sending started"); } - Optional<RanPeriodicVesEvent> stopSendingVesEvents(String identifier) { - RanPeriodicVesEvent periodicEvent = periodicEventsCache.remove(identifier); + Optional<RanPeriodicEvent> stopSendingVesEvents(String identifier) { + RanPeriodicEvent periodicEvent = periodicEventsCache.remove(identifier); if (periodicEvent == null) { return Optional.empty(); } periodicEvent.getScheduledFuture().cancel(false); + stopSendingReports(); return Optional.of(periodicEvent); } @@ -86,7 +150,11 @@ public class RanVesHolder { return periodicEventsCache.containsKey(identifier); } - Event getEventStructure(String identifier) { + public boolean isAnyEventRunning() { + return !periodicEventsCache.isEmpty(); + } + + VesEvent getEventStructure(String identifier) { if (!periodicEventsCache.containsKey(identifier)) { throw new IllegalArgumentException( MessageFormat.format("Cannot find event for given source {0}", identifier)); @@ -95,39 +163,76 @@ public class RanVesHolder { } private static class ThreadCacheUpdateFunction - implements BiFunction<String, RanPeriodicVesEvent, RanPeriodicVesEvent> { + implements BiFunction<String, RanPeriodicEvent, RanPeriodicEvent> { private final Integer interval; private final ThreadPoolTaskScheduler vesPmThreadPoolTaskScheduler; - private final Event vesEvent; + private final VesEvent vesEvent; private final EventCustomizer eventCustomizer; private final Collection<OnEventAction> onEventActions; + private final RanFileReadyHolder fileReadyHolder; private final RanVesSender vesSender; + private final String cellId; + private final ReportingMethodEnum reportingMethod; - public ThreadCacheUpdateFunction(ThreadPoolTaskScheduler vesPmThreadPoolTaskScheduler, Event vesEvent, - Integer interval, EventCustomizer eventCustomizer, Collection<OnEventAction> onEventActions, - RanVesSender vesSender) { + public ThreadCacheUpdateFunction(ThreadPoolTaskScheduler vesPmThreadPoolTaskScheduler, EventCustomizer eventCustomizer, + Collection<OnEventAction> onEventActions, + RanFileReadyHolder fileReadyHolder, RanVesSender vesSender, RequestParameters requestParameters) { this.vesPmThreadPoolTaskScheduler = vesPmThreadPoolTaskScheduler; - this.vesEvent = vesEvent; - this.interval = interval; + this.vesEvent = requestParameters.getVesEvent(); + this.interval = requestParameters.getInterval(); this.eventCustomizer = eventCustomizer; this.onEventActions = onEventActions; + this.fileReadyHolder = fileReadyHolder; this.vesSender = vesSender; + this.cellId = requestParameters.getIdentifier(); + this.reportingMethod = requestParameters.getReportingMethod(); } @Override - public RanPeriodicVesEvent apply(String key, RanPeriodicVesEvent value) { + public RanPeriodicEvent apply(String key, RanPeriodicEvent value) { if (value != null) { // if thread is registered then cancel it and schedule a new one value.getScheduledFuture().cancel(false); } - RanSendVesRunnable sendVesRunnable = + AbstractRanRunnable ranRunnable = (ReportingMethodEnum.FILE_READY.equals(reportingMethod)) ? + new RanSaveFileReadyRunnable(fileReadyHolder, cellId, vesEvent, eventCustomizer, interval, onEventActions) : new RanSendVesRunnable(vesSender, vesEvent, eventCustomizer, onEventActions); + ScheduledFuture<?> scheduledFuture = - vesPmThreadPoolTaskScheduler.scheduleAtFixedRate(sendVesRunnable, interval * 1000L); - return RanPeriodicVesEvent.builder().event(vesEvent).interval(interval).scheduledFuture(scheduledFuture) - .sendVesRunnable(sendVesRunnable).build(); + vesPmThreadPoolTaskScheduler.scheduleAtFixedRate(ranRunnable, interval * 1000L); + return RanPeriodicEvent.builder().event(vesEvent).interval(interval).scheduledFuture(scheduledFuture) + .ranRunnable(ranRunnable).build(); } } + + @Getter + private static class ThreadSendReportFunction { + + protected final Integer interval; + protected final ThreadPoolTaskScheduler vesPmThreadPoolTaskScheduler; + protected RanPeriodicSendReport ranPeriodicVesEvent; + protected ScheduledFuture<?> scheduledFuture; + protected final RanFileReadyHolder ranFileReadyHolder; + + public ThreadSendReportFunction(ThreadPoolTaskScheduler vesPmThreadPoolTaskScheduler, Integer interval, RanFileReadyHolder ranFileReadyHolder) { + this.vesPmThreadPoolTaskScheduler = vesPmThreadPoolTaskScheduler; + this.interval = interval; + this.ranFileReadyHolder = ranFileReadyHolder; + } + + public void startEvent() { + RanSendReportsRunnable ranSendReportsRunnable = + new RanSendReportsRunnable(ranFileReadyHolder); + scheduledFuture = vesPmThreadPoolTaskScheduler.scheduleAtFixedRate(ranSendReportsRunnable, interval * 1000L); + this.ranPeriodicVesEvent = RanPeriodicSendReport.builder().interval(interval).scheduledFuture(scheduledFuture) + .ranSendReportsRunnable(ranSendReportsRunnable).build(); + } + + public boolean isProcessRunning() { + return (nonNull(scheduledFuture) && !(scheduledFuture.isCancelled() || scheduledFuture.isDone())); + } + } + } |