diff options
Diffstat (limited to 'src/main')
-rw-r--r-- | src/main/java/org/onap/a1pesimulator/service/fileready/RanFileReadyHolder.java | 83 | ||||
-rw-r--r-- | src/main/java/org/onap/a1pesimulator/service/ves/RanVesHolder.java | 14 |
2 files changed, 70 insertions, 27 deletions
diff --git a/src/main/java/org/onap/a1pesimulator/service/fileready/RanFileReadyHolder.java b/src/main/java/org/onap/a1pesimulator/service/fileready/RanFileReadyHolder.java index 4191ae7..cf3cf3a 100644 --- a/src/main/java/org/onap/a1pesimulator/service/fileready/RanFileReadyHolder.java +++ b/src/main/java/org/onap/a1pesimulator/service/fileready/RanFileReadyHolder.java @@ -14,11 +14,14 @@ package org.onap.a1pesimulator.service.fileready; import static java.util.Objects.isNull; +import static java.util.Objects.nonNull; import java.time.ZonedDateTime; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import org.onap.a1pesimulator.data.fileready.EventMemoryHolder; import org.onap.a1pesimulator.data.fileready.FileData; @@ -40,7 +43,7 @@ public class RanFileReadyHolder { private static final Logger log = LoggerFactory.getLogger(RanFileReadyHolder.class); - private List<EventMemoryHolder> collectedEvents; + private Map<String, List<EventMemoryHolder>> collectedEventsByCell; private final RanVesSender ranVesSender; private final FtpServerService ftpServerService; private final PMBulkFileService xmlFileService; @@ -55,31 +58,58 @@ public class RanFileReadyHolder { } /** - * Run entire process: PM Bulk File creation-> upload to FTP -> delete temp PM Bulk File -> create File Ready Event - > send it to VES Collector - * collectedEvents are synchronized to not be updated by other threads during PM Bulk File creation + * Run entire process for all cells collectedEventsByCell are synchronized to not be updated by other threads during PM Bulk File creation */ public void createPMBulkFileAndSendFileReadyMessage() { - synchronized (getCollectedEvents()) { - Mono.justOrEmpty(getCollectedEvents()) - .filter(this::areSomeEventsStored) - .flatMap(xmlFileService::generatePMBulkFileXml) - .map(ftpServerService::uploadFileToFtp) - .flatMap(fileReadyEventService::createFileReadyEventAndDeleteTmpFile) - .doOnNext(this::sendEventToVesCollector) - .subscribe(fileData -> informAboutSuccess(), this::informAboutError); + synchronized (getCollectedEventsByCell()) { + getCollectedEventsByCell().forEach((cellId, cellEventList) -> createPMBulkFileAndSendFileReadyMessageForCell(cellEventList)); } } /** - * Adds current event to the memory, which is List<EventMemoryHolder> + * Run entire process for one cell collectedEventsByCell are synchronized to not be updated by other threads during PM Bulk File creation + * + * @param cellId cell identifier + */ + public void createPMBulkFileAndSendFileReadyMessageForCellId(String cellId) { + synchronized (getCollectedEventsByCell()) { + createPMBulkFileAndSendFileReadyMessageForCell(getCollectedEventsForCellId(cellId)); + } + } + + /** + * Run entire process for one cell: PM Bulk File creation-> upload to FTP -> delete temp PM Bulk File -> create File Ready Event - > send it to VES + * Collector. + * + * @param events list of events for one cell + */ + public void createPMBulkFileAndSendFileReadyMessageForCell(List<EventMemoryHolder> events) { + Mono.justOrEmpty(events) + .filter(this::areSomeEventsStored) + .flatMap(xmlFileService::generatePMBulkFileXml) + .map(ftpServerService::uploadFileToFtp) + .flatMap(fileReadyEventService::createFileReadyEventAndDeleteTmpFile) + .doOnNext(this::sendEventToVesCollector) + .subscribe(fileData -> informAboutSuccess(), this::informAboutError); + } + + /** + * Adds current event to the memory by cell, which is Map<String,List<EventMemoryHolder>> * * @param vesEvent event from specific cell * @throws VesBrokerException in case of any problem with adding to List, it throws an exception */ public void saveEventToMemory(VesEvent vesEvent, String cellId, String jobId, Integer granPeriod) throws VesBrokerException { try { - getCollectedEvents().add(new EventMemoryHolder(cellId, jobId, granPeriod, ZonedDateTime.now(), vesEvent)); - log.trace("Saving VES event for cell {} with granularity period {} and sequence number {}", cellId, granPeriod, getCollectedEvents().size()); + Map<String, List<EventMemoryHolder>> events = getCollectedEventsByCell(); + if (events.containsKey(cellId)) { + events.get(cellId).add(new EventMemoryHolder(cellId, jobId, granPeriod, ZonedDateTime.now(), vesEvent)); + } else { + List<EventMemoryHolder> cellEvents = Collections.synchronizedList( + new ArrayList<>(List.of(new EventMemoryHolder(cellId, jobId, granPeriod, ZonedDateTime.now(), vesEvent)))); + events.put(cellId, cellEvents); + } + log.trace("Saving VES event for cell {} with granularity period {} and sequence number {}", cellId, granPeriod, events.get(cellId).size()); } catch (Exception e) { String errorMsg = "Failed to save VES event to memory with exception:" + e; throw new VesBrokerException(errorMsg); @@ -122,14 +152,27 @@ public class RanFileReadyHolder { } /** - * Factory to get List<EventMemoryHolder> + * Factory to get Map<String,List<EventMemoryHolder>> + * + * @return existing or newly created Map<String,List<EventMemoryHolder>> + */ + public Map<String, List<EventMemoryHolder>> getCollectedEventsByCell() { + if (isNull(collectedEventsByCell)) { + collectedEventsByCell = Collections.synchronizedMap(new HashMap<>()); + } + return collectedEventsByCell; + } + + /** + * Get list of events for specific CellId * - * @return existing or newly created List<EventMemoryHolder> + * @param cellId cell identifier + * @return list of events */ - public List<EventMemoryHolder> getCollectedEvents() { - if (isNull(collectedEvents)) { - collectedEvents = Collections.synchronizedList(new ArrayList<>()); + public List<EventMemoryHolder> getCollectedEventsForCellId(String cellId) { + if (nonNull(collectedEventsByCell) && collectedEventsByCell.containsKey(cellId)) { + return collectedEventsByCell.get(cellId); } - return collectedEvents; + return new ArrayList<>(); } } diff --git a/src/main/java/org/onap/a1pesimulator/service/ves/RanVesHolder.java b/src/main/java/org/onap/a1pesimulator/service/ves/RanVesHolder.java index f711347..131f792 100644 --- a/src/main/java/org/onap/a1pesimulator/service/ves/RanVesHolder.java +++ b/src/main/java/org/onap/a1pesimulator/service/ves/RanVesHolder.java @@ -87,20 +87,20 @@ public class RanVesHolder { /** * Stops sending the report after the last cell was stopped. It send the last report before stop completely */ - private void stopSendingReports() { + private void stopSendingReports(String cellId) { + sendLastReport(cellId); if (nonNull(threadSendReportFunction) && !isAnyEventRunning()) { threadSendReportFunction.getRanPeriodicVesEvent().getScheduledFuture().cancel(false); - sendLastReportAfterCancel(); log.info("Stop sending reports every {} seconds", vnfConfigReader.getVnfConfig().getRepPeriod()); } } /** - * Sends the last report after all threads were stopped + * Sends the last report after specific cell was stopped */ - private void sendLastReportAfterCancel() { - log.trace("Send last report after report thread was canceled"); - ranFileReadyHolder.createPMBulkFileAndSendFileReadyMessage(); + private void sendLastReport(String cellId) { + log.trace("Send last report after stop for cell: {}", cellId); + ranFileReadyHolder.createPMBulkFileAndSendFileReadyMessageForCellId(cellId); } Map<String, RanPeriodicEvent> getPeriodicEventsCache() { @@ -138,7 +138,7 @@ public class RanVesHolder { return Optional.empty(); } periodicEvent.getScheduledFuture().cancel(false); - stopSendingReports(); + stopSendingReports(identifier); return Optional.of(periodicEvent); } |