summaryrefslogtreecommitdiffstats
path: root/a1-policy-management/src
diff options
context:
space:
mode:
authorPatrikBuhr <patrik.buhr@est.tech>2022-10-14 11:38:22 +0200
committerPatrikBuhr <patrik.buhr@est.tech>2022-11-11 15:08:52 +0100
commite3693cfdaf24a9ecaa341c938eb8b1d61a61cbf6 (patch)
treea2e2a791c5ef0e710056a3ebf0a3e2ad2ab12a95 /a1-policy-management/src
parent7e311a7dd9490ba90decde4b98460bb965567c08 (diff)
Support for using Amazon S3 - Cloud Object Storage
Introduce using Amazon S3 - Cloud Object Storage - AWS for storing of data. Change-Id: I68365c24c63544b5ad8e958a98f48d95f83e3084 Issue-ID: CCSDK-3810 Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Diffstat (limited to 'a1-policy-management/src')
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/BeanFactory.java25
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/configuration/ApplicationConfig.java23
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/configuration/Meters.java (renamed from a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshCounterTask.java)38
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v2/PolicyController.java4
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/datastore/DataStore.java55
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/datastore/FileStore.java158
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/datastore/NullStore.java68
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/datastore/S3ObjectStore.java227
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Policies.java130
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/PolicyTypes.java77
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Services.java86
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java5
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java2
-rw-r--r--a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/configuration/MetersTest.java (renamed from a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshCounterTaskTest.java)84
-rw-r--r--a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v2/ApplicationTest.java39
-rw-r--r--a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervisionTest.java1
16 files changed, 738 insertions, 284 deletions
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/BeanFactory.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/BeanFactory.java
index 8fc8bc8c..0d93eae2 100644
--- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/BeanFactory.java
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/BeanFactory.java
@@ -20,11 +20,13 @@
package org.onap.ccsdk.oran.a1policymanagementservice;
-import com.fasterxml.jackson.databind.ObjectMapper;
+
import org.apache.catalina.connector.Connector;
import org.onap.ccsdk.oran.a1policymanagementservice.clients.A1ClientFactory;
import org.onap.ccsdk.oran.a1policymanagementservice.clients.SecurityContext;
import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfig;
+import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policies;
+import org.onap.ccsdk.oran.a1policymanagementservice.repository.PolicyTypes;
import org.onap.ccsdk.oran.a1policymanagementservice.repository.Rics;
import org.onap.ccsdk.oran.a1policymanagementservice.repository.Services;
import org.springframework.beans.factory.annotation.Autowired;
@@ -52,18 +54,27 @@ public class BeanFactory {
@Bean
public Services getServices(@Autowired ApplicationConfig applicationConfig) {
- return new Services(applicationConfig);
+ Services services = new Services(applicationConfig);
+ services.restoreFromDatabase().subscribe();
+ return services;
}
@Bean
- public A1ClientFactory getA1ClientFactory(@Autowired ApplicationConfig applicationConfig,
- @Autowired SecurityContext securityContext) {
- return new A1ClientFactory(applicationConfig, securityContext);
+ public PolicyTypes getPolicyTypes(@Autowired ApplicationConfig applicationConfig) {
+ PolicyTypes types = new PolicyTypes(applicationConfig);
+ types.restoreFromDatabase().blockLast();
+ return types;
+ }
+
+ @Bean
+ public Policies getPolicies(@Autowired ApplicationConfig applicationConfig) {
+ return new Policies(applicationConfig);
}
@Bean
- public ObjectMapper mapper() {
- return new ObjectMapper();
+ public A1ClientFactory getA1ClientFactory(@Autowired ApplicationConfig applicationConfig,
+ @Autowired SecurityContext securityContext) {
+ return new A1ClientFactory(applicationConfig, securityContext);
}
@Bean
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/configuration/ApplicationConfig.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/configuration/ApplicationConfig.java
index 0e2294fc..6dbf318d 100644
--- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/configuration/ApplicationConfig.java
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/configuration/ApplicationConfig.java
@@ -20,6 +20,8 @@
package org.onap.ccsdk.oran.a1policymanagementservice.configuration;
+import com.google.common.base.Strings;
+
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@@ -82,6 +84,22 @@ public class ApplicationConfig {
@Value("${app.webclient.http.proxy-type:HTTP}")
private String httpProxyType = "HTTP";
+ @Getter
+ @Value("${app.s3.endpointOverride:}")
+ private String s3EndpointOverride;
+
+ @Getter
+ @Value("${app.s3.accessKeyId:}")
+ private String s3AccessKeyId;
+
+ @Getter
+ @Value("${app.s3.secretAccessKey:}")
+ private String s3SecretAccessKey;
+
+ @Getter
+ @Value("${app.s3.bucket:}")
+ private String s3Bucket;
+
private Map<String, RicConfig> ricConfigs = new HashMap<>();
@Getter
@@ -182,4 +200,9 @@ public class ApplicationConfig {
return Flux.fromIterable(modifications);
}
+
+ public boolean isS3Enabled() {
+ return !(Strings.isNullOrEmpty(s3EndpointOverride) || Strings.isNullOrEmpty(s3Bucket));
+ }
+
}
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshCounterTask.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/configuration/Meters.java
index 95999a04..203bfacc 100644
--- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshCounterTask.java
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/configuration/Meters.java
@@ -18,52 +18,28 @@
* ========================LICENSE_END===================================
*/
-package org.onap.ccsdk.oran.a1policymanagementservice.tasks;
+package org.onap.ccsdk.oran.a1policymanagementservice.configuration;
import io.micrometer.core.instrument.MeterRegistry;
-import java.lang.invoke.MethodHandles;
-import lombok.AccessLevel;
-import lombok.Getter;
+
import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policies;
import org.onap.ccsdk.oran.a1policymanagementservice.repository.PolicyTypes;
import org.onap.ccsdk.oran.a1policymanagementservice.repository.Rics;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
- * The aim is to collect statistical values from the A1 Policy Management Service.
+ * The aim is to collect statistical values from the A1 Policy Management
+ * Service.
+ * The counters are being updated every minute.
*/
@Component
-public class RefreshCounterTask {
-
- private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
- @Autowired
- private final Rics rics;
-
- @Autowired
- private final PolicyTypes policyTypes;
+public class Meters {
@Autowired
- private final Policies policies;
-
- @Autowired
- @Getter(AccessLevel.PUBLIC)
- private final MeterRegistry meterRegistry;
-
- @Autowired
- public RefreshCounterTask(Rics rics, PolicyTypes policyTypes, Policies policies, MeterRegistry meterRegistry) {
- this.rics = rics;
- this.policyTypes = policyTypes;
- this.policies = policies;
- this.meterRegistry = meterRegistry;
-
- logger.trace("Counters have been initialized.");
+ public Meters(Rics rics, PolicyTypes policyTypes, Policies policies, MeterRegistry meterRegistry) {
meterRegistry.gauge("total_ric_count", rics, Rics::size);
meterRegistry.gauge("total_policy_type_count", policyTypes, PolicyTypes::size);
meterRegistry.gauge("total_policy_count", policies, Policies::size);
}
-
}
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v2/PolicyController.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v2/PolicyController.java
index ee2cae58..500ddd2a 100644
--- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v2/PolicyController.java
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v2/PolicyController.java
@@ -22,6 +22,7 @@ package org.onap.ccsdk.oran.a1policymanagementservice.controllers.v2;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
+
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.media.Content;
@@ -29,12 +30,15 @@ import io.swagger.v3.oas.annotations.media.Schema;
import io.swagger.v3.oas.annotations.responses.ApiResponse;
import io.swagger.v3.oas.annotations.responses.ApiResponses;
import io.swagger.v3.oas.annotations.tags.Tag;
+
import java.lang.invoke.MethodHandles;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+
import lombok.Getter;
+
import org.onap.ccsdk.oran.a1policymanagementservice.clients.A1ClientFactory;
import org.onap.ccsdk.oran.a1policymanagementservice.controllers.VoidResponse;
import org.onap.ccsdk.oran.a1policymanagementservice.exceptions.EntityNotFoundException;
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/datastore/DataStore.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/datastore/DataStore.java
new file mode 100644
index 00000000..51b57dee
--- /dev/null
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/datastore/DataStore.java
@@ -0,0 +1,55 @@
+/*-
+ * ========================LICENSE_START=================================
+ * ONAP : ccsdk oran
+ * ======================================================================
+ * Copyright (C) 2022 Nordix Foundation. All rights reserved.
+ * ======================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.onap.ccsdk.oran.a1policymanagementservice.datastore;
+
+import com.google.common.base.Strings;
+
+import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfig;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+public interface DataStore {
+
+ public Flux<String> listObjects(String prefix);
+
+ public Mono<byte[]> readObject(String name);
+
+ public Mono<byte[]> writeObject(String name, byte[] fileData);
+
+ public Mono<Boolean> deleteObject(String name);
+
+ public Mono<String> createDataStore();
+
+ public Mono<String> deleteAllObjects();
+
+ public static DataStore create(ApplicationConfig appConfig, String location) {
+ if (appConfig.isS3Enabled()) {
+ return new S3ObjectStore(appConfig, location);
+ } else if (!Strings.isNullOrEmpty(appConfig.getVardataDirectory())) {
+ return new FileStore(appConfig, location);
+ } else {
+ return new NullStore(location);
+ }
+
+ }
+
+}
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/datastore/FileStore.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/datastore/FileStore.java
new file mode 100644
index 00000000..565120e1
--- /dev/null
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/datastore/FileStore.java
@@ -0,0 +1,158 @@
+/*-
+ * ========================LICENSE_START=================================
+ * ONAP : ccsdk oran
+ * ======================================================================
+ * Copyright (C) 2022 Nordix Foundation. All rights reserved.
+ * ======================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+
+package org.onap.ccsdk.oran.a1policymanagementservice.datastore;
+
+import com.google.common.base.Strings;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Stream;
+
+import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+class FileStore implements DataStore {
+ private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ ApplicationConfig applicationConfig;
+ private final String location;
+
+ public FileStore(ApplicationConfig applicationConfig, String location) {
+ this.applicationConfig = applicationConfig;
+ this.location = location;
+ }
+
+ @Override
+ public Flux<String> listObjects(String prefix) {
+ Path root = Path.of(path().toString(), prefix);
+ if (!root.toFile().exists()) {
+ root = root.getParent();
+ }
+
+ logger.debug("Listing files in: {}", root);
+
+ List<String> result = new ArrayList<>();
+ try (Stream<Path> stream = Files.walk(root, Integer.MAX_VALUE)) {
+
+ stream.forEach(path -> filterListFiles(path, prefix, result));
+
+ return Flux.fromIterable(result);
+ } catch (Exception e) {
+ logger.warn("Could not list filed in {}, reason; {}", root, e.getMessage());
+ return Flux.error(e);
+ }
+ }
+
+ private void filterListFiles(Path path, String prefix, List<String> result) {
+ if (path.toFile().isFile() && externalName(path).startsWith(prefix)) {
+ result.add(externalName(path));
+ } else {
+ logger.trace("Ignoring file/directory {}, prefix: {}", path, prefix);
+ }
+ }
+
+ private String externalName(Path path) {
+ String fullName = path.toString();
+ String externalName = fullName.substring(path().toString().length());
+ if (externalName.startsWith("/")) {
+ externalName = externalName.substring(1);
+ }
+ return externalName;
+ }
+
+ @Override
+ public Mono<byte[]> readObject(String fileName) {
+ try {
+ byte[] contents = Files.readAllBytes(path(fileName));
+ return Mono.just(contents);
+ } catch (Exception e) {
+ return Mono.error(e);
+ }
+ }
+
+ @Override
+ public Mono<Boolean> deleteObject(String name) {
+ try {
+ Files.delete(path(name));
+ return Mono.just(true);
+ } catch (Exception e) {
+ logger.debug("Could not delete file: {}, reason: {}", path(name), e.getMessage());
+ return Mono.just(false);
+ }
+ }
+
+ @Override
+ public Mono<String> createDataStore() {
+ try {
+ if (!Strings.isNullOrEmpty(applicationConfig.getVardataDirectory())) {
+ Files.createDirectories(path());
+ }
+ } catch (IOException e) {
+ logger.error("Could not create directory: {}, reason: {}", path(), e.getMessage());
+ }
+ return Mono.just("OK");
+ }
+
+ private Path path(String name) {
+ return Path.of(path().toString(), name);
+ }
+
+ private Path path() {
+ return Path.of(applicationConfig.getVardataDirectory(), "database", this.location);
+ }
+
+ @Override
+ public Mono<String> deleteAllObjects() {
+ return listObjects("") //
+ .flatMap(this::deleteObject) //
+ .collectList() //
+ .map(o -> "OK");
+ }
+
+ @Override
+ public Mono<byte[]> writeObject(String fileName, byte[] fileData) {
+ try {
+ if (!Strings.isNullOrEmpty(applicationConfig.getVardataDirectory())) {
+ Files.createDirectories(path(fileName).getParent());
+ }
+ File outputFile = path(fileName).toFile();
+
+ try (FileOutputStream outputStream = new FileOutputStream(outputFile)) {
+ outputStream.write(fileData);
+ }
+ } catch (IOException e) {
+ logger.warn("Could not write file: {}, reason; {}", path(fileName), e.getMessage());
+ }
+ return Mono.just(fileData);
+ }
+
+}
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/datastore/NullStore.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/datastore/NullStore.java
new file mode 100644
index 00000000..1ecd8f64
--- /dev/null
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/datastore/NullStore.java
@@ -0,0 +1,68 @@
+/*-
+ * ========================LICENSE_START=================================
+ * ONAP : ccsdk oran
+ * ======================================================================
+ * Copyright (C) 2022 Nordix Foundation. All rights reserved.
+ * ======================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.onap.ccsdk.oran.a1policymanagementservice.datastore;
+
+import java.lang.invoke.MethodHandles;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+class NullStore implements DataStore {
+ private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ public NullStore(String location) {
+ logger.warn("No storage defined for: {}", location);
+ }
+
+ @Override
+ public Flux<String> listObjects(String prefix) {
+ return Flux.empty();
+ }
+
+ @Override
+ public Mono<byte[]> readObject(String name) {
+ return Mono.just(new byte[0]);
+ }
+
+ @Override
+ public Mono<byte[]> writeObject(String name, byte[] fileData) {
+ return Mono.just(new byte[0]);
+ }
+
+ @Override
+ public Mono<Boolean> deleteObject(String name) {
+ return Mono.just(false);
+ }
+
+ @Override
+ public Mono<String> createDataStore() {
+ return Mono.just("");
+ }
+
+ @Override
+ public Mono<String> deleteAllObjects() {
+ return Mono.just("");
+ }
+
+}
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/datastore/S3ObjectStore.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/datastore/S3ObjectStore.java
new file mode 100644
index 00000000..4c7c3c3e
--- /dev/null
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/datastore/S3ObjectStore.java
@@ -0,0 +1,227 @@
+/*-
+ * ========================LICENSE_START=================================
+ * ONAP : ccsdk oran
+ * ======================================================================
+ * Copyright (C) 2022 Nordix Foundation. All rights reserved.
+ * ======================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.onap.ccsdk.oran.a1policymanagementservice.datastore;
+
+import java.net.URI;
+import java.util.concurrent.CompletableFuture;
+
+import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.core.BytesWrapper;
+import software.amazon.awssdk.core.ResponseBytes;
+import software.amazon.awssdk.core.async.AsyncRequestBody;
+import software.amazon.awssdk.core.async.AsyncResponseTransformer;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.s3.S3AsyncClient;
+import software.amazon.awssdk.services.s3.S3AsyncClientBuilder;
+import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
+import software.amazon.awssdk.services.s3.model.CreateBucketResponse;
+import software.amazon.awssdk.services.s3.model.DeleteBucketRequest;
+import software.amazon.awssdk.services.s3.model.DeleteBucketResponse;
+import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
+import software.amazon.awssdk.services.s3.model.DeleteObjectResponse;
+import software.amazon.awssdk.services.s3.model.GetObjectRequest;
+import software.amazon.awssdk.services.s3.model.GetObjectResponse;
+import software.amazon.awssdk.services.s3.model.ListObjectsRequest;
+import software.amazon.awssdk.services.s3.model.ListObjectsResponse;
+import software.amazon.awssdk.services.s3.model.PutObjectRequest;
+import software.amazon.awssdk.services.s3.model.PutObjectResponse;
+import software.amazon.awssdk.services.s3.model.S3Object;
+
+class S3ObjectStore implements DataStore {
+ private static final Logger logger = LoggerFactory.getLogger(S3ObjectStore.class);
+ private final ApplicationConfig applicationConfig;
+
+ private static S3AsyncClient s3AsynchClient;
+ private final String location;
+
+ public S3ObjectStore(ApplicationConfig applicationConfig, String location) {
+ this.applicationConfig = applicationConfig;
+ this.location = location;
+
+ getS3AsynchClient(applicationConfig);
+ }
+
+ private static synchronized S3AsyncClient getS3AsynchClient(ApplicationConfig applicationConfig) {
+ if (applicationConfig.isS3Enabled() && s3AsynchClient == null) {
+ s3AsynchClient = getS3AsyncClientBuilder(applicationConfig).build();
+ }
+ return s3AsynchClient;
+ }
+
+ private static S3AsyncClientBuilder getS3AsyncClientBuilder(ApplicationConfig applicationConfig) {
+ URI uri = URI.create(applicationConfig.getS3EndpointOverride());
+ return S3AsyncClient.builder() //
+ .region(Region.US_EAST_1) //
+ .endpointOverride(uri) //
+ .credentialsProvider(StaticCredentialsProvider.create( //
+ AwsBasicCredentials.create(applicationConfig.getS3AccessKeyId(), //
+ applicationConfig.getS3SecretAccessKey())));
+ }
+
+ @Override
+ public Flux<String> listObjects(String prefix) {
+ return listObjectsInBucket(bucket(), location + "/" + prefix).map(S3Object::key) //
+ .map(this::externalName);
+ }
+
+ @Override
+ public Mono<Boolean> deleteObject(String name) {
+ DeleteObjectRequest request = DeleteObjectRequest.builder() //
+ .bucket(bucket()) //
+ .key(key(name)) //
+ .build();
+
+ CompletableFuture<DeleteObjectResponse> future = s3AsynchClient.deleteObject(request);
+
+ return Mono.fromFuture(future).map(resp -> true);
+ }
+
+ @Override
+ public Mono<byte[]> readObject(String name) {
+ return getDataFromS3Object(bucket(), name);
+ }
+
+ @Override
+ public Mono<byte[]> writeObject(String name, byte[] fileData) {
+
+ PutObjectRequest request = PutObjectRequest.builder() //
+ .bucket(bucket()) //
+ .key(key(name)) //
+ .build();
+
+ AsyncRequestBody body = AsyncRequestBody.fromBytes(fileData);
+
+ CompletableFuture<PutObjectResponse> future = s3AsynchClient.putObject(request, body);
+
+ return Mono.fromFuture(future) //
+ .map(putObjectResponse -> fileData) //
+ .doOnError(t -> logger.error("Failed to store object '{}' in S3 {}", key(name), t.getMessage()));
+ }
+
+ @Override
+ public Mono<String> createDataStore() {
+ return createS3Bucket(bucket());
+ }
+
+ private Mono<String> createS3Bucket(String s3Bucket) {
+
+ CreateBucketRequest request = CreateBucketRequest.builder() //
+ .bucket(s3Bucket) //
+ .build();
+
+ CompletableFuture<CreateBucketResponse> future = s3AsynchClient.createBucket(request);
+
+ return Mono.fromFuture(future) //
+ .map(f -> s3Bucket) //
+ .doOnError(t -> logger.debug("Could not create S3 bucket: {}", t.getMessage()))
+ .onErrorResume(t -> Mono.just(s3Bucket));
+ }
+
+ @Override
+ public Mono<String> deleteAllObjects() {
+ return listObjects("") //
+ .flatMap(this::deleteObject) //
+ .collectList() //
+ .map(resp -> "OK").onErrorResume(t -> Mono.just("NOK"));
+ }
+
+ public Mono<String> deleteBucket() {
+ DeleteBucketRequest request = DeleteBucketRequest.builder() //
+ .bucket(bucket()) //
+ .build();
+
+ CompletableFuture<DeleteBucketResponse> future = s3AsynchClient.deleteBucket(request);
+
+ return Mono.fromFuture(future) //
+ .doOnError(t -> logger.warn("Could not delete bucket: {}, reason: {}", bucket(), t.getMessage()))
+ .map(resp -> bucket()) //
+ .doOnNext(resp -> logger.debug("Deleted bucket: {}", bucket())).onErrorResume(t -> Mono.just("NOK"));
+ }
+
+ private String bucket() {
+ return applicationConfig.getS3Bucket();
+ }
+
+ private Flux<S3Object> listObjectsInBucket(String bucket, String prefix) {
+
+ return listObjectsRequest(bucket, prefix, null) //
+ .expand(response -> listObjectsRequest(bucket, prefix, response)) //
+ .map(ListObjectsResponse::contents) //
+ .doOnNext(f -> logger.debug("Found objects in {}: {}", bucket, f.size())) //
+ .doOnError(t -> logger.warn("Error fromlist objects: {}", t.getMessage())) //
+ .flatMap(Flux::fromIterable) //
+ .doOnNext(obj -> logger.debug("Found object: {}", obj.key()));
+ }
+
+ private Mono<ListObjectsResponse> listObjectsRequest(String bucket, String prefix,
+ ListObjectsResponse prevResponse) {
+ ListObjectsRequest.Builder builder = ListObjectsRequest.builder() //
+ .bucket(bucket) //
+ .maxKeys(1000) //
+ .prefix(prefix);
+
+ if (prevResponse != null) {
+ if (Boolean.TRUE.equals(prevResponse.isTruncated())) {
+ builder.marker(prevResponse.nextMarker());
+ } else {
+ return Mono.empty();
+ }
+ }
+
+ ListObjectsRequest listObjectsRequest = builder.build();
+ CompletableFuture<ListObjectsResponse> future = s3AsynchClient.listObjects(listObjectsRequest);
+ return Mono.fromFuture(future);
+ }
+
+ private Mono<byte[]> getDataFromS3Object(String bucket, String name) {
+
+ GetObjectRequest request = GetObjectRequest.builder() //
+ .bucket(bucket) //
+ .key(key(name)) //
+ .build();
+
+ CompletableFuture<ResponseBytes<GetObjectResponse>> future =
+ s3AsynchClient.getObject(request, AsyncResponseTransformer.toBytes());
+
+ return Mono.fromFuture(future) //
+ .map(BytesWrapper::asByteArray) //
+ .doOnError(t -> logger.error("Failed to get file from S3, key:{}, bucket: {}, {}", key(name), bucket,
+ t.getMessage())) //
+ .doOnEach(n -> logger.debug("Read file from S3: {} {}", bucket, key(name))) //
+ .onErrorResume(t -> Mono.empty());
+ }
+
+ private String key(String name) {
+ return location + "/" + name;
+ }
+
+ private String externalName(String internalName) {
+ return internalName.substring(key("").length());
+ }
+
+}
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Policies.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Policies.java
index ef924748..d808b57d 100644
--- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Policies.java
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Policies.java
@@ -2,7 +2,7 @@
* ========================LICENSE_START=================================
* ONAP : ccsdk oran
* ======================================================================
- * Copyright (C) 2019-2020 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2019-2022 Nordix Foundation. All rights reserved.
* ======================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -23,18 +23,14 @@ package org.onap.ccsdk.oran.a1policymanagementservice.repository;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.PrintStream;
import java.lang.invoke.MethodHandles;
-import java.nio.file.Files;
-import java.nio.file.Path;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.Vector;
@@ -42,17 +38,17 @@ import lombok.Builder;
import lombok.Getter;
import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfig;
+import org.onap.ccsdk.oran.a1policymanagementservice.datastore.DataStore;
import org.onap.ccsdk.oran.a1policymanagementservice.exceptions.EntityNotFoundException;
-import org.onap.ccsdk.oran.a1policymanagementservice.exceptions.ServiceException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.util.FileSystemUtils;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
import reactor.util.annotation.Nullable;
@SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
-@Configuration
public class Policies {
@Getter
@@ -73,12 +69,31 @@ public class Policies {
private MultiMap<Policy> policiesRic = new MultiMap<>();
private MultiMap<Policy> policiesService = new MultiMap<>();
private MultiMap<Policy> policiesType = new MultiMap<>();
+ private final DataStore dataStore;
private final ApplicationConfig appConfig;
private static Gson gson = new GsonBuilder().create();
public Policies(@Autowired ApplicationConfig appConfig) {
this.appConfig = appConfig;
+ this.dataStore = DataStore.create(appConfig, "policies");
+ }
+
+ public Flux<Policy> restoreFromDatabase(Ric ric, PolicyTypes types) {
+ return dataStore.createDataStore() //
+ .flatMapMany(x -> dataStore.listObjects(getPath(ric))) //
+ .flatMap(dataStore::readObject) //
+ .map(String::new) //
+ .map(json -> gson.fromJson(json, PersistentPolicyInfo.class)) //
+ .map(policyInfo -> toPolicy(policyInfo, ric, types)) //
+ .doOnNext(this::put) //
+ .filter(Objects::nonNull) //
+ .doOnError(t -> logger.warn("Could not restore policy database for RIC: {}, reason : {}", ric.id(),
+ t.getMessage())) //
+ .doFinally(sig -> logger.debug("Restored policy database for RIC: {}, number of policies: {}", ric.id(),
+ this.policiesRic.get(ric.id()).size())) //
+ .onErrorResume(t -> Flux.empty()) //
+ ;
}
public synchronized void put(Policy policy) {
@@ -133,11 +148,7 @@ public class Policies {
public synchronized void remove(Policy policy) {
if (!policy.isTransient()) {
- try {
- Files.delete(getPath(policy));
- } catch (Exception e) {
- logger.debug("Could not delete policy from database: {}", e.getMessage());
- }
+ dataStore.deleteObject(getPath(policy)).subscribe();
}
policiesId.remove(policy.getId());
policiesRic.remove(policy.getRic().id(), policy.getId());
@@ -175,24 +186,15 @@ public class Policies {
Set<String> keys = policiesId.keySet();
removeId(keys.iterator().next());
}
- try {
- if (this.appConfig.getVardataDirectory() != null) {
- FileSystemUtils.deleteRecursively(getDatabasePath());
- }
- } catch (Exception e) {
- logger.warn("Could not delete policy database : {}", e.getMessage());
- }
+ dataStore.deleteAllObjects().onErrorResume(t -> Mono.empty()).subscribe();
}
public void store(Policy policy) {
- try {
- Files.createDirectories(getDatabasePath(policy.getRic()));
- try (PrintStream out = new PrintStream(new FileOutputStream(getFile(policy)))) {
- out.print(gson.toJson(toStorageObject(policy)));
- }
- } catch (Exception e) {
- logger.warn("Could not store policy: {} {}", policy.getId(), e.getMessage());
- }
+
+ byte[] bytes = gson.toJson(toStorageObject(policy)).getBytes();
+ this.dataStore.writeObject(this.getPath(policy), bytes) //
+ .doOnError(t -> logger.error("Could not store job in S3, reason: {}", t.getMessage())) //
+ .subscribe();
}
private boolean isMatch(String filterValue, String actualValue) {
@@ -218,29 +220,6 @@ public class Policies {
return filtered;
}
- private File getFile(Policy policy) throws ServiceException {
- return getPath(policy).toFile();
- }
-
- private Path getPath(Policy policy) throws ServiceException {
- return Path.of(getDatabaseDirectory(policy.getRic()), policy.getId() + ".json");
- }
-
- public synchronized void restoreFromDatabase(Ric ric, PolicyTypes types) {
- try {
- Files.createDirectories(getDatabasePath(ric));
- for (File file : getDatabasePath(ric).toFile().listFiles()) {
- String json = Files.readString(file.toPath());
- PersistentPolicyInfo policyStorage = gson.fromJson(json, PersistentPolicyInfo.class);
- this.put(toPolicy(policyStorage, ric, types));
- }
- logger.debug("Restored policy database for RIC: {}, number of policies: {}", ric.id(),
- this.policiesRic.get(ric.id()).size());
- } catch (Exception e) {
- logger.warn("Could not restore policy database for RIC: {}, reason : {}", ric.id(), e.getMessage());
- }
- }
-
private PersistentPolicyInfo toStorageObject(Policy p) {
return PersistentPolicyInfo.builder() //
.id(p.getId()) //
@@ -254,35 +233,30 @@ public class Policies {
.build();
}
- Policy toPolicy(PersistentPolicyInfo p, Ric ric, PolicyTypes types) throws EntityNotFoundException {
- return Policy.builder() //
- .id(p.getId()) //
- .isTransient(p.isTransient()) //
- .json(p.getJson()) //
- .lastModified(Instant.parse(p.lastModified)) //
- .ownerServiceId(p.getOwnerServiceId()) //
- .ric(ric) //
- .statusNotificationUri(p.getStatusNotificationUri()) //
- .type(types.getType(p.getTypeId())) //
- .build();
- }
-
- private Path getDatabasePath(Ric ric) throws ServiceException {
- return Path.of(getDatabaseDirectory(ric));
+ private Policy toPolicy(PersistentPolicyInfo p, Ric ric, PolicyTypes types) {
+ try {
+ return Policy.builder() //
+ .id(p.getId()) //
+ .isTransient(p.isTransient()) //
+ .json(p.getJson()) //
+ .lastModified(Instant.parse(p.lastModified)) //
+ .ownerServiceId(p.getOwnerServiceId()) //
+ .ric(ric) //
+ .statusNotificationUri(p.getStatusNotificationUri()) //
+ .type(types.getType(p.getTypeId())) //
+ .build();
+ } catch (EntityNotFoundException e) {
+ logger.warn("Not found: {}", e.getMessage());
+ return null;
+ }
}
- private String getDatabaseDirectory(Ric ric) throws ServiceException {
- return getDatabaseDirectory() + "/" + ric.id();
+ private String getPath(Policy policy) {
+ return getPath(policy.getRic()) + "/" + policy.getId() + ".json";
}
- private String getDatabaseDirectory() throws ServiceException {
- if (appConfig.getVardataDirectory() == null) {
- throw new ServiceException("No database storage provided");
- }
- return appConfig.getVardataDirectory() + "/database/policyInstances";
+ private String getPath(Ric ric) {
+ return ric.id();
}
- private Path getDatabasePath() throws ServiceException {
- return Path.of(getDatabaseDirectory());
- }
}
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/PolicyTypes.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/PolicyTypes.java
index 327dee60..b4e9c654 100644
--- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/PolicyTypes.java
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/PolicyTypes.java
@@ -23,13 +23,7 @@ package org.onap.ccsdk.oran.a1policymanagementservice.repository;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.PrintStream;
import java.lang.invoke.MethodHandles;
-import java.nio.file.Files;
-import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@@ -38,25 +32,25 @@ import java.util.Map;
import java.util.Vector;
import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfig;
+import org.onap.ccsdk.oran.a1policymanagementservice.datastore.DataStore;
import org.onap.ccsdk.oran.a1policymanagementservice.exceptions.EntityNotFoundException;
import org.onap.ccsdk.oran.a1policymanagementservice.exceptions.ServiceException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.context.annotation.Configuration;
import org.springframework.lang.Nullable;
-import org.springframework.util.FileSystemUtils;
-@Configuration
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
public class PolicyTypes {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private Map<String, PolicyType> types = new HashMap<>();
- private final ApplicationConfig appConfig;
private static Gson gson = new GsonBuilder().create();
+ private final DataStore dataStore;
public PolicyTypes(@Autowired ApplicationConfig appConfig) {
- this.appConfig = appConfig;
- restoreFromDatabase();
+ this.dataStore = DataStore.create(appConfig, "policytypes");
}
public synchronized PolicyType getType(String name) throws EntityNotFoundException {
@@ -113,55 +107,26 @@ public class PolicyTypes {
public synchronized void clear() {
this.types.clear();
- try {
- FileSystemUtils.deleteRecursively(getDatabasePath());
- } catch (IOException | ServiceException e) {
- logger.warn("Could not delete policy type database : {}", e.getMessage());
- }
+ dataStore.deleteAllObjects().onErrorResume(t -> Mono.empty()).subscribe();
}
public void store(PolicyType type) {
- try {
- Files.createDirectories(getDatabasePath());
- try (PrintStream out = new PrintStream(new FileOutputStream(getFile(type)))) {
- out.print(gson.toJson(type));
- }
- } catch (ServiceException e) {
- logger.debug("Could not store policy type: {} {}", type.getId(), e.getMessage());
- } catch (IOException e) {
- logger.warn("Could not store policy type: {} {}", type.getId(), e.getMessage());
- }
+ byte[] bytes = gson.toJson(type).getBytes();
+ dataStore.writeObject(getPath(type), bytes) //
+ .doOnError(t -> logger.warn("Could not store policy type: {} {}", type.getId(), t.getMessage()))
+ .subscribe();
}
- private File getFile(PolicyType type) throws ServiceException {
- return Path.of(getDatabaseDirectory(), type.getId() + ".json").toFile();
- }
+ public Flux<PolicyType> restoreFromDatabase() {
- void restoreFromDatabase() {
- try {
- Files.createDirectories(getDatabasePath());
- for (File file : getDatabasePath().toFile().listFiles()) {
- String json = Files.readString(file.toPath());
- PolicyType type = gson.fromJson(json, PolicyType.class);
- this.types.put(type.getId(), type);
- }
- logger.debug("Restored type database,no of types: {}", this.types.size());
- } catch (ServiceException e) {
- logger.debug("Could not restore policy type database : {}", e.getMessage());
- } catch (Exception e) {
- logger.warn("Could not restore policy type database : {}", e.getMessage());
- }
- }
-
- private String getDatabaseDirectory() throws ServiceException {
- if (appConfig.getVardataDirectory() == null) {
- throw new ServiceException("No policy type storage provided");
- }
- return appConfig.getVardataDirectory() + "/database/policyTypes";
- }
+ return this.dataStore.createDataStore().flatMapMany(x -> dataStore.listObjects("")) //
+ .flatMap(dataStore::readObject) //
+ .map(String::new) //
+ .map(json -> gson.fromJson(json, PolicyType.class)).doOnNext(type -> this.types.put(type.getId(), type)) //
+ .doOnError(t -> logger.warn("Could not restore policy type database : {}", t.getMessage())) //
+ .doFinally(sig -> logger.debug("Restored type database,no of types: {}", this.types.size()))
+ .onErrorResume(t -> Flux.empty()); //
- private Path getDatabasePath() throws ServiceException {
- return Path.of(getDatabaseDirectory());
}
private static Collection<PolicyType> filterTypeName(Collection<PolicyType> types, String typeName) {
@@ -188,4 +153,8 @@ public class PolicyTypes {
return result;
}
+ private String getPath(PolicyType type) {
+ return type.getId() + ".json";
+ }
+
}
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Services.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Services.java
index 68a8d7dd..9c2846a9 100644
--- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Services.java
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Services.java
@@ -2,7 +2,7 @@
* ========================LICENSE_START=================================
* ONAP : ccsdk oran
* ======================================================================
- * Copyright (C) 2019-2020 Nordix Foundation. All rights reserved.
+ * Copyright (C) 2019-2022 Nordix Foundation. All rights reserved.
* ======================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -22,33 +22,29 @@ package org.onap.ccsdk.oran.a1policymanagementservice.repository;
import com.google.gson.Gson;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.PrintStream;
-import java.nio.file.Files;
-import java.nio.file.Path;
import java.util.HashMap;
import java.util.Map;
import java.util.Vector;
import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfig;
+import org.onap.ccsdk.oran.a1policymanagementservice.datastore.DataStore;
import org.onap.ccsdk.oran.a1policymanagementservice.exceptions.ServiceException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.util.FileSystemUtils;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
public class Services {
private static final Logger logger = LoggerFactory.getLogger(Services.class);
private static Gson gson = Service.createGson();
- private final ApplicationConfig appConfig;
+ private final DataStore dataStore;
private Map<String, Service> registeredServices = new HashMap<>();
public Services(@Autowired ApplicationConfig appConfig) {
- this.appConfig = appConfig;
- restoreFromDatabase();
+ this.dataStore = DataStore.create(appConfig, "services");
}
public synchronized Service getService(String name) throws ServiceException {
@@ -77,11 +73,7 @@ public class Services {
public synchronized void remove(String name) {
Service service = registeredServices.remove(name);
if (service != null) {
- try {
- Files.delete(getPath(service));
- } catch (Exception e) {
- // Doesn't matter.
- }
+ dataStore.deleteObject(getPath(service)).subscribe();
}
}
@@ -91,59 +83,29 @@ public class Services {
public synchronized void clear() {
registeredServices.clear();
- try {
- FileSystemUtils.deleteRecursively(getDatabasePath());
- } catch (Exception e) {
- logger.warn("Could not delete services database : {}", e.getMessage());
- }
+ dataStore.deleteAllObjects().onErrorResume(t -> Mono.empty()).subscribe();
}
public void store(Service service) {
- try {
- Files.createDirectories(getDatabasePath());
- try (PrintStream out = new PrintStream(new FileOutputStream(getFile(service)))) {
- String str = gson.toJson(service);
- out.print(str);
- }
- } catch (ServiceException e) {
- logger.debug("Could not store service: {} {}", service.getName(), e.getMessage());
- } catch (IOException e) {
- logger.warn("Could not store pservice: {} {}", service.getName(), e.getMessage());
- }
- }
-
- private File getFile(Service service) throws ServiceException {
- return getPath(service).toFile();
+ byte[] bytes = gson.toJson(service).getBytes();
+ dataStore.writeObject(getPath(service), bytes) //
+ .doOnError(t -> logger.warn("Could not service: {} {}", service.getName(), t.getMessage())).subscribe();
}
- private Path getPath(Service service) throws ServiceException {
- return Path.of(getDatabaseDirectory(), service.getName() + ".json");
+ public Flux<Service> restoreFromDatabase() {
+ return dataStore.createDataStore().flatMapMany(ds -> dataStore.listObjects("")) //
+ .flatMap(dataStore::readObject, 1) //
+ .map(String::new) //
+ .map(json -> gson.fromJson(json, Service.class))
+ .doOnNext(service -> this.registeredServices.put(service.getName(), service))
+ .doOnError(t -> logger.warn("Could not restore services database : {}", t.getMessage()))
+ .doFinally(sig -> logger.debug("Restored type database,no of services: {}",
+ this.registeredServices.size())) //
+ .onErrorResume(t -> Flux.empty()); //
}
- void restoreFromDatabase() {
- try {
- Files.createDirectories(getDatabasePath());
- for (File file : getDatabasePath().toFile().listFiles()) {
- String json = Files.readString(file.toPath());
- Service service = gson.fromJson(json, Service.class);
- this.registeredServices.put(service.getName(), service);
- }
- logger.debug("Restored type database,no of services: {}", this.registeredServices.size());
- } catch (ServiceException e) {
- logger.debug("Could not restore services database : {}", e.getMessage());
- } catch (Exception e) {
- logger.warn("Could not restore services database : {}", e.getMessage());
- }
+ private String getPath(Service service) {
+ return service.getName() + ".json";
}
- private String getDatabaseDirectory() throws ServiceException {
- if (appConfig.getVardataDirectory() == null) {
- throw new ServiceException("No storage provided");
- }
- return appConfig.getVardataDirectory() + "/database/services";
- }
-
- private Path getDatabasePath() throws ServiceException {
- return Path.of(getDatabaseDirectory());
- }
}
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java
index 983e92e6..567bb8d2 100644
--- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java
@@ -105,7 +105,7 @@ public class RefreshConfigTask {
refreshTask = createRefreshTask() //
.subscribe(
notUsed -> logger.debug("Refreshed configuration data"), throwable -> logger
- .error("Configuration refresh terminated due to exception {}", throwable.toString()),
+ .error("Configuration refresh terminated due to exception {}", throwable.getMessage()),
() -> logger.error("Configuration refresh terminated"));
}
@@ -128,6 +128,7 @@ public class RefreshConfigTask {
.flatMap(this::parseConfiguration) //
.flatMap(this::updateConfig, CONCURRENCY) //
.flatMap(this::handleUpdatedRicConfig) //
+ .doOnError(t -> logger.error("Cannot update config {}", t.getMessage()))
.doFinally(signal -> logger.error("Configuration refresh task is terminated: {}", signal));
}
@@ -200,7 +201,7 @@ public class RefreshConfigTask {
void addRic(Ric ric) {
this.rics.put(ric);
if (this.appConfig.getVardataDirectory() != null) {
- this.policies.restoreFromDatabase(ric, this.policyTypes);
+ this.policies.restoreFromDatabase(ric, this.policyTypes).subscribe();
}
logger.debug("Added RIC: {}", ric.id());
}
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java
index 7a5f73d4..fdeb47e2 100644
--- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java
@@ -21,6 +21,7 @@
package org.onap.ccsdk.oran.a1policymanagementservice.tasks;
import java.util.Collection;
+
import org.onap.ccsdk.oran.a1policymanagementservice.clients.A1Client;
import org.onap.ccsdk.oran.a1policymanagementservice.clients.A1ClientFactory;
import org.onap.ccsdk.oran.a1policymanagementservice.clients.AsyncRestClientFactory;
@@ -40,6 +41,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
+
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
diff --git a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshCounterTaskTest.java b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/configuration/MetersTest.java
index bd315c26..5ab3d5a3 100644
--- a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshCounterTaskTest.java
+++ b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/configuration/MetersTest.java
@@ -18,23 +18,21 @@
* ========================LICENSE_END===================================
*/
-package org.onap.ccsdk.oran.a1policymanagementservice.tasks;
+package org.onap.ccsdk.oran.a1policymanagementservice.configuration;
import static org.assertj.core.api.Assertions.assertThat;
-import static org.mockito.Mockito.spy;
-import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.prometheus.PrometheusConfig;
import io.micrometer.prometheus.PrometheusMeterRegistry;
+
import java.time.Instant;
import java.util.Arrays;
import java.util.Vector;
+
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.junit.jupiter.MockitoExtension;
-import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfig;
-import org.onap.ccsdk.oran.a1policymanagementservice.configuration.RicConfig;
import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policies;
import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policy;
import org.onap.ccsdk.oran.a1policymanagementservice.repository.PolicyType;
@@ -43,7 +41,7 @@ import org.onap.ccsdk.oran.a1policymanagementservice.repository.Ric;
import org.onap.ccsdk.oran.a1policymanagementservice.repository.Rics;
@ExtendWith(MockitoExtension.class)
-class RefreshCounterTaskTest {
+public class MetersTest {
private static final String POLICY_TYPE_1_NAME = "type1";
private static final PolicyType POLICY_TYPE_1 = PolicyType.builder().id(POLICY_TYPE_1_NAME).schema("").build();
@@ -57,21 +55,28 @@ class RefreshCounterTaskTest {
.ric(RIC_1).type(POLICY_TYPE_1).lastModified(Instant.now()).isTransient(false)
.statusNotificationUri("statusNotificationUri").build();
- private final PrometheusMeterRegistry prometheusMeterRegistry = new PrometheusMeterRegistry(PrometheusConfig.DEFAULT);
+ private PrometheusMeterRegistry prometheusMeterRegistry = new PrometheusMeterRegistry(PrometheusConfig.DEFAULT);
private final ApplicationConfig appConfig = new ApplicationConfig();
- private PolicyTypes policyTypes;
+ private PolicyTypes types;
private Policies policies;
private Rics rics = new Rics();
+ Meters testObject;
+
@BeforeEach
void init() {
- policyTypes = new PolicyTypes(appConfig);
+ types = new PolicyTypes(appConfig);
policies = new Policies(appConfig);
rics.clear();
+ policies.clear();
+ types.clear();
+
RIC_1.setState(Ric.RicState.AVAILABLE);
RIC_1.clearSupportedPolicyTypes();
+
+ this.testObject = createMeters();
}
@Test
@@ -80,16 +85,15 @@ class RefreshCounterTaskTest {
RIC_1.addSupportedPolicyType(POLICY_TYPE_1);
rics.put(RIC_1);
- policyTypes.put(POLICY_TYPE_1);
+ types.put(POLICY_TYPE_1);
policies.put(POLICY_1);
- RefreshCounterTask spy = spy(createRefreshCounterTask()); // instantiate RefreshCounterTask
- MeterRegistry meterRegistry = spy.getMeterRegistry();
+ createMeters();
- assertThat(meterRegistry.get("total_ric_count").gauge().value()).isEqualTo(1);
- assertThat(meterRegistry.get("total_policy_type_count").gauge().value()).isEqualTo(1);
- assertThat(meterRegistry.get("total_policy_count").gauge().value()).isEqualTo(1);
+ assertThat(prometheusMeterRegistry.get("total_ric_count").gauge().value()).isEqualTo(1);
+ assertThat(prometheusMeterRegistry.get("total_policy_type_count").gauge().value()).isEqualTo(1);
+ assertThat(prometheusMeterRegistry.get("total_policy_count").gauge().value()).isEqualTo(1);
}
@Test
@@ -98,30 +102,27 @@ class RefreshCounterTaskTest {
RIC_1.addSupportedPolicyType(POLICY_TYPE_1);
rics.put(RIC_1);
- policyTypes.put(POLICY_TYPE_1);
+ types.put(POLICY_TYPE_1);
policies.put(POLICY_1);
String POLICY_2_ID = "policyId2";
- Policy POLICY_2 = Policy.builder()
- .id(POLICY_2_ID)
- .json("")
- .ownerServiceId("service")
- .ric(RIC_1)
- .type(POLICY_TYPE_1)
- .lastModified(Instant.now())
+ Policy POLICY_2 = Policy.builder() //
+ .id(POLICY_2_ID) //
+ .json("") //
+ .ownerServiceId("service") //
+ .ric(RIC_1) //
+ .type(POLICY_TYPE_1) //
+ .lastModified(Instant.now()) //
.isTransient(false) //
- .statusNotificationUri("statusNotificationUri")
+ .statusNotificationUri("statusNotificationUri") //
.build();
policies.put(POLICY_2);
- RefreshCounterTask spy = spy(createRefreshCounterTask()); // instantiate RefreshCounterTask
- MeterRegistry meterRegistry = spy.getMeterRegistry();
-
- assertThat(meterRegistry.get("total_ric_count").gauge().value()).isEqualTo(1);
- assertThat(meterRegistry.get("total_policy_type_count").gauge().value()).isEqualTo(1);
- assertThat(meterRegistry.get("total_policy_count").gauge().value()).isEqualTo(2);
+ assertThat(prometheusMeterRegistry.get("total_ric_count").gauge().value()).isEqualTo(1);
+ assertThat(prometheusMeterRegistry.get("total_policy_type_count").gauge().value()).isEqualTo(1);
+ assertThat(prometheusMeterRegistry.get("total_policy_count").gauge().value()).isEqualTo(2);
}
@Test
@@ -129,29 +130,26 @@ class RefreshCounterTaskTest {
RIC_1.setState(Ric.RicState.AVAILABLE);
String POLICY_TYPE_2_NAME = "type2";
- PolicyType POLICY_TYPE_2 = PolicyType.builder()
- .id(POLICY_TYPE_2_NAME)
- .schema("")
+ PolicyType POLICY_TYPE_2 = PolicyType.builder() //
+ .id(POLICY_TYPE_2_NAME) //
+ .schema("") //
.build();
RIC_1.addSupportedPolicyType(POLICY_TYPE_1);
RIC_1.addSupportedPolicyType(POLICY_TYPE_2);
rics.put(RIC_1);
- policyTypes.put(POLICY_TYPE_1);
- policyTypes.put(POLICY_TYPE_2);
+ types.put(POLICY_TYPE_1);
+ types.put(POLICY_TYPE_2);
policies.put(POLICY_1);
- RefreshCounterTask spy = spy(createRefreshCounterTask()); // instantiate RefreshCounterTask
- MeterRegistry meterRegistry = spy.getMeterRegistry();
-
- assertThat(meterRegistry.get("total_ric_count").gauge().value()).isEqualTo(1);
- assertThat(meterRegistry.get("total_policy_type_count").gauge().value()).isEqualTo(2);
- assertThat(meterRegistry.get("total_policy_count").gauge().value()).isEqualTo(1);
+ assertThat(prometheusMeterRegistry.get("total_ric_count").gauge().value()).isEqualTo(1);
+ assertThat(prometheusMeterRegistry.get("total_policy_type_count").gauge().value()).isEqualTo(2);
+ assertThat(prometheusMeterRegistry.get("total_policy_count").gauge().value()).isEqualTo(1);
}
- private RefreshCounterTask createRefreshCounterTask() {
- return new RefreshCounterTask(rics, policyTypes, policies, prometheusMeterRegistry);
+ private Meters createMeters() {
+ return new Meters(rics, types, policies, prometheusMeterRegistry);
}
}
diff --git a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v2/ApplicationTest.java b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v2/ApplicationTest.java
index ab58027a..76838bb8 100644
--- a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v2/ApplicationTest.java
+++ b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v2/ApplicationTest.java
@@ -44,6 +44,7 @@ import java.util.Collections;
import java.util.List;
import org.json.JSONObject;
+import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.onap.ccsdk.oran.a1policymanagementservice.clients.A1ClientFactory;
@@ -87,6 +88,7 @@ import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.test.context.TestPropertySource;
+import org.springframework.util.FileSystemUtils;
import org.springframework.web.reactive.function.client.WebClientResponseException;
import reactor.core.publisher.Mono;
@@ -98,8 +100,9 @@ import reactor.util.annotation.Nullable;
"server.ssl.key-store=./config/keystore.jks", //
"app.webclient.trust-store=./config/truststore.jks", //
"app.webclient.trust-store-used=true", //
- "app.vardata-directory=./target/testdata", //
- "app.filepath=" //
+ "app.vardata-directory=/tmp/pmstest", //
+ "app.filepath=", //
+ "app.s3.bucket=" // If this is set, S3 will be used to store data.
})
class ApplicationTest {
private static final Logger logger = LoggerFactory.getLogger(ApplicationTest.class);
@@ -178,6 +181,15 @@ class ApplicationTest {
this.securityContext.setAuthTokenFilePath(null);
}
+ @AfterAll
+ static void clearTestDir() {
+ try {
+ FileSystemUtils.deleteRecursively(Path.of("/tmp/pmstest"));
+ } catch (Exception e) {
+ logger.warn("Could test directory : {}", e.getMessage());
+ }
+ }
+
@AfterEach
void verifyNoRicLocks() {
for (Ric ric : this.rics.getRics()) {
@@ -205,7 +217,7 @@ class ApplicationTest {
}
@Test
- void testPersistencyPolicies() throws ServiceException {
+ void testPersistencyPolicies() throws Exception {
Ric ric = this.addRic("ric1");
PolicyType type = this.addPolicyType("type1", ric.id());
@@ -213,37 +225,50 @@ class ApplicationTest {
for (int i = 0; i < noOfPolicies; ++i) {
addPolicy("id" + i, type.getId(), "service", ric.id());
}
+ waitforS3();
{
Policies policies = new Policies(this.applicationConfig);
- policies.restoreFromDatabase(ric, this.policyTypes);
+ policies.restoreFromDatabase(ric, this.policyTypes).blockLast();
assertThat(policies.size()).isEqualTo(noOfPolicies);
}
{
restClient().delete("/policies/id2").block();
Policies policies = new Policies(this.applicationConfig);
- policies.restoreFromDatabase(ric, this.policyTypes);
+ policies.restoreFromDatabase(ric, this.policyTypes).blockLast();
assertThat(policies.size()).isEqualTo(noOfPolicies - 1);
}
}
@Test
- void testPersistencyPolicyTypes() throws ServiceException {
+ void testPersistencyPolicyTypes() throws Exception {
Ric ric = this.addRic("ric1");
this.addPolicyType("type1", ric.id());
+ waitforS3();
+
PolicyTypes types = new PolicyTypes(this.applicationConfig);
+ types.restoreFromDatabase().blockLast();
assertThat(types.size()).isEqualTo(1);
}
+ @SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests.
+ private void waitforS3() throws Exception {
+ if (applicationConfig.isS3Enabled()) {
+ Thread.sleep(1000);
+ }
+ }
+
@Test
- void testPersistencyService() throws ServiceException {
+ void testPersistencyService() throws Exception {
final String SERVICE = "serviceName";
putService(SERVICE, 1234, HttpStatus.CREATED);
assertThat(this.services.size()).isEqualTo(1);
Service service = this.services.getService(SERVICE);
+ waitforS3();
Services servicesRestored = new Services(this.applicationConfig);
+ servicesRestored.restoreFromDatabase().blockLast();
Service serviceRestored = servicesRestored.getService(SERVICE);
assertThat(servicesRestored.size()).isEqualTo(1);
assertThat(serviceRestored.getCallbackUrl()).isEqualTo(service.getCallbackUrl());
diff --git a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervisionTest.java b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervisionTest.java
index d9b36d9c..b2bf58e4 100644
--- a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervisionTest.java
+++ b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervisionTest.java
@@ -34,6 +34,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Vector;
+
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;