diff options
5 files changed, 124 insertions, 37 deletions
diff --git a/src/main/java/org/onap/a1pesimulator/service/fileready/RanFileReadyHolder.java b/src/main/java/org/onap/a1pesimulator/service/fileready/RanFileReadyHolder.java index 4191ae7..cf3cf3a 100644 --- a/src/main/java/org/onap/a1pesimulator/service/fileready/RanFileReadyHolder.java +++ b/src/main/java/org/onap/a1pesimulator/service/fileready/RanFileReadyHolder.java @@ -14,11 +14,14 @@ package org.onap.a1pesimulator.service.fileready; import static java.util.Objects.isNull; +import static java.util.Objects.nonNull; import java.time.ZonedDateTime; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import org.onap.a1pesimulator.data.fileready.EventMemoryHolder; import org.onap.a1pesimulator.data.fileready.FileData; @@ -40,7 +43,7 @@ public class RanFileReadyHolder { private static final Logger log = LoggerFactory.getLogger(RanFileReadyHolder.class); - private List<EventMemoryHolder> collectedEvents; + private Map<String, List<EventMemoryHolder>> collectedEventsByCell; private final RanVesSender ranVesSender; private final FtpServerService ftpServerService; private final PMBulkFileService xmlFileService; @@ -55,31 +58,58 @@ public class RanFileReadyHolder { } /** - * Run entire process: PM Bulk File creation-> upload to FTP -> delete temp PM Bulk File -> create File Ready Event - > send it to VES Collector - * collectedEvents are synchronized to not be updated by other threads during PM Bulk File creation + * Run entire process for all cells collectedEventsByCell are synchronized to not be updated by other threads during PM Bulk File creation */ public void createPMBulkFileAndSendFileReadyMessage() { - synchronized (getCollectedEvents()) { - Mono.justOrEmpty(getCollectedEvents()) - .filter(this::areSomeEventsStored) - .flatMap(xmlFileService::generatePMBulkFileXml) - .map(ftpServerService::uploadFileToFtp) - .flatMap(fileReadyEventService::createFileReadyEventAndDeleteTmpFile) - .doOnNext(this::sendEventToVesCollector) - .subscribe(fileData -> informAboutSuccess(), this::informAboutError); + synchronized (getCollectedEventsByCell()) { + getCollectedEventsByCell().forEach((cellId, cellEventList) -> createPMBulkFileAndSendFileReadyMessageForCell(cellEventList)); } } /** - * Adds current event to the memory, which is List<EventMemoryHolder> + * Run entire process for one cell collectedEventsByCell are synchronized to not be updated by other threads during PM Bulk File creation + * + * @param cellId cell identifier + */ + public void createPMBulkFileAndSendFileReadyMessageForCellId(String cellId) { + synchronized (getCollectedEventsByCell()) { + createPMBulkFileAndSendFileReadyMessageForCell(getCollectedEventsForCellId(cellId)); + } + } + + /** + * Run entire process for one cell: PM Bulk File creation-> upload to FTP -> delete temp PM Bulk File -> create File Ready Event - > send it to VES + * Collector. + * + * @param events list of events for one cell + */ + public void createPMBulkFileAndSendFileReadyMessageForCell(List<EventMemoryHolder> events) { + Mono.justOrEmpty(events) + .filter(this::areSomeEventsStored) + .flatMap(xmlFileService::generatePMBulkFileXml) + .map(ftpServerService::uploadFileToFtp) + .flatMap(fileReadyEventService::createFileReadyEventAndDeleteTmpFile) + .doOnNext(this::sendEventToVesCollector) + .subscribe(fileData -> informAboutSuccess(), this::informAboutError); + } + + /** + * Adds current event to the memory by cell, which is Map<String,List<EventMemoryHolder>> * * @param vesEvent event from specific cell * @throws VesBrokerException in case of any problem with adding to List, it throws an exception */ public void saveEventToMemory(VesEvent vesEvent, String cellId, String jobId, Integer granPeriod) throws VesBrokerException { try { - getCollectedEvents().add(new EventMemoryHolder(cellId, jobId, granPeriod, ZonedDateTime.now(), vesEvent)); - log.trace("Saving VES event for cell {} with granularity period {} and sequence number {}", cellId, granPeriod, getCollectedEvents().size()); + Map<String, List<EventMemoryHolder>> events = getCollectedEventsByCell(); + if (events.containsKey(cellId)) { + events.get(cellId).add(new EventMemoryHolder(cellId, jobId, granPeriod, ZonedDateTime.now(), vesEvent)); + } else { + List<EventMemoryHolder> cellEvents = Collections.synchronizedList( + new ArrayList<>(List.of(new EventMemoryHolder(cellId, jobId, granPeriod, ZonedDateTime.now(), vesEvent)))); + events.put(cellId, cellEvents); + } + log.trace("Saving VES event for cell {} with granularity period {} and sequence number {}", cellId, granPeriod, events.get(cellId).size()); } catch (Exception e) { String errorMsg = "Failed to save VES event to memory with exception:" + e; throw new VesBrokerException(errorMsg); @@ -122,14 +152,27 @@ public class RanFileReadyHolder { } /** - * Factory to get List<EventMemoryHolder> + * Factory to get Map<String,List<EventMemoryHolder>> + * + * @return existing or newly created Map<String,List<EventMemoryHolder>> + */ + public Map<String, List<EventMemoryHolder>> getCollectedEventsByCell() { + if (isNull(collectedEventsByCell)) { + collectedEventsByCell = Collections.synchronizedMap(new HashMap<>()); + } + return collectedEventsByCell; + } + + /** + * Get list of events for specific CellId * - * @return existing or newly created List<EventMemoryHolder> + * @param cellId cell identifier + * @return list of events */ - public List<EventMemoryHolder> getCollectedEvents() { - if (isNull(collectedEvents)) { - collectedEvents = Collections.synchronizedList(new ArrayList<>()); + public List<EventMemoryHolder> getCollectedEventsForCellId(String cellId) { + if (nonNull(collectedEventsByCell) && collectedEventsByCell.containsKey(cellId)) { + return collectedEventsByCell.get(cellId); } - return collectedEvents; + return new ArrayList<>(); } } diff --git a/src/main/java/org/onap/a1pesimulator/service/ves/RanVesHolder.java b/src/main/java/org/onap/a1pesimulator/service/ves/RanVesHolder.java index f711347..131f792 100644 --- a/src/main/java/org/onap/a1pesimulator/service/ves/RanVesHolder.java +++ b/src/main/java/org/onap/a1pesimulator/service/ves/RanVesHolder.java @@ -87,20 +87,20 @@ public class RanVesHolder { /** * Stops sending the report after the last cell was stopped. It send the last report before stop completely */ - private void stopSendingReports() { + private void stopSendingReports(String cellId) { + sendLastReport(cellId); if (nonNull(threadSendReportFunction) && !isAnyEventRunning()) { threadSendReportFunction.getRanPeriodicVesEvent().getScheduledFuture().cancel(false); - sendLastReportAfterCancel(); log.info("Stop sending reports every {} seconds", vnfConfigReader.getVnfConfig().getRepPeriod()); } } /** - * Sends the last report after all threads were stopped + * Sends the last report after specific cell was stopped */ - private void sendLastReportAfterCancel() { - log.trace("Send last report after report thread was canceled"); - ranFileReadyHolder.createPMBulkFileAndSendFileReadyMessage(); + private void sendLastReport(String cellId) { + log.trace("Send last report after stop for cell: {}", cellId); + ranFileReadyHolder.createPMBulkFileAndSendFileReadyMessageForCellId(cellId); } Map<String, RanPeriodicEvent> getPeriodicEventsCache() { @@ -138,7 +138,7 @@ public class RanVesHolder { return Optional.empty(); } periodicEvent.getScheduledFuture().cancel(false); - stopSendingReports(); + stopSendingReports(identifier); return Optional.of(periodicEvent); } diff --git a/src/test/java/org/onap/a1pesimulator/service/fileready/CommonFileReady.java b/src/test/java/org/onap/a1pesimulator/service/fileready/CommonFileReady.java index d65ad7c..6b306b0 100644 --- a/src/test/java/org/onap/a1pesimulator/service/fileready/CommonFileReady.java +++ b/src/test/java/org/onap/a1pesimulator/service/fileready/CommonFileReady.java @@ -12,7 +12,9 @@ import java.nio.file.Paths; import java.time.ZonedDateTime; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.UUID; import org.junit.jupiter.api.AfterEach; @@ -36,7 +38,9 @@ public class CommonFileReady { public List<File> filesToDelete; //we collect files created during testing and then delete them public static final String PM_BULK_FILE = "pmBulkFile.xml"; public static final String ARCHIVED_PM_BULK_FILE = "pmBulkFile.xml.gz"; + public static final String TEST_CELL_ID = "Cell1"; public static final Integer NO_OF_EVENTS = 3; + public static final Integer NO_OF_CELLS = 2; @InjectMocks private ObjectMapper mapper; @@ -80,13 +84,27 @@ public class CommonFileReady { protected List<EventMemoryHolder> getTestEvents() { List<EventMemoryHolder> collectedEvents = new ArrayList<>(); for (int i = 0; i < NO_OF_EVENTS; i++) { - EventMemoryHolder eventMemoryHolder = new EventMemoryHolder("Cell1", UUID.randomUUID().toString(), 10, ZonedDateTime.now(), loadEventFromFile()); + EventMemoryHolder eventMemoryHolder = new EventMemoryHolder(TEST_CELL_ID, UUID.randomUUID().toString(), 10, ZonedDateTime.now(), + loadEventFromFile()); collectedEvents.add(eventMemoryHolder); } return collectedEvents; } /** + * Generate events by CellId + * + * @return Map by CellId and list of events + */ + protected Map<String, List<EventMemoryHolder>> getTestEventsByCells(List<EventMemoryHolder> eventList) { + Map<String, List<EventMemoryHolder>> collectedEventsByCell = new HashMap<>(); + for (int cellId = 0; cellId < NO_OF_CELLS; cellId++) { + collectedEventsByCell.put("Cell" + cellId, eventList); + } + return collectedEventsByCell; + } + + /** * Converts json to VESEvent object * * @return created VESEvent diff --git a/src/test/java/org/onap/a1pesimulator/service/fileready/RanFileReadyHolderTest.java b/src/test/java/org/onap/a1pesimulator/service/fileready/RanFileReadyHolderTest.java index 10014c5..f4c6b84 100644 --- a/src/test/java/org/onap/a1pesimulator/service/fileready/RanFileReadyHolderTest.java +++ b/src/test/java/org/onap/a1pesimulator/service/fileready/RanFileReadyHolderTest.java @@ -9,6 +9,7 @@ import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.spy; import java.util.List; +import java.util.Map; import java.util.UUID; import org.apache.http.HttpStatus; @@ -53,7 +54,16 @@ class RanFileReadyHolderTest extends CommonFileReady { ranFileReadyHolder.createPMBulkFileAndSendFileReadyMessage(); assertThat(appender.list).extracting(ILoggingEvent::getFormattedMessage) - .containsExactly("PM Bulk file was generated, uploaded to FTP and File ready event was send to VES Collector"); + .contains("PM Bulk file was generated, uploaded to FTP and File ready event was send to VES Collector"); + } + + @Test + void createPMBulkFileAndSendFileReadyMessageForOneCell() { + ListAppender<ILoggingEvent> appender = createCommonLogAndMock(); + + ranFileReadyHolder.createPMBulkFileAndSendFileReadyMessageForCellId(TEST_CELL_ID); + assertThat(appender.list).extracting(ILoggingEvent::getFormattedMessage) + .contains("PM Bulk file was generated, uploaded to FTP and File ready event was send to VES Collector"); } @Test @@ -62,18 +72,19 @@ class RanFileReadyHolderTest extends CommonFileReady { doReturn(Mono.error(new Exception("error"))).when(fileReadyEventService).createFileReadyEventAndDeleteTmpFile(any()); ranFileReadyHolder.createPMBulkFileAndSendFileReadyMessage(); - assertThat(appender.list).extracting(ILoggingEvent::getFormattedMessage).containsExactly("File ready event was unsuccessful: error"); + assertThat(appender.list).extracting(ILoggingEvent::getFormattedMessage).contains("File ready event was unsuccessful: error"); } @Test void saveEventToMemory() { ranFileReadyHolder = spy(new RanFileReadyHolder(ranVesSender, ftpServerService, pmBulkFileService, fileReadyEventService)); try { - ranFileReadyHolder.saveEventToMemory(loadEventFromFile(), "Cell1", UUID.randomUUID().toString(), 30); + ranFileReadyHolder.saveEventToMemory(loadEventFromFile(), TEST_CELL_ID, UUID.randomUUID().toString(), 30); + ranFileReadyHolder.saveEventToMemory(loadEventFromFile(), TEST_CELL_ID, UUID.randomUUID().toString(), 30); } catch (VesBrokerException e) { e.printStackTrace(); } - assertThat(ranFileReadyHolder.getCollectedEvents()).hasSize(1); + assertThat(ranFileReadyHolder.getCollectedEventsByCell().get(TEST_CELL_ID)).hasSize(2); } @Test @@ -81,15 +92,28 @@ class RanFileReadyHolderTest extends CommonFileReady { doThrow(new VesBrokerException("error")).when(ranFileReadyHolder).saveEventToMemory(any(), any(), any(), any()); Throwable exception = assertThrows(VesBrokerException.class, - () -> ranFileReadyHolder.saveEventToMemory(loadEventFromFile(), "Cell1", UUID.randomUUID().toString(), 30)); + () -> ranFileReadyHolder.saveEventToMemory(loadEventFromFile(), TEST_CELL_ID, UUID.randomUUID().toString(), 30)); assertThat(exception.getMessage()).contains("error"); - assertThat(ranFileReadyHolder.getCollectedEvents()).isEmpty(); + assertThat(ranFileReadyHolder.getCollectedEventsByCell()).isEmpty(); + } + + @Test + void getCollectedEventsByCell() { + Map<String, List<EventMemoryHolder>> collectedEvents = ranFileReadyHolder.getCollectedEventsByCell(); + assertNotNull(collectedEvents); } @Test void getCollectedEvents() { - List<EventMemoryHolder> collectedEvents = ranFileReadyHolder.getCollectedEvents(); + List<EventMemoryHolder> collectedEvents = ranFileReadyHolder.getCollectedEventsForCellId(TEST_CELL_ID); assertNotNull(collectedEvents); + try { + ranFileReadyHolder.saveEventToMemory(loadEventFromFile(), TEST_CELL_ID, UUID.randomUUID().toString(), 30); + collectedEvents = ranFileReadyHolder.getCollectedEventsForCellId(TEST_CELL_ID); + assertNotNull(collectedEvents); + } catch (VesBrokerException e) { + e.printStackTrace(); + } } /** @@ -101,9 +125,11 @@ class RanFileReadyHolderTest extends CommonFileReady { ListAppender<ILoggingEvent> appender = createCommonLog(RanFileReadyHolder.class); List<EventMemoryHolder> collectedEvents = getTestEvents(); + Map<String, List<EventMemoryHolder>> collectedEventsByCell = getTestEventsByCells(collectedEvents); FileData testFileData = FileData.builder().pmBulkFile(createTempFile(PM_BULK_FILE)).build(); - doReturn(collectedEvents).when(ranFileReadyHolder).getCollectedEvents(); + doReturn(collectedEventsByCell).when(ranFileReadyHolder).getCollectedEventsByCell(); + doReturn(collectedEvents).when(ranFileReadyHolder).getCollectedEventsForCellId(any()); doReturn(Mono.just(testFileData)).when(pmBulkFileService).generatePMBulkFileXml(collectedEvents); testFileData.setArchivedPmBulkFile(createTempFile(ARCHIVED_PM_BULK_FILE)); doReturn(Mono.just(testFileData)).when(ftpServerService).uploadFileToFtp(any()); diff --git a/src/test/java/org/onap/a1pesimulator/service/fileready/RanSaveFileReadyRunnableTest.java b/src/test/java/org/onap/a1pesimulator/service/fileready/RanSaveFileReadyRunnableTest.java index 4ee18de..574b4b0 100644 --- a/src/test/java/org/onap/a1pesimulator/service/fileready/RanSaveFileReadyRunnableTest.java +++ b/src/test/java/org/onap/a1pesimulator/service/fileready/RanSaveFileReadyRunnableTest.java @@ -41,7 +41,7 @@ class RanSaveFileReadyRunnableTest extends CommonFileReady { super.setUp(); doReturn(new RanCellEventCustomizer(ranUeHolder)).when(ranEventCustomizerFactory).getEventCustomizer(any(), any()); ranSaveFileReadyRunnable = spy( - new RanSaveFileReadyRunnable(ranFileReadyHolder, "Cell1", loadEventFromFile(), ranEventCustomizerFactory.getEventCustomizer(new VesEvent(), + new RanSaveFileReadyRunnable(ranFileReadyHolder, TEST_CELL_ID, loadEventFromFile(), ranEventCustomizerFactory.getEventCustomizer(new VesEvent(), Mode.REGULAR), 60, Collections.emptyList())); } |