From 25d78500922291a2ade59799654c8b05a5ee7640 Mon Sep 17 00:00:00 2001 From: PatrikBuhr Date: Wed, 28 Apr 2021 08:42:13 +0200 Subject: PMS Persistent storage of service definitions - A1 Istanbul Storing or registerred services to survice a restart. Change-Id: If1b41d0a3c995b51bb93000caca5ecff9da6fbc1 Issue-ID: CCSDK-3256 Signed-off-by: PatrikBuhr --- .../a1policymanagementservice/BeanFactory.java | 4 +- .../controllers/ServiceCallbacks.java | 23 +++--- .../repository/PolicyTypes.java | 4 +- .../repository/Service.java | 41 +++++++++++ .../repository/Services.java | 82 +++++++++++++++++++++- .../tasks/RefreshConfigTask.java | 16 +++-- .../tasks/RicSynchronizationTask.java | 16 ++--- 7 files changed, 153 insertions(+), 33 deletions(-) (limited to 'a1-policy-management/src/main/java/org') diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/BeanFactory.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/BeanFactory.java index 4b5c2f09..93fd2f77 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/BeanFactory.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/BeanFactory.java @@ -51,8 +51,8 @@ public class BeanFactory { } @Bean - public Services getServices() { - return new Services(); + public Services getServices(@Autowired ApplicationConfig applicationConfig) { + return new Services(applicationConfig); } @Bean diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/ServiceCallbacks.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/ServiceCallbacks.java index f9e446e4..b028cd63 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/ServiceCallbacks.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/ServiceCallbacks.java @@ -51,35 +51,28 @@ public class ServiceCallbacks { this.restClient = restClientFactory.createRestClientNoHttpProxy(""); } - public void notifyServicesRicSynchronized(Ric ric, Services services) { - createTask(ric, services).subscribe(numberOfServices -> logger.debug("Services {} notified", numberOfServices), - throwable -> logger.error("Service notification failed, cause: {}", throwable.getMessage()), - () -> logger.debug("All services notified")); - - } - - private Mono createTask(Ric ric, Services services) { + public Flux notifyServicesRicAvailable(Ric ric, Services services) { + final int CONCURRENCY = 10; return Flux.fromIterable(services.getAll()) // - .flatMap(service -> notifyServiceRicSynchronized(ric, service)) // - .collectList() // - .flatMap(okResponses -> Mono.just(Integer.valueOf(okResponses.size()))); // + .flatMap(service -> notifyService(ric, service, ServiceCallbackInfo.EventType.AVAILABLE), CONCURRENCY); // } - private Mono notifyServiceRicSynchronized(Ric ric, Service service) { + private Mono notifyService(Ric ric, Service service, ServiceCallbackInfo.EventType eventType) { if (service.getCallbackUrl().isEmpty()) { return Mono.empty(); } - ServiceCallbackInfo request = new ServiceCallbackInfo(ric.id(), ServiceCallbackInfo.EventType.AVAILABLE); + ServiceCallbackInfo request = new ServiceCallbackInfo(ric.id(), eventType); String body = gson.toJson(request); return restClient.post(service.getCallbackUrl(), body) .doOnNext(resp -> logger.debug("Invoking service {} callback, ric: {}", service.getName(), ric.id())) .onErrorResume(throwable -> { - logger.error("Service: {}, callback: {} failed: {}", service.getName(), service.getCallbackUrl(), + logger.warn("Service: {}, callback: {} failed: {}", service.getName(), service.getCallbackUrl(), throwable.toString()); return Mono.empty(); - }); + }) // + .flatMap(resp -> Mono.just(service)); } } diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/PolicyTypes.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/PolicyTypes.java index ad3270cb..3248f214 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/PolicyTypes.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/PolicyTypes.java @@ -120,10 +120,10 @@ public class PolicyTypes { this.types.put(type.getId(), type); } logger.debug("Restored type database,no of types: {}", this.types.size()); - } catch (IOException e) { - logger.warn("Could not restore policy type database : {}", e.getMessage()); } catch (ServiceException e) { logger.debug("Could not restore policy type database : {}", e.getMessage()); + } catch (Exception e) { + logger.warn("Could not restore policy type database : {}", e.getMessage()); } } diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Service.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Service.java index 3c7c53a1..90ae28c5 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Service.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Service.java @@ -20,6 +20,13 @@ package org.onap.ccsdk.oran.a1policymanagementservice.repository; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.TypeAdapter; +import com.google.gson.stream.JsonReader; +import com.google.gson.stream.JsonWriter; + +import java.io.IOException; import java.time.Duration; import java.time.Instant; @@ -27,6 +34,40 @@ import lombok.Getter; import lombok.Setter; public class Service { + + static class InstantAdapter extends TypeAdapter { + @Override + public Instant read(JsonReader reader) throws IOException { + reader.skipValue(); + return Instant.now(); // Pretend that the last ping was now (after a restart) + } + + @Override + public void write(JsonWriter writer, Instant value) throws IOException { + writer.value(value.toString()); + } + } + + static class DurationAdapter extends TypeAdapter { + @Override + public Duration read(JsonReader reader) throws IOException { + long value = reader.nextLong(); + return Duration.ofNanos(value); + } + + @Override + public void write(JsonWriter writer, Duration value) throws IOException { + writer.value(value.toNanos()); + } + } + + public static Gson createGson() { + return new GsonBuilder() // + .registerTypeAdapter(Instant.class, new Service.InstantAdapter()) // + .registerTypeAdapter(Duration.class, new Service.DurationAdapter()) // + .create(); + } + @Getter private final String name; diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Services.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Services.java index 63633f67..2d2f6364 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Services.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Services.java @@ -20,19 +20,37 @@ package org.onap.ccsdk.oran.a1policymanagementservice.repository; +import com.google.gson.Gson; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.PrintStream; +import java.nio.file.Files; +import java.nio.file.Path; import java.util.HashMap; import java.util.Map; import java.util.Vector; +import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfig; import org.onap.ccsdk.oran.a1policymanagementservice.exceptions.ServiceException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.util.FileSystemUtils; public class Services { private static final Logger logger = LoggerFactory.getLogger(Services.class); + private static Gson gson = Service.createGson(); + private final ApplicationConfig appConfig; private Map registeredServices = new HashMap<>(); + public Services(@Autowired ApplicationConfig appConfig) { + this.appConfig = appConfig; + restoreFromDatabase(); + } + public synchronized Service getService(String name) throws ServiceException { Service s = registeredServices.get(name); if (s == null) { @@ -49,6 +67,7 @@ public class Services { logger.debug("Put service: {}", service.getName()); service.keepAlive(); registeredServices.put(service.getName(), service); + store(service); } public synchronized Iterable getAll() { @@ -56,7 +75,14 @@ public class Services { } public synchronized void remove(String name) { - registeredServices.remove(name); + Service service = registeredServices.remove(name); + if (service != null) { + try { + Files.delete(getPath(service)); + } catch (Exception e) { + + } + } } public synchronized int size() { @@ -65,5 +91,59 @@ public class Services { public synchronized void clear() { registeredServices.clear(); + try { + FileSystemUtils.deleteRecursively(getDatabasePath()); + } catch (Exception e) { + logger.warn("Could not delete services database : {}", e.getMessage()); + } + } + + public void store(Service service) { + try { + Files.createDirectories(getDatabasePath()); + try (PrintStream out = new PrintStream(new FileOutputStream(getFile(service)))) { + String str = gson.toJson(service); + out.print(str); + } + } catch (ServiceException e) { + logger.debug("Could not store service: {} {}", service.getName(), e.getMessage()); + } catch (IOException e) { + logger.warn("Could not store pservice: {} {}", service.getName(), e.getMessage()); + } + } + + private File getFile(Service service) throws ServiceException { + return getPath(service).toFile(); + } + + private Path getPath(Service service) throws ServiceException { + return Path.of(getDatabaseDirectory(), service.getName() + ".json"); + } + + void restoreFromDatabase() { + try { + Files.createDirectories(getDatabasePath()); + for (File file : getDatabasePath().toFile().listFiles()) { + String json = Files.readString(file.toPath()); + Service service = gson.fromJson(json, Service.class); + this.registeredServices.put(service.getName(), service); + } + logger.debug("Restored type database,no of services: {}", this.registeredServices.size()); + } catch (ServiceException e) { + logger.debug("Could not restore services database : {}", e.getMessage()); + } catch (Exception e) { + logger.warn("Could not restore services database : {}", e.getMessage()); + } + } + + private String getDatabaseDirectory() throws ServiceException { + if (appConfig.getVardataDirectory() == null) { + throw new ServiceException("No storage provided"); + } + return appConfig.getVardataDirectory() + "/database/services"; + } + + private Path getDatabasePath() throws ServiceException { + return Path.of(getDatabaseDirectory()); } } diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java index c733cb0d..faef863e 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java @@ -35,6 +35,7 @@ import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationCo import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfig.RicConfigUpdate; import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfigParser; import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ConfigurationFile; +import org.onap.ccsdk.oran.a1policymanagementservice.controllers.ServiceCallbacks; import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policies; import org.onap.ccsdk.oran.a1policymanagementservice.repository.PolicyTypes; import org.onap.ccsdk.oran.a1policymanagementservice.repository.Ric; @@ -233,9 +234,11 @@ public class RefreshConfigTask { RicConfigUpdate.Type event = updatedInfo.getType(); if (event == RicConfigUpdate.Type.ADDED) { logger.debug("RIC added {}", ricId); - Ric ric = new Ric(updatedInfo.getRicConfig()); - return trySyncronizeSupportedTypes(ric) // + + return trySyncronizeSupportedTypes(new Ric(updatedInfo.getRicConfig())) // .flatMap(this::addRic) // + .flatMap(this::notifyServicesRicAvailable) // + .doOnNext(ric -> ric.setState(RicState.AVAILABLE)) // .flatMap(notUsed -> Mono.just(event)); } else if (event == RicConfigUpdate.Type.REMOVED) { logger.debug("RIC removed {}", ricId); @@ -263,11 +266,16 @@ public class RefreshConfigTask { } logger.debug("Added RIC: {}", ric.id()); - ric.setState(RicState.AVAILABLE); - return Mono.just(ric); } + private Mono notifyServicesRicAvailable(Ric ric) { + ServiceCallbacks callbacks = new ServiceCallbacks(this.restClientFactory); + return callbacks.notifyServicesRicAvailable(ric, services) // + .collectList() // + .flatMap(list -> Mono.just(ric)); + } + /** * Reads the configuration from file. */ diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java index 6ac104ca..0ccccb73 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java @@ -108,7 +108,7 @@ public class RicSynchronizationTask { logger.warn("Synchronization failure for ric: {}, reason: {}", ric.id(), t.getMessage()); // ric.setState(RicState.UNAVAILABLE); // }) // - .doOnNext(notUsed -> onSynchronizationComplete(ric)) // + .flatMap(notUsed -> onSynchronizationComplete(ric)) // .onErrorResume(t -> Mono.just(ric)); } @@ -142,14 +142,17 @@ public class RicSynchronizationTask { return Flux.concat(synchronizedTypes, policiesDeletedInRic, policiesRecreatedInRic); } - private void onSynchronizationComplete(Ric ric) { + private Mono onSynchronizationComplete(Ric ric) { if (this.rics.get(ric.id()) == null) { logger.debug("Policies removed in removed ric: {}", ric.id()); - return; + return Mono.empty(); } logger.debug("Synchronization completed for: {}", ric.id()); ric.setState(RicState.AVAILABLE); - notifyServices(ric); + ServiceCallbacks callbacks = new ServiceCallbacks(this.restClientFactory); + return callbacks.notifyServicesRicAvailable(ric, services) // + .collectList() // + .flatMap(list -> Mono.just(ric)); } private Flux deleteAllPolicyInstances(Ric ric, Throwable t) { @@ -165,11 +168,6 @@ public class RicSynchronizationTask { return Flux.concat(synchronizedTypes, deletePoliciesInRic); } - void notifyServices(Ric ric) { - ServiceCallbacks callbacks = new ServiceCallbacks(this.restClientFactory); - callbacks.notifyServicesRicSynchronized(ric, services); - } - private Mono getPolicyType(String policyTypeId, A1Client a1Client) { if (policyTypes.contains(policyTypeId)) { return Mono.just(policyTypes.get(policyTypeId)); -- cgit 1.2.3-korg