aboutsummaryrefslogtreecommitdiffstats
path: root/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'src/main')
-rw-r--r--src/main/java/org/onap/a1pesimulator/service/fileready/RanFileReadyHolder.java83
-rw-r--r--src/main/java/org/onap/a1pesimulator/service/ves/RanVesHolder.java14
2 files changed, 70 insertions, 27 deletions
diff --git a/src/main/java/org/onap/a1pesimulator/service/fileready/RanFileReadyHolder.java b/src/main/java/org/onap/a1pesimulator/service/fileready/RanFileReadyHolder.java
index 4191ae7..cf3cf3a 100644
--- a/src/main/java/org/onap/a1pesimulator/service/fileready/RanFileReadyHolder.java
+++ b/src/main/java/org/onap/a1pesimulator/service/fileready/RanFileReadyHolder.java
@@ -14,11 +14,14 @@
package org.onap.a1pesimulator.service.fileready;
import static java.util.Objects.isNull;
+import static java.util.Objects.nonNull;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import org.onap.a1pesimulator.data.fileready.EventMemoryHolder;
import org.onap.a1pesimulator.data.fileready.FileData;
@@ -40,7 +43,7 @@ public class RanFileReadyHolder {
private static final Logger log = LoggerFactory.getLogger(RanFileReadyHolder.class);
- private List<EventMemoryHolder> collectedEvents;
+ private Map<String, List<EventMemoryHolder>> collectedEventsByCell;
private final RanVesSender ranVesSender;
private final FtpServerService ftpServerService;
private final PMBulkFileService xmlFileService;
@@ -55,31 +58,58 @@ public class RanFileReadyHolder {
}
/**
- * Run entire process: PM Bulk File creation-> upload to FTP -> delete temp PM Bulk File -> create File Ready Event - > send it to VES Collector
- * collectedEvents are synchronized to not be updated by other threads during PM Bulk File creation
+ * Run entire process for all cells collectedEventsByCell are synchronized to not be updated by other threads during PM Bulk File creation
*/
public void createPMBulkFileAndSendFileReadyMessage() {
- synchronized (getCollectedEvents()) {
- Mono.justOrEmpty(getCollectedEvents())
- .filter(this::areSomeEventsStored)
- .flatMap(xmlFileService::generatePMBulkFileXml)
- .map(ftpServerService::uploadFileToFtp)
- .flatMap(fileReadyEventService::createFileReadyEventAndDeleteTmpFile)
- .doOnNext(this::sendEventToVesCollector)
- .subscribe(fileData -> informAboutSuccess(), this::informAboutError);
+ synchronized (getCollectedEventsByCell()) {
+ getCollectedEventsByCell().forEach((cellId, cellEventList) -> createPMBulkFileAndSendFileReadyMessageForCell(cellEventList));
}
}
/**
- * Adds current event to the memory, which is List<EventMemoryHolder>
+ * Run entire process for one cell collectedEventsByCell are synchronized to not be updated by other threads during PM Bulk File creation
+ *
+ * @param cellId cell identifier
+ */
+ public void createPMBulkFileAndSendFileReadyMessageForCellId(String cellId) {
+ synchronized (getCollectedEventsByCell()) {
+ createPMBulkFileAndSendFileReadyMessageForCell(getCollectedEventsForCellId(cellId));
+ }
+ }
+
+ /**
+ * Run entire process for one cell: PM Bulk File creation-> upload to FTP -> delete temp PM Bulk File -> create File Ready Event - > send it to VES
+ * Collector.
+ *
+ * @param events list of events for one cell
+ */
+ public void createPMBulkFileAndSendFileReadyMessageForCell(List<EventMemoryHolder> events) {
+ Mono.justOrEmpty(events)
+ .filter(this::areSomeEventsStored)
+ .flatMap(xmlFileService::generatePMBulkFileXml)
+ .map(ftpServerService::uploadFileToFtp)
+ .flatMap(fileReadyEventService::createFileReadyEventAndDeleteTmpFile)
+ .doOnNext(this::sendEventToVesCollector)
+ .subscribe(fileData -> informAboutSuccess(), this::informAboutError);
+ }
+
+ /**
+ * Adds current event to the memory by cell, which is Map<String,List<EventMemoryHolder>>
*
* @param vesEvent event from specific cell
* @throws VesBrokerException in case of any problem with adding to List, it throws an exception
*/
public void saveEventToMemory(VesEvent vesEvent, String cellId, String jobId, Integer granPeriod) throws VesBrokerException {
try {
- getCollectedEvents().add(new EventMemoryHolder(cellId, jobId, granPeriod, ZonedDateTime.now(), vesEvent));
- log.trace("Saving VES event for cell {} with granularity period {} and sequence number {}", cellId, granPeriod, getCollectedEvents().size());
+ Map<String, List<EventMemoryHolder>> events = getCollectedEventsByCell();
+ if (events.containsKey(cellId)) {
+ events.get(cellId).add(new EventMemoryHolder(cellId, jobId, granPeriod, ZonedDateTime.now(), vesEvent));
+ } else {
+ List<EventMemoryHolder> cellEvents = Collections.synchronizedList(
+ new ArrayList<>(List.of(new EventMemoryHolder(cellId, jobId, granPeriod, ZonedDateTime.now(), vesEvent))));
+ events.put(cellId, cellEvents);
+ }
+ log.trace("Saving VES event for cell {} with granularity period {} and sequence number {}", cellId, granPeriod, events.get(cellId).size());
} catch (Exception e) {
String errorMsg = "Failed to save VES event to memory with exception:" + e;
throw new VesBrokerException(errorMsg);
@@ -122,14 +152,27 @@ public class RanFileReadyHolder {
}
/**
- * Factory to get List<EventMemoryHolder>
+ * Factory to get Map<String,List<EventMemoryHolder>>
+ *
+ * @return existing or newly created Map<String,List<EventMemoryHolder>>
+ */
+ public Map<String, List<EventMemoryHolder>> getCollectedEventsByCell() {
+ if (isNull(collectedEventsByCell)) {
+ collectedEventsByCell = Collections.synchronizedMap(new HashMap<>());
+ }
+ return collectedEventsByCell;
+ }
+
+ /**
+ * Get list of events for specific CellId
*
- * @return existing or newly created List<EventMemoryHolder>
+ * @param cellId cell identifier
+ * @return list of events
*/
- public List<EventMemoryHolder> getCollectedEvents() {
- if (isNull(collectedEvents)) {
- collectedEvents = Collections.synchronizedList(new ArrayList<>());
+ public List<EventMemoryHolder> getCollectedEventsForCellId(String cellId) {
+ if (nonNull(collectedEventsByCell) && collectedEventsByCell.containsKey(cellId)) {
+ return collectedEventsByCell.get(cellId);
}
- return collectedEvents;
+ return new ArrayList<>();
}
}
diff --git a/src/main/java/org/onap/a1pesimulator/service/ves/RanVesHolder.java b/src/main/java/org/onap/a1pesimulator/service/ves/RanVesHolder.java
index f711347..131f792 100644
--- a/src/main/java/org/onap/a1pesimulator/service/ves/RanVesHolder.java
+++ b/src/main/java/org/onap/a1pesimulator/service/ves/RanVesHolder.java
@@ -87,20 +87,20 @@ public class RanVesHolder {
/**
* Stops sending the report after the last cell was stopped. It send the last report before stop completely
*/
- private void stopSendingReports() {
+ private void stopSendingReports(String cellId) {
+ sendLastReport(cellId);
if (nonNull(threadSendReportFunction) && !isAnyEventRunning()) {
threadSendReportFunction.getRanPeriodicVesEvent().getScheduledFuture().cancel(false);
- sendLastReportAfterCancel();
log.info("Stop sending reports every {} seconds", vnfConfigReader.getVnfConfig().getRepPeriod());
}
}
/**
- * Sends the last report after all threads were stopped
+ * Sends the last report after specific cell was stopped
*/
- private void sendLastReportAfterCancel() {
- log.trace("Send last report after report thread was canceled");
- ranFileReadyHolder.createPMBulkFileAndSendFileReadyMessage();
+ private void sendLastReport(String cellId) {
+ log.trace("Send last report after stop for cell: {}", cellId);
+ ranFileReadyHolder.createPMBulkFileAndSendFileReadyMessageForCellId(cellId);
}
Map<String, RanPeriodicEvent> getPeriodicEventsCache() {
@@ -138,7 +138,7 @@ public class RanVesHolder {
return Optional.empty();
}
periodicEvent.getScheduledFuture().cancel(false);
- stopSendingReports();
+ stopSendingReports(identifier);
return Optional.of(periodicEvent);
}