aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/java/org/onap/a1pesimulator/service/pm
diff options
context:
space:
mode:
authork.kedron <k.kedron@partner.samsung.com>2021-09-20 10:06:26 +0200
committerk.kedron <k.kedron@partner.samsung.com>2021-09-20 13:04:05 +0200
commitb7e91e0a92ecc0254bb66d560e38cf06e6f76ebb (patch)
tree690c4526ada4fa8754df770f2397fc08e5bec04a /src/main/java/org/onap/a1pesimulator/service/pm
parentf1563a03cabc572d1cd15260befdaa8808048fbf (diff)
Refactoring
- ves to report package - fileready to pm package - according method name Issue-ID: INT-1945 Signed-off-by: Krystian Kedron <k.kedron@partner.samsung.com> Change-Id: I2f22c828145727ba772440fe111b5fde34446b3a
Diffstat (limited to 'src/main/java/org/onap/a1pesimulator/service/pm')
-rw-r--r--src/main/java/org/onap/a1pesimulator/service/pm/FileReadyEventService.java159
-rw-r--r--src/main/java/org/onap/a1pesimulator/service/pm/FtpServerService.java215
-rw-r--r--src/main/java/org/onap/a1pesimulator/service/pm/PMBulkFileService.java333
-rw-r--r--src/main/java/org/onap/a1pesimulator/service/pm/RanFileReadyHolder.java178
-rw-r--r--src/main/java/org/onap/a1pesimulator/service/pm/RanSaveFileReadyRunnable.java54
-rw-r--r--src/main/java/org/onap/a1pesimulator/service/pm/RanSendReportsRunnable.java28
6 files changed, 967 insertions, 0 deletions
diff --git a/src/main/java/org/onap/a1pesimulator/service/pm/FileReadyEventService.java b/src/main/java/org/onap/a1pesimulator/service/pm/FileReadyEventService.java
new file mode 100644
index 0000000..df2ce97
--- /dev/null
+++ b/src/main/java/org/onap/a1pesimulator/service/pm/FileReadyEventService.java
@@ -0,0 +1,159 @@
+/*
+ * Copyright (C) 2021 Samsung Electronics
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.onap.a1pesimulator.service.pm;
+
+import static org.onap.a1pesimulator.service.pm.FtpServerService.deletePMBulkFile;
+import static org.onap.a1pesimulator.util.Constants.FILE_READY_CHANGE_IDENTIFIER;
+import static org.onap.a1pesimulator.util.Constants.FILE_READY_CHANGE_TYPE;
+
+import java.io.File;
+import java.time.Instant;
+import java.time.ZonedDateTime;
+import java.time.temporal.ChronoUnit;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import org.onap.a1pesimulator.data.fileready.FileData;
+import org.onap.a1pesimulator.data.fileready.FileReadyEvent;
+import org.onap.a1pesimulator.data.fileready.NotificationFields;
+import org.onap.a1pesimulator.data.fileready.NotificationFields.ArrayOfNamedHashMap;
+import org.onap.a1pesimulator.data.ves.CommonEventHeader;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Service;
+
+import reactor.core.publisher.Mono;
+
+/**
+ * Service for PM Bulk File creation and its handling
+ */
+
+@Service
+public class FileReadyEventService {
+
+ private final FtpServerService ftpServerService;
+
+ @Value("${file.ready.version}")
+ private String version;
+
+ @Value("${file.ready.vesEventListenerVersion}")
+ private String vesEventListenerVersion;
+
+ @Value("${file.ready.domain}")
+ private String domain;
+
+ @Value("${file.ready.eventName}")
+ private String eventName;
+
+ @Value("${file.ready.fileFormatType}")
+ private String fileFormatType;
+
+ @Value("${file.ready.fileFormatVersion}")
+ private String fileFormatVersion;
+
+ @Value("${file.ready.notificationFieldsVersion}")
+ private String notificationFieldsVersion;
+
+ @Value("${file.ready.priority}")
+ private String priority;
+
+ @Value("${file.ready.reportingEntityName}")
+ private String reportingEntityName;
+
+ public FileReadyEventService(FtpServerService ftpServerService) {
+ this.ftpServerService = ftpServerService;
+ }
+
+ /**
+ * It will create FileReadyEvent.json which will go to VES Collector
+ *
+ * @return created FileReadyEvent
+ */
+ protected Mono<FileData> createFileReadyEventAndDeleteTmpFile(Mono<FileData> fileMono) {
+ return fileMono
+ .map(this::createFileReadyEvent)
+ .doOnNext(file -> deleteTempArchivedBulkFile(file.getArchivedPmBulkFile()));
+ }
+
+ /**
+ * Creates File Ready Event
+ *
+ * @param fileData information about PM Bulk Files created in previous steps
+ * @return added newly created FileReadyEvent to FileData
+ */
+ protected FileData createFileReadyEvent(FileData fileData) {
+ FileReadyEvent event = new FileReadyEvent();
+ CommonEventHeader commonEventHeader = getCommonHeader();
+ event.setCommonEventHeader(commonEventHeader);
+ commonEventHeader.setStartEpochMicrosec(ChronoUnit.MICROS.between(Instant.EPOCH, fileData.getStartEventDate()));
+ commonEventHeader.setLastEpochMicrosec(ChronoUnit.MICROS.between(Instant.EPOCH, fileData.getEndEventDate()));
+ event.setNotificationFields(getNotificationFields(fileData.getArchivedPmBulkFile().getName()));
+ fileData.setFileReadyEvent(event);
+ return fileData;
+ }
+
+ /**
+ * Creates NotificationFields section in FileReadyEvent
+ *
+ * @param fileName name of archived PM Bulk File
+ * @return NotificationFields object
+ */
+ private NotificationFields getNotificationFields(String fileName) {
+ NotificationFields notificationFields = NotificationFields.builder()
+ .changeIdentifier(FILE_READY_CHANGE_IDENTIFIER)
+ .changeType(FILE_READY_CHANGE_TYPE)
+ .notificationFieldsVersion(notificationFieldsVersion).build();
+
+ ArrayOfNamedHashMap arrayOfNamedHashMap = new ArrayOfNamedHashMap();
+ Map<String, String> hashMapItems = new HashMap<>();
+ hashMapItems.put("location", ftpServerService.getFtpPath() + fileName);
+ hashMapItems.put("compression", "gzip");
+ hashMapItems.put("fileFormatType", fileFormatType);
+ hashMapItems.put("fileFormatVersion", fileFormatVersion);
+
+ arrayOfNamedHashMap.setName(fileName);
+ arrayOfNamedHashMap.setHashMap(hashMapItems);
+ notificationFields.setArrayOfNamedHashMap(Collections.singletonList(arrayOfNamedHashMap));
+ return notificationFields;
+ }
+
+ /**
+ * Creates CommonEventHeader
+ *
+ * @return created CommonEventHeader
+ */
+ private CommonEventHeader getCommonHeader() {
+ return CommonEventHeader.builder()
+ .version(version)
+ .vesEventListenerVersion(vesEventListenerVersion)
+ .domain(domain)
+ .eventName(eventName)
+ .eventId(UUID.randomUUID().toString())
+ .priority(priority)
+ .reportingEntityName(reportingEntityName)
+ .sequence(0)
+ .timeZoneOffset(ZonedDateTime.now().getOffset().toString())
+ .build();
+ }
+
+ /**
+ * Deletes temporary archived PM Bulk File
+ *
+ * @param fileMono temporary archived PM Bulk File
+ */
+ private void deleteTempArchivedBulkFile(File fileMono) {
+ deletePMBulkFile(fileMono);
+ }
+}
diff --git a/src/main/java/org/onap/a1pesimulator/service/pm/FtpServerService.java b/src/main/java/org/onap/a1pesimulator/service/pm/FtpServerService.java
new file mode 100644
index 0000000..01b0672
--- /dev/null
+++ b/src/main/java/org/onap/a1pesimulator/service/pm/FtpServerService.java
@@ -0,0 +1,215 @@
+/*
+ * Copyright (C) 2021 Samsung Electronics
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.onap.a1pesimulator.service.pm;
+
+import static java.util.Objects.nonNull;
+import static org.onap.a1pesimulator.util.Constants.TEMP_DIR;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.zip.GZIPOutputStream;
+
+
+import org.onap.a1pesimulator.data.fileready.FileData;
+import org.onap.a1pesimulator.exception.NotUploadedToFtpException;
+import org.onap.a1pesimulator.util.VnfConfigReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Service;
+import org.springframework.util.FileCopyUtils;
+
+import net.schmizz.sshj.SSHClient;
+import net.schmizz.sshj.sftp.SFTPClient;
+import net.schmizz.sshj.transport.verification.PromiscuousVerifier;
+import reactor.core.publisher.Mono;
+
+@Service
+public class FtpServerService {
+
+ private static final Logger log = LoggerFactory.getLogger(FtpServerService.class);
+
+ //true - file will be uploaded to FTP; false - file will be copied into xmlPmLocation
+ @Value("${ftp.server.upload}")
+ private boolean ftpServerUpload;
+
+ // location where archived file will be copied
+ @Value("${xml.pm.location}")
+ private String xmlPmLocation;
+
+ @Value("${ftp.server.protocol}")
+ private String ftpServerProtocol;
+
+ @Value("${ftp.server.filepath}")
+ private String ftpServerFilepath;
+
+ @Value("${ftp.server.username}")
+ private String ftpServerUsername;
+
+ @Value("${ftp.server.password}")
+ private String ftpServerPassword;
+
+ private VnfConfigReader vnfConfigReader;
+
+ public FtpServerService(VnfConfigReader vnfConfigReader) {
+ this.vnfConfigReader = vnfConfigReader;
+ }
+
+ public Mono<FileData> uploadFileToFtp(FileData fileData) {
+ return Mono.just(fileData)
+ .flatMap(this::tryToCompressFile)
+ .flatMap(this::tryToUploadOrSaveFileToFtp)
+ .onErrorResume(throwable -> resumeError(throwable, fileData))
+ .doOnNext(file -> deletePMBulkFile(file.getPmBulkFile()));
+ }
+
+ /**
+ * Trying to compress file into .gz
+ *
+ * @param fileData file to be archived
+ * @return archived file
+ */
+ private Mono<FileData> tryToCompressFile(FileData fileData) {
+ File archiveBulkFile = new File(TEMP_DIR, fileData.getPmBulkFile().getName() + ".gz");
+
+ try (GZIPOutputStream zos = new GZIPOutputStream(
+ new FileOutputStream(archiveBulkFile.getAbsolutePath())); FileInputStream inputStream = new FileInputStream(fileData.getPmBulkFile())) {
+ byte[] buffer = new byte[1024];
+ int len;
+ while ((len = inputStream.read(buffer)) > 0) {
+ zos.write(buffer, 0, len);
+ }
+ fileData.setArchivedPmBulkFile(archiveBulkFile);
+ log.trace("Compressing file {}", fileData.getPmBulkFile().getName());
+ return Mono.just(fileData);
+ } catch (IOException e) {
+ log.error("Could not compress file", e);
+ return Mono.empty();
+ }
+ }
+
+ /**
+ * Upload file to FTP or copy it to mounted location
+ *
+ * @param fileData data about file
+ * @return fileData for fileReadyEvent
+ */
+ private Mono<FileData> tryToUploadOrSaveFileToFtp(FileData fileData) {
+ if (ftpServerUpload) {
+ return tryToUploadFileToFtp(fileData);
+ } else {
+ File fileOnFtp = new File(xmlPmLocation, fileData.getArchivedPmBulkFile().getName());
+ try {
+ FileCopyUtils.copy(fileData.getArchivedPmBulkFile(), fileOnFtp);
+ log.info("Uploading file to the location: {}", fileOnFtp);
+ return Mono.just(fileData);
+ } catch (IOException e) {
+ return Mono.error(new NotUploadedToFtpException("File was not copied to FTP location", e));
+ }
+ }
+ }
+
+ /**
+ * Upload file to FTP
+ *
+ * @param fileData archived file in Mono
+ * @return archived file for fileReadyEvent
+ */
+ private Mono<FileData> tryToUploadFileToFtp(FileData fileData) {
+ if (nonNull(fileData.getArchivedPmBulkFile())) {
+ SSHClient client = getSSHClient();
+ if (nonNull(client)) {
+ try (client; SFTPClient sftpClient = client.newSFTPClient()) {
+ File archiveBulkFile = fileData.getArchivedPmBulkFile();
+ sftpClient.put(archiveBulkFile.getAbsolutePath(), ftpServerFilepath + "/" + archiveBulkFile.getName());
+
+ log.info("Uploading file to FTP: {}", archiveBulkFile.getAbsoluteFile());
+ return Mono.just(fileData);
+ } catch (IOException e) {
+ log.error("Exception while trying to upload a file", e);
+ }
+ } else {
+ log.error("Could not connect to FTP server");
+ }
+ } else {
+ log.error("There is no file to upload");
+ }
+ return Mono.error(new NotUploadedToFtpException("File was not uploaded to FTP"));
+ }
+
+ /**
+ * Creates SSHClient instance
+ *
+ * @return SSHClient
+ */
+ protected SSHClient getSSHClient() {
+ SSHClient client = new SSHClient();
+ try {
+ client.addHostKeyVerifier(new PromiscuousVerifier());
+ client.connect(vnfConfigReader.getVnfConfig().getFtpHost(), Integer.parseInt(vnfConfigReader.getVnfConfig().getFtpPort()));
+ client.authPassword(ftpServerUsername, ftpServerPassword);
+ return client;
+ } catch (IOException e) {
+ log.error("There was an error while connecting to FTP server", e);
+ try {
+ client.close();
+ } catch (IOException ioException) {
+ log.error("There was an error while closing the connection to FTP server", e);
+ }
+ return null;
+ }
+ }
+
+ /**
+ * Deletes created PM Bulk File xml from temp storage after successful upload to FTP
+ *
+ * @param file file which we gonna delete
+ */
+ public static void deletePMBulkFile(File file) {
+ try {
+ log.trace("Deleting file: {}", file.getAbsoluteFile());
+ Files.delete(file.toPath());
+ } catch (IOException e) {
+ log.warn("Could not delete file: {}", file.getName(), e);
+ }
+ }
+
+ /**
+ * Get path to FTP server
+ *
+ * @return for example: "sftp://foo:pass@106.120.119.170:2222/upload/"
+ */
+ public String getFtpPath() {
+ return ftpServerProtocol + "://" + ftpServerUsername + ":" + ftpServerPassword + "@" + vnfConfigReader.getVnfConfig().getFtpHost() + ":"
+ + vnfConfigReader.getVnfConfig().getFtpPort() + "/" + ftpServerFilepath
+ + "/";
+ }
+
+ /**
+ * Try to clean up things after an exception
+ *
+ * @param throwable error thrown
+ * @param fileData data about files which needs to be deleted
+ * @return empty Mono object
+ */
+ protected Mono<FileData> resumeError(Throwable throwable, FileData fileData) {
+ log.error("Error occurs while uploading file to FTP server", throwable);
+ deletePMBulkFile(fileData.getPmBulkFile());
+ deletePMBulkFile(fileData.getArchivedPmBulkFile());
+ return Mono.empty();
+ }
+}
diff --git a/src/main/java/org/onap/a1pesimulator/service/pm/PMBulkFileService.java b/src/main/java/org/onap/a1pesimulator/service/pm/PMBulkFileService.java
new file mode 100644
index 0000000..aec473c
--- /dev/null
+++ b/src/main/java/org/onap/a1pesimulator/service/pm/PMBulkFileService.java
@@ -0,0 +1,333 @@
+/*
+ * Copyright (C) 2021 Samsung Electronics
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.onap.a1pesimulator.service.pm;
+
+import static java.util.Comparator.comparing;
+import static java.util.Objects.isNull;
+import static org.onap.a1pesimulator.util.Constants.EMPTY_STRING;
+import static org.onap.a1pesimulator.util.Constants.MEASUREMENT_FIELD_IDENTIFIER;
+import static org.onap.a1pesimulator.util.Constants.TEMP_DIR;
+import static org.onap.a1pesimulator.util.Convertors.ISO_8601_DATE;
+import static org.onap.a1pesimulator.util.Convertors.YYYYMMDD_PATTERN;
+import static org.onap.a1pesimulator.util.Convertors.truncateToSpecifiedMinutes;
+import static org.onap.a1pesimulator.util.Convertors.zonedDateTimeToString;
+
+import java.io.File;
+import java.time.ZonedDateTime;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Stream;
+
+import javax.xml.XMLConstants;
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.ParserConfigurationException;
+import javax.xml.transform.OutputKeys;
+import javax.xml.transform.Transformer;
+import javax.xml.transform.TransformerException;
+import javax.xml.transform.TransformerFactory;
+import javax.xml.transform.dom.DOMSource;
+import javax.xml.transform.stream.StreamResult;
+
+import org.onap.a1pesimulator.data.fileready.EventMemoryHolder;
+import org.onap.a1pesimulator.data.fileready.FileData;
+import org.onap.a1pesimulator.data.ves.VesEvent;
+import org.onap.a1pesimulator.util.VnfConfigReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Service;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.Node;
+
+import reactor.core.publisher.Mono;
+
+/**
+ * Service for PM Bulk File creation and handling
+ */
+
+@Service
+public class PMBulkFileService {
+
+ private static final Logger log = LoggerFactory.getLogger(PMBulkFileService.class);
+ private static Map<String, AtomicInteger> uniqueFileNamesWithCount;
+ private final VnfConfigReader vnfConfigReader;
+
+ @Value("${xml.pm.bulk.fileFormatVersion}")
+ private String fileFormatVersion;
+
+ @Value("${xml.pm.bulk.vendorName}")
+ private String vendorName;
+
+ @Value("${xml.pm.bulk.fileSender}")
+ private String fileSenderValue;
+
+ @Value("${xml.pm.bulk.userLabel}")
+ private String userLabel;
+
+ @Value("${xml.pm.bulk.domainId}")
+ private String domainId;
+
+ public PMBulkFileService(VnfConfigReader vnfConfigReader) {
+ this.vnfConfigReader = vnfConfigReader;
+ }
+
+ /**
+ * Generate PM Bulk File xml from stored events
+ *
+ * @param collectedEvents list of stored events
+ * @return generated file in Mono object
+ */
+ public Mono<FileData> generatePMBulkFileXml(List<EventMemoryHolder> collectedEvents) {
+
+ try {
+ DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance();
+ docFactory.setAttribute(XMLConstants.ACCESS_EXTERNAL_DTD, "");
+ docFactory.setAttribute(XMLConstants.ACCESS_EXTERNAL_SCHEMA, "");
+
+ DocumentBuilder docBuilder = docFactory.newDocumentBuilder();
+
+ //root elements
+ Document doc = docBuilder.newDocument();
+
+ Element measCollecFile = doc.createElement("measCollecFile");
+ doc.appendChild(measCollecFile);
+ measCollecFile.setAttribute("xmlns", "http://www.3gpp.org/ftp/specs/archive/32_series/32.435#measCollec");
+ measCollecFile.setAttribute("xmlns:xsi", "http://www.w3.org/2001/XMLSchema-instance");
+ measCollecFile.setAttribute("xsi:schemaLocation",
+ "http://www.3gpp.org/ftp/specs/archive/32_series/32.435#measCollec http://www.3gpp.org/ftp/specs/archive/32_series/32.435#measCollec");
+
+ //fileHeader elements
+ Element fileHeader = doc.createElement("fileHeader");
+ measCollecFile.appendChild(fileHeader);
+ fileHeader.setAttribute("fileFormatVersion", fileFormatVersion);
+ fileHeader.setAttribute("vendorName", vendorName);
+
+ //fileSender elements
+ Element fileSender = doc.createElement("fileSender");
+ fileHeader.appendChild(fileSender);
+ fileSender.setAttribute("elementType", fileSenderValue);
+
+ //measCollec elements
+ Element measCollec = doc.createElement("measCollec");
+ fileHeader.appendChild(measCollec);
+ measCollec.setAttribute("beginTime", zonedDateTimeToString(earliestEventTime(collectedEvents), ISO_8601_DATE));
+
+ //measData elements
+ Element measData = doc.createElement("measData");
+ measCollecFile.appendChild(measData);
+
+ //managedElement elements
+ Element managedElement = doc.createElement("managedElement");
+ measData.appendChild(managedElement);
+ managedElement.setAttribute("userLabel", userLabel);
+
+ //add measInfo elements
+ addMeansInfo(doc, measData, collectedEvents);
+
+ //fileFooter elements
+ Element fileFooter = doc.createElement("fileFooter");
+ measCollecFile.appendChild(fileFooter);
+
+ Element measCollecFooter = doc.createElement("measCollec");
+ fileFooter.appendChild(measCollecFooter);
+ measCollecFooter.setAttribute("endTime", zonedDateTimeToString(latestEventTime(collectedEvents), ISO_8601_DATE));
+
+ File xmlFile = writeDocumentIntoXmlFile(doc, collectedEvents);
+
+ Mono<FileData> justMono = Mono.just(FileData.builder().pmBulkFile(xmlFile).startEventDate(earliestEventTime(collectedEvents))
+ .endEventDate(latestEventTime(collectedEvents)).build());
+ log.trace("Removing all VES events from memory: {}", collectedEvents.size());
+ collectedEvents.clear();
+ return justMono;
+
+ } catch (ParserConfigurationException | TransformerException pce) {
+ log.error("Error occurs while creating PM Bulk File", pce);
+ return Mono.empty();
+ }
+ }
+
+ /**
+ * Add measurement elements for each cell and measurement time into PM Bulk File
+ *
+ * @param doc Document
+ * @param measData main element of document, which stores meansData
+ * @param collectedEvents list of stored events
+ */
+ private void addMeansInfo(Document doc, Element measData, List<EventMemoryHolder> collectedEvents) {
+ collectedEvents.stream().sorted(comparing(EventMemoryHolder::getEventDate)).forEach(eventMemoryHolder -> {
+ VesEvent event = eventMemoryHolder.getEvent();
+
+ Element measInfo = doc.createElement("measInfo");
+ measData.appendChild(measInfo);
+
+ //job element
+ Element job = doc.createElement("job");
+ measInfo.appendChild(job);
+ job.setAttribute("jobId", eventMemoryHolder.getJobId());
+
+ //granPeriod elements
+ Element granPeriod = doc.createElement("granPeriod");
+ measInfo.appendChild(granPeriod);
+ granPeriod.setAttribute("duration", getDurationString(eventMemoryHolder.getGranPeriod()));
+ ZonedDateTime endDate = eventMemoryHolder.getEventDate();
+ granPeriod.setAttribute("endTime", zonedDateTimeToString(endDate, ISO_8601_DATE));
+
+ //repPeriod elements
+ Element repPeriod = doc.createElement("repPeriod");
+ measInfo.appendChild(repPeriod);
+ repPeriod.setAttribute("duration", getDurationString(vnfConfigReader.getVnfConfig().getRepPeriod()));
+
+ //measType definition
+ HashMap<String, String> measurmentMap = new HashMap<>();
+ AtomicInteger i = new AtomicInteger(1);
+ event.getMeasurementFields().getAdditionalMeasurements().forEach(additionalMeasurement -> {
+ if (Stream.of(MEASUREMENT_FIELD_IDENTIFIER)
+ .noneMatch(elementName -> elementName.equalsIgnoreCase(additionalMeasurement.getName()))) {
+ Element measType = doc.createElement("measType");
+ measInfo.appendChild(measType);
+ measType.setAttribute("p", String.valueOf(i));
+ measType.setTextContent(additionalMeasurement.getName());
+ measurmentMap.put(additionalMeasurement.getName(), String.valueOf(i));
+ i.incrementAndGet();
+ }
+ });
+
+ //measValue elements
+ Element measValue = doc.createElement("measValue");
+ measInfo.appendChild(measValue);
+ measValue.setAttribute("measObjLdn", eventMemoryHolder.getCellId());
+ event.getMeasurementFields().getAdditionalMeasurements().stream()
+ .filter(additionalMeasurement -> measurmentMap.containsKey(additionalMeasurement.getName()))
+ .forEach(additionalMeasurement -> {
+ if (!additionalMeasurement.getMeasurementValue().isEmpty()) {
+
+ //r elements
+ Element r = doc.createElement("r");
+ measValue.appendChild(r);
+ r.setAttribute("p", measurmentMap.get(additionalMeasurement.getName()));
+ r.setTextContent(additionalMeasurement.getMeasurementValue());
+ }
+ });
+ });
+ }
+
+ /**
+ * Converts Document into XML file and adds proper headers
+ *
+ * @param doc Document
+ * @param collectedEvents list of stored events
+ * @return newly created File in xml format
+ */
+ private File writeDocumentIntoXmlFile(Document doc, List<EventMemoryHolder> collectedEvents) throws TransformerException {
+ TransformerFactory transformerFactory = TransformerFactory.newInstance();
+ transformerFactory.setAttribute(XMLConstants.ACCESS_EXTERNAL_DTD, "");
+ transformerFactory.setAttribute(XMLConstants.ACCESS_EXTERNAL_STYLESHEET, "");
+
+ Transformer tr = transformerFactory.newTransformer();
+ tr.setOutputProperty(OutputKeys.ENCODING, "UTF-8");
+ tr.setOutputProperty(OutputKeys.VERSION, "1.0");
+ Node pi = doc.createProcessingInstruction("xml-stylesheet", "type=\"text/xsl\" href=\"MeasDataCollection.xsl\"");
+ doc.insertBefore(pi, doc.getDocumentElement());
+
+ File xmlFile = getXmlFile(collectedEvents);
+ StreamResult result = new StreamResult(xmlFile);
+ DOMSource source = new DOMSource(doc);
+ tr.transform(source, result);
+ return xmlFile;
+ }
+
+ /**
+ * Generate PM Bulk File and its name
+ *
+ * @param collectedEvents list of stored events
+ * @return newly created File
+ */
+ private File getXmlFile(List<EventMemoryHolder> collectedEvents) {
+ StringBuilder fileNameBuilder = new StringBuilder("C");
+ ZonedDateTime firstEventTime = earliestEventTime(collectedEvents);
+ ZonedDateTime lastEventTime = latestEventTime(collectedEvents);
+ fileNameBuilder.append(zonedDateTimeToString(firstEventTime, YYYYMMDD_PATTERN)).append(".");
+ fileNameBuilder.append(zonedDateTimeToString(truncateToSpecifiedMinutes(firstEventTime, 5), "HHmmZ")).append("-");
+ fileNameBuilder.append(zonedDateTimeToString(lastEventTime, YYYYMMDD_PATTERN)).append(".");
+ fileNameBuilder.append(zonedDateTimeToString(truncateToSpecifiedMinutes(lastEventTime, 5), "HHmmZ"));
+ fileNameBuilder.append("_").append(domainId);
+ fileNameBuilder.append(appendRcIfNecessary(fileNameBuilder));
+ fileNameBuilder.append(".xml");
+
+ return new File(TEMP_DIR, fileNameBuilder.toString());
+ }
+
+ /**
+ * The RC parameter is a running count and shall be appended only if the filename is otherwise not unique, i.e. more than one file is generated and all
+ * other parameters of the file name are identical.
+ *
+ * @param fileNameBuilder stringBuilder which contains currently generated file name
+ * @return sequence number or empty string
+ */
+ private static String appendRcIfNecessary(StringBuilder fileNameBuilder) {
+ String fileName = fileNameBuilder.toString();
+ int sequence = 0;
+ if (isNull(uniqueFileNamesWithCount)) {
+ uniqueFileNamesWithCount = Collections.synchronizedMap(new HashMap<>());
+ }
+ if (uniqueFileNamesWithCount.containsKey(fileName)) {
+ sequence = uniqueFileNamesWithCount.get(fileName).incrementAndGet();
+ } else {
+ uniqueFileNamesWithCount.clear(); //we have new dates, so we can clear existing list to not grow infinitely
+ uniqueFileNamesWithCount.put(fileName, new AtomicInteger(0));
+ }
+ return sequence > 0 ? "_-_" + sequence : EMPTY_STRING;
+ }
+
+ /**
+ * Get ZonedDateTime of the earliest event in that reporting period
+ *
+ * @param collectedEvents list of compared events
+ * @return the earliest ZonedDateTime
+ */
+ private static ZonedDateTime earliestEventTime(List<EventMemoryHolder> collectedEvents) {
+ return collectedEvents.stream()
+ .map(EventMemoryHolder::getEventDate)
+ .min(comparing(ZonedDateTime::toEpochSecond, Comparator.nullsLast(Comparator.naturalOrder())))
+ .orElse(ZonedDateTime.now());
+ }
+
+ /**
+ * Get ZonedDateTime of the latest event in that reporting period
+ *
+ * @param collectedEvents list of compared events
+ * @return the latest ZonedDateTime
+ */
+ private static ZonedDateTime latestEventTime(List<EventMemoryHolder> collectedEvents) {
+ return collectedEvents.stream().map(EventMemoryHolder::getEventDate)
+ .max(comparing(ZonedDateTime::toEpochSecond, Comparator.nullsLast(Comparator.naturalOrder())))
+ .orElse(ZonedDateTime.now());
+ }
+
+ /**
+ * Convert duration interval in seconds to xml element required by the specification Examples: PT10S, PT900S
+ *
+ * @param interval interval in seconds
+ * @return duration xml element representation
+ */
+ private static String getDurationString(int interval) {
+ return "PT" + interval + "S";
+ }
+}
diff --git a/src/main/java/org/onap/a1pesimulator/service/pm/RanFileReadyHolder.java b/src/main/java/org/onap/a1pesimulator/service/pm/RanFileReadyHolder.java
new file mode 100644
index 0000000..d3a7970
--- /dev/null
+++ b/src/main/java/org/onap/a1pesimulator/service/pm/RanFileReadyHolder.java
@@ -0,0 +1,178 @@
+/*
+ * Copyright (C) 2021 Samsung Electronics
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.onap.a1pesimulator.service.pm;
+
+import static java.util.Objects.isNull;
+import static java.util.Objects.nonNull;
+
+import java.time.ZonedDateTime;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.onap.a1pesimulator.data.fileready.EventMemoryHolder;
+import org.onap.a1pesimulator.data.fileready.FileData;
+import org.onap.a1pesimulator.data.ves.VesEvent;
+import org.onap.a1pesimulator.exception.VesBrokerException;
+import org.onap.a1pesimulator.service.report.RanVesSender;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
+import org.springframework.util.CollectionUtils;
+
+import reactor.core.publisher.Mono;
+
+/**
+ * Entry point for PM Bulk File event
+ */
+@Service
+public class RanFileReadyHolder {
+
+ private static final Logger log = LoggerFactory.getLogger(RanFileReadyHolder.class);
+
+ private Map<String, List<EventMemoryHolder>> collectedEventsByCell;
+ private final RanVesSender ranVesSender;
+ private final FtpServerService ftpServerService;
+ private final PMBulkFileService xmlFileService;
+ private final FileReadyEventService fileReadyEventService;
+
+ public RanFileReadyHolder(RanVesSender ranVesSender, FtpServerService ftpServerService, PMBulkFileService xmlFileService,
+ FileReadyEventService fileReadyEventService) {
+ this.ranVesSender = ranVesSender;
+ this.ftpServerService = ftpServerService;
+ this.xmlFileService = xmlFileService;
+ this.fileReadyEventService = fileReadyEventService;
+ }
+
+ /**
+ * Run entire process for all cells collectedEventsByCell are synchronized to not be updated by other threads during PM Bulk File creation
+ */
+ public void createPMBulkFileAndSendFileReadyMessage() {
+ synchronized (getCollectedEventsByCell()) {
+ getCollectedEventsByCell().forEach((cellId, cellEventList) -> createPMBulkFileAndSendFileReadyMessageForCell(cellEventList));
+ }
+ }
+
+ /**
+ * Run entire process for one cell collectedEventsByCell are synchronized to not be updated by other threads during PM Bulk File creation
+ *
+ * @param cellId cell identifier
+ */
+ public void createPMBulkFileAndSendFileReadyMessageForCellId(String cellId) {
+ synchronized (getCollectedEventsByCell()) {
+ createPMBulkFileAndSendFileReadyMessageForCell(getCollectedEventsForCellId(cellId));
+ }
+ }
+
+ /**
+ * Run entire process for one cell: PM Bulk File creation-> upload to FTP -> delete temp PM Bulk File -> create File Ready Event - > send it to VES
+ * Collector.
+ *
+ * @param events list of events for one cell
+ */
+ public void createPMBulkFileAndSendFileReadyMessageForCell(List<EventMemoryHolder> events) {
+ Mono.justOrEmpty(events)
+ .filter(this::areSomeEventsStored)
+ .flatMap(xmlFileService::generatePMBulkFileXml)
+ .map(ftpServerService::uploadFileToFtp)
+ .flatMap(fileReadyEventService::createFileReadyEventAndDeleteTmpFile)
+ .doOnNext(this::sendEventToVesCollector)
+ .subscribe(fileData -> informAboutSuccess(), this::informAboutError);
+ }
+
+ /**
+ * Adds current event to the memory by cell, which is Map<String,List<EventMemoryHolder>>
+ *
+ * @param vesEvent event from specific cell
+ * @throws VesBrokerException in case of any problem with adding to List, it throws an exception
+ */
+ public void saveEventToMemory(VesEvent vesEvent, String cellId, String jobId, Integer granPeriod) throws VesBrokerException {
+ try {
+ Map<String, List<EventMemoryHolder>> events = getCollectedEventsByCell();
+ if (events.containsKey(cellId)) {
+ events.get(cellId).add(new EventMemoryHolder(cellId, jobId, granPeriod, ZonedDateTime.now(), vesEvent));
+ } else {
+ List<EventMemoryHolder> cellEvents = Collections.synchronizedList(
+ new ArrayList<>(List.of(new EventMemoryHolder(cellId, jobId, granPeriod, ZonedDateTime.now(), vesEvent))));
+ events.put(cellId, cellEvents);
+ }
+ log.trace("Saving VES event for cell {} with granularity period {} and sequence number {}", cellId, granPeriod, events.get(cellId).size());
+ } catch (Exception e) {
+ String errorMsg = "Failed to save VES event to memory with exception:" + e;
+ throw new VesBrokerException(errorMsg);
+ }
+ }
+
+ /**
+ * Sends FileReadyEvent to VES Collector
+ *
+ * @param fileData object with FileReadyEvent file
+ */
+ protected void sendEventToVesCollector(FileData fileData) {
+ ranVesSender.send(fileData.getFileReadyEvent());
+ }
+
+ /**
+ * Log about successful operation
+ */
+ private void informAboutSuccess() {
+ log.info("PM Bulk file was generated, uploaded to FTP and File ready event was send to VES Collector");
+ }
+
+ /**
+ * Log an error if occurs during the process
+ *
+ * @param throwable - error raised in some of the steps
+ */
+ private void informAboutError(Throwable throwable) {
+ log.info("File ready event was unsuccessful: {}", throwable.getMessage());
+ }
+
+ /**
+ * Check if there are any Events stored in the memory. Used before creating PM Bulk File xml
+ *
+ * @param collectedEvents list of stored events
+ * @return true there is at least one event / false - no event at all
+ */
+ private boolean areSomeEventsStored(List<EventMemoryHolder> collectedEvents) {
+ return !CollectionUtils.isEmpty(collectedEvents);
+ }
+
+ /**
+ * Factory to get Map<String,List<EventMemoryHolder>>
+ *
+ * @return existing or newly created Map<String,List<EventMemoryHolder>>
+ */
+ public Map<String, List<EventMemoryHolder>> getCollectedEventsByCell() {
+ if (isNull(collectedEventsByCell)) {
+ collectedEventsByCell = Collections.synchronizedMap(new HashMap<>());
+ }
+ return collectedEventsByCell;
+ }
+
+ /**
+ * Get list of events for specific CellId
+ *
+ * @param cellId cell identifier
+ * @return list of events
+ */
+ public List<EventMemoryHolder> getCollectedEventsForCellId(String cellId) {
+ if (nonNull(collectedEventsByCell) && collectedEventsByCell.containsKey(cellId)) {
+ return collectedEventsByCell.get(cellId);
+ }
+ return new ArrayList<>();
+ }
+}
diff --git a/src/main/java/org/onap/a1pesimulator/service/pm/RanSaveFileReadyRunnable.java b/src/main/java/org/onap/a1pesimulator/service/pm/RanSaveFileReadyRunnable.java
new file mode 100644
index 0000000..222680f
--- /dev/null
+++ b/src/main/java/org/onap/a1pesimulator/service/pm/RanSaveFileReadyRunnable.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright (C) 2021 Samsung Electronics
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.onap.a1pesimulator.service.pm;
+
+import java.util.Collection;
+import java.util.UUID;
+
+import org.onap.a1pesimulator.data.ves.VesEvent;
+import org.onap.a1pesimulator.exception.VesBrokerException;
+import org.onap.a1pesimulator.service.common.AbstractRanRunnable;
+import org.onap.a1pesimulator.service.common.EventCustomizer;
+import org.onap.a1pesimulator.service.report.OnEventAction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RanSaveFileReadyRunnable extends AbstractRanRunnable {
+
+ private static final Logger log = LoggerFactory.getLogger(RanSaveFileReadyRunnable.class);
+ private final Integer granPeriod;
+ private final String cellId;
+ private final String jobId;
+ private final RanFileReadyHolder ranFileReadyHolder;
+
+ public RanSaveFileReadyRunnable(RanFileReadyHolder ranFileReadyHolder, String cellId, VesEvent event, EventCustomizer eventCustomizer, Integer interval,
+ Collection<OnEventAction> onEventActions) {
+ super(event, eventCustomizer, onEventActions);
+ this.ranFileReadyHolder = ranFileReadyHolder;
+ this.granPeriod = interval;
+ this.cellId = cellId;
+ this.jobId = UUID.randomUUID() + "-" + cellId;
+ }
+
+ @Override
+ public void run() {
+ try {
+ VesEvent customizedEvent = eventCustomizer.apply(event);
+ onEventAction.forEach(action -> action.onEvent(customizedEvent));
+ ranFileReadyHolder.saveEventToMemory(customizedEvent, cellId, jobId, granPeriod);
+ } catch (VesBrokerException e) {
+ log.error("Saving file ready event failed: {}", e.getMessage());
+ }
+ }
+}
diff --git a/src/main/java/org/onap/a1pesimulator/service/pm/RanSendReportsRunnable.java b/src/main/java/org/onap/a1pesimulator/service/pm/RanSendReportsRunnable.java
new file mode 100644
index 0000000..88cb5a2
--- /dev/null
+++ b/src/main/java/org/onap/a1pesimulator/service/pm/RanSendReportsRunnable.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright (C) 2021 Samsung Electronics
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package org.onap.a1pesimulator.service.pm;
+
+public class RanSendReportsRunnable implements Runnable {
+
+ protected final RanFileReadyHolder ranFileReadyHolder;
+
+ public RanSendReportsRunnable(RanFileReadyHolder ranFileReadyHolder) {
+ this.ranFileReadyHolder = ranFileReadyHolder;
+ }
+
+ @Override
+ public void run() {
+ ranFileReadyHolder.createPMBulkFileAndSendFileReadyMessage();
+ }
+}