diff options
Diffstat (limited to 'src/main/java/org/onap/a1pesimulator/service/ves')
11 files changed, 259 insertions, 152 deletions
diff --git a/src/main/java/org/onap/a1pesimulator/service/ves/OnEventAction.java b/src/main/java/org/onap/a1pesimulator/service/ves/OnEventAction.java index b34594a..ba1ddab 100644 --- a/src/main/java/org/onap/a1pesimulator/service/ves/OnEventAction.java +++ b/src/main/java/org/onap/a1pesimulator/service/ves/OnEventAction.java @@ -13,10 +13,10 @@ package org.onap.a1pesimulator.service.ves; -import org.onap.a1pesimulator.data.ves.Event; +import org.onap.a1pesimulator.data.ves.VesEvent; @FunctionalInterface public interface OnEventAction { - void onEvent(Event event); + void onEvent(VesEvent event); } diff --git a/src/main/java/org/onap/a1pesimulator/service/ves/RanCellEventCustomizer.java b/src/main/java/org/onap/a1pesimulator/service/ves/RanCellEventCustomizer.java index 9922329..35c9215 100644 --- a/src/main/java/org/onap/a1pesimulator/service/ves/RanCellEventCustomizer.java +++ b/src/main/java/org/onap/a1pesimulator/service/ves/RanCellEventCustomizer.java @@ -15,10 +15,11 @@ package org.onap.a1pesimulator.service.ves; import java.util.List; import java.util.Optional; -import org.onap.a1pesimulator.data.ves.Event; + +import org.onap.a1pesimulator.data.ves.VesEvent; import org.onap.a1pesimulator.data.ves.MeasurementFields.AdditionalMeasurement; +import org.onap.a1pesimulator.service.common.EventCustomizer; import org.onap.a1pesimulator.service.ue.RanUeHolder; -import org.onap.a1pesimulator.service.ves.RanSendVesRunnable.EventCustomizer; import org.onap.a1pesimulator.util.Constants; import org.onap.a1pesimulator.util.JsonUtils; import org.onap.a1pesimulator.util.RanVesUtils; @@ -35,36 +36,36 @@ public class RanCellEventCustomizer implements EventCustomizer { } @Override - public Event apply(Event t) { - Event event = JsonUtils.INSTANCE.clone(t); + public VesEvent apply(VesEvent t) { + VesEvent event = JsonUtils.INSTANCE.clone(t); return customizeEvent(event); } - private Event customizeEvent(Event event) { + private VesEvent customizeEvent(VesEvent event) { RanVesUtils.updateHeader(event); enrichWithUeData(event); randomizeEvent(event); return event; } - private void randomizeEvent(Event event) { + private void randomizeEvent(VesEvent event) { List<AdditionalMeasurement> additionalMeasurementsToRandomize = event.getMeasurementFields().getAdditionalMeasurements(); event.getMeasurementFields().setAdditionalMeasurements( RanVesUtils.randomizeAdditionalMeasurements(additionalMeasurementsToRandomize)); } - private void enrichWithUeData(Event event) { + private void enrichWithUeData(VesEvent event) { Optional<AdditionalMeasurement> identity = event.getMeasurementFields().getAdditionalMeasurements().stream() - .filter(msrmnt -> Constants.MEASUREMENT_FIELD_IDENTIFIER - .equalsIgnoreCase( - msrmnt.getName())) - .findAny(); + .filter(msrmnt -> Constants.MEASUREMENT_FIELD_IDENTIFIER + .equalsIgnoreCase( + msrmnt.getName())) + .findAny(); identity.ifPresent(m -> addTrafficModelMeasurement(event, m)); } - private void addTrafficModelMeasurement(Event event, AdditionalMeasurement identity) { + private void addTrafficModelMeasurement(VesEvent event, AdditionalMeasurement identity) { AdditionalMeasurement trafficModelMeasurement = RanVesUtils.buildTrafficModelMeasurement(identity, ranUeHolder, UE_PARAM_TRAFFIC_MODEL_RANGE); event.getMeasurementFields().getAdditionalMeasurements().add(trafficModelMeasurement); diff --git a/src/main/java/org/onap/a1pesimulator/service/ves/RanCellFailureEventCustomizer.java b/src/main/java/org/onap/a1pesimulator/service/ves/RanCellFailureEventCustomizer.java index ac2c4fc..c45c717 100644 --- a/src/main/java/org/onap/a1pesimulator/service/ves/RanCellFailureEventCustomizer.java +++ b/src/main/java/org/onap/a1pesimulator/service/ves/RanCellFailureEventCustomizer.java @@ -20,26 +20,28 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Optional; -import lombok.AllArgsConstructor; -import lombok.EqualsAndHashCode; -import org.onap.a1pesimulator.data.ves.Event; + +import org.onap.a1pesimulator.data.ves.VesEvent; import org.onap.a1pesimulator.data.ves.MeasurementFields.AdditionalMeasurement; +import org.onap.a1pesimulator.service.common.EventCustomizer; import org.onap.a1pesimulator.service.ue.RanUeHolder; -import org.onap.a1pesimulator.service.ves.RanSendVesRunnable.EventCustomizer; import org.onap.a1pesimulator.util.Constants; import org.onap.a1pesimulator.util.JsonUtils; import org.onap.a1pesimulator.util.RanVesUtils; +import lombok.AllArgsConstructor; +import lombok.EqualsAndHashCode; + public class RanCellFailureEventCustomizer implements EventCustomizer { private static final String UE_PARAM_TRAFFIC_MODEL_RANGE = "[[50->10]]"; private final RanUeHolder ranUeHolder; - private final Event event; + private final VesEvent event; private final Map<Key, Value> additionalMeasurementsValues = new HashMap<>(); private final ValueFactory valueFactory; - public RanCellFailureEventCustomizer(Event event, RanUeHolder ranUeHolder) { + public RanCellFailureEventCustomizer(VesEvent event, RanUeHolder ranUeHolder) { this.ranUeHolder = ranUeHolder; this.event = event; valueFactory = new ValueFactory(); @@ -47,11 +49,11 @@ public class RanCellFailureEventCustomizer implements EventCustomizer { } @Override - public Event apply(Event t) { + public VesEvent apply(VesEvent t) { return customizeEvent(JsonUtils.INSTANCE.clone(this.event)); } - private void collectAdditionalMeasurementValues(Event event) { + private void collectAdditionalMeasurementValues(VesEvent event) { Collection<AdditionalMeasurement> additionalMeasurementsToResolve = event.getMeasurementFields().getAdditionalMeasurements(); additionalMeasurementsToResolve.forEach(this::collectAdditionalMeasurementValue); @@ -67,14 +69,14 @@ public class RanCellFailureEventCustomizer implements EventCustomizer { } } - private Event customizeEvent(Event event) { + private VesEvent customizeEvent(VesEvent event) { RanVesUtils.updateHeader(event); enrichWithUeData(event); resolveRanges(event); return event; } - private void resolveRanges(Event event) { + private void resolveRanges(VesEvent event) { List<AdditionalMeasurement> additionalMeasurementsToResolve = event.getMeasurementFields().getAdditionalMeasurements(); @@ -94,17 +96,17 @@ public class RanCellFailureEventCustomizer implements EventCustomizer { } } - private void enrichWithUeData(Event event) { + private void enrichWithUeData(VesEvent event) { Optional<AdditionalMeasurement> identity = event.getMeasurementFields().getAdditionalMeasurements().stream() - .filter(msrmnt -> Constants.MEASUREMENT_FIELD_IDENTIFIER - .equalsIgnoreCase( - msrmnt.getName())) - .findAny(); + .filter(msrmnt -> Constants.MEASUREMENT_FIELD_IDENTIFIER + .equalsIgnoreCase( + msrmnt.getName())) + .findAny(); identity.ifPresent(m -> addTrafficModelMeasurement(event, m)); } - private void addTrafficModelMeasurement(Event event, AdditionalMeasurement identity) { + private void addTrafficModelMeasurement(VesEvent event, AdditionalMeasurement identity) { AdditionalMeasurement trafficModelMeasurement = RanVesUtils.buildTrafficModelMeasurement(identity, ranUeHolder, UE_PARAM_TRAFFIC_MODEL_RANGE); event.getMeasurementFields().getAdditionalMeasurements().add(trafficModelMeasurement); diff --git a/src/main/java/org/onap/a1pesimulator/service/ves/RanCheckCellIsDeadOnEvent.java b/src/main/java/org/onap/a1pesimulator/service/ves/RanCheckCellIsDeadOnEvent.java index 1330e04..3d0c400 100644 --- a/src/main/java/org/onap/a1pesimulator/service/ves/RanCheckCellIsDeadOnEvent.java +++ b/src/main/java/org/onap/a1pesimulator/service/ves/RanCheckCellIsDeadOnEvent.java @@ -18,7 +18,7 @@ import static org.onap.a1pesimulator.service.cell.RanCellStateService.TOPIC_CELL import java.util.Optional; import org.onap.a1pesimulator.data.cell.CellDetails; import org.onap.a1pesimulator.data.cell.state.CellStateEnum; -import org.onap.a1pesimulator.data.ves.Event; +import org.onap.a1pesimulator.data.ves.VesEvent; import org.onap.a1pesimulator.data.ves.MeasurementFields; import org.onap.a1pesimulator.service.cell.RanCellsHolder; import org.slf4j.Logger; @@ -54,7 +54,7 @@ public class RanCheckCellIsDeadOnEvent implements OnEventAction { } @Override - public void onEvent(Event event) { + public void onEvent(VesEvent event) { Optional<String> cellId = getCellIdentifier(event); Optional<String> throughput = getCellThroughput(event); Optional<String> latency = getCellLatency(event); @@ -93,19 +93,19 @@ public class RanCheckCellIsDeadOnEvent implements OnEventAction { } } - private Optional<String> getCellIdentifier(Event event) { + private Optional<String> getCellIdentifier(VesEvent event) { return getValueFromAdditionalMeasurement(event, "identifier"); } - private Optional<String> getCellThroughput(Event event) { + private Optional<String> getCellThroughput(VesEvent event) { return getValueFromAdditionalMeasurement(event, "throughput"); } - private Optional<String> getCellLatency(Event event) { + private Optional<String> getCellLatency(VesEvent event) { return getValueFromAdditionalMeasurement(event, "latency"); } - private Optional<String> getValueFromAdditionalMeasurement(Event event, String key) { + private Optional<String> getValueFromAdditionalMeasurement(VesEvent event, String key) { Optional<MeasurementFields.AdditionalMeasurement> measurement = getAdditionalMeasurement(event, key); return measurement.map(this::getValueFromAdditionalMeasurement); } @@ -114,10 +114,10 @@ public class RanCheckCellIsDeadOnEvent implements OnEventAction { return measurement.getHashMap().get("value"); } - private Optional<MeasurementFields.AdditionalMeasurement> getAdditionalMeasurement(Event event, + private Optional<MeasurementFields.AdditionalMeasurement> getAdditionalMeasurement(VesEvent event, String additionalMeasurement) { return event.getMeasurementFields().getAdditionalMeasurements().stream() - .filter(e -> e.getName().equals(additionalMeasurement)).findFirst(); + .filter(e -> e.getName().equals(additionalMeasurement)).findFirst(); } private long addDelayTime(long epoch) { diff --git a/src/main/java/org/onap/a1pesimulator/service/ves/RanEventCustomizerFactory.java b/src/main/java/org/onap/a1pesimulator/service/ves/RanEventCustomizerFactory.java index 3fbeda9..c7c2ee9 100644 --- a/src/main/java/org/onap/a1pesimulator/service/ves/RanEventCustomizerFactory.java +++ b/src/main/java/org/onap/a1pesimulator/service/ves/RanEventCustomizerFactory.java @@ -14,9 +14,10 @@ package org.onap.a1pesimulator.service.ves; import java.text.MessageFormat; -import org.onap.a1pesimulator.data.ves.Event; + +import org.onap.a1pesimulator.data.ves.VesEvent; +import org.onap.a1pesimulator.service.common.EventCustomizer; import org.onap.a1pesimulator.service.ue.RanUeHolder; -import org.onap.a1pesimulator.service.ves.RanSendVesRunnable.EventCustomizer; import org.springframework.stereotype.Component; @Component @@ -30,7 +31,7 @@ public class RanEventCustomizerFactory { this.regularEventCustomizer = regularEventCustomizer; } - public EventCustomizer getEventCustomizer(Event event, Mode mode) { + public EventCustomizer getEventCustomizer(VesEvent event, Mode mode) { switch (mode) { case REGULAR: return regularEventCustomizer; diff --git a/src/main/java/org/onap/a1pesimulator/service/ves/RanSendVesRunnable.java b/src/main/java/org/onap/a1pesimulator/service/ves/RanSendVesRunnable.java index 7378bc0..c537a5f 100644 --- a/src/main/java/org/onap/a1pesimulator/service/ves/RanSendVesRunnable.java +++ b/src/main/java/org/onap/a1pesimulator/service/ves/RanSendVesRunnable.java @@ -14,44 +14,30 @@ package org.onap.a1pesimulator.service.ves; import java.util.Collection; -import java.util.function.Function; -import org.onap.a1pesimulator.data.ves.Event; -import org.onap.a1pesimulator.exception.VesBrokerException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -public class RanSendVesRunnable implements Runnable { +import org.onap.a1pesimulator.data.ves.VesEvent; +import org.onap.a1pesimulator.service.common.AbstractRanRunnable; +import org.onap.a1pesimulator.service.common.EventCustomizer; - private static final Logger log = LoggerFactory.getLogger(RanSendVesRunnable.class); +public class RanSendVesRunnable extends AbstractRanRunnable { private final RanVesSender vesSender; - private Event event; - private final EventCustomizer eventCustomizer; - private final Collection<OnEventAction> onEventAction; - public RanSendVesRunnable(RanVesSender vesSender, Event event, EventCustomizer eventCustomizer, + public RanSendVesRunnable(RanVesSender vesSender, VesEvent event, EventCustomizer eventCustomizer, Collection<OnEventAction> onEventActions) { + super(event, eventCustomizer, onEventActions); this.vesSender = vesSender; - this.event = event; - this.eventCustomizer = eventCustomizer; - this.onEventAction = onEventActions; } @Override public void run() { - try { - Event customizedEvent = eventCustomizer.apply(event); - onEventAction.forEach(action -> action.onEvent(customizedEvent)); - vesSender.send(customizedEvent); - } catch (VesBrokerException e) { - log.error("Sending scheduled event failed: {}", e.getMessage()); - } + VesEvent customizedEvent = eventCustomizer.apply(event); + onEventAction.forEach(action -> action.onEvent(customizedEvent)); + vesSender.send(customizedEvent); } - public void updateEvent(Event event) { + @Override + public void updateEvent(VesEvent event) { this.event = event; } - - @FunctionalInterface - public interface EventCustomizer extends Function<Event, Event> { } } diff --git a/src/main/java/org/onap/a1pesimulator/service/ves/RanVesBrokerService.java b/src/main/java/org/onap/a1pesimulator/service/ves/RanVesBrokerService.java index 8a90d46..8767251 100644 --- a/src/main/java/org/onap/a1pesimulator/service/ves/RanVesBrokerService.java +++ b/src/main/java/org/onap/a1pesimulator/service/ves/RanVesBrokerService.java @@ -16,27 +16,29 @@ package org.onap.a1pesimulator.service.ves; import java.util.Collection; import java.util.Map; import java.util.Optional; -import org.onap.a1pesimulator.data.ves.Event; -import org.onap.a1pesimulator.data.ves.RanPeriodicVesEvent; + +import org.onap.a1pesimulator.data.ReportingMethodEnum; +import org.onap.a1pesimulator.data.fileready.RanPeriodicEvent; +import org.onap.a1pesimulator.data.ves.VesEvent; import org.springframework.http.ResponseEntity; public interface RanVesBrokerService { - ResponseEntity<String> startSendingVesEvents(String identifier, Event vesEvent, Integer interval); + ResponseEntity<String> startSendingVesEvents(String identifier, VesEvent vesEvent, Integer interval, ReportingMethodEnum reportingMethods); - Optional<RanPeriodicVesEvent> stopSendingVesEvents(String identifier); + Optional<RanPeriodicEvent> stopSendingVesEvents(String identifier); - Map<String, RanPeriodicVesEvent> getPeriodicEventsCache(); + Map<String, RanPeriodicEvent> getPeriodicEventsCache(); Collection<String> getEnabledEventElementIdentifiers(); - Event getEventStructure(String identifier); + VesEvent getEventStructure(String identifier); - Event startSendingFailureVesEvents(String identifier); + VesEvent startSendingFailureVesEvents(String identifier, ReportingMethodEnum reportingMethods); - Event getGlobalPmVesStructure(); + VesEvent getGlobalPmVesStructure(); - void setGlobalPmVesStructure(Event event); + void setGlobalPmVesStructure(VesEvent event); Integer getGlobalVesInterval(); diff --git a/src/main/java/org/onap/a1pesimulator/service/ves/RanVesBrokerServiceImpl.java b/src/main/java/org/onap/a1pesimulator/service/ves/RanVesBrokerServiceImpl.java index 861bd36..4417212 100644 --- a/src/main/java/org/onap/a1pesimulator/service/ves/RanVesBrokerServiceImpl.java +++ b/src/main/java/org/onap/a1pesimulator/service/ves/RanVesBrokerServiceImpl.java @@ -17,9 +17,11 @@ import java.util.Collection; import java.util.Collections; import java.util.Map; import java.util.Optional; -import org.onap.a1pesimulator.data.ves.Event; + +import org.onap.a1pesimulator.data.ReportingMethodEnum; +import org.onap.a1pesimulator.data.fileready.RanPeriodicEvent; +import org.onap.a1pesimulator.data.ves.VesEvent; import org.onap.a1pesimulator.data.ves.MeasurementFields.AdditionalMeasurement; -import org.onap.a1pesimulator.data.ves.RanPeriodicVesEvent; import org.onap.a1pesimulator.util.Constants; import org.springframework.http.ResponseEntity; import org.springframework.stereotype.Service; @@ -37,31 +39,29 @@ public class RanVesBrokerServiceImpl implements RanVesBrokerService { } @Override - public Map<String, RanPeriodicVesEvent> getPeriodicEventsCache() { + public Map<String, RanPeriodicEvent> getPeriodicEventsCache() { return vesHolder.getPeriodicEventsCache(); } @Override - public ResponseEntity<String> startSendingVesEvents(String identifier, Event vesEvent, Integer interval) { - + public ResponseEntity<String> startSendingVesEvents(String identifier, VesEvent vesEvent, Integer interval, ReportingMethodEnum reportingMethod) { enrichWithIdentifier(identifier, vesEvent); - vesHolder.startSendingVesEvents(identifier, vesEvent, interval); - - return ResponseEntity.accepted().body("VES Event sending started"); + ResponseEntity<String> response = vesHolder.startSendingVesEvents(identifier, vesEvent, interval, reportingMethod); + return ResponseEntity.accepted().body(response.getBody()); } @Override - public Event startSendingFailureVesEvents(String identifier) { + public VesEvent startSendingFailureVesEvents(String identifier, ReportingMethodEnum reportingMethod) { - Event vesEvent = vesDataProvider.getFailurePmVesEvent(); + var vesEvent = vesDataProvider.getFailurePmVesEvent(); enrichWithIdentifier(identifier, vesEvent); - vesHolder.startSendingFailureVesEvents(identifier, vesEvent); + vesHolder.startSendingFailureVesEvents(identifier, vesEvent, reportingMethod); return vesEvent; } @Override - public Optional<RanPeriodicVesEvent> stopSendingVesEvents(String identifier) { + public Optional<RanPeriodicEvent> stopSendingVesEvents(String identifier) { return vesHolder.stopSendingVesEvents(identifier); } @@ -71,17 +71,17 @@ public class RanVesBrokerServiceImpl implements RanVesBrokerService { } @Override - public Event getEventStructure(String identifier) { + public VesEvent getEventStructure(String identifier) { return vesHolder.getEventStructure(identifier); } @Override - public Event getGlobalPmVesStructure() { + public VesEvent getGlobalPmVesStructure() { return vesDataProvider.getPmVesEvent(); } @Override - public void setGlobalPmVesStructure(Event event) { + public void setGlobalPmVesStructure(VesEvent event) { vesDataProvider.setPmVesEvent(event); } @@ -95,20 +95,20 @@ public class RanVesBrokerServiceImpl implements RanVesBrokerService { vesDataProvider.setInterval(interval); } - private void enrichWithIdentifier(String identifier, Event event) { + private void enrichWithIdentifier(String identifier, VesEvent event) { if (event.getMeasurementFields() == null || event.getMeasurementFields().getAdditionalMeasurements() == null) { return; } Collection<AdditionalMeasurement> additionalMeasurements = event.getMeasurementFields().getAdditionalMeasurements(); Optional<AdditionalMeasurement> identityOpt = additionalMeasurements.stream() - .filter(m -> Constants.MEASUREMENT_FIELD_IDENTIFIER - .equalsIgnoreCase(m.getName())) - .findAny(); + .filter(m -> Constants.MEASUREMENT_FIELD_IDENTIFIER + .equalsIgnoreCase(m.getName())) + .findAny(); if (identityOpt.isPresent()) { identityOpt.get().getHashMap().put(Constants.MEASUREMENT_FIELD_IDENTIFIER, identifier); } else { - AdditionalMeasurement measurement = new AdditionalMeasurement(); + var measurement = new AdditionalMeasurement(); measurement.setName(Constants.MEASUREMENT_FIELD_IDENTIFIER); measurement.setHashMap(Collections.singletonMap(Constants.MEASUREMENT_FIELD_VALUE, identifier)); additionalMeasurements.add(measurement); diff --git a/src/main/java/org/onap/a1pesimulator/service/ves/RanVesDataProvider.java b/src/main/java/org/onap/a1pesimulator/service/ves/RanVesDataProvider.java index 95743f3..9a9a2f6 100644 --- a/src/main/java/org/onap/a1pesimulator/service/ves/RanVesDataProvider.java +++ b/src/main/java/org/onap/a1pesimulator/service/ves/RanVesDataProvider.java @@ -15,8 +15,8 @@ package org.onap.a1pesimulator.service.ves; import java.io.IOException; import java.net.URL; -import lombok.Setter; -import org.onap.a1pesimulator.data.ves.Event; + +import org.onap.a1pesimulator.data.ves.VesEvent; import org.onap.a1pesimulator.util.JsonUtils; import org.springframework.beans.factory.annotation.Value; import org.springframework.cache.annotation.Cacheable; @@ -24,6 +24,8 @@ import org.springframework.core.io.Resource; import org.springframework.core.io.ResourceLoader; import org.springframework.stereotype.Service; +import lombok.Setter; + @Service public class RanVesDataProvider { @@ -31,9 +33,9 @@ public class RanVesDataProvider { private static final String PM_FAILURE_VES_LOCATION = "classpath:failurePmVes.json"; @Setter - private Event pmVesEvent; + private VesEvent pmVesEvent; @Setter - private Event failurePmVesEvent; + private VesEvent failurePmVesEvent; @Setter private Integer interval; @@ -46,15 +48,15 @@ public class RanVesDataProvider { } @Cacheable("pmVes") - public Event loadPmVesEvent() { + public VesEvent loadPmVesEvent() { URL resourceUrl = getResourceURL(resourceLoader.getResource(PM_VES_LOCATION)); - return JsonUtils.INSTANCE.deserializeFromFileUrl(resourceUrl, Event.class); + return JsonUtils.INSTANCE.deserializeFromFileUrl(resourceUrl, VesEvent.class); } @Cacheable("failurePmVes") - public Event loadFailurePmVesEvent() { + public VesEvent loadFailurePmVesEvent() { URL resourceUrl = getResourceURL(resourceLoader.getResource(PM_FAILURE_VES_LOCATION)); - return JsonUtils.INSTANCE.deserializeFromFileUrl(resourceUrl, Event.class); + return JsonUtils.INSTANCE.deserializeFromFileUrl(resourceUrl, VesEvent.class); } public Integer getRegularVesInterval() { @@ -68,14 +70,14 @@ public class RanVesDataProvider { return defaultInterval; } - public Event getPmVesEvent() { + public VesEvent getPmVesEvent() { if (pmVesEvent == null) { return loadPmVesEvent(); } return pmVesEvent; } - public Event getFailurePmVesEvent() { + public VesEvent getFailurePmVesEvent() { if (failurePmVesEvent == null) { return loadFailurePmVesEvent(); } diff --git a/src/main/java/org/onap/a1pesimulator/service/ves/RanVesHolder.java b/src/main/java/org/onap/a1pesimulator/service/ves/RanVesHolder.java index d53d8dd..f711347 100644 --- a/src/main/java/org/onap/a1pesimulator/service/ves/RanVesHolder.java +++ b/src/main/java/org/onap/a1pesimulator/service/ves/RanVesHolder.java @@ -13,6 +13,9 @@ package org.onap.a1pesimulator.service.ves; +import static java.util.Objects.isNull; +import static java.util.Objects.nonNull; + import java.text.MessageFormat; import java.util.Collection; import java.util.Map; @@ -20,61 +23,122 @@ import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledFuture; import java.util.function.BiFunction; -import org.onap.a1pesimulator.data.ves.Event; -import org.onap.a1pesimulator.data.ves.RanPeriodicVesEvent; + +import org.onap.a1pesimulator.data.ReportingMethodEnum; +import org.onap.a1pesimulator.data.RequestParameters; +import org.onap.a1pesimulator.data.fileready.RanPeriodicEvent; +import org.onap.a1pesimulator.data.fileready.RanPeriodicSendReport; +import org.onap.a1pesimulator.data.ves.VesEvent; +import org.onap.a1pesimulator.service.common.AbstractRanRunnable; +import org.onap.a1pesimulator.service.common.EventCustomizer; +import org.onap.a1pesimulator.service.fileready.RanFileReadyHolder; +import org.onap.a1pesimulator.service.fileready.RanSaveFileReadyRunnable; +import org.onap.a1pesimulator.service.fileready.RanSendReportsRunnable; import org.onap.a1pesimulator.service.ves.RanEventCustomizerFactory.Mode; -import org.onap.a1pesimulator.service.ves.RanSendVesRunnable.EventCustomizer; +import org.onap.a1pesimulator.util.VnfConfigReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.http.ResponseEntity; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import org.springframework.stereotype.Service; +import lombok.Getter; + @Service public class RanVesHolder { - private final Map<String, RanPeriodicVesEvent> periodicEventsCache = new ConcurrentHashMap<>(); + private static final Logger log = LoggerFactory.getLogger(RanVesHolder.class); + private final Map<String, RanPeriodicEvent> periodicEventsCache = new ConcurrentHashMap<>(); private final RanVesDataProvider vesDataProvider; private final RanEventCustomizerFactory eventCustomizerFactory; private final ThreadPoolTaskScheduler vesPmThreadPoolTaskScheduler; private final Collection<OnEventAction> onEventActions; + private final RanFileReadyHolder ranFileReadyHolder; private final RanVesSender vesSender; + private final VnfConfigReader vnfConfigReader; + private ThreadSendReportFunction threadSendReportFunction; - public RanVesHolder(ThreadPoolTaskScheduler vesPmThreadPoolTaskScheduler, RanVesSender vesSender, + public RanVesHolder(ThreadPoolTaskScheduler vesPmThreadPoolTaskScheduler, RanFileReadyHolder ranFileReadyHolder, RanVesSender vesSender, + VnfConfigReader vnfConfigReader, RanEventCustomizerFactory eventCustomizerFactory, RanVesDataProvider vesDataProvider, Collection<OnEventAction> onEventActions) { this.vesPmThreadPoolTaskScheduler = vesPmThreadPoolTaskScheduler; + this.ranFileReadyHolder = ranFileReadyHolder; this.vesSender = vesSender; + this.vnfConfigReader = vnfConfigReader; this.eventCustomizerFactory = eventCustomizerFactory; this.vesDataProvider = vesDataProvider; this.onEventActions = onEventActions; } - Map<String, RanPeriodicVesEvent> getPeriodicEventsCache() { + /** + * Thread for periodical sending of PM Bulk Files and fileReady Events + */ + private void startSendingReports() { + if (isNull(threadSendReportFunction) || !threadSendReportFunction.isProcessRunning()) { + int repPeriod = vnfConfigReader.getVnfConfig().getRepPeriod(); + threadSendReportFunction = new ThreadSendReportFunction(vesPmThreadPoolTaskScheduler, repPeriod, ranFileReadyHolder); + threadSendReportFunction.startEvent(); + log.info("Start sending reports every {} seconds", repPeriod); + } + } + + /** + * Stops sending the report after the last cell was stopped. It send the last report before stop completely + */ + private void stopSendingReports() { + if (nonNull(threadSendReportFunction) && !isAnyEventRunning()) { + threadSendReportFunction.getRanPeriodicVesEvent().getScheduledFuture().cancel(false); + sendLastReportAfterCancel(); + log.info("Stop sending reports every {} seconds", vnfConfigReader.getVnfConfig().getRepPeriod()); + } + } + + /** + * Sends the last report after all threads were stopped + */ + private void sendLastReportAfterCancel() { + log.trace("Send last report after report thread was canceled"); + ranFileReadyHolder.createPMBulkFileAndSendFileReadyMessage(); + } + + Map<String, RanPeriodicEvent> getPeriodicEventsCache() { return periodicEventsCache; } - ResponseEntity<String> startSendingVesEvents(String identifier, Event vesEvent, Integer interval) { + ResponseEntity<String> startSendingVesEvents(String identifier, VesEvent vesEvent, Integer interval, ReportingMethodEnum reportingMethod) { periodicEventsCache.compute(identifier, - new ThreadCacheUpdateFunction(vesPmThreadPoolTaskScheduler, vesEvent, interval, - eventCustomizerFactory.getEventCustomizer(vesEvent, Mode.REGULAR), onEventActions, vesSender)); + new ThreadCacheUpdateFunction(vesPmThreadPoolTaskScheduler, eventCustomizerFactory.getEventCustomizer(vesEvent, Mode.REGULAR), onEventActions, + ranFileReadyHolder, vesSender, RequestParameters.builder() + .vesEvent(vesEvent).identifier(identifier).reportingMethod(reportingMethod).interval(interval).build())); + if (ReportingMethodEnum.FILE_READY.equals(reportingMethod)) { + startSendingReports(); + } return ResponseEntity.accepted().body("VES Event sending started"); } - ResponseEntity<String> startSendingFailureVesEvents(String identifier, Event vesEvent) { + ResponseEntity<String> startSendingFailureVesEvents(String identifier, VesEvent vesEvent, ReportingMethodEnum reportingMethod) { - periodicEventsCache.compute(identifier, new ThreadCacheUpdateFunction(vesPmThreadPoolTaskScheduler, vesEvent, - vesDataProvider.getFailureVesInterval(), - eventCustomizerFactory.getEventCustomizer(vesEvent, Mode.FAILURE), onEventActions, vesSender)); + periodicEventsCache.compute(identifier, + new ThreadCacheUpdateFunction(vesPmThreadPoolTaskScheduler, eventCustomizerFactory.getEventCustomizer(vesEvent, Mode.FAILURE), onEventActions, + ranFileReadyHolder, + vesSender, RequestParameters.builder().vesEvent(vesEvent).identifier(identifier).interval(vesDataProvider.getFailureVesInterval()) + .reportingMethod(reportingMethod).build())); + if (ReportingMethodEnum.FILE_READY.equals(reportingMethod)) { + startSendingReports(); + } return ResponseEntity.accepted().body("Failure VES Event sending started"); } - Optional<RanPeriodicVesEvent> stopSendingVesEvents(String identifier) { - RanPeriodicVesEvent periodicEvent = periodicEventsCache.remove(identifier); + Optional<RanPeriodicEvent> stopSendingVesEvents(String identifier) { + RanPeriodicEvent periodicEvent = periodicEventsCache.remove(identifier); if (periodicEvent == null) { return Optional.empty(); } periodicEvent.getScheduledFuture().cancel(false); + stopSendingReports(); return Optional.of(periodicEvent); } @@ -86,7 +150,11 @@ public class RanVesHolder { return periodicEventsCache.containsKey(identifier); } - Event getEventStructure(String identifier) { + public boolean isAnyEventRunning() { + return !periodicEventsCache.isEmpty(); + } + + VesEvent getEventStructure(String identifier) { if (!periodicEventsCache.containsKey(identifier)) { throw new IllegalArgumentException( MessageFormat.format("Cannot find event for given source {0}", identifier)); @@ -95,39 +163,76 @@ public class RanVesHolder { } private static class ThreadCacheUpdateFunction - implements BiFunction<String, RanPeriodicVesEvent, RanPeriodicVesEvent> { + implements BiFunction<String, RanPeriodicEvent, RanPeriodicEvent> { private final Integer interval; private final ThreadPoolTaskScheduler vesPmThreadPoolTaskScheduler; - private final Event vesEvent; + private final VesEvent vesEvent; private final EventCustomizer eventCustomizer; private final Collection<OnEventAction> onEventActions; + private final RanFileReadyHolder fileReadyHolder; private final RanVesSender vesSender; + private final String cellId; + private final ReportingMethodEnum reportingMethod; - public ThreadCacheUpdateFunction(ThreadPoolTaskScheduler vesPmThreadPoolTaskScheduler, Event vesEvent, - Integer interval, EventCustomizer eventCustomizer, Collection<OnEventAction> onEventActions, - RanVesSender vesSender) { + public ThreadCacheUpdateFunction(ThreadPoolTaskScheduler vesPmThreadPoolTaskScheduler, EventCustomizer eventCustomizer, + Collection<OnEventAction> onEventActions, + RanFileReadyHolder fileReadyHolder, RanVesSender vesSender, RequestParameters requestParameters) { this.vesPmThreadPoolTaskScheduler = vesPmThreadPoolTaskScheduler; - this.vesEvent = vesEvent; - this.interval = interval; + this.vesEvent = requestParameters.getVesEvent(); + this.interval = requestParameters.getInterval(); this.eventCustomizer = eventCustomizer; this.onEventActions = onEventActions; + this.fileReadyHolder = fileReadyHolder; this.vesSender = vesSender; + this.cellId = requestParameters.getIdentifier(); + this.reportingMethod = requestParameters.getReportingMethod(); } @Override - public RanPeriodicVesEvent apply(String key, RanPeriodicVesEvent value) { + public RanPeriodicEvent apply(String key, RanPeriodicEvent value) { if (value != null) { // if thread is registered then cancel it and schedule a new one value.getScheduledFuture().cancel(false); } - RanSendVesRunnable sendVesRunnable = + AbstractRanRunnable ranRunnable = (ReportingMethodEnum.FILE_READY.equals(reportingMethod)) ? + new RanSaveFileReadyRunnable(fileReadyHolder, cellId, vesEvent, eventCustomizer, interval, onEventActions) : new RanSendVesRunnable(vesSender, vesEvent, eventCustomizer, onEventActions); + ScheduledFuture<?> scheduledFuture = - vesPmThreadPoolTaskScheduler.scheduleAtFixedRate(sendVesRunnable, interval * 1000L); - return RanPeriodicVesEvent.builder().event(vesEvent).interval(interval).scheduledFuture(scheduledFuture) - .sendVesRunnable(sendVesRunnable).build(); + vesPmThreadPoolTaskScheduler.scheduleAtFixedRate(ranRunnable, interval * 1000L); + return RanPeriodicEvent.builder().event(vesEvent).interval(interval).scheduledFuture(scheduledFuture) + .ranRunnable(ranRunnable).build(); } } + + @Getter + private static class ThreadSendReportFunction { + + protected final Integer interval; + protected final ThreadPoolTaskScheduler vesPmThreadPoolTaskScheduler; + protected RanPeriodicSendReport ranPeriodicVesEvent; + protected ScheduledFuture<?> scheduledFuture; + protected final RanFileReadyHolder ranFileReadyHolder; + + public ThreadSendReportFunction(ThreadPoolTaskScheduler vesPmThreadPoolTaskScheduler, Integer interval, RanFileReadyHolder ranFileReadyHolder) { + this.vesPmThreadPoolTaskScheduler = vesPmThreadPoolTaskScheduler; + this.interval = interval; + this.ranFileReadyHolder = ranFileReadyHolder; + } + + public void startEvent() { + RanSendReportsRunnable ranSendReportsRunnable = + new RanSendReportsRunnable(ranFileReadyHolder); + scheduledFuture = vesPmThreadPoolTaskScheduler.scheduleAtFixedRate(ranSendReportsRunnable, interval * 1000L); + this.ranPeriodicVesEvent = RanPeriodicSendReport.builder().interval(interval).scheduledFuture(scheduledFuture) + .ranSendReportsRunnable(ranSendReportsRunnable).build(); + } + + public boolean isProcessRunning() { + return (nonNull(scheduledFuture) && !(scheduledFuture.isCancelled() || scheduledFuture.isDone())); + } + } + } diff --git a/src/main/java/org/onap/a1pesimulator/service/ves/RanVesSender.java b/src/main/java/org/onap/a1pesimulator/service/ves/RanVesSender.java index 9c50197..85bccbb 100644 --- a/src/main/java/org/onap/a1pesimulator/service/ves/RanVesSender.java +++ b/src/main/java/org/onap/a1pesimulator/service/ves/RanVesSender.java @@ -13,9 +13,11 @@ package org.onap.a1pesimulator.service.ves; +import static java.util.Objects.nonNull; + +import org.onap.a1pesimulator.data.Event; import org.onap.a1pesimulator.data.VnfConfig; import org.onap.a1pesimulator.data.ves.CommonEventHeader; -import org.onap.a1pesimulator.data.ves.Event; import org.onap.a1pesimulator.exception.VesBrokerException; import org.onap.a1pesimulator.util.JsonUtils; import org.onap.a1pesimulator.util.VnfConfigReader; @@ -31,6 +33,8 @@ import org.springframework.http.ResponseEntity; import org.springframework.stereotype.Service; import org.springframework.web.client.RestTemplate; +import reactor.core.publisher.Mono; + @Service public class RanVesSender { @@ -53,30 +57,33 @@ public class RanVesSender { this.vesCollectorPath = vesCollectorPath; } - public ResponseEntity<String> send(Event vesEvent) throws VesBrokerException { - VnfConfig vnfConfig = vnfConfigReader.getVnfConfig(); - String url = getVesCollectorUrl(vnfConfig); - HttpHeaders headers = new HttpHeaders(); - headers.setContentType(MediaType.APPLICATION_JSON); - headers.setBasicAuth(vnfConfig.getVesUser(), vnfConfig.getVesPassword()); + public Mono<HttpStatus> send(Event event) { + if (nonNull(event)) { + VnfConfig vnfConfig = vnfConfigReader.getVnfConfig(); + String url = getVesCollectorUrl(vnfConfig); + HttpHeaders headers = new HttpHeaders(); + headers.setContentType(MediaType.APPLICATION_JSON); + headers.setBasicAuth(vnfConfig.getVesUser(), vnfConfig.getVesPassword()); - setVnfInfo(vesEvent, vnfConfig); - String event = JsonUtils.INSTANCE.objectToPrettyString(vesEvent); + setVnfInfo(event, vnfConfig); + String eventInJson = JsonUtils.INSTANCE.objectToPrettyString(event); - log.info("Sending following VES event: {}", event); + log.trace("Sending following event: {} ", eventInJson); - HttpEntity<String> entity = new HttpEntity<>(event, headers); - ResponseEntity<String> response = restTemplate.exchange(url, HttpMethod.POST, entity, String.class); + HttpEntity<String> entity = new HttpEntity<>(eventInJson, headers); + ResponseEntity<String> response = restTemplate.exchange(url, HttpMethod.POST, entity, String.class); - log.debug("Response received: {}", response); + log.debug("Response received: {}", response); - if (response.getStatusCode() == HttpStatus.OK || response.getStatusCode() == HttpStatus.ACCEPTED) { - return response; - } else { - String errorMsg = - "Failed to send VES event to the collector with response status code:" + response.getStatusCode(); - throw new VesBrokerException(errorMsg); + if (response.getStatusCode() == HttpStatus.OK || response.getStatusCode() == HttpStatus.ACCEPTED) { + return Mono.just(response.getStatusCode()); + } else { + String errorMsg = + "Failed to send VES event to the collector with response status code:" + response.getStatusCode(); + return Mono.error(new VesBrokerException(errorMsg)); + } } + return Mono.error(new VesBrokerException("There is no event to send to the collector.")); } private String getVesCollectorUrl(VnfConfig vnfConfig) { @@ -89,4 +96,5 @@ public class RanVesSender { header.setSourceName(vnfConfig.getVnfName()); vesEvent.setCommonEventHeader(header); } + } |