diff options
author | k.kedron <k.kedron@partner.samsung.com> | 2021-09-20 10:06:26 +0200 |
---|---|---|
committer | k.kedron <k.kedron@partner.samsung.com> | 2021-09-20 13:04:05 +0200 |
commit | b7e91e0a92ecc0254bb66d560e38cf06e6f76ebb (patch) | |
tree | 690c4526ada4fa8754df770f2397fc08e5bec04a /src/main/java/org/onap/a1pesimulator/service/pm | |
parent | f1563a03cabc572d1cd15260befdaa8808048fbf (diff) |
Refactoring
- ves to report package
- fileready to pm package
- according method name
Issue-ID: INT-1945
Signed-off-by: Krystian Kedron <k.kedron@partner.samsung.com>
Change-Id: I2f22c828145727ba772440fe111b5fde34446b3a
Diffstat (limited to 'src/main/java/org/onap/a1pesimulator/service/pm')
6 files changed, 967 insertions, 0 deletions
diff --git a/src/main/java/org/onap/a1pesimulator/service/pm/FileReadyEventService.java b/src/main/java/org/onap/a1pesimulator/service/pm/FileReadyEventService.java new file mode 100644 index 0000000..df2ce97 --- /dev/null +++ b/src/main/java/org/onap/a1pesimulator/service/pm/FileReadyEventService.java @@ -0,0 +1,159 @@ +/* + * Copyright (C) 2021 Samsung Electronics + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.onap.a1pesimulator.service.pm; + +import static org.onap.a1pesimulator.service.pm.FtpServerService.deletePMBulkFile; +import static org.onap.a1pesimulator.util.Constants.FILE_READY_CHANGE_IDENTIFIER; +import static org.onap.a1pesimulator.util.Constants.FILE_READY_CHANGE_TYPE; + +import java.io.File; +import java.time.Instant; +import java.time.ZonedDateTime; +import java.time.temporal.ChronoUnit; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + +import org.onap.a1pesimulator.data.fileready.FileData; +import org.onap.a1pesimulator.data.fileready.FileReadyEvent; +import org.onap.a1pesimulator.data.fileready.NotificationFields; +import org.onap.a1pesimulator.data.fileready.NotificationFields.ArrayOfNamedHashMap; +import org.onap.a1pesimulator.data.ves.CommonEventHeader; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; + +import reactor.core.publisher.Mono; + +/** + * Service for PM Bulk File creation and its handling + */ + +@Service +public class FileReadyEventService { + + private final FtpServerService ftpServerService; + + @Value("${file.ready.version}") + private String version; + + @Value("${file.ready.vesEventListenerVersion}") + private String vesEventListenerVersion; + + @Value("${file.ready.domain}") + private String domain; + + @Value("${file.ready.eventName}") + private String eventName; + + @Value("${file.ready.fileFormatType}") + private String fileFormatType; + + @Value("${file.ready.fileFormatVersion}") + private String fileFormatVersion; + + @Value("${file.ready.notificationFieldsVersion}") + private String notificationFieldsVersion; + + @Value("${file.ready.priority}") + private String priority; + + @Value("${file.ready.reportingEntityName}") + private String reportingEntityName; + + public FileReadyEventService(FtpServerService ftpServerService) { + this.ftpServerService = ftpServerService; + } + + /** + * It will create FileReadyEvent.json which will go to VES Collector + * + * @return created FileReadyEvent + */ + protected Mono<FileData> createFileReadyEventAndDeleteTmpFile(Mono<FileData> fileMono) { + return fileMono + .map(this::createFileReadyEvent) + .doOnNext(file -> deleteTempArchivedBulkFile(file.getArchivedPmBulkFile())); + } + + /** + * Creates File Ready Event + * + * @param fileData information about PM Bulk Files created in previous steps + * @return added newly created FileReadyEvent to FileData + */ + protected FileData createFileReadyEvent(FileData fileData) { + FileReadyEvent event = new FileReadyEvent(); + CommonEventHeader commonEventHeader = getCommonHeader(); + event.setCommonEventHeader(commonEventHeader); + commonEventHeader.setStartEpochMicrosec(ChronoUnit.MICROS.between(Instant.EPOCH, fileData.getStartEventDate())); + commonEventHeader.setLastEpochMicrosec(ChronoUnit.MICROS.between(Instant.EPOCH, fileData.getEndEventDate())); + event.setNotificationFields(getNotificationFields(fileData.getArchivedPmBulkFile().getName())); + fileData.setFileReadyEvent(event); + return fileData; + } + + /** + * Creates NotificationFields section in FileReadyEvent + * + * @param fileName name of archived PM Bulk File + * @return NotificationFields object + */ + private NotificationFields getNotificationFields(String fileName) { + NotificationFields notificationFields = NotificationFields.builder() + .changeIdentifier(FILE_READY_CHANGE_IDENTIFIER) + .changeType(FILE_READY_CHANGE_TYPE) + .notificationFieldsVersion(notificationFieldsVersion).build(); + + ArrayOfNamedHashMap arrayOfNamedHashMap = new ArrayOfNamedHashMap(); + Map<String, String> hashMapItems = new HashMap<>(); + hashMapItems.put("location", ftpServerService.getFtpPath() + fileName); + hashMapItems.put("compression", "gzip"); + hashMapItems.put("fileFormatType", fileFormatType); + hashMapItems.put("fileFormatVersion", fileFormatVersion); + + arrayOfNamedHashMap.setName(fileName); + arrayOfNamedHashMap.setHashMap(hashMapItems); + notificationFields.setArrayOfNamedHashMap(Collections.singletonList(arrayOfNamedHashMap)); + return notificationFields; + } + + /** + * Creates CommonEventHeader + * + * @return created CommonEventHeader + */ + private CommonEventHeader getCommonHeader() { + return CommonEventHeader.builder() + .version(version) + .vesEventListenerVersion(vesEventListenerVersion) + .domain(domain) + .eventName(eventName) + .eventId(UUID.randomUUID().toString()) + .priority(priority) + .reportingEntityName(reportingEntityName) + .sequence(0) + .timeZoneOffset(ZonedDateTime.now().getOffset().toString()) + .build(); + } + + /** + * Deletes temporary archived PM Bulk File + * + * @param fileMono temporary archived PM Bulk File + */ + private void deleteTempArchivedBulkFile(File fileMono) { + deletePMBulkFile(fileMono); + } +} diff --git a/src/main/java/org/onap/a1pesimulator/service/pm/FtpServerService.java b/src/main/java/org/onap/a1pesimulator/service/pm/FtpServerService.java new file mode 100644 index 0000000..01b0672 --- /dev/null +++ b/src/main/java/org/onap/a1pesimulator/service/pm/FtpServerService.java @@ -0,0 +1,215 @@ +/* + * Copyright (C) 2021 Samsung Electronics + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.onap.a1pesimulator.service.pm; + +import static java.util.Objects.nonNull; +import static org.onap.a1pesimulator.util.Constants.TEMP_DIR; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.file.Files; +import java.util.zip.GZIPOutputStream; + + +import org.onap.a1pesimulator.data.fileready.FileData; +import org.onap.a1pesimulator.exception.NotUploadedToFtpException; +import org.onap.a1pesimulator.util.VnfConfigReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; +import org.springframework.util.FileCopyUtils; + +import net.schmizz.sshj.SSHClient; +import net.schmizz.sshj.sftp.SFTPClient; +import net.schmizz.sshj.transport.verification.PromiscuousVerifier; +import reactor.core.publisher.Mono; + +@Service +public class FtpServerService { + + private static final Logger log = LoggerFactory.getLogger(FtpServerService.class); + + //true - file will be uploaded to FTP; false - file will be copied into xmlPmLocation + @Value("${ftp.server.upload}") + private boolean ftpServerUpload; + + // location where archived file will be copied + @Value("${xml.pm.location}") + private String xmlPmLocation; + + @Value("${ftp.server.protocol}") + private String ftpServerProtocol; + + @Value("${ftp.server.filepath}") + private String ftpServerFilepath; + + @Value("${ftp.server.username}") + private String ftpServerUsername; + + @Value("${ftp.server.password}") + private String ftpServerPassword; + + private VnfConfigReader vnfConfigReader; + + public FtpServerService(VnfConfigReader vnfConfigReader) { + this.vnfConfigReader = vnfConfigReader; + } + + public Mono<FileData> uploadFileToFtp(FileData fileData) { + return Mono.just(fileData) + .flatMap(this::tryToCompressFile) + .flatMap(this::tryToUploadOrSaveFileToFtp) + .onErrorResume(throwable -> resumeError(throwable, fileData)) + .doOnNext(file -> deletePMBulkFile(file.getPmBulkFile())); + } + + /** + * Trying to compress file into .gz + * + * @param fileData file to be archived + * @return archived file + */ + private Mono<FileData> tryToCompressFile(FileData fileData) { + File archiveBulkFile = new File(TEMP_DIR, fileData.getPmBulkFile().getName() + ".gz"); + + try (GZIPOutputStream zos = new GZIPOutputStream( + new FileOutputStream(archiveBulkFile.getAbsolutePath())); FileInputStream inputStream = new FileInputStream(fileData.getPmBulkFile())) { + byte[] buffer = new byte[1024]; + int len; + while ((len = inputStream.read(buffer)) > 0) { + zos.write(buffer, 0, len); + } + fileData.setArchivedPmBulkFile(archiveBulkFile); + log.trace("Compressing file {}", fileData.getPmBulkFile().getName()); + return Mono.just(fileData); + } catch (IOException e) { + log.error("Could not compress file", e); + return Mono.empty(); + } + } + + /** + * Upload file to FTP or copy it to mounted location + * + * @param fileData data about file + * @return fileData for fileReadyEvent + */ + private Mono<FileData> tryToUploadOrSaveFileToFtp(FileData fileData) { + if (ftpServerUpload) { + return tryToUploadFileToFtp(fileData); + } else { + File fileOnFtp = new File(xmlPmLocation, fileData.getArchivedPmBulkFile().getName()); + try { + FileCopyUtils.copy(fileData.getArchivedPmBulkFile(), fileOnFtp); + log.info("Uploading file to the location: {}", fileOnFtp); + return Mono.just(fileData); + } catch (IOException e) { + return Mono.error(new NotUploadedToFtpException("File was not copied to FTP location", e)); + } + } + } + + /** + * Upload file to FTP + * + * @param fileData archived file in Mono + * @return archived file for fileReadyEvent + */ + private Mono<FileData> tryToUploadFileToFtp(FileData fileData) { + if (nonNull(fileData.getArchivedPmBulkFile())) { + SSHClient client = getSSHClient(); + if (nonNull(client)) { + try (client; SFTPClient sftpClient = client.newSFTPClient()) { + File archiveBulkFile = fileData.getArchivedPmBulkFile(); + sftpClient.put(archiveBulkFile.getAbsolutePath(), ftpServerFilepath + "/" + archiveBulkFile.getName()); + + log.info("Uploading file to FTP: {}", archiveBulkFile.getAbsoluteFile()); + return Mono.just(fileData); + } catch (IOException e) { + log.error("Exception while trying to upload a file", e); + } + } else { + log.error("Could not connect to FTP server"); + } + } else { + log.error("There is no file to upload"); + } + return Mono.error(new NotUploadedToFtpException("File was not uploaded to FTP")); + } + + /** + * Creates SSHClient instance + * + * @return SSHClient + */ + protected SSHClient getSSHClient() { + SSHClient client = new SSHClient(); + try { + client.addHostKeyVerifier(new PromiscuousVerifier()); + client.connect(vnfConfigReader.getVnfConfig().getFtpHost(), Integer.parseInt(vnfConfigReader.getVnfConfig().getFtpPort())); + client.authPassword(ftpServerUsername, ftpServerPassword); + return client; + } catch (IOException e) { + log.error("There was an error while connecting to FTP server", e); + try { + client.close(); + } catch (IOException ioException) { + log.error("There was an error while closing the connection to FTP server", e); + } + return null; + } + } + + /** + * Deletes created PM Bulk File xml from temp storage after successful upload to FTP + * + * @param file file which we gonna delete + */ + public static void deletePMBulkFile(File file) { + try { + log.trace("Deleting file: {}", file.getAbsoluteFile()); + Files.delete(file.toPath()); + } catch (IOException e) { + log.warn("Could not delete file: {}", file.getName(), e); + } + } + + /** + * Get path to FTP server + * + * @return for example: "sftp://foo:pass@106.120.119.170:2222/upload/" + */ + public String getFtpPath() { + return ftpServerProtocol + "://" + ftpServerUsername + ":" + ftpServerPassword + "@" + vnfConfigReader.getVnfConfig().getFtpHost() + ":" + + vnfConfigReader.getVnfConfig().getFtpPort() + "/" + ftpServerFilepath + + "/"; + } + + /** + * Try to clean up things after an exception + * + * @param throwable error thrown + * @param fileData data about files which needs to be deleted + * @return empty Mono object + */ + protected Mono<FileData> resumeError(Throwable throwable, FileData fileData) { + log.error("Error occurs while uploading file to FTP server", throwable); + deletePMBulkFile(fileData.getPmBulkFile()); + deletePMBulkFile(fileData.getArchivedPmBulkFile()); + return Mono.empty(); + } +} diff --git a/src/main/java/org/onap/a1pesimulator/service/pm/PMBulkFileService.java b/src/main/java/org/onap/a1pesimulator/service/pm/PMBulkFileService.java new file mode 100644 index 0000000..aec473c --- /dev/null +++ b/src/main/java/org/onap/a1pesimulator/service/pm/PMBulkFileService.java @@ -0,0 +1,333 @@ +/* + * Copyright (C) 2021 Samsung Electronics + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.onap.a1pesimulator.service.pm; + +import static java.util.Comparator.comparing; +import static java.util.Objects.isNull; +import static org.onap.a1pesimulator.util.Constants.EMPTY_STRING; +import static org.onap.a1pesimulator.util.Constants.MEASUREMENT_FIELD_IDENTIFIER; +import static org.onap.a1pesimulator.util.Constants.TEMP_DIR; +import static org.onap.a1pesimulator.util.Convertors.ISO_8601_DATE; +import static org.onap.a1pesimulator.util.Convertors.YYYYMMDD_PATTERN; +import static org.onap.a1pesimulator.util.Convertors.truncateToSpecifiedMinutes; +import static org.onap.a1pesimulator.util.Convertors.zonedDateTimeToString; + +import java.io.File; +import java.time.ZonedDateTime; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Stream; + +import javax.xml.XMLConstants; +import javax.xml.parsers.DocumentBuilder; +import javax.xml.parsers.DocumentBuilderFactory; +import javax.xml.parsers.ParserConfigurationException; +import javax.xml.transform.OutputKeys; +import javax.xml.transform.Transformer; +import javax.xml.transform.TransformerException; +import javax.xml.transform.TransformerFactory; +import javax.xml.transform.dom.DOMSource; +import javax.xml.transform.stream.StreamResult; + +import org.onap.a1pesimulator.data.fileready.EventMemoryHolder; +import org.onap.a1pesimulator.data.fileready.FileData; +import org.onap.a1pesimulator.data.ves.VesEvent; +import org.onap.a1pesimulator.util.VnfConfigReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; +import org.w3c.dom.Document; +import org.w3c.dom.Element; +import org.w3c.dom.Node; + +import reactor.core.publisher.Mono; + +/** + * Service for PM Bulk File creation and handling + */ + +@Service +public class PMBulkFileService { + + private static final Logger log = LoggerFactory.getLogger(PMBulkFileService.class); + private static Map<String, AtomicInteger> uniqueFileNamesWithCount; + private final VnfConfigReader vnfConfigReader; + + @Value("${xml.pm.bulk.fileFormatVersion}") + private String fileFormatVersion; + + @Value("${xml.pm.bulk.vendorName}") + private String vendorName; + + @Value("${xml.pm.bulk.fileSender}") + private String fileSenderValue; + + @Value("${xml.pm.bulk.userLabel}") + private String userLabel; + + @Value("${xml.pm.bulk.domainId}") + private String domainId; + + public PMBulkFileService(VnfConfigReader vnfConfigReader) { + this.vnfConfigReader = vnfConfigReader; + } + + /** + * Generate PM Bulk File xml from stored events + * + * @param collectedEvents list of stored events + * @return generated file in Mono object + */ + public Mono<FileData> generatePMBulkFileXml(List<EventMemoryHolder> collectedEvents) { + + try { + DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance(); + docFactory.setAttribute(XMLConstants.ACCESS_EXTERNAL_DTD, ""); + docFactory.setAttribute(XMLConstants.ACCESS_EXTERNAL_SCHEMA, ""); + + DocumentBuilder docBuilder = docFactory.newDocumentBuilder(); + + //root elements + Document doc = docBuilder.newDocument(); + + Element measCollecFile = doc.createElement("measCollecFile"); + doc.appendChild(measCollecFile); + measCollecFile.setAttribute("xmlns", "http://www.3gpp.org/ftp/specs/archive/32_series/32.435#measCollec"); + measCollecFile.setAttribute("xmlns:xsi", "http://www.w3.org/2001/XMLSchema-instance"); + measCollecFile.setAttribute("xsi:schemaLocation", + "http://www.3gpp.org/ftp/specs/archive/32_series/32.435#measCollec http://www.3gpp.org/ftp/specs/archive/32_series/32.435#measCollec"); + + //fileHeader elements + Element fileHeader = doc.createElement("fileHeader"); + measCollecFile.appendChild(fileHeader); + fileHeader.setAttribute("fileFormatVersion", fileFormatVersion); + fileHeader.setAttribute("vendorName", vendorName); + + //fileSender elements + Element fileSender = doc.createElement("fileSender"); + fileHeader.appendChild(fileSender); + fileSender.setAttribute("elementType", fileSenderValue); + + //measCollec elements + Element measCollec = doc.createElement("measCollec"); + fileHeader.appendChild(measCollec); + measCollec.setAttribute("beginTime", zonedDateTimeToString(earliestEventTime(collectedEvents), ISO_8601_DATE)); + + //measData elements + Element measData = doc.createElement("measData"); + measCollecFile.appendChild(measData); + + //managedElement elements + Element managedElement = doc.createElement("managedElement"); + measData.appendChild(managedElement); + managedElement.setAttribute("userLabel", userLabel); + + //add measInfo elements + addMeansInfo(doc, measData, collectedEvents); + + //fileFooter elements + Element fileFooter = doc.createElement("fileFooter"); + measCollecFile.appendChild(fileFooter); + + Element measCollecFooter = doc.createElement("measCollec"); + fileFooter.appendChild(measCollecFooter); + measCollecFooter.setAttribute("endTime", zonedDateTimeToString(latestEventTime(collectedEvents), ISO_8601_DATE)); + + File xmlFile = writeDocumentIntoXmlFile(doc, collectedEvents); + + Mono<FileData> justMono = Mono.just(FileData.builder().pmBulkFile(xmlFile).startEventDate(earliestEventTime(collectedEvents)) + .endEventDate(latestEventTime(collectedEvents)).build()); + log.trace("Removing all VES events from memory: {}", collectedEvents.size()); + collectedEvents.clear(); + return justMono; + + } catch (ParserConfigurationException | TransformerException pce) { + log.error("Error occurs while creating PM Bulk File", pce); + return Mono.empty(); + } + } + + /** + * Add measurement elements for each cell and measurement time into PM Bulk File + * + * @param doc Document + * @param measData main element of document, which stores meansData + * @param collectedEvents list of stored events + */ + private void addMeansInfo(Document doc, Element measData, List<EventMemoryHolder> collectedEvents) { + collectedEvents.stream().sorted(comparing(EventMemoryHolder::getEventDate)).forEach(eventMemoryHolder -> { + VesEvent event = eventMemoryHolder.getEvent(); + + Element measInfo = doc.createElement("measInfo"); + measData.appendChild(measInfo); + + //job element + Element job = doc.createElement("job"); + measInfo.appendChild(job); + job.setAttribute("jobId", eventMemoryHolder.getJobId()); + + //granPeriod elements + Element granPeriod = doc.createElement("granPeriod"); + measInfo.appendChild(granPeriod); + granPeriod.setAttribute("duration", getDurationString(eventMemoryHolder.getGranPeriod())); + ZonedDateTime endDate = eventMemoryHolder.getEventDate(); + granPeriod.setAttribute("endTime", zonedDateTimeToString(endDate, ISO_8601_DATE)); + + //repPeriod elements + Element repPeriod = doc.createElement("repPeriod"); + measInfo.appendChild(repPeriod); + repPeriod.setAttribute("duration", getDurationString(vnfConfigReader.getVnfConfig().getRepPeriod())); + + //measType definition + HashMap<String, String> measurmentMap = new HashMap<>(); + AtomicInteger i = new AtomicInteger(1); + event.getMeasurementFields().getAdditionalMeasurements().forEach(additionalMeasurement -> { + if (Stream.of(MEASUREMENT_FIELD_IDENTIFIER) + .noneMatch(elementName -> elementName.equalsIgnoreCase(additionalMeasurement.getName()))) { + Element measType = doc.createElement("measType"); + measInfo.appendChild(measType); + measType.setAttribute("p", String.valueOf(i)); + measType.setTextContent(additionalMeasurement.getName()); + measurmentMap.put(additionalMeasurement.getName(), String.valueOf(i)); + i.incrementAndGet(); + } + }); + + //measValue elements + Element measValue = doc.createElement("measValue"); + measInfo.appendChild(measValue); + measValue.setAttribute("measObjLdn", eventMemoryHolder.getCellId()); + event.getMeasurementFields().getAdditionalMeasurements().stream() + .filter(additionalMeasurement -> measurmentMap.containsKey(additionalMeasurement.getName())) + .forEach(additionalMeasurement -> { + if (!additionalMeasurement.getMeasurementValue().isEmpty()) { + + //r elements + Element r = doc.createElement("r"); + measValue.appendChild(r); + r.setAttribute("p", measurmentMap.get(additionalMeasurement.getName())); + r.setTextContent(additionalMeasurement.getMeasurementValue()); + } + }); + }); + } + + /** + * Converts Document into XML file and adds proper headers + * + * @param doc Document + * @param collectedEvents list of stored events + * @return newly created File in xml format + */ + private File writeDocumentIntoXmlFile(Document doc, List<EventMemoryHolder> collectedEvents) throws TransformerException { + TransformerFactory transformerFactory = TransformerFactory.newInstance(); + transformerFactory.setAttribute(XMLConstants.ACCESS_EXTERNAL_DTD, ""); + transformerFactory.setAttribute(XMLConstants.ACCESS_EXTERNAL_STYLESHEET, ""); + + Transformer tr = transformerFactory.newTransformer(); + tr.setOutputProperty(OutputKeys.ENCODING, "UTF-8"); + tr.setOutputProperty(OutputKeys.VERSION, "1.0"); + Node pi = doc.createProcessingInstruction("xml-stylesheet", "type=\"text/xsl\" href=\"MeasDataCollection.xsl\""); + doc.insertBefore(pi, doc.getDocumentElement()); + + File xmlFile = getXmlFile(collectedEvents); + StreamResult result = new StreamResult(xmlFile); + DOMSource source = new DOMSource(doc); + tr.transform(source, result); + return xmlFile; + } + + /** + * Generate PM Bulk File and its name + * + * @param collectedEvents list of stored events + * @return newly created File + */ + private File getXmlFile(List<EventMemoryHolder> collectedEvents) { + StringBuilder fileNameBuilder = new StringBuilder("C"); + ZonedDateTime firstEventTime = earliestEventTime(collectedEvents); + ZonedDateTime lastEventTime = latestEventTime(collectedEvents); + fileNameBuilder.append(zonedDateTimeToString(firstEventTime, YYYYMMDD_PATTERN)).append("."); + fileNameBuilder.append(zonedDateTimeToString(truncateToSpecifiedMinutes(firstEventTime, 5), "HHmmZ")).append("-"); + fileNameBuilder.append(zonedDateTimeToString(lastEventTime, YYYYMMDD_PATTERN)).append("."); + fileNameBuilder.append(zonedDateTimeToString(truncateToSpecifiedMinutes(lastEventTime, 5), "HHmmZ")); + fileNameBuilder.append("_").append(domainId); + fileNameBuilder.append(appendRcIfNecessary(fileNameBuilder)); + fileNameBuilder.append(".xml"); + + return new File(TEMP_DIR, fileNameBuilder.toString()); + } + + /** + * The RC parameter is a running count and shall be appended only if the filename is otherwise not unique, i.e. more than one file is generated and all + * other parameters of the file name are identical. + * + * @param fileNameBuilder stringBuilder which contains currently generated file name + * @return sequence number or empty string + */ + private static String appendRcIfNecessary(StringBuilder fileNameBuilder) { + String fileName = fileNameBuilder.toString(); + int sequence = 0; + if (isNull(uniqueFileNamesWithCount)) { + uniqueFileNamesWithCount = Collections.synchronizedMap(new HashMap<>()); + } + if (uniqueFileNamesWithCount.containsKey(fileName)) { + sequence = uniqueFileNamesWithCount.get(fileName).incrementAndGet(); + } else { + uniqueFileNamesWithCount.clear(); //we have new dates, so we can clear existing list to not grow infinitely + uniqueFileNamesWithCount.put(fileName, new AtomicInteger(0)); + } + return sequence > 0 ? "_-_" + sequence : EMPTY_STRING; + } + + /** + * Get ZonedDateTime of the earliest event in that reporting period + * + * @param collectedEvents list of compared events + * @return the earliest ZonedDateTime + */ + private static ZonedDateTime earliestEventTime(List<EventMemoryHolder> collectedEvents) { + return collectedEvents.stream() + .map(EventMemoryHolder::getEventDate) + .min(comparing(ZonedDateTime::toEpochSecond, Comparator.nullsLast(Comparator.naturalOrder()))) + .orElse(ZonedDateTime.now()); + } + + /** + * Get ZonedDateTime of the latest event in that reporting period + * + * @param collectedEvents list of compared events + * @return the latest ZonedDateTime + */ + private static ZonedDateTime latestEventTime(List<EventMemoryHolder> collectedEvents) { + return collectedEvents.stream().map(EventMemoryHolder::getEventDate) + .max(comparing(ZonedDateTime::toEpochSecond, Comparator.nullsLast(Comparator.naturalOrder()))) + .orElse(ZonedDateTime.now()); + } + + /** + * Convert duration interval in seconds to xml element required by the specification Examples: PT10S, PT900S + * + * @param interval interval in seconds + * @return duration xml element representation + */ + private static String getDurationString(int interval) { + return "PT" + interval + "S"; + } +} diff --git a/src/main/java/org/onap/a1pesimulator/service/pm/RanFileReadyHolder.java b/src/main/java/org/onap/a1pesimulator/service/pm/RanFileReadyHolder.java new file mode 100644 index 0000000..d3a7970 --- /dev/null +++ b/src/main/java/org/onap/a1pesimulator/service/pm/RanFileReadyHolder.java @@ -0,0 +1,178 @@ +/* + * Copyright (C) 2021 Samsung Electronics + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.onap.a1pesimulator.service.pm; + +import static java.util.Objects.isNull; +import static java.util.Objects.nonNull; + +import java.time.ZonedDateTime; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.onap.a1pesimulator.data.fileready.EventMemoryHolder; +import org.onap.a1pesimulator.data.fileready.FileData; +import org.onap.a1pesimulator.data.ves.VesEvent; +import org.onap.a1pesimulator.exception.VesBrokerException; +import org.onap.a1pesimulator.service.report.RanVesSender; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Service; +import org.springframework.util.CollectionUtils; + +import reactor.core.publisher.Mono; + +/** + * Entry point for PM Bulk File event + */ +@Service +public class RanFileReadyHolder { + + private static final Logger log = LoggerFactory.getLogger(RanFileReadyHolder.class); + + private Map<String, List<EventMemoryHolder>> collectedEventsByCell; + private final RanVesSender ranVesSender; + private final FtpServerService ftpServerService; + private final PMBulkFileService xmlFileService; + private final FileReadyEventService fileReadyEventService; + + public RanFileReadyHolder(RanVesSender ranVesSender, FtpServerService ftpServerService, PMBulkFileService xmlFileService, + FileReadyEventService fileReadyEventService) { + this.ranVesSender = ranVesSender; + this.ftpServerService = ftpServerService; + this.xmlFileService = xmlFileService; + this.fileReadyEventService = fileReadyEventService; + } + + /** + * Run entire process for all cells collectedEventsByCell are synchronized to not be updated by other threads during PM Bulk File creation + */ + public void createPMBulkFileAndSendFileReadyMessage() { + synchronized (getCollectedEventsByCell()) { + getCollectedEventsByCell().forEach((cellId, cellEventList) -> createPMBulkFileAndSendFileReadyMessageForCell(cellEventList)); + } + } + + /** + * Run entire process for one cell collectedEventsByCell are synchronized to not be updated by other threads during PM Bulk File creation + * + * @param cellId cell identifier + */ + public void createPMBulkFileAndSendFileReadyMessageForCellId(String cellId) { + synchronized (getCollectedEventsByCell()) { + createPMBulkFileAndSendFileReadyMessageForCell(getCollectedEventsForCellId(cellId)); + } + } + + /** + * Run entire process for one cell: PM Bulk File creation-> upload to FTP -> delete temp PM Bulk File -> create File Ready Event - > send it to VES + * Collector. + * + * @param events list of events for one cell + */ + public void createPMBulkFileAndSendFileReadyMessageForCell(List<EventMemoryHolder> events) { + Mono.justOrEmpty(events) + .filter(this::areSomeEventsStored) + .flatMap(xmlFileService::generatePMBulkFileXml) + .map(ftpServerService::uploadFileToFtp) + .flatMap(fileReadyEventService::createFileReadyEventAndDeleteTmpFile) + .doOnNext(this::sendEventToVesCollector) + .subscribe(fileData -> informAboutSuccess(), this::informAboutError); + } + + /** + * Adds current event to the memory by cell, which is Map<String,List<EventMemoryHolder>> + * + * @param vesEvent event from specific cell + * @throws VesBrokerException in case of any problem with adding to List, it throws an exception + */ + public void saveEventToMemory(VesEvent vesEvent, String cellId, String jobId, Integer granPeriod) throws VesBrokerException { + try { + Map<String, List<EventMemoryHolder>> events = getCollectedEventsByCell(); + if (events.containsKey(cellId)) { + events.get(cellId).add(new EventMemoryHolder(cellId, jobId, granPeriod, ZonedDateTime.now(), vesEvent)); + } else { + List<EventMemoryHolder> cellEvents = Collections.synchronizedList( + new ArrayList<>(List.of(new EventMemoryHolder(cellId, jobId, granPeriod, ZonedDateTime.now(), vesEvent)))); + events.put(cellId, cellEvents); + } + log.trace("Saving VES event for cell {} with granularity period {} and sequence number {}", cellId, granPeriod, events.get(cellId).size()); + } catch (Exception e) { + String errorMsg = "Failed to save VES event to memory with exception:" + e; + throw new VesBrokerException(errorMsg); + } + } + + /** + * Sends FileReadyEvent to VES Collector + * + * @param fileData object with FileReadyEvent file + */ + protected void sendEventToVesCollector(FileData fileData) { + ranVesSender.send(fileData.getFileReadyEvent()); + } + + /** + * Log about successful operation + */ + private void informAboutSuccess() { + log.info("PM Bulk file was generated, uploaded to FTP and File ready event was send to VES Collector"); + } + + /** + * Log an error if occurs during the process + * + * @param throwable - error raised in some of the steps + */ + private void informAboutError(Throwable throwable) { + log.info("File ready event was unsuccessful: {}", throwable.getMessage()); + } + + /** + * Check if there are any Events stored in the memory. Used before creating PM Bulk File xml + * + * @param collectedEvents list of stored events + * @return true there is at least one event / false - no event at all + */ + private boolean areSomeEventsStored(List<EventMemoryHolder> collectedEvents) { + return !CollectionUtils.isEmpty(collectedEvents); + } + + /** + * Factory to get Map<String,List<EventMemoryHolder>> + * + * @return existing or newly created Map<String,List<EventMemoryHolder>> + */ + public Map<String, List<EventMemoryHolder>> getCollectedEventsByCell() { + if (isNull(collectedEventsByCell)) { + collectedEventsByCell = Collections.synchronizedMap(new HashMap<>()); + } + return collectedEventsByCell; + } + + /** + * Get list of events for specific CellId + * + * @param cellId cell identifier + * @return list of events + */ + public List<EventMemoryHolder> getCollectedEventsForCellId(String cellId) { + if (nonNull(collectedEventsByCell) && collectedEventsByCell.containsKey(cellId)) { + return collectedEventsByCell.get(cellId); + } + return new ArrayList<>(); + } +} diff --git a/src/main/java/org/onap/a1pesimulator/service/pm/RanSaveFileReadyRunnable.java b/src/main/java/org/onap/a1pesimulator/service/pm/RanSaveFileReadyRunnable.java new file mode 100644 index 0000000..222680f --- /dev/null +++ b/src/main/java/org/onap/a1pesimulator/service/pm/RanSaveFileReadyRunnable.java @@ -0,0 +1,54 @@ +/* + * Copyright (C) 2021 Samsung Electronics + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.onap.a1pesimulator.service.pm; + +import java.util.Collection; +import java.util.UUID; + +import org.onap.a1pesimulator.data.ves.VesEvent; +import org.onap.a1pesimulator.exception.VesBrokerException; +import org.onap.a1pesimulator.service.common.AbstractRanRunnable; +import org.onap.a1pesimulator.service.common.EventCustomizer; +import org.onap.a1pesimulator.service.report.OnEventAction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class RanSaveFileReadyRunnable extends AbstractRanRunnable { + + private static final Logger log = LoggerFactory.getLogger(RanSaveFileReadyRunnable.class); + private final Integer granPeriod; + private final String cellId; + private final String jobId; + private final RanFileReadyHolder ranFileReadyHolder; + + public RanSaveFileReadyRunnable(RanFileReadyHolder ranFileReadyHolder, String cellId, VesEvent event, EventCustomizer eventCustomizer, Integer interval, + Collection<OnEventAction> onEventActions) { + super(event, eventCustomizer, onEventActions); + this.ranFileReadyHolder = ranFileReadyHolder; + this.granPeriod = interval; + this.cellId = cellId; + this.jobId = UUID.randomUUID() + "-" + cellId; + } + + @Override + public void run() { + try { + VesEvent customizedEvent = eventCustomizer.apply(event); + onEventAction.forEach(action -> action.onEvent(customizedEvent)); + ranFileReadyHolder.saveEventToMemory(customizedEvent, cellId, jobId, granPeriod); + } catch (VesBrokerException e) { + log.error("Saving file ready event failed: {}", e.getMessage()); + } + } +} diff --git a/src/main/java/org/onap/a1pesimulator/service/pm/RanSendReportsRunnable.java b/src/main/java/org/onap/a1pesimulator/service/pm/RanSendReportsRunnable.java new file mode 100644 index 0000000..88cb5a2 --- /dev/null +++ b/src/main/java/org/onap/a1pesimulator/service/pm/RanSendReportsRunnable.java @@ -0,0 +1,28 @@ +/* + * Copyright (C) 2021 Samsung Electronics + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.onap.a1pesimulator.service.pm; + +public class RanSendReportsRunnable implements Runnable { + + protected final RanFileReadyHolder ranFileReadyHolder; + + public RanSendReportsRunnable(RanFileReadyHolder ranFileReadyHolder) { + this.ranFileReadyHolder = ranFileReadyHolder; + } + + @Override + public void run() { + ranFileReadyHolder.createPMBulkFileAndSendFileReadyMessage(); + } +} |