diff options
Diffstat (limited to 'src/main/java/org/onap/a1pesimulator/service/fileready/RanFileReadyHolder.java')
-rw-r--r-- | src/main/java/org/onap/a1pesimulator/service/fileready/RanFileReadyHolder.java | 83 |
1 files changed, 63 insertions, 20 deletions
diff --git a/src/main/java/org/onap/a1pesimulator/service/fileready/RanFileReadyHolder.java b/src/main/java/org/onap/a1pesimulator/service/fileready/RanFileReadyHolder.java index 4191ae7..cf3cf3a 100644 --- a/src/main/java/org/onap/a1pesimulator/service/fileready/RanFileReadyHolder.java +++ b/src/main/java/org/onap/a1pesimulator/service/fileready/RanFileReadyHolder.java @@ -14,11 +14,14 @@ package org.onap.a1pesimulator.service.fileready; import static java.util.Objects.isNull; +import static java.util.Objects.nonNull; import java.time.ZonedDateTime; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import org.onap.a1pesimulator.data.fileready.EventMemoryHolder; import org.onap.a1pesimulator.data.fileready.FileData; @@ -40,7 +43,7 @@ public class RanFileReadyHolder { private static final Logger log = LoggerFactory.getLogger(RanFileReadyHolder.class); - private List<EventMemoryHolder> collectedEvents; + private Map<String, List<EventMemoryHolder>> collectedEventsByCell; private final RanVesSender ranVesSender; private final FtpServerService ftpServerService; private final PMBulkFileService xmlFileService; @@ -55,31 +58,58 @@ public class RanFileReadyHolder { } /** - * Run entire process: PM Bulk File creation-> upload to FTP -> delete temp PM Bulk File -> create File Ready Event - > send it to VES Collector - * collectedEvents are synchronized to not be updated by other threads during PM Bulk File creation + * Run entire process for all cells collectedEventsByCell are synchronized to not be updated by other threads during PM Bulk File creation */ public void createPMBulkFileAndSendFileReadyMessage() { - synchronized (getCollectedEvents()) { - Mono.justOrEmpty(getCollectedEvents()) - .filter(this::areSomeEventsStored) - .flatMap(xmlFileService::generatePMBulkFileXml) - .map(ftpServerService::uploadFileToFtp) - .flatMap(fileReadyEventService::createFileReadyEventAndDeleteTmpFile) - .doOnNext(this::sendEventToVesCollector) - .subscribe(fileData -> informAboutSuccess(), this::informAboutError); + synchronized (getCollectedEventsByCell()) { + getCollectedEventsByCell().forEach((cellId, cellEventList) -> createPMBulkFileAndSendFileReadyMessageForCell(cellEventList)); } } /** - * Adds current event to the memory, which is List<EventMemoryHolder> + * Run entire process for one cell collectedEventsByCell are synchronized to not be updated by other threads during PM Bulk File creation + * + * @param cellId cell identifier + */ + public void createPMBulkFileAndSendFileReadyMessageForCellId(String cellId) { + synchronized (getCollectedEventsByCell()) { + createPMBulkFileAndSendFileReadyMessageForCell(getCollectedEventsForCellId(cellId)); + } + } + + /** + * Run entire process for one cell: PM Bulk File creation-> upload to FTP -> delete temp PM Bulk File -> create File Ready Event - > send it to VES + * Collector. + * + * @param events list of events for one cell + */ + public void createPMBulkFileAndSendFileReadyMessageForCell(List<EventMemoryHolder> events) { + Mono.justOrEmpty(events) + .filter(this::areSomeEventsStored) + .flatMap(xmlFileService::generatePMBulkFileXml) + .map(ftpServerService::uploadFileToFtp) + .flatMap(fileReadyEventService::createFileReadyEventAndDeleteTmpFile) + .doOnNext(this::sendEventToVesCollector) + .subscribe(fileData -> informAboutSuccess(), this::informAboutError); + } + + /** + * Adds current event to the memory by cell, which is Map<String,List<EventMemoryHolder>> * * @param vesEvent event from specific cell * @throws VesBrokerException in case of any problem with adding to List, it throws an exception */ public void saveEventToMemory(VesEvent vesEvent, String cellId, String jobId, Integer granPeriod) throws VesBrokerException { try { - getCollectedEvents().add(new EventMemoryHolder(cellId, jobId, granPeriod, ZonedDateTime.now(), vesEvent)); - log.trace("Saving VES event for cell {} with granularity period {} and sequence number {}", cellId, granPeriod, getCollectedEvents().size()); + Map<String, List<EventMemoryHolder>> events = getCollectedEventsByCell(); + if (events.containsKey(cellId)) { + events.get(cellId).add(new EventMemoryHolder(cellId, jobId, granPeriod, ZonedDateTime.now(), vesEvent)); + } else { + List<EventMemoryHolder> cellEvents = Collections.synchronizedList( + new ArrayList<>(List.of(new EventMemoryHolder(cellId, jobId, granPeriod, ZonedDateTime.now(), vesEvent)))); + events.put(cellId, cellEvents); + } + log.trace("Saving VES event for cell {} with granularity period {} and sequence number {}", cellId, granPeriod, events.get(cellId).size()); } catch (Exception e) { String errorMsg = "Failed to save VES event to memory with exception:" + e; throw new VesBrokerException(errorMsg); @@ -122,14 +152,27 @@ public class RanFileReadyHolder { } /** - * Factory to get List<EventMemoryHolder> + * Factory to get Map<String,List<EventMemoryHolder>> + * + * @return existing or newly created Map<String,List<EventMemoryHolder>> + */ + public Map<String, List<EventMemoryHolder>> getCollectedEventsByCell() { + if (isNull(collectedEventsByCell)) { + collectedEventsByCell = Collections.synchronizedMap(new HashMap<>()); + } + return collectedEventsByCell; + } + + /** + * Get list of events for specific CellId * - * @return existing or newly created List<EventMemoryHolder> + * @param cellId cell identifier + * @return list of events */ - public List<EventMemoryHolder> getCollectedEvents() { - if (isNull(collectedEvents)) { - collectedEvents = Collections.synchronizedList(new ArrayList<>()); + public List<EventMemoryHolder> getCollectedEventsForCellId(String cellId) { + if (nonNull(collectedEventsByCell) && collectedEventsByCell.containsKey(cellId)) { + return collectedEventsByCell.get(cellId); } - return collectedEvents; + return new ArrayList<>(); } } |