diff options
author | Stanislav Marszalek <s.marszalek2@partner.samsung.com> | 2021-07-27 12:39:06 +0200 |
---|---|---|
committer | Stanislav Marszalek <s.marszalek2@partner.samsung.com> | 2021-08-05 10:49:01 +0200 |
commit | 44b1873c0f84022d90769f46abda160b3ea06bd3 (patch) | |
tree | e210ad60a7c7cc58ab7fc827c9cfacf0fa1126d6 | |
parent | 616ab68e1c2e31a63ed8b78aa959234458d91e47 (diff) |
O1 PM Bulk support in DataCollector RAPP
Issue-ID: INT-1947
Signed-off-by: Stanislav Marszalek <s.marszalek2@partner.samsung.com>
Change-Id: I02d3684c98d563d7f386de2fdf032e930ac44b3f
21 files changed, 601 insertions, 135 deletions
diff --git a/datacollector/src/main/java/org/onap/rapp/datacollector/entity/fileready/FileReadyEvent.java b/datacollector/src/main/java/org/onap/rapp/datacollector/entity/fileready/FileReadyEvent.java new file mode 100644 index 0000000..104047f --- /dev/null +++ b/datacollector/src/main/java/org/onap/rapp/datacollector/entity/fileready/FileReadyEvent.java @@ -0,0 +1,24 @@ +package org.onap.rapp.datacollector.entity.fileready; + +import org.onap.rapp.datacollector.entity.ves.CommonEventHeader; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonTypeName; + +import lombok.Builder; +import lombok.Data; +import lombok.Getter; + +/** + * Object for storing PM Bulk File information coming from PM Mapper + */ +@JsonTypeName("event") +@JsonInclude(JsonInclude.Include.NON_NULL) +@Builder +@Data +@Getter +public class FileReadyEvent { + + private CommonEventHeader commonEventHeader; + private MeasDataCollection measDataCollection; +} diff --git a/datacollector/src/main/java/org/onap/rapp/datacollector/entity/fileready/MeasDataCollection.java b/datacollector/src/main/java/org/onap/rapp/datacollector/entity/fileready/MeasDataCollection.java new file mode 100644 index 0000000..53a9e74 --- /dev/null +++ b/datacollector/src/main/java/org/onap/rapp/datacollector/entity/fileready/MeasDataCollection.java @@ -0,0 +1,58 @@ +package org.onap.rapp.datacollector.entity.fileready; + +import java.util.List; + +import lombok.Getter; + +/** + * MeasDataCollection section of PM Bulk File coming from PM Mapper + */ +@Getter +public class MeasDataCollection { + + private long granularityPeriod; + private String measuredEntityUserName; + private String measuredEntityDn; + private String measuredEntitySoftwareVersion; + + private List<MeasInfo> measInfoList; + + @Getter + public class MeasInfo { + + private MeasInfoId measInfoId; + + @Getter + public class MeasInfoId { + + private String sMeasInfoId; + } + + private MeasTypes measTypes; + + @Getter + public class MeasTypes { + + List<String> sMeasTypesList; + } + + private List<MeasValue> measValuesList; + + @Getter + public class MeasValue { + + private String measObjInstId; + private boolean suspectFlag; + private List<MeasResult> measResults; + + @Getter + public class MeasResult { + + private Integer p; + private String sValue; + + } + } + + } +} diff --git a/datacollector/src/main/java/org/onap/rapp/datacollector/entity/ves/CommonEventHeader.java b/datacollector/src/main/java/org/onap/rapp/datacollector/entity/ves/CommonEventHeader.java index e9d3f75..e087289 100644 --- a/datacollector/src/main/java/org/onap/rapp/datacollector/entity/ves/CommonEventHeader.java +++ b/datacollector/src/main/java/org/onap/rapp/datacollector/entity/ves/CommonEventHeader.java @@ -14,53 +14,42 @@ package org.onap.rapp.datacollector.entity.ves; +import org.springframework.data.annotation.Transient; + import com.fasterxml.jackson.annotation.JsonInclude; + +import lombok.AllArgsConstructor; import lombok.Builder; -import lombok.Data; import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.Setter; import lombok.ToString; -import org.springframework.data.annotation.Transient; -@Data +@Getter +@Setter @EqualsAndHashCode @ToString @Builder +@AllArgsConstructor @JsonInclude(JsonInclude.Include.NON_NULL) public class CommonEventHeader { - public final String eventType; - public final String version; - public final String sourceId; - public final String reportingEntityName; - public final Long startEpochMicrosec; - public final String eventId; - public final Long lastEpochMicrosec; - public final String priority; - public final Integer sequence; - public final String sourceName; - public final String domain; - public final String eventName; - public final String reportingEntityId; - public final String nfcNamingCode; - public final String nfNamingCode; + + private final String eventType; + private final String version; + private final String sourceId; + private final String reportingEntityName; + private Long startEpochMicrosec; + private final String eventId; + private Long lastEpochMicrosec; + private final String priority; + private final Integer sequence; + private final String sourceName; + private final String domain; + private final String eventName; + private final String reportingEntityId; + private final String nfcNamingCode; + private final String nfNamingCode; @Transient public final String timeZoneOffset; - protected CommonEventHeader(String eventType, String version, String sourceId, String reportingEntityName, Long startEpochMicrosec, String eventId, Long lastEpochMicrosec, String priority, Integer sequence, String sourceName, String domain, String eventName, String reportingEntityId, String nfcNamingCode, String nfNamingCode, String timeZone) { - this.eventType = eventType; - this.version = version; - this.sourceId = sourceId; - this.reportingEntityName = reportingEntityName; - this.startEpochMicrosec = startEpochMicrosec; - this.eventId = eventId; - this.lastEpochMicrosec = lastEpochMicrosec; - this.priority = priority; - this.sequence = sequence; - this.sourceName = sourceName; - this.domain = domain; - this.eventName = eventName; - this.reportingEntityId = reportingEntityId; - this.nfcNamingCode = nfcNamingCode; - this.nfNamingCode = nfNamingCode; - this.timeZoneOffset = timeZone; - } } diff --git a/datacollector/src/main/java/org/onap/rapp/datacollector/entity/ves/MeasurementFields.java b/datacollector/src/main/java/org/onap/rapp/datacollector/entity/ves/MeasurementFields.java index 802aace..c11c490 100644 --- a/datacollector/src/main/java/org/onap/rapp/datacollector/entity/ves/MeasurementFields.java +++ b/datacollector/src/main/java/org/onap/rapp/datacollector/entity/ves/MeasurementFields.java @@ -15,12 +15,14 @@ package org.onap.rapp.datacollector.entity.ves; import java.util.Collections; import java.util.List; + +import org.springframework.data.relational.core.mapping.Column; +import org.springframework.data.relational.core.mapping.Table; + import lombok.Builder; import lombok.Data; import lombok.EqualsAndHashCode; import lombok.ToString; -import org.springframework.data.relational.core.mapping.Column; -import org.springframework.data.relational.core.mapping.Table; @Data @ToString @@ -31,7 +33,7 @@ public class MeasurementFields { public static final MeasurementFields EMPTY = new MeasurementFields(-1L, -1L, Collections.emptyList()); public final Long eventId; public final long measurementInterval; - public final String measurementFieldsVersion = "4.0"; + public static final String MEASUREMENT_FIELDS_VERSION = "4.0"; @Column("event_id") public final List<AdditionalMeasurements> additionalMeasurements; diff --git a/datacollector/src/main/java/org/onap/rapp/datacollector/service/FileReadyParserImpl.java b/datacollector/src/main/java/org/onap/rapp/datacollector/service/FileReadyParserImpl.java new file mode 100644 index 0000000..d7f517e --- /dev/null +++ b/datacollector/src/main/java/org/onap/rapp/datacollector/service/FileReadyParserImpl.java @@ -0,0 +1,197 @@ +/* + * Copyright (C) 2021 Samsung Electronics + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + + +package org.onap.rapp.datacollector.service; + +import static java.util.Objects.nonNull; +import static org.onap.rapp.datacollector.service.PMService.CELL_FIELD_NAME; +import static org.onap.rapp.datacollector.service.PMService.VALUE_NAME; + +import java.lang.reflect.Type; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import org.onap.rapp.datacollector.entity.fileready.FileReadyEvent; +import org.onap.rapp.datacollector.entity.fileready.MeasDataCollection; +import org.onap.rapp.datacollector.entity.fileready.MeasDataCollection.MeasInfo; +import org.onap.rapp.datacollector.entity.fileready.MeasDataCollection.MeasInfo.MeasValue; +import org.onap.rapp.datacollector.entity.ves.AdditionalMeasurements; +import org.onap.rapp.datacollector.entity.ves.CommonEventHeader; +import org.onap.rapp.datacollector.entity.ves.Event; +import org.onap.rapp.datacollector.entity.ves.MeasurementFields; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Service; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonDeserializationContext; +import com.google.gson.JsonDeserializer; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParseException; + +@Service +public class FileReadyParserImpl extends ParserAbstractClass implements VesParser { + + private static final Logger logger = LoggerFactory.getLogger(FileReadyParserImpl.class); + + /** + * Parse incoming Json string into list of Events + * + * @param eventString json from PM Mapper + * @return list of events + */ + @Override + public List<Event> parse(final String eventString) { + logger.debug("parsing ves event {}", eventString); + FileReadyEvent fileReadyEvent = gson.fromJson(eventString, FileReadyEvent.class); + return convertFileReadyEventToEventList(fileReadyEvent, eventString); + } + + /** + * Convert FileReadyEvent event into list of events which will be stored in database + * + * @param fileReadyEvent object created from PM Mapper response + * @param eventString Json event in string + * @return list of events + */ + private List<Event> convertFileReadyEventToEventList(FileReadyEvent fileReadyEvent, String eventString) { + List<Event> events = new ArrayList<>(); + long averageMeasInterval = getAverageMeasInterval(fileReadyEvent); + fileReadyEvent.getMeasDataCollection().getMeasInfoList() + .forEach(measInfo -> measInfo.getMeasValuesList().stream() + .filter(measValue -> hasListOfTypesSameSizeAsListOfResults(measInfo, measValue)) + .forEach(measValue -> events.add(createEvent(fileReadyEvent, measInfo, measValue, eventString, averageMeasInterval)))); + return events; + } + + /** + * Creates individual event from FileReadyEvent data + * + * @param fileReadyEvent bject created from PM Mapper response + * @param measInfo measurement Info object + * @param measValue measurement Value object + * @param eventString Json event in string + * @param averageMeasInterval calculated average interval + * @return Event object + */ + private Event createEvent(FileReadyEvent fileReadyEvent, MeasInfo measInfo, MeasValue measValue, String eventString, long averageMeasInterval) { + List<AdditionalMeasurements> additionalMeasList = new ArrayList<>(); + // Adding measurement's results to additionalMeasList + measValue.getMeasResults() + .forEach(measResult -> { + Map<String, String> hashMap = new HashMap<>(); + hashMap.put(VALUE_NAME, measResult.getSValue()); + additionalMeasList.add(AdditionalMeasurements.builder() + .withName(measInfo.getMeasTypes().getSMeasTypesList().get(measResult.getP() - 1)) + .withHashMap(hashMap).build()); + } + ); + // Adding cell identifier record to additionalMeasList + additionalMeasList.add(AdditionalMeasurements.builder() + .withName(CELL_FIELD_NAME) + .withHashMap(Collections.singletonMap(CELL_FIELD_NAME, measValue.getMeasObjInstId())).build()); + + MeasurementFields measurementFields = MeasurementFields.builder() + .measurementInterval(averageMeasInterval) + .additionalMeasurements(additionalMeasList) + .build(); + Event createdEvent = Event.of(createEventHeader(fileReadyEvent, averageMeasInterval), measurementFields); + createdEvent.raw = eventString; + return createdEvent; + } + + /** + * Creates CommonEventHeader as new copy of initial CommonEventHeader and sets its start/end date by average interval + * + * @param fileReadyEvent object created from PM Mapper response + * @param averageMeasInterval calculated average interval + * @return created CommonEventHeader + */ + private CommonEventHeader createEventHeader(FileReadyEvent fileReadyEvent, long averageMeasInterval) { + CommonEventHeader headerCopy = gson.fromJson(gson.toJson(fileReadyEvent.getCommonEventHeader()), CommonEventHeader.class); + headerCopy.setStartEpochMicrosec(headerCopy.getStartEpochMicrosec() - averageMeasInterval); + headerCopy.setLastEpochMicrosec(fileReadyEvent.getCommonEventHeader().getStartEpochMicrosec()); + fileReadyEvent.getCommonEventHeader().setStartEpochMicrosec(headerCopy.getLastEpochMicrosec() + averageMeasInterval); + return headerCopy; + } + + /** + * As MeansType will be selected by its position in the list we need to make sure that MeasTypesList's size is the same size of MeasResults + * + * @param measInfo measurement Info object + * @param measValue measurement Value object + * @return true=size is the same, false=size is different we can not process it + */ + private boolean hasListOfTypesSameSizeAsListOfResults(MeasInfo measInfo, MeasValue measValue) { + return measInfo.getMeasTypes().getSMeasTypesList().size() == measValue.getMeasResults().size(); + } + + /** + * Average interval between last and start day, divided by number of measurements + * + * @param fileReadyEvent object created from PM Mapper response + * @return Average interval in microseconds + */ + private long getAverageMeasInterval(FileReadyEvent fileReadyEvent) { + int noOfMeasurment = fileReadyEvent.getMeasDataCollection().getMeasInfoList().size(); + int dividedBy = (noOfMeasurment == 0 || noOfMeasurment == 1) ? 1 : (noOfMeasurment - 1); + long difference = fileReadyEvent.getCommonEventHeader().getLastEpochMicrosec() - fileReadyEvent.getCommonEventHeader().getStartEpochMicrosec(); + return difference / dividedBy; + } + + /** + * Class which deserialize json event into FileReadyEvent object + */ + private static class FileReadyEventDeserializer implements JsonDeserializer<FileReadyEvent> { + + @Override + public FileReadyEvent deserialize(JsonElement jsonElement, Type type, JsonDeserializationContext jsonDeserializationContext) + throws JsonParseException { + Optional<JsonObject> eventJsonObject = getEventJsonObject(jsonElement); + CommonEventHeader header = getHeaderJsonObject(eventJsonObject.orElse(null), jsonDeserializationContext); + header.setStartEpochMicrosec(header.getStartEpochMicrosec() * 1000); + header.setLastEpochMicrosec(header.getLastEpochMicrosec() * 1000); + + Optional<JsonObject> measDataCollectionJson = getMeasDataCollectionJson(eventJsonObject.orElse(null)); + if (measDataCollectionJson.isPresent()) { + MeasDataCollection measDataCollection = jsonDeserializationContext.deserialize(measDataCollectionJson.get(), MeasDataCollection.class); + logger.trace("measDataCollection {}", measDataCollection); + return FileReadyEvent.builder().commonEventHeader(header).measDataCollection(measDataCollection).build(); + } else { + logger.error("MeasDataCollection was not found {}", eventJsonObject); + throw new JsonParseException("MeasDataCollection was not found"); + } + } + + private Optional<JsonObject> getMeasDataCollectionJson(JsonObject obj) { + if (nonNull(obj)) { + Optional<JsonObject> fileReadyJson = Optional.ofNullable(obj.getAsJsonObject(FILE_READY_EVENT_UNIQUE_ELEMENT)); + if (fileReadyJson.isPresent()) { + return Optional.ofNullable(fileReadyJson.get().getAsJsonObject("measDataCollection")); + } + } + return Optional.empty(); + } + } + + private final Gson gson = new GsonBuilder() + .registerTypeAdapter(FileReadyEvent.class, new FileReadyEventDeserializer()).create(); + +} diff --git a/datacollector/src/main/java/org/onap/rapp/datacollector/service/PMService.java b/datacollector/src/main/java/org/onap/rapp/datacollector/service/PMService.java index 7039d2e..1e2931c 100644 --- a/datacollector/src/main/java/org/onap/rapp/datacollector/service/PMService.java +++ b/datacollector/src/main/java/org/onap/rapp/datacollector/service/PMService.java @@ -33,14 +33,15 @@ public class PMService { private static final Logger logger = LoggerFactory.getLogger(PMService.class); public static final String CELL_FIELD_NAME = "identifier"; + public static final String VALUE_NAME = "value"; public static final int CELL_INDEX = 0; private static final int MICRO_SECONDS_OF_SECOND = 1_000_000; private final VesPersisterSqlImpl vesPersisterSql; private final DataAggregationService aggregationService; - private final VesParser parser; + private final ParserFactory parser; - public PMService(VesPersisterSqlImpl vesPersisterSql, DataAggregationService aggregationService, VesParser parser) { + public PMService(VesPersisterSqlImpl vesPersisterSql, DataAggregationService aggregationService, ParserFactory parser) { this.vesPersisterSql = vesPersisterSql; this.aggregationService = aggregationService; this.parser = parser; @@ -71,7 +72,7 @@ public class PMService { } private Map<String, List<Event>> groupByCell(List<EventAPI> events) { - return events.stream().map(e -> parser.parse(e.getRawdata())) + return events.stream().flatMap(e -> parser.getParsedEvents(e.getRawdata()).stream()) .collect(Collectors.groupingBy(this::getCellFromVes)); } diff --git a/datacollector/src/main/java/org/onap/rapp/datacollector/service/ParserAbstractClass.java b/datacollector/src/main/java/org/onap/rapp/datacollector/service/ParserAbstractClass.java new file mode 100644 index 0000000..6f539b3 --- /dev/null +++ b/datacollector/src/main/java/org/onap/rapp/datacollector/service/ParserAbstractClass.java @@ -0,0 +1,82 @@ +/* + * Copyright (C) 2021 Samsung Electronics + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + + +package org.onap.rapp.datacollector.service; + +import static java.util.Objects.nonNull; + +import java.util.Optional; + +import org.onap.rapp.datacollector.entity.ves.CommonEventHeader; + +import com.google.gson.JsonDeserializationContext; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParseException; + +public abstract class ParserAbstractClass { + + public static final String VES_EVENT_UNIQUE_ELEMENT = "measurementFields"; + public static final String FILE_READY_EVENT_UNIQUE_ELEMENT = "perf3gppFields"; + public static final String EVENT_JSON_ELEMENT_NAME = "event"; + public static final String COMMON_EVENT_HEADER = "commonEventHeader"; + + /** + * Scans the Json Object if contains VES_EVENT_UNIQUE_ELEMENT + * + * @param obj json object + * @return true=it is VES event, false=not VES event + */ + protected boolean isVesEvent(JsonObject obj) { + return getEventJsonObject(obj).filter(jsonObject -> jsonObject.has(VES_EVENT_UNIQUE_ELEMENT)).isPresent(); + } + + /** + * Scans the Json Object if contains FILE_READY_EVENT_UNIQUE_ELEMENT + * + * @param obj json object + * @return true=it is FileReadyEvent event, false=not FileReadyEvent + */ + protected boolean isFileReadyEvent(JsonObject obj) { + return getEventJsonObject(obj).filter(jsonObject -> jsonObject.has(FILE_READY_EVENT_UNIQUE_ELEMENT)).isPresent(); + } + + /** + * Gets Event json element from incoming json + * + * @param jsonElement top json elemnt + * @return Event Json element + */ + protected static Optional<JsonObject> getEventJsonObject(JsonElement jsonElement) { + JsonObject obj = jsonElement.getAsJsonObject(); + return Optional.ofNullable(obj.getAsJsonObject(EVENT_JSON_ELEMENT_NAME)); + } + + /** + * Gets CommonEventHeader from json + * + * @param obj Event json element + * @param jsonDeserializationContext json context + * @return CommonEventHeader object from json object + */ + protected static CommonEventHeader getHeaderJsonObject(JsonObject obj, JsonDeserializationContext jsonDeserializationContext) { + if (nonNull(obj) && obj.has(COMMON_EVENT_HEADER)) { + JsonObject headerJson = obj.getAsJsonObject(COMMON_EVENT_HEADER); + return jsonDeserializationContext.deserialize(headerJson, CommonEventHeader.class); + } else { + throw new JsonParseException("Common header not found"); + } + } + +} diff --git a/datacollector/src/main/java/org/onap/rapp/datacollector/service/ParserFactory.java b/datacollector/src/main/java/org/onap/rapp/datacollector/service/ParserFactory.java new file mode 100644 index 0000000..77b3877 --- /dev/null +++ b/datacollector/src/main/java/org/onap/rapp/datacollector/service/ParserFactory.java @@ -0,0 +1,43 @@ +package org.onap.rapp.datacollector.service; + +import java.util.List; + +import org.onap.rapp.datacollector.entity.ves.Event; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Service; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonObject; +import com.google.gson.JsonParseException; + +/** + * Factory which decided what parser to use to deserialize json event coming from PM Mapper Supported are FileReadyEvent and VES event + */ +@Service +public class ParserFactory extends ParserAbstractClass { + + private static final Logger logger = LoggerFactory.getLogger(ParserFactory.class); + private final VesParserImpl vesParser; + private final FileReadyParserImpl fileReadyParser; + + public ParserFactory(VesParserImpl vesParser, FileReadyParserImpl fileReadyParser) { + this.vesParser = vesParser; + this.fileReadyParser = fileReadyParser; + } + + public List<Event> getParsedEvents(String eventString) { + JsonObject json = gson.fromJson(eventString, JsonObject.class); + if (isFileReadyEvent(json)) { + return fileReadyParser.parse(eventString); + } else if (isVesEvent(json)) { + return vesParser.parse(eventString); + } else { + logger.error("Not supported event structure {}", eventString); + throw new JsonParseException("Not supported event structure"); + } + } + + private final Gson gson = new GsonBuilder().create(); +} diff --git a/datacollector/src/main/java/org/onap/rapp/datacollector/service/VesParser.java b/datacollector/src/main/java/org/onap/rapp/datacollector/service/VesParser.java index 9a453b4..9149032 100644 --- a/datacollector/src/main/java/org/onap/rapp/datacollector/service/VesParser.java +++ b/datacollector/src/main/java/org/onap/rapp/datacollector/service/VesParser.java @@ -14,8 +14,10 @@ package org.onap.rapp.datacollector.service; +import java.util.List; + import org.onap.rapp.datacollector.entity.ves.Event; public interface VesParser { - Event parse(final String event); + List<Event> parse(final String event); } diff --git a/datacollector/src/main/java/org/onap/rapp/datacollector/service/VesParserImpl.java b/datacollector/src/main/java/org/onap/rapp/datacollector/service/VesParserImpl.java index cf90bfd..849c567 100644 --- a/datacollector/src/main/java/org/onap/rapp/datacollector/service/VesParserImpl.java +++ b/datacollector/src/main/java/org/onap/rapp/datacollector/service/VesParserImpl.java @@ -14,20 +14,15 @@ package org.onap.rapp.datacollector.service; -import com.google.gson.Gson; -import com.google.gson.GsonBuilder; -import com.google.gson.JsonArray; -import com.google.gson.JsonDeserializationContext; -import com.google.gson.JsonDeserializer; -import com.google.gson.JsonElement; -import com.google.gson.JsonObject; -import com.google.gson.JsonParseException; +import static java.util.Objects.nonNull; + import java.lang.reflect.Type; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; + import org.onap.rapp.datacollector.entity.ves.AdditionalMeasurements; import org.onap.rapp.datacollector.entity.ves.CommonEventHeader; import org.onap.rapp.datacollector.entity.ves.Event; @@ -36,64 +31,87 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonArray; +import com.google.gson.JsonDeserializationContext; +import com.google.gson.JsonDeserializer; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParseException; + @Service -public class VesParserImpl implements VesParser { +public class VesParserImpl extends ParserAbstractClass implements VesParser { + private static final Logger logger = LoggerFactory.getLogger(VesParserImpl.class); + /** + * Parse incoming Json string into list of Events + * + * @param eventString json from PM Mapper + * @return list of events + */ + @Override + public List<Event> parse(final String eventString) { + logger.debug("parsing ves event {}", eventString); + Event event = gson.fromJson(eventString, Event.class); + event.raw = eventString; + + return Collections.singletonList(event); + } + + /** + * Class which deserialize json event into Event object + */ private static class VesEventDeserializer implements JsonDeserializer<Event> { + private static class AdditionalMeasurementsRawValue { + String name; Map<String, String> hashMap; } @Override public Event deserialize(JsonElement jsonElement, Type type, JsonDeserializationContext jsonDeserializationContext) throws JsonParseException { - JsonObject obj = jsonElement.getAsJsonObject(); - obj = obj.getAsJsonObject("event"); - CommonEventHeader header; - Optional<MeasurementFields> measurementFields = Optional.empty(); - List<AdditionalMeasurements> additionalMeasurements = Collections.emptyList(); - if (obj.has("commonEventHeader")) { - JsonObject h = obj.getAsJsonObject("commonEventHeader"); - header = jsonDeserializationContext.deserialize(h, CommonEventHeader.class); - } else { - throw new JsonParseException("Common header not found"); - } - if (obj.has("measurementFields")) { - JsonObject h = obj.getAsJsonObject("measurementFields"); - measurementFields = Optional.ofNullable(jsonDeserializationContext.deserialize(h, MeasurementFields.class)); - if (h.has("additionalMeasurements")) { - JsonArray arr = h.getAsJsonArray("additionalMeasurements"); + Optional<JsonObject> eventJsonObject = getEventJsonObject(jsonElement); + CommonEventHeader header = getHeaderJsonObject(eventJsonObject.orElse(null), jsonDeserializationContext); + + Optional<MeasurementFields> measurementFields; + List<AdditionalMeasurements> additionalMeasurements = new ArrayList<>(); + + Optional<JsonObject> vesEventJson = getVesEventJson(eventJsonObject.orElse(null)); + if (vesEventJson.isPresent()) { + measurementFields = Optional.ofNullable(jsonDeserializationContext.deserialize(vesEventJson.get(), MeasurementFields.class)); + if (vesEventJson.get().has("additionalMeasurements")) { + JsonArray additionalMeasurementsArray = vesEventJson.get().getAsJsonArray("additionalMeasurements"); additionalMeasurements = new ArrayList<>(); - for (int i = 0; i < arr.size(); i++) { - AdditionalMeasurementsRawValue tmp = jsonDeserializationContext.deserialize(arr.get(i).getAsJsonObject(), AdditionalMeasurementsRawValue.class); + for (int i = 0; i < additionalMeasurementsArray.size(); i++) { + AdditionalMeasurementsRawValue tmp = jsonDeserializationContext + .deserialize(additionalMeasurementsArray.get(i).getAsJsonObject(), AdditionalMeasurementsRawValue.class); additionalMeasurements.add(AdditionalMeasurements.builder() .withName(tmp.name) .withHashMap(tmp.hashMap) .build()); } } + logger.trace("measurement fields {}", measurementFields); + logger.trace("additional measurements {}", additionalMeasurements); + measurementFields = Optional.of(MeasurementFields.builder() + .measurementInterval(measurementFields.orElse(MeasurementFields.EMPTY).measurementInterval) + .additionalMeasurements(additionalMeasurements) + .build()); + return Event.of(header, measurementFields.get()); + } else { + logger.error("MeasurementFields was not found {}", eventJsonObject); + throw new JsonParseException("MeasurementFields was not found"); } - logger.trace("measurement fields {}", measurementFields); - logger.trace("additional measurements {}", additionalMeasurements); - measurementFields = Optional.of(MeasurementFields.builder() - .measurementInterval(measurementFields.orElse(MeasurementFields.EMPTY).measurementInterval) - .additionalMeasurements(additionalMeasurements) - .build()); - - return Event.of(header, measurementFields.get()); } - } - private final Gson gson = new GsonBuilder() - .registerTypeAdapter(Event.class, new VesEventDeserializer()) - .create(); - - public Event parse(final String event) { - logger.debug("parsing ves event {}", event); - final Event result = gson.fromJson(event, Event.class); - result.raw = event; - return result; + private Optional<JsonObject> getVesEventJson(JsonObject obj) { + return Optional.ofNullable(nonNull(obj) ? obj.getAsJsonObject(VES_EVENT_UNIQUE_ELEMENT) : null); + } } + private final Gson gson = new GsonBuilder() + .registerTypeAdapter(Event.class, new VesEventDeserializer()).create(); } diff --git a/datacollector/src/main/java/org/onap/rapp/datacollector/service/VesPersister.java b/datacollector/src/main/java/org/onap/rapp/datacollector/service/VesPersister.java index 391f762..1f29042 100644 --- a/datacollector/src/main/java/org/onap/rapp/datacollector/service/VesPersister.java +++ b/datacollector/src/main/java/org/onap/rapp/datacollector/service/VesPersister.java @@ -19,8 +19,11 @@ import org.onap.rapp.datacollector.entity.ves.Event; import org.onap.rapp.datacollector.entity.ves.EventAPI; public interface VesPersister { + void persists(Event event); + void persistAll(List<Event> events); + List<EventAPI> findTopNVesEvent(int n); List<EventAPI> findAll(); diff --git a/datacollector/src/main/java/org/onap/rapp/datacollector/service/VesPersisterSqlImpl.java b/datacollector/src/main/java/org/onap/rapp/datacollector/service/VesPersisterSqlImpl.java index c30ff41..71c4209 100644 --- a/datacollector/src/main/java/org/onap/rapp/datacollector/service/VesPersisterSqlImpl.java +++ b/datacollector/src/main/java/org/onap/rapp/datacollector/service/VesPersisterSqlImpl.java @@ -45,6 +45,12 @@ public class VesPersisterSqlImpl implements VesPersister { } @Override + public void persistAll(List<Event> events) { + logger.debug("persisting all events {}", events); + repository.saveAll(events); + } + + @Override public List<EventAPI> findTopNVesEvent(int n) { logger.debug("finding top {} events", n); return repositoryAPI.findTopNVesEvent(n); diff --git a/datacollector/src/main/java/org/onap/rapp/datacollector/service/VesRetrievalService.java b/datacollector/src/main/java/org/onap/rapp/datacollector/service/VesRetrievalService.java index 517bb8b..379e32d 100644 --- a/datacollector/src/main/java/org/onap/rapp/datacollector/service/VesRetrievalService.java +++ b/datacollector/src/main/java/org/onap/rapp/datacollector/service/VesRetrievalService.java @@ -16,9 +16,11 @@ package org.onap.rapp.datacollector.service; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.List; import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; + import org.onap.rapp.datacollector.entity.ves.AdditionalMeasurementValues; import org.onap.rapp.datacollector.entity.ves.AdditionalMeasurements; import org.onap.rapp.datacollector.entity.ves.Event; @@ -26,6 +28,9 @@ import org.onap.rapp.datacollector.service.configuration.DmaapRestReaderConfigur import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.HttpEntity; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpMethod; import org.springframework.http.ResponseEntity; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; @@ -40,13 +45,13 @@ public class VesRetrievalService implements DmaapRestReader { private final RestTemplate restTemplate; private final DmaapRestReaderConfiguration config; - private final VesParser parser; + private final ParserFactory parser; private final VesPersister persister; private final UEHolder ueHolder; @Autowired - public VesRetrievalService(RestTemplate restTemplate, VesParser parser, VesPersister persister, - DmaapRestReaderConfiguration configuration, UEHolder ueHolder) { + public VesRetrievalService(RestTemplate restTemplate, ParserFactory parser, VesPersister persister, + DmaapRestReaderConfiguration configuration, UEHolder ueHolder) { this.restTemplate = restTemplate; this.parser = parser; this.persister = persister; @@ -59,7 +64,9 @@ public class VesRetrievalService implements DmaapRestReader { logger.info("Reaching from dmaap: {}", config.getMeasurementsTopicUrl()); try { ResponseEntity<String[]> responseEntity = - restTemplate.getForEntity(config.getMeasurementsTopicUrl(), String[].class); + restTemplate.exchange(config.getMeasurementsTopicUrl(), HttpMethod.GET, + new HttpEntity<String>(createHeaders(config.getDmaapProperties().getUsername(), config.getDmaapProperties().getPassword())), + String[].class); if (responseEntity.hasBody()) { String[] events = responseEntity.getBody(); return Arrays.stream(events).collect(Collectors.toList()); @@ -70,18 +77,25 @@ public class VesRetrievalService implements DmaapRestReader { return Collections.emptyList(); } + private HttpHeaders createHeaders(String username, String password) { + HttpHeaders headers = new HttpHeaders(); + headers.setBasicAuth(username, password); + return headers; + } + @Scheduled(fixedRate = 5000) public void retrieveAndStoreVesEvents() { - retrieveEvents().stream().map(parser::parse).forEach(this::saveEvent); + retrieveEvents().stream().map(parser::getParsedEvents).forEach(this::saveAllEvents); } - private void saveEvent(Event event) { - persister.persists(event); - saveUesOfVes(event); + private void saveAllEvents(List<Event> events) { + persister.persistAll(events); + saveUesOfVes(events); } - private void saveUesOfVes(Event event){ - Set<String> uesOfVes = getUserEquipmentData(event); + private void saveUesOfVes(List<Event> events) { + Set<String> uesOfVes = Optional.ofNullable(events).orElse(Collections.emptyList()).stream().flatMap(event -> getUserEquipmentData(event).stream()) + .collect(Collectors.toSet()); uesOfVes.forEach(ueHolder::addUE); } diff --git a/datacollector/src/main/java/org/onap/rapp/datacollector/service/configuration/DmaapProperties.java b/datacollector/src/main/java/org/onap/rapp/datacollector/service/configuration/DmaapProperties.java index adc3695..3560e89 100644 --- a/datacollector/src/main/java/org/onap/rapp/datacollector/service/configuration/DmaapProperties.java +++ b/datacollector/src/main/java/org/onap/rapp/datacollector/service/configuration/DmaapProperties.java @@ -13,16 +13,20 @@ package org.onap.rapp.datacollector.service.configuration; +import org.springframework.boot.context.properties.ConfigurationProperties; + import lombok.Getter; import lombok.Setter; -import org.springframework.boot.context.properties.ConfigurationProperties; @Getter @Setter @ConfigurationProperties(prefix = "dmaap") public class DmaapProperties { + private String protocol; private String host; + private String username; + private String password; private int port; private String measurementsTopic; diff --git a/datacollector/src/main/java/org/onap/rapp/datacollector/service/configuration/DmaapRestReaderConfiguration.java b/datacollector/src/main/java/org/onap/rapp/datacollector/service/configuration/DmaapRestReaderConfiguration.java index 36dee70..9752fb5 100644 --- a/datacollector/src/main/java/org/onap/rapp/datacollector/service/configuration/DmaapRestReaderConfiguration.java +++ b/datacollector/src/main/java/org/onap/rapp/datacollector/service/configuration/DmaapRestReaderConfiguration.java @@ -47,7 +47,8 @@ import org.springframework.web.client.RestTemplate; @Configuration public class DmaapRestReaderConfiguration { - private final static class TrustAllSSLSocketFactory extends SSLSocketFactory { + private static final class TrustAllSSLSocketFactory extends SSLSocketFactory { + SSLContext sslContext = SSLContext.getInstance("TLS"); public TrustAllSSLSocketFactory() throws NoSuchAlgorithmException, KeyManagementException { @@ -120,6 +121,10 @@ public class DmaapRestReaderConfiguration { return dmaapProperties.getMeasurementsTopicUrl(); } + public DmaapProperties getDmaapProperties() { + return dmaapProperties; + } + @Bean public DataSource dataSource() { diff --git a/datacollector/src/main/resources/application.yml b/datacollector/src/main/resources/application.yml index 0b8b661..502d8de 100644 --- a/datacollector/src/main/resources/application.yml +++ b/datacollector/src/main/resources/application.yml @@ -4,6 +4,8 @@ dmaap: prtocol: "http" host: "localhost" port: 8181 + username: dcae@dcae.onap.org + password: demo123456! measurements-topic: "measurements" database: host: mariadb-host diff --git a/datacollector/src/test/java/org/onap/rapp/datacollector/entity/ves/CommonEventHeaderTest.java b/datacollector/src/test/java/org/onap/rapp/datacollector/entity/ves/CommonEventHeaderTest.java index 873ce1b..003463f 100644 --- a/datacollector/src/test/java/org/onap/rapp/datacollector/entity/ves/CommonEventHeaderTest.java +++ b/datacollector/src/test/java/org/onap/rapp/datacollector/entity/ves/CommonEventHeaderTest.java @@ -67,21 +67,21 @@ public class CommonEventHeaderTest { public void builder() { CommonEventHeader actual = createDumyCommonEventHeader(); - assertEquals("version", actual.version); - assertEquals("domain", actual.domain); - assertEquals("eventId", actual.eventId); - assertEquals("eventName", actual.eventName); - assertEquals("eventType", actual.eventType); - assertEquals(12345L, actual.lastEpochMicrosec.longValue()); - assertEquals("nfcNamingCode", actual.nfcNamingCode); - assertEquals("nfNamingCode", actual.nfNamingCode); - assertEquals("priority", actual.priority); - assertEquals("entityId", actual.reportingEntityId); - assertEquals("reportingEntityName", actual.reportingEntityName); - assertEquals(567, actual.sequence.intValue()); - assertEquals("sourceId", actual.sourceId); - assertEquals("sourceName", actual.sourceName); - assertEquals(123456789L, actual.startEpochMicrosec.longValue()); + assertEquals("version", actual.getVersion()); + assertEquals("domain", actual.getDomain()); + assertEquals("eventId", actual.getEventId()); + assertEquals("eventName", actual.getEventName()); + assertEquals("eventType", actual.getEventType()); + assertEquals(12345L, actual.getLastEpochMicrosec().longValue()); + assertEquals("nfcNamingCode", actual.getNfcNamingCode()); + assertEquals("nfNamingCode", actual.getNfNamingCode()); + assertEquals("priority", actual.getPriority()); + assertEquals("entityId", actual.getReportingEntityId()); + assertEquals("reportingEntityName", actual.getReportingEntityName()); + assertEquals(567, actual.getSequence().intValue()); + assertEquals("sourceId", actual.getSourceId()); + assertEquals("sourceName", actual.getSourceName()); + assertEquals(123456789L, actual.getStartEpochMicrosec().longValue()); assertEquals("UTC+2", actual.timeZoneOffset); diff --git a/datacollector/src/test/java/org/onap/rapp/datacollector/entity/ves/EventTest.java b/datacollector/src/test/java/org/onap/rapp/datacollector/entity/ves/EventTest.java index 9b6700b..63bb440 100644 --- a/datacollector/src/test/java/org/onap/rapp/datacollector/entity/ves/EventTest.java +++ b/datacollector/src/test/java/org/onap/rapp/datacollector/entity/ves/EventTest.java @@ -15,6 +15,10 @@ package org.onap.rapp.datacollector.entity.ves; import static org.junit.Assert.assertEquals; + +import java.util.Collections; +import java.util.List; + import org.junit.Test; public class EventTest { @@ -27,6 +31,10 @@ public class EventTest { return Event.of(header, fields); } + public static List<Event> createDumyListOfEvents() { + return Collections.singletonList(createDumyEvent()); + } + public static Event createDumyEventWithUe() { CommonEventHeader header = CommonEventHeaderTest.createDumyCommonEventHeader(); @@ -35,6 +43,10 @@ public class EventTest { return Event.of(header, fields); } + public static List<Event> createDumyListOfEventsWithUe() { + return Collections.singletonList(createDumyEventWithUe()); + } + @Test public void of() { CommonEventHeader header = diff --git a/datacollector/src/test/java/org/onap/rapp/datacollector/entity/ves/MeasurementFieldsTest.java b/datacollector/src/test/java/org/onap/rapp/datacollector/entity/ves/MeasurementFieldsTest.java index b3ca614..1283bd5 100644 --- a/datacollector/src/test/java/org/onap/rapp/datacollector/entity/ves/MeasurementFieldsTest.java +++ b/datacollector/src/test/java/org/onap/rapp/datacollector/entity/ves/MeasurementFieldsTest.java @@ -48,7 +48,7 @@ public class MeasurementFieldsTest { MeasurementFields actual = createDummy(v); assertEquals(1234567L, actual.measurementInterval); - assertEquals("4.0", actual.measurementFieldsVersion); + assertEquals("4.0", actual.MEASUREMENT_FIELDS_VERSION); assertEquals(List.of(v), actual.additionalMeasurements); } } diff --git a/datacollector/src/test/java/org/onap/rapp/datacollector/service/VesParserImplTest.java b/datacollector/src/test/java/org/onap/rapp/datacollector/service/VesParserImplTest.java index f6cd0e5..ba2a22f 100644 --- a/datacollector/src/test/java/org/onap/rapp/datacollector/service/VesParserImplTest.java +++ b/datacollector/src/test/java/org/onap/rapp/datacollector/service/VesParserImplTest.java @@ -20,12 +20,15 @@ import java.io.BufferedReader; import java.io.InputStream; import java.io.InputStreamReader; import java.util.stream.Collectors; + import org.junit.Before; import org.junit.Test; -import com.google.gson.JsonParseException; import org.onap.rapp.datacollector.entity.ves.Event; +import com.google.gson.JsonParseException; + public class VesParserImplTest { + String testVesContent; VesParser parser = new VesParserImpl(); @@ -40,17 +43,17 @@ public class VesParserImplTest { @Test public void parse() { - Event actual = parser.parse(testVesContent); - assertEquals("4.0.1", actual.commonEventHeader.version); - assertEquals(1413378172000000L, (long) actual.commonEventHeader.lastEpochMicrosec); - assertEquals(1413378172000000L, (long) actual.commonEventHeader.startEpochMicrosec); - assertEquals(3, (int) actual.commonEventHeader.sequence); - assertEquals("measurement", actual.commonEventHeader.domain); + Event actual = parser.parse(testVesContent).get(0); + assertEquals("4.0.1", actual.commonEventHeader.getVersion()); + assertEquals(1413378172000000L, (long) actual.commonEventHeader.getLastEpochMicrosec()); + assertEquals(1413378172000000L, (long) actual.commonEventHeader.getStartEpochMicrosec()); + assertEquals(3, (int) actual.commonEventHeader.getSequence()); + assertEquals("measurement", actual.commonEventHeader.getDomain()); assertEquals("UTC-05:30", actual.commonEventHeader.timeZoneOffset); } @Test(expected = JsonParseException.class) public void parseEmpty() { - Event actual = parser.parse("{\"event\":{}}"); + Event actual = parser.parse("{\"event\":{}}").get(0); } }
\ No newline at end of file diff --git a/datacollector/src/test/java/org/onap/rapp/datacollector/service/VesRetrievalServiceTest.java b/datacollector/src/test/java/org/onap/rapp/datacollector/service/VesRetrievalServiceTest.java index 0126019..e1a285c 100644 --- a/datacollector/src/test/java/org/onap/rapp/datacollector/service/VesRetrievalServiceTest.java +++ b/datacollector/src/test/java/org/onap/rapp/datacollector/service/VesRetrievalServiceTest.java @@ -15,7 +15,9 @@ package org.onap.rapp.datacollector.service; import java.util.Collection; import java.util.HashSet; +import java.util.List; import java.util.Set; + import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -23,13 +25,12 @@ import org.junit.runner.RunWith; import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.junit.MockitoJUnitRunner; -import org.onap.rapp.datacollector.entity.ves.Event; +import org.onap.rapp.datacollector.entity.ves.EventTest; import org.onap.rapp.datacollector.service.configuration.DmaapRestReaderConfiguration; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.web.client.RestClientException; import org.springframework.web.client.RestTemplate; -import org.onap.rapp.datacollector.entity.ves.EventTest; @RunWith(MockitoJUnitRunner.class) public class VesRetrievalServiceTest { @@ -41,7 +42,7 @@ public class VesRetrievalServiceTest { private DmaapRestReaderConfiguration config; @Mock - private VesParser parser; + private ParserFactory parser; @Mock private VesPersister persister; @@ -83,30 +84,30 @@ public class VesRetrievalServiceTest { public void whenRetrievedThenAlsoStored() { Mockito.when(config.getMeasurementsTopicUrl()).thenReturn("http://localhost/a-topic"); Mockito.when(restTemplate.getForEntity("http://localhost/a-topic", String[].class)) - .thenReturn(new ResponseEntity<String[]>(new String[]{"dead", "beef"}, HttpStatus.OK)); - Mockito.when(parser.parse(Mockito.any(String.class))) - .thenReturn(EventTest.createDumyEvent()); + .thenReturn(new ResponseEntity<>(new String[]{"dead", "beef"}, HttpStatus.OK)); + Mockito.when(parser.getParsedEvents(Mockito.any(String.class))) + .thenReturn(EventTest.createDumyListOfEvents()); service = new VesRetrievalService(restTemplate, parser, persister, config, ueHolder); service.retrieveAndStoreVesEvents(); - Mockito.verify(persister, Mockito.times(2)).persists(Mockito.any(Event.class)); + Mockito.verify(persister, Mockito.times(2)).persistAll(Mockito.any(List.class)); } @Test public void whenRetrievedThenAlsoStoredWithUE() { Mockito.when(config.getMeasurementsTopicUrl()).thenReturn("http://localhost/a-topic"); Mockito.when(restTemplate.getForEntity("http://localhost/a-topic", String[].class)) - .thenReturn(new ResponseEntity<String[]>(new String[]{"dead", "beef"}, HttpStatus.OK)); - Mockito.when(parser.parse(Mockito.any(String.class))) - .thenReturn(EventTest.createDumyEventWithUe()); + .thenReturn(new ResponseEntity<>(new String[]{"dead", "beef"}, HttpStatus.OK)); + Mockito.when(parser.getParsedEvents(Mockito.any(String.class))) + .thenReturn(EventTest.createDumyListOfEventsWithUe()); UEHolder ueHolder = new UEHolder(); service = new VesRetrievalService(restTemplate, parser, persister, config, ueHolder); service.retrieveAndStoreVesEvents(); - Mockito.verify(persister, Mockito.times(2)).persists(Mockito.any(Event.class)); + Mockito.verify(persister, Mockito.times(2)).persistAll(Mockito.any(List.class)); Assert.assertEquals(ueHolder.getUes(), Set.of("emergency_samsung_01", "mobile_samsung_s10")); } } |