aboutsummaryrefslogtreecommitdiffstats
path: root/a1-policy-management/src/main/java/org
diff options
context:
space:
mode:
Diffstat (limited to 'a1-policy-management/src/main/java/org')
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/BeanFactory.java4
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/ServiceCallbacks.java23
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/PolicyTypes.java4
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Service.java41
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Services.java82
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java16
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java16
7 files changed, 153 insertions, 33 deletions
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/BeanFactory.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/BeanFactory.java
index 4b5c2f09..93fd2f77 100644
--- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/BeanFactory.java
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/BeanFactory.java
@@ -51,8 +51,8 @@ public class BeanFactory {
}
@Bean
- public Services getServices() {
- return new Services();
+ public Services getServices(@Autowired ApplicationConfig applicationConfig) {
+ return new Services(applicationConfig);
}
@Bean
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/ServiceCallbacks.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/ServiceCallbacks.java
index f9e446e4..b028cd63 100644
--- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/ServiceCallbacks.java
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/ServiceCallbacks.java
@@ -51,35 +51,28 @@ public class ServiceCallbacks {
this.restClient = restClientFactory.createRestClientNoHttpProxy("");
}
- public void notifyServicesRicSynchronized(Ric ric, Services services) {
- createTask(ric, services).subscribe(numberOfServices -> logger.debug("Services {} notified", numberOfServices),
- throwable -> logger.error("Service notification failed, cause: {}", throwable.getMessage()),
- () -> logger.debug("All services notified"));
-
- }
-
- private Mono<Integer> createTask(Ric ric, Services services) {
+ public Flux<Service> notifyServicesRicAvailable(Ric ric, Services services) {
+ final int CONCURRENCY = 10;
return Flux.fromIterable(services.getAll()) //
- .flatMap(service -> notifyServiceRicSynchronized(ric, service)) //
- .collectList() //
- .flatMap(okResponses -> Mono.just(Integer.valueOf(okResponses.size()))); //
+ .flatMap(service -> notifyService(ric, service, ServiceCallbackInfo.EventType.AVAILABLE), CONCURRENCY); //
}
- private Mono<String> notifyServiceRicSynchronized(Ric ric, Service service) {
+ private Mono<Service> notifyService(Ric ric, Service service, ServiceCallbackInfo.EventType eventType) {
if (service.getCallbackUrl().isEmpty()) {
return Mono.empty();
}
- ServiceCallbackInfo request = new ServiceCallbackInfo(ric.id(), ServiceCallbackInfo.EventType.AVAILABLE);
+ ServiceCallbackInfo request = new ServiceCallbackInfo(ric.id(), eventType);
String body = gson.toJson(request);
return restClient.post(service.getCallbackUrl(), body)
.doOnNext(resp -> logger.debug("Invoking service {} callback, ric: {}", service.getName(), ric.id()))
.onErrorResume(throwable -> {
- logger.error("Service: {}, callback: {} failed: {}", service.getName(), service.getCallbackUrl(),
+ logger.warn("Service: {}, callback: {} failed: {}", service.getName(), service.getCallbackUrl(),
throwable.toString());
return Mono.empty();
- });
+ }) //
+ .flatMap(resp -> Mono.just(service));
}
}
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/PolicyTypes.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/PolicyTypes.java
index ad3270cb..3248f214 100644
--- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/PolicyTypes.java
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/PolicyTypes.java
@@ -120,10 +120,10 @@ public class PolicyTypes {
this.types.put(type.getId(), type);
}
logger.debug("Restored type database,no of types: {}", this.types.size());
- } catch (IOException e) {
- logger.warn("Could not restore policy type database : {}", e.getMessage());
} catch (ServiceException e) {
logger.debug("Could not restore policy type database : {}", e.getMessage());
+ } catch (Exception e) {
+ logger.warn("Could not restore policy type database : {}", e.getMessage());
}
}
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Service.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Service.java
index 3c7c53a1..90ae28c5 100644
--- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Service.java
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Service.java
@@ -20,6 +20,13 @@
package org.onap.ccsdk.oran.a1policymanagementservice.repository;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.TypeAdapter;
+import com.google.gson.stream.JsonReader;
+import com.google.gson.stream.JsonWriter;
+
+import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
@@ -27,6 +34,40 @@ import lombok.Getter;
import lombok.Setter;
public class Service {
+
+ static class InstantAdapter extends TypeAdapter<Instant> {
+ @Override
+ public Instant read(JsonReader reader) throws IOException {
+ reader.skipValue();
+ return Instant.now(); // Pretend that the last ping was now (after a restart)
+ }
+
+ @Override
+ public void write(JsonWriter writer, Instant value) throws IOException {
+ writer.value(value.toString());
+ }
+ }
+
+ static class DurationAdapter extends TypeAdapter<Duration> {
+ @Override
+ public Duration read(JsonReader reader) throws IOException {
+ long value = reader.nextLong();
+ return Duration.ofNanos(value);
+ }
+
+ @Override
+ public void write(JsonWriter writer, Duration value) throws IOException {
+ writer.value(value.toNanos());
+ }
+ }
+
+ public static Gson createGson() {
+ return new GsonBuilder() //
+ .registerTypeAdapter(Instant.class, new Service.InstantAdapter()) //
+ .registerTypeAdapter(Duration.class, new Service.DurationAdapter()) //
+ .create();
+ }
+
@Getter
private final String name;
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Services.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Services.java
index 63633f67..2d2f6364 100644
--- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Services.java
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Services.java
@@ -20,19 +20,37 @@
package org.onap.ccsdk.oran.a1policymanagementservice.repository;
+import com.google.gson.Gson;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
import java.util.HashMap;
import java.util.Map;
import java.util.Vector;
+import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfig;
import org.onap.ccsdk.oran.a1policymanagementservice.exceptions.ServiceException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.util.FileSystemUtils;
public class Services {
private static final Logger logger = LoggerFactory.getLogger(Services.class);
+ private static Gson gson = Service.createGson();
+ private final ApplicationConfig appConfig;
private Map<String, Service> registeredServices = new HashMap<>();
+ public Services(@Autowired ApplicationConfig appConfig) {
+ this.appConfig = appConfig;
+ restoreFromDatabase();
+ }
+
public synchronized Service getService(String name) throws ServiceException {
Service s = registeredServices.get(name);
if (s == null) {
@@ -49,6 +67,7 @@ public class Services {
logger.debug("Put service: {}", service.getName());
service.keepAlive();
registeredServices.put(service.getName(), service);
+ store(service);
}
public synchronized Iterable<Service> getAll() {
@@ -56,7 +75,14 @@ public class Services {
}
public synchronized void remove(String name) {
- registeredServices.remove(name);
+ Service service = registeredServices.remove(name);
+ if (service != null) {
+ try {
+ Files.delete(getPath(service));
+ } catch (Exception e) {
+
+ }
+ }
}
public synchronized int size() {
@@ -65,5 +91,59 @@ public class Services {
public synchronized void clear() {
registeredServices.clear();
+ try {
+ FileSystemUtils.deleteRecursively(getDatabasePath());
+ } catch (Exception e) {
+ logger.warn("Could not delete services database : {}", e.getMessage());
+ }
+ }
+
+ public void store(Service service) {
+ try {
+ Files.createDirectories(getDatabasePath());
+ try (PrintStream out = new PrintStream(new FileOutputStream(getFile(service)))) {
+ String str = gson.toJson(service);
+ out.print(str);
+ }
+ } catch (ServiceException e) {
+ logger.debug("Could not store service: {} {}", service.getName(), e.getMessage());
+ } catch (IOException e) {
+ logger.warn("Could not store pservice: {} {}", service.getName(), e.getMessage());
+ }
+ }
+
+ private File getFile(Service service) throws ServiceException {
+ return getPath(service).toFile();
+ }
+
+ private Path getPath(Service service) throws ServiceException {
+ return Path.of(getDatabaseDirectory(), service.getName() + ".json");
+ }
+
+ void restoreFromDatabase() {
+ try {
+ Files.createDirectories(getDatabasePath());
+ for (File file : getDatabasePath().toFile().listFiles()) {
+ String json = Files.readString(file.toPath());
+ Service service = gson.fromJson(json, Service.class);
+ this.registeredServices.put(service.getName(), service);
+ }
+ logger.debug("Restored type database,no of services: {}", this.registeredServices.size());
+ } catch (ServiceException e) {
+ logger.debug("Could not restore services database : {}", e.getMessage());
+ } catch (Exception e) {
+ logger.warn("Could not restore services database : {}", e.getMessage());
+ }
+ }
+
+ private String getDatabaseDirectory() throws ServiceException {
+ if (appConfig.getVardataDirectory() == null) {
+ throw new ServiceException("No storage provided");
+ }
+ return appConfig.getVardataDirectory() + "/database/services";
+ }
+
+ private Path getDatabasePath() throws ServiceException {
+ return Path.of(getDatabaseDirectory());
}
}
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java
index c733cb0d..faef863e 100644
--- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java
@@ -35,6 +35,7 @@ import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationCo
import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfig.RicConfigUpdate;
import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfigParser;
import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ConfigurationFile;
+import org.onap.ccsdk.oran.a1policymanagementservice.controllers.ServiceCallbacks;
import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policies;
import org.onap.ccsdk.oran.a1policymanagementservice.repository.PolicyTypes;
import org.onap.ccsdk.oran.a1policymanagementservice.repository.Ric;
@@ -233,9 +234,11 @@ public class RefreshConfigTask {
RicConfigUpdate.Type event = updatedInfo.getType();
if (event == RicConfigUpdate.Type.ADDED) {
logger.debug("RIC added {}", ricId);
- Ric ric = new Ric(updatedInfo.getRicConfig());
- return trySyncronizeSupportedTypes(ric) //
+
+ return trySyncronizeSupportedTypes(new Ric(updatedInfo.getRicConfig())) //
.flatMap(this::addRic) //
+ .flatMap(this::notifyServicesRicAvailable) //
+ .doOnNext(ric -> ric.setState(RicState.AVAILABLE)) //
.flatMap(notUsed -> Mono.just(event));
} else if (event == RicConfigUpdate.Type.REMOVED) {
logger.debug("RIC removed {}", ricId);
@@ -263,11 +266,16 @@ public class RefreshConfigTask {
}
logger.debug("Added RIC: {}", ric.id());
- ric.setState(RicState.AVAILABLE);
-
return Mono.just(ric);
}
+ private Mono<Ric> notifyServicesRicAvailable(Ric ric) {
+ ServiceCallbacks callbacks = new ServiceCallbacks(this.restClientFactory);
+ return callbacks.notifyServicesRicAvailable(ric, services) //
+ .collectList() //
+ .flatMap(list -> Mono.just(ric));
+ }
+
/**
* Reads the configuration from file.
*/
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java
index 6ac104ca..0ccccb73 100644
--- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java
@@ -108,7 +108,7 @@ public class RicSynchronizationTask {
logger.warn("Synchronization failure for ric: {}, reason: {}", ric.id(), t.getMessage()); //
ric.setState(RicState.UNAVAILABLE); //
}) //
- .doOnNext(notUsed -> onSynchronizationComplete(ric)) //
+ .flatMap(notUsed -> onSynchronizationComplete(ric)) //
.onErrorResume(t -> Mono.just(ric));
}
@@ -142,14 +142,17 @@ public class RicSynchronizationTask {
return Flux.concat(synchronizedTypes, policiesDeletedInRic, policiesRecreatedInRic);
}
- private void onSynchronizationComplete(Ric ric) {
+ private Mono<Ric> onSynchronizationComplete(Ric ric) {
if (this.rics.get(ric.id()) == null) {
logger.debug("Policies removed in removed ric: {}", ric.id());
- return;
+ return Mono.empty();
}
logger.debug("Synchronization completed for: {}", ric.id());
ric.setState(RicState.AVAILABLE);
- notifyServices(ric);
+ ServiceCallbacks callbacks = new ServiceCallbacks(this.restClientFactory);
+ return callbacks.notifyServicesRicAvailable(ric, services) //
+ .collectList() //
+ .flatMap(list -> Mono.just(ric));
}
private Flux<Object> deleteAllPolicyInstances(Ric ric, Throwable t) {
@@ -165,11 +168,6 @@ public class RicSynchronizationTask {
return Flux.concat(synchronizedTypes, deletePoliciesInRic);
}
- void notifyServices(Ric ric) {
- ServiceCallbacks callbacks = new ServiceCallbacks(this.restClientFactory);
- callbacks.notifyServicesRicSynchronized(ric, services);
- }
-
private Mono<PolicyType> getPolicyType(String policyTypeId, A1Client a1Client) {
if (policyTypes.contains(policyTypeId)) {
return Mono.just(policyTypes.get(policyTypeId));