diff options
Diffstat (limited to 'src/main/java/org/onap/a1pesimulator/service/report')
11 files changed, 1166 insertions, 0 deletions
diff --git a/src/main/java/org/onap/a1pesimulator/service/report/OnEventAction.java b/src/main/java/org/onap/a1pesimulator/service/report/OnEventAction.java new file mode 100644 index 0000000..f95f749 --- /dev/null +++ b/src/main/java/org/onap/a1pesimulator/service/report/OnEventAction.java @@ -0,0 +1,22 @@ +/* + * Copyright (C) 2021 Samsung Electronics + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.onap.a1pesimulator.service.report; + +import org.onap.a1pesimulator.data.ves.VesEvent; + +@FunctionalInterface +public interface OnEventAction { + + void onEvent(VesEvent event); +} diff --git a/src/main/java/org/onap/a1pesimulator/service/report/RanCellEventCustomizer.java b/src/main/java/org/onap/a1pesimulator/service/report/RanCellEventCustomizer.java new file mode 100644 index 0000000..f16e29c --- /dev/null +++ b/src/main/java/org/onap/a1pesimulator/service/report/RanCellEventCustomizer.java @@ -0,0 +1,73 @@ +/* + * Copyright (C) 2021 Samsung Electronics + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.onap.a1pesimulator.service.report; + +import java.util.List; +import java.util.Optional; + +import org.onap.a1pesimulator.data.ves.VesEvent; +import org.onap.a1pesimulator.data.ves.MeasurementFields.AdditionalMeasurement; +import org.onap.a1pesimulator.service.common.EventCustomizer; +import org.onap.a1pesimulator.service.ue.RanUeHolder; +import org.onap.a1pesimulator.util.Constants; +import org.onap.a1pesimulator.util.JsonUtils; +import org.onap.a1pesimulator.util.RanVesUtils; +import org.springframework.stereotype.Service; + +@Service +public class RanCellEventCustomizer implements EventCustomizer { + + private static final String UE_PARAM_TRAFFIC_MODEL_RANGE = "[[20-50]]"; + private final RanUeHolder ranUeHolder; + + public RanCellEventCustomizer(RanUeHolder ueHolder) { + this.ranUeHolder = ueHolder; + } + + @Override + public VesEvent apply(VesEvent t) { + VesEvent event = JsonUtils.INSTANCE.clone(t); + return customizeEvent(event); + } + + private VesEvent customizeEvent(VesEvent event) { + RanVesUtils.updateHeader(event); + enrichWithUeData(event); + randomizeEvent(event); + return event; + } + + private void randomizeEvent(VesEvent event) { + List<AdditionalMeasurement> additionalMeasurementsToRandomize = + event.getMeasurementFields().getAdditionalMeasurements(); + event.getMeasurementFields().setAdditionalMeasurements( + RanVesUtils.randomizeAdditionalMeasurements(additionalMeasurementsToRandomize)); + } + + private void enrichWithUeData(VesEvent event) { + + Optional<AdditionalMeasurement> identity = event.getMeasurementFields().getAdditionalMeasurements().stream() + .filter(msrmnt -> Constants.MEASUREMENT_FIELD_IDENTIFIER + .equalsIgnoreCase( + msrmnt.getName())) + .findAny(); + identity.ifPresent(m -> addTrafficModelMeasurement(event)); + } + + private void addTrafficModelMeasurement(VesEvent event) { + AdditionalMeasurement trafficModelMeasurement = + RanVesUtils.buildTrafficModelMeasurement( ranUeHolder, UE_PARAM_TRAFFIC_MODEL_RANGE); + event.getMeasurementFields().getAdditionalMeasurements().add(trafficModelMeasurement); + } +} diff --git a/src/main/java/org/onap/a1pesimulator/service/report/RanCellFailureEventCustomizer.java b/src/main/java/org/onap/a1pesimulator/service/report/RanCellFailureEventCustomizer.java new file mode 100644 index 0000000..6d62f33 --- /dev/null +++ b/src/main/java/org/onap/a1pesimulator/service/report/RanCellFailureEventCustomizer.java @@ -0,0 +1,227 @@ +/* + * Copyright (C) 2021 Samsung Electronics + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.onap.a1pesimulator.service.report; + +import java.text.MessageFormat; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; + +import org.onap.a1pesimulator.data.ves.VesEvent; +import org.onap.a1pesimulator.data.ves.MeasurementFields.AdditionalMeasurement; +import org.onap.a1pesimulator.service.common.EventCustomizer; +import org.onap.a1pesimulator.service.ue.RanUeHolder; +import org.onap.a1pesimulator.util.Constants; +import org.onap.a1pesimulator.util.JsonUtils; +import org.onap.a1pesimulator.util.RanVesUtils; + +import lombok.AllArgsConstructor; +import lombok.EqualsAndHashCode; + +public class RanCellFailureEventCustomizer implements EventCustomizer { + + private static final String UE_PARAM_TRAFFIC_MODEL_RANGE = "[[50->10]]"; + private final RanUeHolder ranUeHolder; + private final VesEvent event; + + private final Map<Key, Value> additionalMeasurementsValues = new HashMap<>(); + private final ValueFactory valueFactory; + + public RanCellFailureEventCustomizer(VesEvent event, RanUeHolder ranUeHolder) { + this.ranUeHolder = ranUeHolder; + this.event = event; + valueFactory = new ValueFactory(); + collectAdditionalMeasurementValues(event); + } + + @Override + public VesEvent apply(VesEvent t) { + return customizeEvent(JsonUtils.INSTANCE.clone(this.event)); + } + + private void collectAdditionalMeasurementValues(VesEvent event) { + Collection<AdditionalMeasurement> additionalMeasurementsToResolve = + event.getMeasurementFields().getAdditionalMeasurements(); + additionalMeasurementsToResolve.forEach(this::collectAdditionalMeasurementValue); + } + + private void collectAdditionalMeasurementValue(AdditionalMeasurement m) { + for (Entry<String, String> entry : m.getHashMap().entrySet()) { + if (!RanVesUtils.isRange(entry.getValue())) { + continue; + } + additionalMeasurementsValues + .putIfAbsent(new Key(m.getName(), entry.getKey()), valueFactory.getInstance(entry.getValue())); + } + } + + private VesEvent customizeEvent(VesEvent event) { + RanVesUtils.updateHeader(event); + enrichWithUeData(event); + resolveRanges(event); + return event; + } + + private void resolveRanges(VesEvent event) { + List<AdditionalMeasurement> additionalMeasurementsToResolve = + event.getMeasurementFields().getAdditionalMeasurements(); + + additionalMeasurementsToResolve.forEach(this::resolveRanges); + event.getMeasurementFields().setAdditionalMeasurements(additionalMeasurementsToResolve); + } + + private void resolveRanges(AdditionalMeasurement m) { + for (Entry<String, String> entry : m.getHashMap().entrySet()) { + Key key = new Key(m.getName(), entry.getKey()); + if (!additionalMeasurementsValues.containsKey(key)) { + continue; + } + Value value = additionalMeasurementsValues.get(key); + value.current = value.calculateCurrentValue(); + entry.setValue(value.current.toString()); + } + } + + private void enrichWithUeData(VesEvent event) { + + Optional<AdditionalMeasurement> identity = event.getMeasurementFields().getAdditionalMeasurements().stream() + .filter(msrmnt -> Constants.MEASUREMENT_FIELD_IDENTIFIER + .equalsIgnoreCase( + msrmnt.getName())) + .findAny(); + identity.ifPresent(m -> addTrafficModelMeasurement(event)); + } + + private void addTrafficModelMeasurement(VesEvent event) { + AdditionalMeasurement trafficModelMeasurement = + RanVesUtils.buildTrafficModelMeasurement(ranUeHolder, UE_PARAM_TRAFFIC_MODEL_RANGE); + event.getMeasurementFields().getAdditionalMeasurements().add(trafficModelMeasurement); + + collectAdditionalMeasurementValue(trafficModelMeasurement); + } + + // -----------helper classes + + private static class ValueFactory { + + public Value getInstance(String value) { + String[] split; + if (RanVesUtils.isRandomRange(value)) { + split = RanVesUtils.splitRandomRange(value); + return new RandomValue(Integer.valueOf(split[0]), Integer.valueOf(split[1])); + } + if (RanVesUtils.isTrandingRange(value)) { + split = RanVesUtils.splitTrendingRange(value); + Integer start = Integer.valueOf(split[0]); + Integer end = Integer.valueOf(split[1]); + if (start < end) { + return new RaisingValue(start, end); + } else if (start > end) { + return new DecreasingValue(start, end); + } + } + throw new RuntimeException(MessageFormat.format("Cannot instantiate Value from string: {0}", value)); + } + } + + private abstract static class Value { + + protected Integer start; + protected Integer end; + protected Integer current; + + public Value(Integer start, Integer end) { + this.start = start; + this.end = end; + } + + public abstract Integer calculateCurrentValue(); + } + + private static class RaisingValue extends Value { + + private int increment; + + public RaisingValue(Integer start, Integer end) { + super(start, end); + } + + @Override + public Integer calculateCurrentValue() { + if (current == null) { + return start; + } + if (increment == 0) { + increment = 1; + } else { + increment = increment * 2; + } + Integer result = start + increment; + if (result > end) { + increment = 1; + return end; + } + return result; + } + } + + private static class DecreasingValue extends Value { + + private int decrement; + + public DecreasingValue(Integer start, Integer end) { + super(start, end); + } + + @Override + public Integer calculateCurrentValue() { + if (current == null) { + return start; + } + if (decrement == 0) { + decrement = 1; + } else { + decrement = decrement * 2; + } + Integer result = start - decrement; + if (result < end) { + return end; + } + return result; + } + } + + private static class RandomValue extends Value { + + public RandomValue(Integer start, Integer end) { + super(start, end); + } + + @Override + public Integer calculateCurrentValue() { + return RanVesUtils.getRandomNumber(start, end); + } + } + + @AllArgsConstructor + @EqualsAndHashCode + private static class Key { + + private String paramName; + private String mapKey; + } +} diff --git a/src/main/java/org/onap/a1pesimulator/service/report/RanCheckCellIsDeadOnEvent.java b/src/main/java/org/onap/a1pesimulator/service/report/RanCheckCellIsDeadOnEvent.java new file mode 100644 index 0000000..a3fc3da --- /dev/null +++ b/src/main/java/org/onap/a1pesimulator/service/report/RanCheckCellIsDeadOnEvent.java @@ -0,0 +1,126 @@ +/* + * Copyright (C) 2021 Samsung Electronics + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.onap.a1pesimulator.service.report; + +import static org.onap.a1pesimulator.service.cell.RanCellStateService.TOPIC_CELL; + +import java.util.Optional; +import org.onap.a1pesimulator.data.cell.CellDetails; +import org.onap.a1pesimulator.data.cell.state.CellStateEnum; +import org.onap.a1pesimulator.data.ves.VesEvent; +import org.onap.a1pesimulator.data.ves.MeasurementFields; +import org.onap.a1pesimulator.service.cell.RanCellsHolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.messaging.simp.SimpMessagingTemplate; +import org.springframework.stereotype.Service; + +@Service +public class RanCheckCellIsDeadOnEvent implements OnEventAction { + + private static final Logger log = LoggerFactory.getLogger(RanCheckCellIsDeadOnEvent.class); + + private final RanCellsHolder ranCellsHolder; + private final SimpMessagingTemplate messagingTemplate; + + private final Integer failingModeThroughputValue; + private final Integer failingModeLatencyValue; + private final Integer failingCheckoutDelayTimeInSec; + + private static final int TO_MICRO_SEC = 1_000_000; + + public RanCheckCellIsDeadOnEvent(RanCellsHolder ranCellsHolder, SimpMessagingTemplate messagingTemplate, + @Value("${ves.failing.throughput}") Integer failingModeThroughputValue, + @Value("${ves.failing.latency}") Integer failingModeLatencyValue, + @Value("${ves.failing.checkout.delay}") Integer failingCheckoutDelayTimeInSec) { + this.ranCellsHolder = ranCellsHolder; + this.messagingTemplate = messagingTemplate; + + this.failingModeThroughputValue = failingModeThroughputValue; + this.failingModeLatencyValue = failingModeLatencyValue; + this.failingCheckoutDelayTimeInSec = failingCheckoutDelayTimeInSec; + } + + @Override + public void onEvent(VesEvent event) { + Optional<String> cellId = getCellIdentifier(event); + Optional<String> throughput = getCellThroughput(event); + Optional<String> latency = getCellLatency(event); + + if (cellId.isPresent() && throughput.isPresent() && latency.isPresent()) { + checkCell(cellId.get(), Integer.parseInt(throughput.get()), Integer.parseInt(latency.get()), + event.getCommonEventHeader().getLastEpochMicrosec()); + } + } + + private void checkCell(String cellId, Integer throughput, Integer latency, Long lastEpochMicrosec) { + if (throughput <= failingModeThroughputValue && latency >= failingModeLatencyValue) { + log.info("Failure mode detected for cell {}", cellId); + processSleepingMode(cellId, lastEpochMicrosec); + } + } + + private void processSleepingMode(String cellId, Long lastEpochMicrosec) { + CellDetails cell = ranCellsHolder.getCellById(cellId); + if (cell.getCellStateMachine().getState() == CellStateEnum.GOING_TO_SLEEP) { + Optional<RanCellsHolder.CellInFailureMode> cellInFailureModeOpt = + ranCellsHolder.getCellsInFailureMode(cellId); + if (cellInFailureModeOpt.isPresent()) { + RanCellsHolder.CellInFailureMode cellInFailureMode = cellInFailureModeOpt.get(); + if (cellInFailureMode.getSleepingModeDetectedTime() == null) { + cellInFailureMode.setSleepingModeDetectedTime(lastEpochMicrosec); + } else { + long waitingEpochMicrosec = addDelayTime(cellInFailureMode.getSleepingModeDetectedTime()); + if (lastEpochMicrosec >= waitingEpochMicrosec) { + log.info("Cell {} is sleeping!", cellId); + cell.nextState(); + messagingTemplate.convertAndSend(TOPIC_CELL, cell); + } + } + } + } + } + + private Optional<String> getCellIdentifier(VesEvent event) { + return getValueFromAdditionalMeasurement(event, "identifier"); + } + + private Optional<String> getCellThroughput(VesEvent event) { + return getValueFromAdditionalMeasurement(event, "throughput"); + } + + private Optional<String> getCellLatency(VesEvent event) { + return getValueFromAdditionalMeasurement(event, "latency"); + } + + private Optional<String> getValueFromAdditionalMeasurement(VesEvent event, String key) { + Optional<MeasurementFields.AdditionalMeasurement> measurement = getAdditionalMeasurement(event, key); + return measurement.map(this::getValueFromAdditionalMeasurement); + } + + private String getValueFromAdditionalMeasurement(MeasurementFields.AdditionalMeasurement measurement) { + return measurement.getHashMap().get("value"); + } + + private Optional<MeasurementFields.AdditionalMeasurement> getAdditionalMeasurement(VesEvent event, + String additionalMeasurement) { + return event.getMeasurementFields().getAdditionalMeasurements().stream() + .filter(e -> e.getName().equals(additionalMeasurement)).findFirst(); + } + + private long addDelayTime(long epoch) { + return epoch + failingCheckoutDelayTimeInSec * TO_MICRO_SEC; + } +} diff --git a/src/main/java/org/onap/a1pesimulator/service/report/RanEventCustomizerFactory.java b/src/main/java/org/onap/a1pesimulator/service/report/RanEventCustomizerFactory.java new file mode 100644 index 0000000..26ec13a --- /dev/null +++ b/src/main/java/org/onap/a1pesimulator/service/report/RanEventCustomizerFactory.java @@ -0,0 +1,49 @@ +/* + * Copyright (C) 2021 Samsung Electronics + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.onap.a1pesimulator.service.report; + +import java.text.MessageFormat; + +import org.onap.a1pesimulator.data.ves.VesEvent; +import org.onap.a1pesimulator.service.common.EventCustomizer; +import org.onap.a1pesimulator.service.ue.RanUeHolder; +import org.springframework.stereotype.Component; + +@Component +public class RanEventCustomizerFactory { + + private final EventCustomizer regularEventCustomizer; + private final RanUeHolder ranUeHolder; + + public RanEventCustomizerFactory(EventCustomizer regularEventCustomizer, RanUeHolder ranUeHolder) { + this.ranUeHolder = ranUeHolder; + this.regularEventCustomizer = regularEventCustomizer; + } + + public EventCustomizer getEventCustomizer(VesEvent event, Mode mode) { + switch (mode) { + case REGULAR: + return regularEventCustomizer; + case FAILURE: + return new RanCellFailureEventCustomizer(event, ranUeHolder); + default: + throw new RuntimeException( + MessageFormat.format("Cannot construct event customizer for mode: {0}", mode)); + } + } + + public enum Mode { + REGULAR, FAILURE + } +} diff --git a/src/main/java/org/onap/a1pesimulator/service/report/RanReportsBrokerService.java b/src/main/java/org/onap/a1pesimulator/service/report/RanReportsBrokerService.java new file mode 100644 index 0000000..a1af9ac --- /dev/null +++ b/src/main/java/org/onap/a1pesimulator/service/report/RanReportsBrokerService.java @@ -0,0 +1,50 @@ +/* + * Copyright (C) 2021 Samsung Electronics + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.onap.a1pesimulator.service.report; + +import java.util.Collection; +import java.util.Map; +import java.util.Optional; + +import org.onap.a1pesimulator.data.ReportingMethodEnum; +import org.onap.a1pesimulator.data.fileready.RanPeriodicEvent; +import org.onap.a1pesimulator.data.ves.VesEvent; +import org.springframework.http.ResponseEntity; + +public interface RanReportsBrokerService { + + ResponseEntity<String> startSendingReports(String identifier, VesEvent vesEvent, Integer interval, ReportingMethodEnum reportingMethods); + + Optional<RanPeriodicEvent> stopSendingReports(String identifier); + + Map<String, RanPeriodicEvent> getPeriodicEventsCache(); + + Collection<String> getEnabledEventElementIdentifiers(); + + RanPeriodicEvent getPeriodicEvent(String identifier); + + VesEvent startSendingFailureReports(String identifier, ReportingMethodEnum reportingMethods); + + VesEvent getGlobalPmVesStructure(); + + void setGlobalPmVesStructure(VesEvent event); + + Integer getGlobalVesInterval(); + + void setGlobalVesInterval(Integer interval); + + String getGlobalReportingMethod(); + + void setGlobalReportingMethod(String reportingMethod); +} diff --git a/src/main/java/org/onap/a1pesimulator/service/report/RanReportsBrokerServiceImpl.java b/src/main/java/org/onap/a1pesimulator/service/report/RanReportsBrokerServiceImpl.java new file mode 100644 index 0000000..b441164 --- /dev/null +++ b/src/main/java/org/onap/a1pesimulator/service/report/RanReportsBrokerServiceImpl.java @@ -0,0 +1,128 @@ +/* + * Copyright (C) 2021 Samsung Electronics + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.onap.a1pesimulator.service.report; + +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.Optional; + +import org.onap.a1pesimulator.data.ReportingMethodEnum; +import org.onap.a1pesimulator.data.fileready.RanPeriodicEvent; +import org.onap.a1pesimulator.data.ves.VesEvent; +import org.onap.a1pesimulator.data.ves.MeasurementFields.AdditionalMeasurement; +import org.onap.a1pesimulator.util.Constants; +import org.springframework.http.ResponseEntity; +import org.springframework.stereotype.Service; + +@Service +public class RanReportsBrokerServiceImpl implements RanReportsBrokerService { + + private final RanVesDataProvider vesDataProvider; + + private final RanVesHolder vesHolder; + + public RanReportsBrokerServiceImpl(RanVesDataProvider vesDataProvider, RanVesHolder vesHolder) { + this.vesDataProvider = vesDataProvider; + this.vesHolder = vesHolder; + } + + @Override + public Map<String, RanPeriodicEvent> getPeriodicEventsCache() { + return vesHolder.getPeriodicEventsCache(); + } + + @Override + public ResponseEntity<String> startSendingReports(String identifier, VesEvent vesEvent, Integer interval, ReportingMethodEnum reportingMethod) { + enrichWithIdentifier(identifier, vesEvent); + ResponseEntity<String> response = vesHolder.startSendingVesEvents(identifier, vesEvent, interval, reportingMethod); + return ResponseEntity.accepted().body(response.getBody()); + } + + @Override + public VesEvent startSendingFailureReports(String identifier, ReportingMethodEnum reportingMethod) { + + var vesEvent = vesDataProvider.getFailurePmVesEvent(); + + enrichWithIdentifier(identifier, vesEvent); + vesHolder.startSendingFailureVesEvents(identifier, vesEvent, reportingMethod); + return vesEvent; + } + + @Override + public Optional<RanPeriodicEvent> stopSendingReports(String identifier) { + return vesHolder.stopSendingVesEvents(identifier); + } + + @Override + public Collection<String> getEnabledEventElementIdentifiers() { + return vesHolder.getEnabledEventElementIdentifiers(); + } + + @Override + public RanPeriodicEvent getPeriodicEvent(String identifier) { + return vesHolder.getPeriodicEventForCell(identifier); + } + + @Override + public VesEvent getGlobalPmVesStructure() { + return vesDataProvider.getPmVesEvent(); + } + + @Override + public void setGlobalPmVesStructure(VesEvent event) { + vesDataProvider.setPmVesEvent(event); + } + + @Override + public Integer getGlobalVesInterval() { + return vesDataProvider.getRegularVesInterval(); + } + + @Override + public void setGlobalVesInterval(Integer interval) { + vesDataProvider.setInterval(interval); + } + + @Override + public String getGlobalReportingMethod() { + return vesDataProvider.getReportingMethod(); + } + + @Override + public void setGlobalReportingMethod(String reportingMethod) { + vesDataProvider.setReportingMethod(reportingMethod); + } + + private void enrichWithIdentifier(String identifier, VesEvent event) { + if (event.getMeasurementFields() == null || event.getMeasurementFields().getAdditionalMeasurements() == null) { + return; + } + Collection<AdditionalMeasurement> additionalMeasurements = + event.getMeasurementFields().getAdditionalMeasurements(); + Optional<AdditionalMeasurement> identityOpt = additionalMeasurements.stream() + .filter(m -> Constants.MEASUREMENT_FIELD_IDENTIFIER + .equalsIgnoreCase(m.getName())) + .findAny(); + if (identityOpt.isPresent()) { + identityOpt.get().getHashMap().put(Constants.MEASUREMENT_FIELD_IDENTIFIER, identifier); + } else { + var measurement = new AdditionalMeasurement(); + measurement.setName(Constants.MEASUREMENT_FIELD_IDENTIFIER); + measurement.setHashMap(Collections.singletonMap(Constants.MEASUREMENT_FIELD_VALUE, identifier)); + additionalMeasurements.add(measurement); + } + } + +} diff --git a/src/main/java/org/onap/a1pesimulator/service/report/RanSendVesRunnable.java b/src/main/java/org/onap/a1pesimulator/service/report/RanSendVesRunnable.java new file mode 100644 index 0000000..e982417 --- /dev/null +++ b/src/main/java/org/onap/a1pesimulator/service/report/RanSendVesRunnable.java @@ -0,0 +1,43 @@ +/* + * Copyright (C) 2021 Samsung Electronics + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.onap.a1pesimulator.service.report; + +import java.util.Collection; + +import org.onap.a1pesimulator.data.ves.VesEvent; +import org.onap.a1pesimulator.service.common.AbstractRanRunnable; +import org.onap.a1pesimulator.service.common.EventCustomizer; + +public class RanSendVesRunnable extends AbstractRanRunnable { + + private final RanVesSender vesSender; + + public RanSendVesRunnable(RanVesSender vesSender, VesEvent event, EventCustomizer eventCustomizer, + Collection<OnEventAction> onEventActions) { + super(event, eventCustomizer, onEventActions); + this.vesSender = vesSender; + } + + @Override + public void run() { + VesEvent customizedEvent = eventCustomizer.apply(event); + onEventAction.forEach(action -> action.onEvent(customizedEvent)); + vesSender.send(customizedEvent); + } + + @Override + public void updateEvent(VesEvent event) { + this.event = event; + } +} diff --git a/src/main/java/org/onap/a1pesimulator/service/report/RanVesDataProvider.java b/src/main/java/org/onap/a1pesimulator/service/report/RanVesDataProvider.java new file mode 100644 index 0000000..35f2773 --- /dev/null +++ b/src/main/java/org/onap/a1pesimulator/service/report/RanVesDataProvider.java @@ -0,0 +1,108 @@ +/* + * Copyright (C) 2021 Samsung Electronics + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.onap.a1pesimulator.service.report; + +import java.io.IOException; +import java.net.URL; + +import org.onap.a1pesimulator.data.ves.VesEvent; +import org.onap.a1pesimulator.util.JsonUtils; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.cache.annotation.Cacheable; +import org.springframework.core.io.Resource; +import org.springframework.core.io.ResourceLoader; +import org.springframework.stereotype.Service; + +import lombok.Setter; + +@Service +public class RanVesDataProvider { + + private static final String PM_VES_LOCATION = "classpath:pmVes.json"; + private static final String PM_FAILURE_VES_LOCATION = "classpath:failurePmVes.json"; + + @Setter + private VesEvent pmVesEvent; + @Setter + private VesEvent failurePmVesEvent; + @Setter + private Integer interval; + @Setter + private String reportingMethod; + + private final Integer defaultInterval; + private final String defaultReportingMethod; + private final ResourceLoader resourceLoader; + + public RanVesDataProvider(@Value("${ves.defaultInterval}") Integer defaultInterval, + @Value("${ves.defaultReportingMethod}") String defaultReportingMethod, + ResourceLoader resourceLoader) { + this.defaultInterval = defaultInterval; + this.defaultReportingMethod = defaultReportingMethod; + this.resourceLoader = resourceLoader; + } + + @Cacheable("pmVes") + public VesEvent loadPmVesEvent() { + URL resourceUrl = getResourceURL(resourceLoader.getResource(PM_VES_LOCATION)); + return JsonUtils.INSTANCE.deserializeFromFileUrl(resourceUrl, VesEvent.class); + } + + @Cacheable("failurePmVes") + public VesEvent loadFailurePmVesEvent() { + URL resourceUrl = getResourceURL(resourceLoader.getResource(PM_FAILURE_VES_LOCATION)); + return JsonUtils.INSTANCE.deserializeFromFileUrl(resourceUrl, VesEvent.class); + } + + public Integer getRegularVesInterval() { + if (interval == null) { + return defaultInterval; + } + return interval; + } + + public String getReportingMethod() { + if (reportingMethod == null) { + return defaultReportingMethod; + } + + return reportingMethod; + } + + public Integer getFailureVesInterval() { + return defaultInterval; + } + + public VesEvent getPmVesEvent() { + if (pmVesEvent == null) { + return loadPmVesEvent(); + } + return pmVesEvent; + } + + public VesEvent getFailurePmVesEvent() { + if (failurePmVesEvent == null) { + return loadFailurePmVesEvent(); + } + return failurePmVesEvent; + } + + private URL getResourceURL(Resource resource) { + try { + return resource.getURL(); + } catch (IOException e) { + throw new RuntimeException("Cannot get resource URL for: " + resource.getFilename()); + } + } +} diff --git a/src/main/java/org/onap/a1pesimulator/service/report/RanVesHolder.java b/src/main/java/org/onap/a1pesimulator/service/report/RanVesHolder.java new file mode 100644 index 0000000..dba7898 --- /dev/null +++ b/src/main/java/org/onap/a1pesimulator/service/report/RanVesHolder.java @@ -0,0 +1,240 @@ +/* + * Copyright (C) 2021 Samsung Electronics + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.onap.a1pesimulator.service.report; + +import static java.util.Objects.isNull; +import static java.util.Objects.nonNull; + +import java.text.MessageFormat; +import java.util.Collection; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledFuture; +import java.util.function.BiFunction; + +import org.onap.a1pesimulator.data.ReportingMethodEnum; +import org.onap.a1pesimulator.data.RequestParameters; +import org.onap.a1pesimulator.data.fileready.RanPeriodicEvent; +import org.onap.a1pesimulator.data.fileready.RanPeriodicSendReport; +import org.onap.a1pesimulator.data.ves.VesEvent; +import org.onap.a1pesimulator.service.common.AbstractRanRunnable; +import org.onap.a1pesimulator.service.common.EventCustomizer; +import org.onap.a1pesimulator.service.pm.RanFileReadyHolder; +import org.onap.a1pesimulator.service.pm.RanSaveFileReadyRunnable; +import org.onap.a1pesimulator.service.pm.RanSendReportsRunnable; +import org.onap.a1pesimulator.service.report.RanEventCustomizerFactory.Mode; +import org.onap.a1pesimulator.util.VnfConfigReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.http.ResponseEntity; +import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; +import org.springframework.stereotype.Service; + +import lombok.Getter; + +@Service +public class RanVesHolder { + + private static final Logger log = LoggerFactory.getLogger(RanVesHolder.class); + private final Map<String, RanPeriodicEvent> periodicEventsCache = new ConcurrentHashMap<>(); + + private final RanVesDataProvider vesDataProvider; + private final RanEventCustomizerFactory eventCustomizerFactory; + private final ThreadPoolTaskScheduler vesPmThreadPoolTaskScheduler; + private final Collection<OnEventAction> onEventActions; + private final RanFileReadyHolder ranFileReadyHolder; + private final RanVesSender vesSender; + private final VnfConfigReader vnfConfigReader; + private ThreadSendReportFunction threadSendReportFunction; + + public RanVesHolder(ThreadPoolTaskScheduler vesPmThreadPoolTaskScheduler, RanFileReadyHolder ranFileReadyHolder, RanVesSender vesSender, + VnfConfigReader vnfConfigReader, + RanEventCustomizerFactory eventCustomizerFactory, RanVesDataProvider vesDataProvider, + Collection<OnEventAction> onEventActions) { + this.vesPmThreadPoolTaskScheduler = vesPmThreadPoolTaskScheduler; + this.ranFileReadyHolder = ranFileReadyHolder; + this.vesSender = vesSender; + this.vnfConfigReader = vnfConfigReader; + this.eventCustomizerFactory = eventCustomizerFactory; + this.vesDataProvider = vesDataProvider; + this.onEventActions = onEventActions; + } + + /** + * Thread for periodical sending of PM Bulk Files and fileReady Events + */ + private void startSendingReports() { + if (isNull(threadSendReportFunction) || !threadSendReportFunction.isProcessRunning()) { + int repPeriod = vnfConfigReader.getVnfConfig().getRepPeriod(); + threadSendReportFunction = new ThreadSendReportFunction(vesPmThreadPoolTaskScheduler, repPeriod, ranFileReadyHolder); + threadSendReportFunction.startEvent(); + log.info("Start sending reports every {} seconds", repPeriod); + } + } + + /** + * Stops sending the report after the last cell was stopped. It send the last report before stop completely + */ + private void stopSendingReports(String cellId) { + sendLastReport(cellId); + if (nonNull(threadSendReportFunction) && !isAnyEventRunning()) { + threadSendReportFunction.getRanPeriodicVesEvent().getScheduledFuture().cancel(false); + log.info("Stop sending reports every {} seconds", vnfConfigReader.getVnfConfig().getRepPeriod()); + } + } + + /** + * Sends the last report after specific cell was stopped + */ + private void sendLastReport(String cellId) { + if (nonNull(threadSendReportFunction)) { + log.trace("Send last report after stop for cell: {}", cellId); + ranFileReadyHolder.createPMBulkFileAndSendFileReadyMessageForCellId(cellId); + } + } + + Map<String, RanPeriodicEvent> getPeriodicEventsCache() { + return periodicEventsCache; + } + + ResponseEntity<String> startSendingVesEvents(String identifier, VesEvent vesEvent, Integer interval, ReportingMethodEnum reportingMethod) { + + periodicEventsCache.compute(identifier, + new ThreadCacheUpdateFunction(vesPmThreadPoolTaskScheduler, eventCustomizerFactory.getEventCustomizer(vesEvent, Mode.REGULAR), onEventActions, + ranFileReadyHolder, vesSender, RequestParameters.builder() + .vesEvent(vesEvent).identifier(identifier).reportingMethod(reportingMethod).interval(interval).build())); + if (ReportingMethodEnum.FILE_READY.equals(reportingMethod)) { + startSendingReports(); + } + return ResponseEntity.accepted().body("VES Event sending started"); + } + + ResponseEntity<String> startSendingFailureVesEvents(String identifier, VesEvent vesEvent, ReportingMethodEnum reportingMethod) { + + periodicEventsCache.compute(identifier, + new ThreadCacheUpdateFunction(vesPmThreadPoolTaskScheduler, eventCustomizerFactory.getEventCustomizer(vesEvent, Mode.FAILURE), onEventActions, + ranFileReadyHolder, + vesSender, RequestParameters.builder().vesEvent(vesEvent).identifier(identifier).interval(vesDataProvider.getFailureVesInterval()) + .reportingMethod(reportingMethod).build())); + if (ReportingMethodEnum.FILE_READY.equals(reportingMethod)) { + startSendingReports(); + } + return ResponseEntity.accepted().body("Failure VES Event sending started"); + } + + Optional<RanPeriodicEvent> stopSendingVesEvents(String identifier) { + RanPeriodicEvent periodicEvent = periodicEventsCache.remove(identifier); + if (periodicEvent == null) { + return Optional.empty(); + } + periodicEvent.getScheduledFuture().cancel(false); + stopSendingReports(identifier); + return Optional.of(periodicEvent); + } + + Collection<String> getEnabledEventElementIdentifiers() { + return periodicEventsCache.keySet(); + } + + public boolean isEventEnabled(String identifier) { + return periodicEventsCache.containsKey(identifier); + } + + public boolean isAnyEventRunning() { + return !periodicEventsCache.isEmpty(); + } + + RanPeriodicEvent getPeriodicEventForCell(String identifier) { + if (!periodicEventsCache.containsKey(identifier)) { + throw new IllegalArgumentException( + MessageFormat.format("Cannot find event for given source {0}", identifier)); + } + return periodicEventsCache.get(identifier); + } + + private static class ThreadCacheUpdateFunction + implements BiFunction<String, RanPeriodicEvent, RanPeriodicEvent> { + + private final Integer interval; + private final ThreadPoolTaskScheduler vesPmThreadPoolTaskScheduler; + private final VesEvent vesEvent; + private final EventCustomizer eventCustomizer; + private final Collection<OnEventAction> onEventActions; + private final RanFileReadyHolder fileReadyHolder; + private final RanVesSender vesSender; + private final String cellId; + private final ReportingMethodEnum reportingMethod; + + public ThreadCacheUpdateFunction(ThreadPoolTaskScheduler vesPmThreadPoolTaskScheduler, EventCustomizer eventCustomizer, + Collection<OnEventAction> onEventActions, + RanFileReadyHolder fileReadyHolder, RanVesSender vesSender, RequestParameters requestParameters) { + this.vesPmThreadPoolTaskScheduler = vesPmThreadPoolTaskScheduler; + this.vesEvent = requestParameters.getVesEvent(); + this.interval = requestParameters.getInterval(); + this.eventCustomizer = eventCustomizer; + this.onEventActions = onEventActions; + this.fileReadyHolder = fileReadyHolder; + this.vesSender = vesSender; + this.cellId = requestParameters.getIdentifier(); + this.reportingMethod = requestParameters.getReportingMethod(); + } + + @Override + public RanPeriodicEvent apply(String key, RanPeriodicEvent value) { + if (value != null) { + // if thread is registered then cancel it and schedule a new one + value.getScheduledFuture().cancel(false); + } + AbstractRanRunnable ranRunnable = (ReportingMethodEnum.FILE_READY.equals(reportingMethod)) ? + new RanSaveFileReadyRunnable(fileReadyHolder, cellId, vesEvent, eventCustomizer, interval, onEventActions) : + new RanSendVesRunnable(vesSender, vesEvent, eventCustomizer, onEventActions); + + ScheduledFuture<?> scheduledFuture = + vesPmThreadPoolTaskScheduler.scheduleAtFixedRate(ranRunnable, interval * 1000L); + return RanPeriodicEvent.builder().event(vesEvent).interval(interval).reportingMethod(reportingMethod.getValue()).scheduledFuture(scheduledFuture) + .ranRunnable(ranRunnable).build(); + } + + } + + @Getter + private static class ThreadSendReportFunction { + + protected final Integer interval; + protected final ThreadPoolTaskScheduler vesPmThreadPoolTaskScheduler; + protected RanPeriodicSendReport ranPeriodicVesEvent; + protected ScheduledFuture<?> scheduledFuture; + protected final RanFileReadyHolder ranFileReadyHolder; + + public ThreadSendReportFunction(ThreadPoolTaskScheduler vesPmThreadPoolTaskScheduler, Integer interval, RanFileReadyHolder ranFileReadyHolder) { + this.vesPmThreadPoolTaskScheduler = vesPmThreadPoolTaskScheduler; + this.interval = interval; + this.ranFileReadyHolder = ranFileReadyHolder; + } + + public void startEvent() { + RanSendReportsRunnable ranSendReportsRunnable = + new RanSendReportsRunnable(ranFileReadyHolder); + scheduledFuture = vesPmThreadPoolTaskScheduler.scheduleAtFixedRate(ranSendReportsRunnable, interval * 1000L); + this.ranPeriodicVesEvent = RanPeriodicSendReport.builder().interval(interval).scheduledFuture(scheduledFuture) + .ranSendReportsRunnable(ranSendReportsRunnable).build(); + } + + public boolean isProcessRunning() { + return (nonNull(scheduledFuture) && !(scheduledFuture.isCancelled() || scheduledFuture.isDone())); + } + } + +} diff --git a/src/main/java/org/onap/a1pesimulator/service/report/RanVesSender.java b/src/main/java/org/onap/a1pesimulator/service/report/RanVesSender.java new file mode 100644 index 0000000..d12abd8 --- /dev/null +++ b/src/main/java/org/onap/a1pesimulator/service/report/RanVesSender.java @@ -0,0 +1,100 @@ +/* + * Copyright (C) 2021 Samsung Electronics + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package org.onap.a1pesimulator.service.report; + +import static java.util.Objects.nonNull; + +import org.onap.a1pesimulator.data.Event; +import org.onap.a1pesimulator.data.VnfConfig; +import org.onap.a1pesimulator.data.ves.CommonEventHeader; +import org.onap.a1pesimulator.exception.VesBrokerException; +import org.onap.a1pesimulator.util.JsonUtils; +import org.onap.a1pesimulator.util.VnfConfigReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.http.HttpEntity; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpMethod; +import org.springframework.http.HttpStatus; +import org.springframework.http.MediaType; +import org.springframework.http.ResponseEntity; +import org.springframework.stereotype.Service; +import org.springframework.web.client.RestTemplate; + +import reactor.core.publisher.Mono; + +@Service +public class RanVesSender { + + private static final Logger log = LoggerFactory.getLogger(RanVesSender.class); + + private RestTemplate restTemplate; + + private String vesCollectorProtocol; + + private String vesCollectorPath; + + private VnfConfigReader vnfConfigReader; + + public RanVesSender(RestTemplate restTemplate, VnfConfigReader vnfConfigReader, + @Value("${ves.collector.protocol}") String vesCollectorProtocol, + @Value("${ves.collector.endpoint}") String vesCollectorPath) { + this.restTemplate = restTemplate; + this.vnfConfigReader = vnfConfigReader; + this.vesCollectorProtocol = vesCollectorProtocol; + this.vesCollectorPath = vesCollectorPath; + } + + public Mono<HttpStatus> send(Event event) { + if (nonNull(event)) { + VnfConfig vnfConfig = vnfConfigReader.getVnfConfig(); + String url = getVesCollectorUrl(vnfConfig); + HttpHeaders headers = new HttpHeaders(); + headers.setContentType(MediaType.APPLICATION_JSON); + headers.setBasicAuth(vnfConfig.getVesUser(), vnfConfig.getVesPassword()); + + setVnfInfo(event, vnfConfig); + String eventInJson = JsonUtils.INSTANCE.objectToPrettyString(event); + + log.trace("Sending following event: {} ", eventInJson); + + HttpEntity<String> entity = new HttpEntity<>(eventInJson, headers); + ResponseEntity<String> response = restTemplate.exchange(url, HttpMethod.POST, entity, String.class); + + log.debug("Response received: {}", response); + + if (response.getStatusCode() == HttpStatus.OK || response.getStatusCode() == HttpStatus.ACCEPTED) { + return Mono.just(response.getStatusCode()); + } else { + String errorMsg = + "Failed to send VES event to the collector with response status code:" + response.getStatusCode(); + return Mono.error(new VesBrokerException(errorMsg)); + } + } + return Mono.error(new VesBrokerException("There is no event to send to the collector.")); + } + + private String getVesCollectorUrl(VnfConfig vnfConfig) { + return vesCollectorProtocol + "://" + vnfConfig.getVesHost() + ":" + vnfConfig.getVesPort() + vesCollectorPath; + } + + private void setVnfInfo(Event vesEvent, VnfConfig vnfConfig) { + CommonEventHeader header = vesEvent.getCommonEventHeader(); + header.setSourceId(vnfConfig.getVnfId()); + header.setSourceName(vnfConfig.getVnfName()); + vesEvent.setCommonEventHeader(header); + } + +} |