diff options
author | Dan Timoney <dtimoney@att.com> | 2022-11-11 17:10:37 +0000 |
---|---|---|
committer | Gerrit Code Review <gerrit@onap.org> | 2022-11-11 17:10:37 +0000 |
commit | 23189ffaa1099cfd83fd54fd771c4f40ccb95454 (patch) | |
tree | 7b62ce472a79b360e0269bc241e36b2aa71d0f75 /a1-policy-management | |
parent | 7e8f1778e5d5b10c476368e18709c7713d60d0e6 (diff) | |
parent | e3693cfdaf24a9ecaa341c938eb8b1d61a61cbf6 (diff) |
Merge "Support for using Amazon S3 - Cloud Object Storage"
Diffstat (limited to 'a1-policy-management')
18 files changed, 752 insertions, 288 deletions
diff --git a/a1-policy-management/config/application.yaml b/a1-policy-management/config/application.yaml index 80bb1568..2bd8d83d 100644 --- a/a1-policy-management/config/application.yaml +++ b/a1-policy-management/config/application.yaml @@ -77,7 +77,7 @@ app: http.proxy-host: http.proxy-port: 0 http.proxy-type: HTTP - # path where the service can store data + # path where the service can store data. This parameter is not relevant if S3 Object store is configured. vardata-directory: /var/policy-management-service # the config-file-schema-path referres to a location in the jar file. If this property is empty or missing, # no schema validation will be executed. @@ -85,3 +85,9 @@ app: # A file containing an authorization token, which shall be inserted in each HTTP header (authorization). # If the file name is empty, no authorization token is sent. auth-token-file: + # S3 object store usage is enabled by defining the bucket to use. This will override the vardata-directory parameter. + s3: + endpointOverride: http://localhost:9000 + accessKeyId: minio + secretAccessKey: miniostorage + bucket: diff --git a/a1-policy-management/pom.xml b/a1-policy-management/pom.xml index 436937a3..80a1a449 100644 --- a/a1-policy-management/pom.xml +++ b/a1-policy-management/pom.xml @@ -44,12 +44,11 @@ <guava.version>31.0.1-jre</guava.version> <docker-maven-plugin>0.30.0</docker-maven-plugin> <surefire-maven-plugin.version>3.0.0-M5</surefire-maven-plugin.version> - <snakeyaml.version>1.32</snakeyaml.version><!-- overrides version included via spring-boot-starter:jar:2.6.11 to address CVE-2022-38752. Remove later if possible --> - <!-- Version must be higher than version 2.19.1 that is defined in the parent pom for JUnit 5 tests to be run. Do not remove! --> <jacoco-maven-plugin.version>0.8.6</jacoco-maven-plugin.version> <swagger-codegen-maven-plugin.version>3.0.11</swagger-codegen-maven-plugin.version> <exec.skip>true</exec.skip> <ccsdk.project.version>${project.version}</ccsdk.project.version> + <software.amazon.awssdk.version>2.17.292</software.amazon.awssdk.version> </properties> <dependencies> <dependency> @@ -145,6 +144,11 @@ <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-aop</artifactId> </dependency> + <dependency> + <groupId>software.amazon.awssdk</groupId> + <artifactId>s3</artifactId> + <version>${software.amazon.awssdk.version}</version> + </dependency> <!--REQUIRED TO GENERATE DOCUMENTATION --> <dependency> <groupId>io.springfox</groupId> @@ -432,4 +436,4 @@ </plugin> </plugins> </build> -</project> +</project>
\ No newline at end of file diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/BeanFactory.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/BeanFactory.java index 8fc8bc8c..0d93eae2 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/BeanFactory.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/BeanFactory.java @@ -20,11 +20,13 @@ package org.onap.ccsdk.oran.a1policymanagementservice; -import com.fasterxml.jackson.databind.ObjectMapper; + import org.apache.catalina.connector.Connector; import org.onap.ccsdk.oran.a1policymanagementservice.clients.A1ClientFactory; import org.onap.ccsdk.oran.a1policymanagementservice.clients.SecurityContext; import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfig; +import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policies; +import org.onap.ccsdk.oran.a1policymanagementservice.repository.PolicyTypes; import org.onap.ccsdk.oran.a1policymanagementservice.repository.Rics; import org.onap.ccsdk.oran.a1policymanagementservice.repository.Services; import org.springframework.beans.factory.annotation.Autowired; @@ -52,18 +54,27 @@ public class BeanFactory { @Bean public Services getServices(@Autowired ApplicationConfig applicationConfig) { - return new Services(applicationConfig); + Services services = new Services(applicationConfig); + services.restoreFromDatabase().subscribe(); + return services; } @Bean - public A1ClientFactory getA1ClientFactory(@Autowired ApplicationConfig applicationConfig, - @Autowired SecurityContext securityContext) { - return new A1ClientFactory(applicationConfig, securityContext); + public PolicyTypes getPolicyTypes(@Autowired ApplicationConfig applicationConfig) { + PolicyTypes types = new PolicyTypes(applicationConfig); + types.restoreFromDatabase().blockLast(); + return types; + } + + @Bean + public Policies getPolicies(@Autowired ApplicationConfig applicationConfig) { + return new Policies(applicationConfig); } @Bean - public ObjectMapper mapper() { - return new ObjectMapper(); + public A1ClientFactory getA1ClientFactory(@Autowired ApplicationConfig applicationConfig, + @Autowired SecurityContext securityContext) { + return new A1ClientFactory(applicationConfig, securityContext); } @Bean diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/configuration/ApplicationConfig.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/configuration/ApplicationConfig.java index 0e2294fc..6dbf318d 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/configuration/ApplicationConfig.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/configuration/ApplicationConfig.java @@ -20,6 +20,8 @@ package org.onap.ccsdk.oran.a1policymanagementservice.configuration; +import com.google.common.base.Strings; + import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -82,6 +84,22 @@ public class ApplicationConfig { @Value("${app.webclient.http.proxy-type:HTTP}") private String httpProxyType = "HTTP"; + @Getter + @Value("${app.s3.endpointOverride:}") + private String s3EndpointOverride; + + @Getter + @Value("${app.s3.accessKeyId:}") + private String s3AccessKeyId; + + @Getter + @Value("${app.s3.secretAccessKey:}") + private String s3SecretAccessKey; + + @Getter + @Value("${app.s3.bucket:}") + private String s3Bucket; + private Map<String, RicConfig> ricConfigs = new HashMap<>(); @Getter @@ -182,4 +200,9 @@ public class ApplicationConfig { return Flux.fromIterable(modifications); } + + public boolean isS3Enabled() { + return !(Strings.isNullOrEmpty(s3EndpointOverride) || Strings.isNullOrEmpty(s3Bucket)); + } + } diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshCounterTask.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/configuration/Meters.java index 95999a04..203bfacc 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshCounterTask.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/configuration/Meters.java @@ -18,52 +18,28 @@ * ========================LICENSE_END=================================== */ -package org.onap.ccsdk.oran.a1policymanagementservice.tasks; +package org.onap.ccsdk.oran.a1policymanagementservice.configuration; import io.micrometer.core.instrument.MeterRegistry; -import java.lang.invoke.MethodHandles; -import lombok.AccessLevel; -import lombok.Getter; + import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policies; import org.onap.ccsdk.oran.a1policymanagementservice.repository.PolicyTypes; import org.onap.ccsdk.oran.a1policymanagementservice.repository.Rics; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** - * The aim is to collect statistical values from the A1 Policy Management Service. + * The aim is to collect statistical values from the A1 Policy Management + * Service. + * The counters are being updated every minute. */ @Component -public class RefreshCounterTask { - - private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - - @Autowired - private final Rics rics; - - @Autowired - private final PolicyTypes policyTypes; +public class Meters { @Autowired - private final Policies policies; - - @Autowired - @Getter(AccessLevel.PUBLIC) - private final MeterRegistry meterRegistry; - - @Autowired - public RefreshCounterTask(Rics rics, PolicyTypes policyTypes, Policies policies, MeterRegistry meterRegistry) { - this.rics = rics; - this.policyTypes = policyTypes; - this.policies = policies; - this.meterRegistry = meterRegistry; - - logger.trace("Counters have been initialized."); + public Meters(Rics rics, PolicyTypes policyTypes, Policies policies, MeterRegistry meterRegistry) { meterRegistry.gauge("total_ric_count", rics, Rics::size); meterRegistry.gauge("total_policy_type_count", policyTypes, PolicyTypes::size); meterRegistry.gauge("total_policy_count", policies, Policies::size); } - } diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v2/PolicyController.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v2/PolicyController.java index ee2cae58..500ddd2a 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v2/PolicyController.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v2/PolicyController.java @@ -22,6 +22,7 @@ package org.onap.ccsdk.oran.a1policymanagementservice.controllers.v2; import com.google.gson.Gson; import com.google.gson.GsonBuilder; + import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.Parameter; import io.swagger.v3.oas.annotations.media.Content; @@ -29,12 +30,15 @@ import io.swagger.v3.oas.annotations.media.Schema; import io.swagger.v3.oas.annotations.responses.ApiResponse; import io.swagger.v3.oas.annotations.responses.ApiResponses; import io.swagger.v3.oas.annotations.tags.Tag; + import java.lang.invoke.MethodHandles; import java.time.Instant; import java.util.ArrayList; import java.util.Collection; import java.util.List; + import lombok.Getter; + import org.onap.ccsdk.oran.a1policymanagementservice.clients.A1ClientFactory; import org.onap.ccsdk.oran.a1policymanagementservice.controllers.VoidResponse; import org.onap.ccsdk.oran.a1policymanagementservice.exceptions.EntityNotFoundException; diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/datastore/DataStore.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/datastore/DataStore.java new file mode 100644 index 00000000..51b57dee --- /dev/null +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/datastore/DataStore.java @@ -0,0 +1,55 @@ +/*- + * ========================LICENSE_START================================= + * ONAP : ccsdk oran + * ====================================================================== + * Copyright (C) 2022 Nordix Foundation. All rights reserved. + * ====================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package org.onap.ccsdk.oran.a1policymanagementservice.datastore; + +import com.google.common.base.Strings; + +import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfig; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +public interface DataStore { + + public Flux<String> listObjects(String prefix); + + public Mono<byte[]> readObject(String name); + + public Mono<byte[]> writeObject(String name, byte[] fileData); + + public Mono<Boolean> deleteObject(String name); + + public Mono<String> createDataStore(); + + public Mono<String> deleteAllObjects(); + + public static DataStore create(ApplicationConfig appConfig, String location) { + if (appConfig.isS3Enabled()) { + return new S3ObjectStore(appConfig, location); + } else if (!Strings.isNullOrEmpty(appConfig.getVardataDirectory())) { + return new FileStore(appConfig, location); + } else { + return new NullStore(location); + } + + } + +} diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/datastore/FileStore.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/datastore/FileStore.java new file mode 100644 index 00000000..565120e1 --- /dev/null +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/datastore/FileStore.java @@ -0,0 +1,158 @@ +/*- + * ========================LICENSE_START================================= + * ONAP : ccsdk oran + * ====================================================================== + * Copyright (C) 2022 Nordix Foundation. All rights reserved. + * ====================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + + +package org.onap.ccsdk.oran.a1policymanagementservice.datastore; + +import com.google.common.base.Strings; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Stream; + +import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +class FileStore implements DataStore { + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + ApplicationConfig applicationConfig; + private final String location; + + public FileStore(ApplicationConfig applicationConfig, String location) { + this.applicationConfig = applicationConfig; + this.location = location; + } + + @Override + public Flux<String> listObjects(String prefix) { + Path root = Path.of(path().toString(), prefix); + if (!root.toFile().exists()) { + root = root.getParent(); + } + + logger.debug("Listing files in: {}", root); + + List<String> result = new ArrayList<>(); + try (Stream<Path> stream = Files.walk(root, Integer.MAX_VALUE)) { + + stream.forEach(path -> filterListFiles(path, prefix, result)); + + return Flux.fromIterable(result); + } catch (Exception e) { + logger.warn("Could not list filed in {}, reason; {}", root, e.getMessage()); + return Flux.error(e); + } + } + + private void filterListFiles(Path path, String prefix, List<String> result) { + if (path.toFile().isFile() && externalName(path).startsWith(prefix)) { + result.add(externalName(path)); + } else { + logger.trace("Ignoring file/directory {}, prefix: {}", path, prefix); + } + } + + private String externalName(Path path) { + String fullName = path.toString(); + String externalName = fullName.substring(path().toString().length()); + if (externalName.startsWith("/")) { + externalName = externalName.substring(1); + } + return externalName; + } + + @Override + public Mono<byte[]> readObject(String fileName) { + try { + byte[] contents = Files.readAllBytes(path(fileName)); + return Mono.just(contents); + } catch (Exception e) { + return Mono.error(e); + } + } + + @Override + public Mono<Boolean> deleteObject(String name) { + try { + Files.delete(path(name)); + return Mono.just(true); + } catch (Exception e) { + logger.debug("Could not delete file: {}, reason: {}", path(name), e.getMessage()); + return Mono.just(false); + } + } + + @Override + public Mono<String> createDataStore() { + try { + if (!Strings.isNullOrEmpty(applicationConfig.getVardataDirectory())) { + Files.createDirectories(path()); + } + } catch (IOException e) { + logger.error("Could not create directory: {}, reason: {}", path(), e.getMessage()); + } + return Mono.just("OK"); + } + + private Path path(String name) { + return Path.of(path().toString(), name); + } + + private Path path() { + return Path.of(applicationConfig.getVardataDirectory(), "database", this.location); + } + + @Override + public Mono<String> deleteAllObjects() { + return listObjects("") // + .flatMap(this::deleteObject) // + .collectList() // + .map(o -> "OK"); + } + + @Override + public Mono<byte[]> writeObject(String fileName, byte[] fileData) { + try { + if (!Strings.isNullOrEmpty(applicationConfig.getVardataDirectory())) { + Files.createDirectories(path(fileName).getParent()); + } + File outputFile = path(fileName).toFile(); + + try (FileOutputStream outputStream = new FileOutputStream(outputFile)) { + outputStream.write(fileData); + } + } catch (IOException e) { + logger.warn("Could not write file: {}, reason; {}", path(fileName), e.getMessage()); + } + return Mono.just(fileData); + } + +} diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/datastore/NullStore.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/datastore/NullStore.java new file mode 100644 index 00000000..1ecd8f64 --- /dev/null +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/datastore/NullStore.java @@ -0,0 +1,68 @@ +/*- + * ========================LICENSE_START================================= + * ONAP : ccsdk oran + * ====================================================================== + * Copyright (C) 2022 Nordix Foundation. All rights reserved. + * ====================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package org.onap.ccsdk.oran.a1policymanagementservice.datastore; + +import java.lang.invoke.MethodHandles; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +class NullStore implements DataStore { + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + public NullStore(String location) { + logger.warn("No storage defined for: {}", location); + } + + @Override + public Flux<String> listObjects(String prefix) { + return Flux.empty(); + } + + @Override + public Mono<byte[]> readObject(String name) { + return Mono.just(new byte[0]); + } + + @Override + public Mono<byte[]> writeObject(String name, byte[] fileData) { + return Mono.just(new byte[0]); + } + + @Override + public Mono<Boolean> deleteObject(String name) { + return Mono.just(false); + } + + @Override + public Mono<String> createDataStore() { + return Mono.just(""); + } + + @Override + public Mono<String> deleteAllObjects() { + return Mono.just(""); + } + +} diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/datastore/S3ObjectStore.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/datastore/S3ObjectStore.java new file mode 100644 index 00000000..4c7c3c3e --- /dev/null +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/datastore/S3ObjectStore.java @@ -0,0 +1,227 @@ +/*- + * ========================LICENSE_START================================= + * ONAP : ccsdk oran + * ====================================================================== + * Copyright (C) 2022 Nordix Foundation. All rights reserved. + * ====================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package org.onap.ccsdk.oran.a1policymanagementservice.datastore; + +import java.net.URI; +import java.util.concurrent.CompletableFuture; + +import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.core.BytesWrapper; +import software.amazon.awssdk.core.ResponseBytes; +import software.amazon.awssdk.core.async.AsyncRequestBody; +import software.amazon.awssdk.core.async.AsyncResponseTransformer; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.S3AsyncClientBuilder; +import software.amazon.awssdk.services.s3.model.CreateBucketRequest; +import software.amazon.awssdk.services.s3.model.CreateBucketResponse; +import software.amazon.awssdk.services.s3.model.DeleteBucketRequest; +import software.amazon.awssdk.services.s3.model.DeleteBucketResponse; +import software.amazon.awssdk.services.s3.model.DeleteObjectRequest; +import software.amazon.awssdk.services.s3.model.DeleteObjectResponse; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.GetObjectResponse; +import software.amazon.awssdk.services.s3.model.ListObjectsRequest; +import software.amazon.awssdk.services.s3.model.ListObjectsResponse; +import software.amazon.awssdk.services.s3.model.PutObjectRequest; +import software.amazon.awssdk.services.s3.model.PutObjectResponse; +import software.amazon.awssdk.services.s3.model.S3Object; + +class S3ObjectStore implements DataStore { + private static final Logger logger = LoggerFactory.getLogger(S3ObjectStore.class); + private final ApplicationConfig applicationConfig; + + private static S3AsyncClient s3AsynchClient; + private final String location; + + public S3ObjectStore(ApplicationConfig applicationConfig, String location) { + this.applicationConfig = applicationConfig; + this.location = location; + + getS3AsynchClient(applicationConfig); + } + + private static synchronized S3AsyncClient getS3AsynchClient(ApplicationConfig applicationConfig) { + if (applicationConfig.isS3Enabled() && s3AsynchClient == null) { + s3AsynchClient = getS3AsyncClientBuilder(applicationConfig).build(); + } + return s3AsynchClient; + } + + private static S3AsyncClientBuilder getS3AsyncClientBuilder(ApplicationConfig applicationConfig) { + URI uri = URI.create(applicationConfig.getS3EndpointOverride()); + return S3AsyncClient.builder() // + .region(Region.US_EAST_1) // + .endpointOverride(uri) // + .credentialsProvider(StaticCredentialsProvider.create( // + AwsBasicCredentials.create(applicationConfig.getS3AccessKeyId(), // + applicationConfig.getS3SecretAccessKey()))); + } + + @Override + public Flux<String> listObjects(String prefix) { + return listObjectsInBucket(bucket(), location + "/" + prefix).map(S3Object::key) // + .map(this::externalName); + } + + @Override + public Mono<Boolean> deleteObject(String name) { + DeleteObjectRequest request = DeleteObjectRequest.builder() // + .bucket(bucket()) // + .key(key(name)) // + .build(); + + CompletableFuture<DeleteObjectResponse> future = s3AsynchClient.deleteObject(request); + + return Mono.fromFuture(future).map(resp -> true); + } + + @Override + public Mono<byte[]> readObject(String name) { + return getDataFromS3Object(bucket(), name); + } + + @Override + public Mono<byte[]> writeObject(String name, byte[] fileData) { + + PutObjectRequest request = PutObjectRequest.builder() // + .bucket(bucket()) // + .key(key(name)) // + .build(); + + AsyncRequestBody body = AsyncRequestBody.fromBytes(fileData); + + CompletableFuture<PutObjectResponse> future = s3AsynchClient.putObject(request, body); + + return Mono.fromFuture(future) // + .map(putObjectResponse -> fileData) // + .doOnError(t -> logger.error("Failed to store object '{}' in S3 {}", key(name), t.getMessage())); + } + + @Override + public Mono<String> createDataStore() { + return createS3Bucket(bucket()); + } + + private Mono<String> createS3Bucket(String s3Bucket) { + + CreateBucketRequest request = CreateBucketRequest.builder() // + .bucket(s3Bucket) // + .build(); + + CompletableFuture<CreateBucketResponse> future = s3AsynchClient.createBucket(request); + + return Mono.fromFuture(future) // + .map(f -> s3Bucket) // + .doOnError(t -> logger.debug("Could not create S3 bucket: {}", t.getMessage())) + .onErrorResume(t -> Mono.just(s3Bucket)); + } + + @Override + public Mono<String> deleteAllObjects() { + return listObjects("") // + .flatMap(this::deleteObject) // + .collectList() // + .map(resp -> "OK").onErrorResume(t -> Mono.just("NOK")); + } + + public Mono<String> deleteBucket() { + DeleteBucketRequest request = DeleteBucketRequest.builder() // + .bucket(bucket()) // + .build(); + + CompletableFuture<DeleteBucketResponse> future = s3AsynchClient.deleteBucket(request); + + return Mono.fromFuture(future) // + .doOnError(t -> logger.warn("Could not delete bucket: {}, reason: {}", bucket(), t.getMessage())) + .map(resp -> bucket()) // + .doOnNext(resp -> logger.debug("Deleted bucket: {}", bucket())).onErrorResume(t -> Mono.just("NOK")); + } + + private String bucket() { + return applicationConfig.getS3Bucket(); + } + + private Flux<S3Object> listObjectsInBucket(String bucket, String prefix) { + + return listObjectsRequest(bucket, prefix, null) // + .expand(response -> listObjectsRequest(bucket, prefix, response)) // + .map(ListObjectsResponse::contents) // + .doOnNext(f -> logger.debug("Found objects in {}: {}", bucket, f.size())) // + .doOnError(t -> logger.warn("Error fromlist objects: {}", t.getMessage())) // + .flatMap(Flux::fromIterable) // + .doOnNext(obj -> logger.debug("Found object: {}", obj.key())); + } + + private Mono<ListObjectsResponse> listObjectsRequest(String bucket, String prefix, + ListObjectsResponse prevResponse) { + ListObjectsRequest.Builder builder = ListObjectsRequest.builder() // + .bucket(bucket) // + .maxKeys(1000) // + .prefix(prefix); + + if (prevResponse != null) { + if (Boolean.TRUE.equals(prevResponse.isTruncated())) { + builder.marker(prevResponse.nextMarker()); + } else { + return Mono.empty(); + } + } + + ListObjectsRequest listObjectsRequest = builder.build(); + CompletableFuture<ListObjectsResponse> future = s3AsynchClient.listObjects(listObjectsRequest); + return Mono.fromFuture(future); + } + + private Mono<byte[]> getDataFromS3Object(String bucket, String name) { + + GetObjectRequest request = GetObjectRequest.builder() // + .bucket(bucket) // + .key(key(name)) // + .build(); + + CompletableFuture<ResponseBytes<GetObjectResponse>> future = + s3AsynchClient.getObject(request, AsyncResponseTransformer.toBytes()); + + return Mono.fromFuture(future) // + .map(BytesWrapper::asByteArray) // + .doOnError(t -> logger.error("Failed to get file from S3, key:{}, bucket: {}, {}", key(name), bucket, + t.getMessage())) // + .doOnEach(n -> logger.debug("Read file from S3: {} {}", bucket, key(name))) // + .onErrorResume(t -> Mono.empty()); + } + + private String key(String name) { + return location + "/" + name; + } + + private String externalName(String internalName) { + return internalName.substring(key("").length()); + } + +} diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Policies.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Policies.java index ef924748..d808b57d 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Policies.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Policies.java @@ -2,7 +2,7 @@ * ========================LICENSE_START================================= * ONAP : ccsdk oran * ====================================================================== - * Copyright (C) 2019-2020 Nordix Foundation. All rights reserved. + * Copyright (C) 2019-2022 Nordix Foundation. All rights reserved. * ====================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,18 +23,14 @@ package org.onap.ccsdk.oran.a1policymanagementservice.repository; import com.google.gson.Gson; import com.google.gson.GsonBuilder; -import java.io.File; -import java.io.FileOutputStream; -import java.io.PrintStream; import java.lang.invoke.MethodHandles; -import java.nio.file.Files; -import java.nio.file.Path; import java.time.Instant; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.Vector; @@ -42,17 +38,17 @@ import lombok.Builder; import lombok.Getter; import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfig; +import org.onap.ccsdk.oran.a1policymanagementservice.datastore.DataStore; import org.onap.ccsdk.oran.a1policymanagementservice.exceptions.EntityNotFoundException; -import org.onap.ccsdk.oran.a1policymanagementservice.exceptions.ServiceException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.context.annotation.Configuration; -import org.springframework.util.FileSystemUtils; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; import reactor.util.annotation.Nullable; @SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally -@Configuration public class Policies { @Getter @@ -73,12 +69,31 @@ public class Policies { private MultiMap<Policy> policiesRic = new MultiMap<>(); private MultiMap<Policy> policiesService = new MultiMap<>(); private MultiMap<Policy> policiesType = new MultiMap<>(); + private final DataStore dataStore; private final ApplicationConfig appConfig; private static Gson gson = new GsonBuilder().create(); public Policies(@Autowired ApplicationConfig appConfig) { this.appConfig = appConfig; + this.dataStore = DataStore.create(appConfig, "policies"); + } + + public Flux<Policy> restoreFromDatabase(Ric ric, PolicyTypes types) { + return dataStore.createDataStore() // + .flatMapMany(x -> dataStore.listObjects(getPath(ric))) // + .flatMap(dataStore::readObject) // + .map(String::new) // + .map(json -> gson.fromJson(json, PersistentPolicyInfo.class)) // + .map(policyInfo -> toPolicy(policyInfo, ric, types)) // + .doOnNext(this::put) // + .filter(Objects::nonNull) // + .doOnError(t -> logger.warn("Could not restore policy database for RIC: {}, reason : {}", ric.id(), + t.getMessage())) // + .doFinally(sig -> logger.debug("Restored policy database for RIC: {}, number of policies: {}", ric.id(), + this.policiesRic.get(ric.id()).size())) // + .onErrorResume(t -> Flux.empty()) // + ; } public synchronized void put(Policy policy) { @@ -133,11 +148,7 @@ public class Policies { public synchronized void remove(Policy policy) { if (!policy.isTransient()) { - try { - Files.delete(getPath(policy)); - } catch (Exception e) { - logger.debug("Could not delete policy from database: {}", e.getMessage()); - } + dataStore.deleteObject(getPath(policy)).subscribe(); } policiesId.remove(policy.getId()); policiesRic.remove(policy.getRic().id(), policy.getId()); @@ -175,24 +186,15 @@ public class Policies { Set<String> keys = policiesId.keySet(); removeId(keys.iterator().next()); } - try { - if (this.appConfig.getVardataDirectory() != null) { - FileSystemUtils.deleteRecursively(getDatabasePath()); - } - } catch (Exception e) { - logger.warn("Could not delete policy database : {}", e.getMessage()); - } + dataStore.deleteAllObjects().onErrorResume(t -> Mono.empty()).subscribe(); } public void store(Policy policy) { - try { - Files.createDirectories(getDatabasePath(policy.getRic())); - try (PrintStream out = new PrintStream(new FileOutputStream(getFile(policy)))) { - out.print(gson.toJson(toStorageObject(policy))); - } - } catch (Exception e) { - logger.warn("Could not store policy: {} {}", policy.getId(), e.getMessage()); - } + + byte[] bytes = gson.toJson(toStorageObject(policy)).getBytes(); + this.dataStore.writeObject(this.getPath(policy), bytes) // + .doOnError(t -> logger.error("Could not store job in S3, reason: {}", t.getMessage())) // + .subscribe(); } private boolean isMatch(String filterValue, String actualValue) { @@ -218,29 +220,6 @@ public class Policies { return filtered; } - private File getFile(Policy policy) throws ServiceException { - return getPath(policy).toFile(); - } - - private Path getPath(Policy policy) throws ServiceException { - return Path.of(getDatabaseDirectory(policy.getRic()), policy.getId() + ".json"); - } - - public synchronized void restoreFromDatabase(Ric ric, PolicyTypes types) { - try { - Files.createDirectories(getDatabasePath(ric)); - for (File file : getDatabasePath(ric).toFile().listFiles()) { - String json = Files.readString(file.toPath()); - PersistentPolicyInfo policyStorage = gson.fromJson(json, PersistentPolicyInfo.class); - this.put(toPolicy(policyStorage, ric, types)); - } - logger.debug("Restored policy database for RIC: {}, number of policies: {}", ric.id(), - this.policiesRic.get(ric.id()).size()); - } catch (Exception e) { - logger.warn("Could not restore policy database for RIC: {}, reason : {}", ric.id(), e.getMessage()); - } - } - private PersistentPolicyInfo toStorageObject(Policy p) { return PersistentPolicyInfo.builder() // .id(p.getId()) // @@ -254,35 +233,30 @@ public class Policies { .build(); } - Policy toPolicy(PersistentPolicyInfo p, Ric ric, PolicyTypes types) throws EntityNotFoundException { - return Policy.builder() // - .id(p.getId()) // - .isTransient(p.isTransient()) // - .json(p.getJson()) // - .lastModified(Instant.parse(p.lastModified)) // - .ownerServiceId(p.getOwnerServiceId()) // - .ric(ric) // - .statusNotificationUri(p.getStatusNotificationUri()) // - .type(types.getType(p.getTypeId())) // - .build(); - } - - private Path getDatabasePath(Ric ric) throws ServiceException { - return Path.of(getDatabaseDirectory(ric)); + private Policy toPolicy(PersistentPolicyInfo p, Ric ric, PolicyTypes types) { + try { + return Policy.builder() // + .id(p.getId()) // + .isTransient(p.isTransient()) // + .json(p.getJson()) // + .lastModified(Instant.parse(p.lastModified)) // + .ownerServiceId(p.getOwnerServiceId()) // + .ric(ric) // + .statusNotificationUri(p.getStatusNotificationUri()) // + .type(types.getType(p.getTypeId())) // + .build(); + } catch (EntityNotFoundException e) { + logger.warn("Not found: {}", e.getMessage()); + return null; + } } - private String getDatabaseDirectory(Ric ric) throws ServiceException { - return getDatabaseDirectory() + "/" + ric.id(); + private String getPath(Policy policy) { + return getPath(policy.getRic()) + "/" + policy.getId() + ".json"; } - private String getDatabaseDirectory() throws ServiceException { - if (appConfig.getVardataDirectory() == null) { - throw new ServiceException("No database storage provided"); - } - return appConfig.getVardataDirectory() + "/database/policyInstances"; + private String getPath(Ric ric) { + return ric.id(); } - private Path getDatabasePath() throws ServiceException { - return Path.of(getDatabaseDirectory()); - } } diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/PolicyTypes.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/PolicyTypes.java index 327dee60..b4e9c654 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/PolicyTypes.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/PolicyTypes.java @@ -23,13 +23,7 @@ package org.onap.ccsdk.oran.a1policymanagementservice.repository; import com.google.gson.Gson; import com.google.gson.GsonBuilder; -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.PrintStream; import java.lang.invoke.MethodHandles; -import java.nio.file.Files; -import java.nio.file.Path; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -38,25 +32,25 @@ import java.util.Map; import java.util.Vector; import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfig; +import org.onap.ccsdk.oran.a1policymanagementservice.datastore.DataStore; import org.onap.ccsdk.oran.a1policymanagementservice.exceptions.EntityNotFoundException; import org.onap.ccsdk.oran.a1policymanagementservice.exceptions.ServiceException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.context.annotation.Configuration; import org.springframework.lang.Nullable; -import org.springframework.util.FileSystemUtils; -@Configuration +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + public class PolicyTypes { private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private Map<String, PolicyType> types = new HashMap<>(); - private final ApplicationConfig appConfig; private static Gson gson = new GsonBuilder().create(); + private final DataStore dataStore; public PolicyTypes(@Autowired ApplicationConfig appConfig) { - this.appConfig = appConfig; - restoreFromDatabase(); + this.dataStore = DataStore.create(appConfig, "policytypes"); } public synchronized PolicyType getType(String name) throws EntityNotFoundException { @@ -113,55 +107,26 @@ public class PolicyTypes { public synchronized void clear() { this.types.clear(); - try { - FileSystemUtils.deleteRecursively(getDatabasePath()); - } catch (IOException | ServiceException e) { - logger.warn("Could not delete policy type database : {}", e.getMessage()); - } + dataStore.deleteAllObjects().onErrorResume(t -> Mono.empty()).subscribe(); } public void store(PolicyType type) { - try { - Files.createDirectories(getDatabasePath()); - try (PrintStream out = new PrintStream(new FileOutputStream(getFile(type)))) { - out.print(gson.toJson(type)); - } - } catch (ServiceException e) { - logger.debug("Could not store policy type: {} {}", type.getId(), e.getMessage()); - } catch (IOException e) { - logger.warn("Could not store policy type: {} {}", type.getId(), e.getMessage()); - } + byte[] bytes = gson.toJson(type).getBytes(); + dataStore.writeObject(getPath(type), bytes) // + .doOnError(t -> logger.warn("Could not store policy type: {} {}", type.getId(), t.getMessage())) + .subscribe(); } - private File getFile(PolicyType type) throws ServiceException { - return Path.of(getDatabaseDirectory(), type.getId() + ".json").toFile(); - } + public Flux<PolicyType> restoreFromDatabase() { - void restoreFromDatabase() { - try { - Files.createDirectories(getDatabasePath()); - for (File file : getDatabasePath().toFile().listFiles()) { - String json = Files.readString(file.toPath()); - PolicyType type = gson.fromJson(json, PolicyType.class); - this.types.put(type.getId(), type); - } - logger.debug("Restored type database,no of types: {}", this.types.size()); - } catch (ServiceException e) { - logger.debug("Could not restore policy type database : {}", e.getMessage()); - } catch (Exception e) { - logger.warn("Could not restore policy type database : {}", e.getMessage()); - } - } - - private String getDatabaseDirectory() throws ServiceException { - if (appConfig.getVardataDirectory() == null) { - throw new ServiceException("No policy type storage provided"); - } - return appConfig.getVardataDirectory() + "/database/policyTypes"; - } + return this.dataStore.createDataStore().flatMapMany(x -> dataStore.listObjects("")) // + .flatMap(dataStore::readObject) // + .map(String::new) // + .map(json -> gson.fromJson(json, PolicyType.class)).doOnNext(type -> this.types.put(type.getId(), type)) // + .doOnError(t -> logger.warn("Could not restore policy type database : {}", t.getMessage())) // + .doFinally(sig -> logger.debug("Restored type database,no of types: {}", this.types.size())) + .onErrorResume(t -> Flux.empty()); // - private Path getDatabasePath() throws ServiceException { - return Path.of(getDatabaseDirectory()); } private static Collection<PolicyType> filterTypeName(Collection<PolicyType> types, String typeName) { @@ -188,4 +153,8 @@ public class PolicyTypes { return result; } + private String getPath(PolicyType type) { + return type.getId() + ".json"; + } + } diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Services.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Services.java index 68a8d7dd..9c2846a9 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Services.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Services.java @@ -2,7 +2,7 @@ * ========================LICENSE_START================================= * ONAP : ccsdk oran * ====================================================================== - * Copyright (C) 2019-2020 Nordix Foundation. All rights reserved. + * Copyright (C) 2019-2022 Nordix Foundation. All rights reserved. * ====================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,33 +22,29 @@ package org.onap.ccsdk.oran.a1policymanagementservice.repository; import com.google.gson.Gson; -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.PrintStream; -import java.nio.file.Files; -import java.nio.file.Path; import java.util.HashMap; import java.util.Map; import java.util.Vector; import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfig; +import org.onap.ccsdk.oran.a1policymanagementservice.datastore.DataStore; import org.onap.ccsdk.oran.a1policymanagementservice.exceptions.ServiceException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.util.FileSystemUtils; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; public class Services { private static final Logger logger = LoggerFactory.getLogger(Services.class); private static Gson gson = Service.createGson(); - private final ApplicationConfig appConfig; + private final DataStore dataStore; private Map<String, Service> registeredServices = new HashMap<>(); public Services(@Autowired ApplicationConfig appConfig) { - this.appConfig = appConfig; - restoreFromDatabase(); + this.dataStore = DataStore.create(appConfig, "services"); } public synchronized Service getService(String name) throws ServiceException { @@ -77,11 +73,7 @@ public class Services { public synchronized void remove(String name) { Service service = registeredServices.remove(name); if (service != null) { - try { - Files.delete(getPath(service)); - } catch (Exception e) { - // Doesn't matter. - } + dataStore.deleteObject(getPath(service)).subscribe(); } } @@ -91,59 +83,29 @@ public class Services { public synchronized void clear() { registeredServices.clear(); - try { - FileSystemUtils.deleteRecursively(getDatabasePath()); - } catch (Exception e) { - logger.warn("Could not delete services database : {}", e.getMessage()); - } + dataStore.deleteAllObjects().onErrorResume(t -> Mono.empty()).subscribe(); } public void store(Service service) { - try { - Files.createDirectories(getDatabasePath()); - try (PrintStream out = new PrintStream(new FileOutputStream(getFile(service)))) { - String str = gson.toJson(service); - out.print(str); - } - } catch (ServiceException e) { - logger.debug("Could not store service: {} {}", service.getName(), e.getMessage()); - } catch (IOException e) { - logger.warn("Could not store pservice: {} {}", service.getName(), e.getMessage()); - } - } - - private File getFile(Service service) throws ServiceException { - return getPath(service).toFile(); + byte[] bytes = gson.toJson(service).getBytes(); + dataStore.writeObject(getPath(service), bytes) // + .doOnError(t -> logger.warn("Could not service: {} {}", service.getName(), t.getMessage())).subscribe(); } - private Path getPath(Service service) throws ServiceException { - return Path.of(getDatabaseDirectory(), service.getName() + ".json"); + public Flux<Service> restoreFromDatabase() { + return dataStore.createDataStore().flatMapMany(ds -> dataStore.listObjects("")) // + .flatMap(dataStore::readObject, 1) // + .map(String::new) // + .map(json -> gson.fromJson(json, Service.class)) + .doOnNext(service -> this.registeredServices.put(service.getName(), service)) + .doOnError(t -> logger.warn("Could not restore services database : {}", t.getMessage())) + .doFinally(sig -> logger.debug("Restored type database,no of services: {}", + this.registeredServices.size())) // + .onErrorResume(t -> Flux.empty()); // } - void restoreFromDatabase() { - try { - Files.createDirectories(getDatabasePath()); - for (File file : getDatabasePath().toFile().listFiles()) { - String json = Files.readString(file.toPath()); - Service service = gson.fromJson(json, Service.class); - this.registeredServices.put(service.getName(), service); - } - logger.debug("Restored type database,no of services: {}", this.registeredServices.size()); - } catch (ServiceException e) { - logger.debug("Could not restore services database : {}", e.getMessage()); - } catch (Exception e) { - logger.warn("Could not restore services database : {}", e.getMessage()); - } + private String getPath(Service service) { + return service.getName() + ".json"; } - private String getDatabaseDirectory() throws ServiceException { - if (appConfig.getVardataDirectory() == null) { - throw new ServiceException("No storage provided"); - } - return appConfig.getVardataDirectory() + "/database/services"; - } - - private Path getDatabasePath() throws ServiceException { - return Path.of(getDatabaseDirectory()); - } } diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java index 983e92e6..567bb8d2 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java @@ -105,7 +105,7 @@ public class RefreshConfigTask { refreshTask = createRefreshTask() // .subscribe( notUsed -> logger.debug("Refreshed configuration data"), throwable -> logger - .error("Configuration refresh terminated due to exception {}", throwable.toString()), + .error("Configuration refresh terminated due to exception {}", throwable.getMessage()), () -> logger.error("Configuration refresh terminated")); } @@ -128,6 +128,7 @@ public class RefreshConfigTask { .flatMap(this::parseConfiguration) // .flatMap(this::updateConfig, CONCURRENCY) // .flatMap(this::handleUpdatedRicConfig) // + .doOnError(t -> logger.error("Cannot update config {}", t.getMessage())) .doFinally(signal -> logger.error("Configuration refresh task is terminated: {}", signal)); } @@ -200,7 +201,7 @@ public class RefreshConfigTask { void addRic(Ric ric) { this.rics.put(ric); if (this.appConfig.getVardataDirectory() != null) { - this.policies.restoreFromDatabase(ric, this.policyTypes); + this.policies.restoreFromDatabase(ric, this.policyTypes).subscribe(); } logger.debug("Added RIC: {}", ric.id()); } diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java index 7a5f73d4..fdeb47e2 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java @@ -21,6 +21,7 @@ package org.onap.ccsdk.oran.a1policymanagementservice.tasks; import java.util.Collection; + import org.onap.ccsdk.oran.a1policymanagementservice.clients.A1Client; import org.onap.ccsdk.oran.a1policymanagementservice.clients.A1ClientFactory; import org.onap.ccsdk.oran.a1policymanagementservice.clients.AsyncRestClientFactory; @@ -40,6 +41,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; + import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; diff --git a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshCounterTaskTest.java b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/configuration/MetersTest.java index bd315c26..5ab3d5a3 100644 --- a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshCounterTaskTest.java +++ b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/configuration/MetersTest.java @@ -18,23 +18,21 @@ * ========================LICENSE_END=================================== */ -package org.onap.ccsdk.oran.a1policymanagementservice.tasks; +package org.onap.ccsdk.oran.a1policymanagementservice.configuration; import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.Mockito.spy; -import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.prometheus.PrometheusConfig; import io.micrometer.prometheus.PrometheusMeterRegistry; + import java.time.Instant; import java.util.Arrays; import java.util.Vector; + import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.junit.jupiter.MockitoExtension; -import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfig; -import org.onap.ccsdk.oran.a1policymanagementservice.configuration.RicConfig; import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policies; import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policy; import org.onap.ccsdk.oran.a1policymanagementservice.repository.PolicyType; @@ -43,7 +41,7 @@ import org.onap.ccsdk.oran.a1policymanagementservice.repository.Ric; import org.onap.ccsdk.oran.a1policymanagementservice.repository.Rics; @ExtendWith(MockitoExtension.class) -class RefreshCounterTaskTest { +public class MetersTest { private static final String POLICY_TYPE_1_NAME = "type1"; private static final PolicyType POLICY_TYPE_1 = PolicyType.builder().id(POLICY_TYPE_1_NAME).schema("").build(); @@ -57,21 +55,28 @@ class RefreshCounterTaskTest { .ric(RIC_1).type(POLICY_TYPE_1).lastModified(Instant.now()).isTransient(false) .statusNotificationUri("statusNotificationUri").build(); - private final PrometheusMeterRegistry prometheusMeterRegistry = new PrometheusMeterRegistry(PrometheusConfig.DEFAULT); + private PrometheusMeterRegistry prometheusMeterRegistry = new PrometheusMeterRegistry(PrometheusConfig.DEFAULT); private final ApplicationConfig appConfig = new ApplicationConfig(); - private PolicyTypes policyTypes; + private PolicyTypes types; private Policies policies; private Rics rics = new Rics(); + Meters testObject; + @BeforeEach void init() { - policyTypes = new PolicyTypes(appConfig); + types = new PolicyTypes(appConfig); policies = new Policies(appConfig); rics.clear(); + policies.clear(); + types.clear(); + RIC_1.setState(Ric.RicState.AVAILABLE); RIC_1.clearSupportedPolicyTypes(); + + this.testObject = createMeters(); } @Test @@ -80,16 +85,15 @@ class RefreshCounterTaskTest { RIC_1.addSupportedPolicyType(POLICY_TYPE_1); rics.put(RIC_1); - policyTypes.put(POLICY_TYPE_1); + types.put(POLICY_TYPE_1); policies.put(POLICY_1); - RefreshCounterTask spy = spy(createRefreshCounterTask()); // instantiate RefreshCounterTask - MeterRegistry meterRegistry = spy.getMeterRegistry(); + createMeters(); - assertThat(meterRegistry.get("total_ric_count").gauge().value()).isEqualTo(1); - assertThat(meterRegistry.get("total_policy_type_count").gauge().value()).isEqualTo(1); - assertThat(meterRegistry.get("total_policy_count").gauge().value()).isEqualTo(1); + assertThat(prometheusMeterRegistry.get("total_ric_count").gauge().value()).isEqualTo(1); + assertThat(prometheusMeterRegistry.get("total_policy_type_count").gauge().value()).isEqualTo(1); + assertThat(prometheusMeterRegistry.get("total_policy_count").gauge().value()).isEqualTo(1); } @Test @@ -98,30 +102,27 @@ class RefreshCounterTaskTest { RIC_1.addSupportedPolicyType(POLICY_TYPE_1); rics.put(RIC_1); - policyTypes.put(POLICY_TYPE_1); + types.put(POLICY_TYPE_1); policies.put(POLICY_1); String POLICY_2_ID = "policyId2"; - Policy POLICY_2 = Policy.builder() - .id(POLICY_2_ID) - .json("") - .ownerServiceId("service") - .ric(RIC_1) - .type(POLICY_TYPE_1) - .lastModified(Instant.now()) + Policy POLICY_2 = Policy.builder() // + .id(POLICY_2_ID) // + .json("") // + .ownerServiceId("service") // + .ric(RIC_1) // + .type(POLICY_TYPE_1) // + .lastModified(Instant.now()) // .isTransient(false) // - .statusNotificationUri("statusNotificationUri") + .statusNotificationUri("statusNotificationUri") // .build(); policies.put(POLICY_2); - RefreshCounterTask spy = spy(createRefreshCounterTask()); // instantiate RefreshCounterTask - MeterRegistry meterRegistry = spy.getMeterRegistry(); - - assertThat(meterRegistry.get("total_ric_count").gauge().value()).isEqualTo(1); - assertThat(meterRegistry.get("total_policy_type_count").gauge().value()).isEqualTo(1); - assertThat(meterRegistry.get("total_policy_count").gauge().value()).isEqualTo(2); + assertThat(prometheusMeterRegistry.get("total_ric_count").gauge().value()).isEqualTo(1); + assertThat(prometheusMeterRegistry.get("total_policy_type_count").gauge().value()).isEqualTo(1); + assertThat(prometheusMeterRegistry.get("total_policy_count").gauge().value()).isEqualTo(2); } @Test @@ -129,29 +130,26 @@ class RefreshCounterTaskTest { RIC_1.setState(Ric.RicState.AVAILABLE); String POLICY_TYPE_2_NAME = "type2"; - PolicyType POLICY_TYPE_2 = PolicyType.builder() - .id(POLICY_TYPE_2_NAME) - .schema("") + PolicyType POLICY_TYPE_2 = PolicyType.builder() // + .id(POLICY_TYPE_2_NAME) // + .schema("") // .build(); RIC_1.addSupportedPolicyType(POLICY_TYPE_1); RIC_1.addSupportedPolicyType(POLICY_TYPE_2); rics.put(RIC_1); - policyTypes.put(POLICY_TYPE_1); - policyTypes.put(POLICY_TYPE_2); + types.put(POLICY_TYPE_1); + types.put(POLICY_TYPE_2); policies.put(POLICY_1); - RefreshCounterTask spy = spy(createRefreshCounterTask()); // instantiate RefreshCounterTask - MeterRegistry meterRegistry = spy.getMeterRegistry(); - - assertThat(meterRegistry.get("total_ric_count").gauge().value()).isEqualTo(1); - assertThat(meterRegistry.get("total_policy_type_count").gauge().value()).isEqualTo(2); - assertThat(meterRegistry.get("total_policy_count").gauge().value()).isEqualTo(1); + assertThat(prometheusMeterRegistry.get("total_ric_count").gauge().value()).isEqualTo(1); + assertThat(prometheusMeterRegistry.get("total_policy_type_count").gauge().value()).isEqualTo(2); + assertThat(prometheusMeterRegistry.get("total_policy_count").gauge().value()).isEqualTo(1); } - private RefreshCounterTask createRefreshCounterTask() { - return new RefreshCounterTask(rics, policyTypes, policies, prometheusMeterRegistry); + private Meters createMeters() { + return new Meters(rics, types, policies, prometheusMeterRegistry); } } diff --git a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v2/ApplicationTest.java b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v2/ApplicationTest.java index ab58027a..76838bb8 100644 --- a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v2/ApplicationTest.java +++ b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v2/ApplicationTest.java @@ -44,6 +44,7 @@ import java.util.Collections; import java.util.List; import org.json.JSONObject; +import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import org.onap.ccsdk.oran.a1policymanagementservice.clients.A1ClientFactory; @@ -87,6 +88,7 @@ import org.springframework.http.HttpStatus; import org.springframework.http.MediaType; import org.springframework.http.ResponseEntity; import org.springframework.test.context.TestPropertySource; +import org.springframework.util.FileSystemUtils; import org.springframework.web.reactive.function.client.WebClientResponseException; import reactor.core.publisher.Mono; @@ -98,8 +100,9 @@ import reactor.util.annotation.Nullable; "server.ssl.key-store=./config/keystore.jks", // "app.webclient.trust-store=./config/truststore.jks", // "app.webclient.trust-store-used=true", // - "app.vardata-directory=./target/testdata", // - "app.filepath=" // + "app.vardata-directory=/tmp/pmstest", // + "app.filepath=", // + "app.s3.bucket=" // If this is set, S3 will be used to store data. }) class ApplicationTest { private static final Logger logger = LoggerFactory.getLogger(ApplicationTest.class); @@ -178,6 +181,15 @@ class ApplicationTest { this.securityContext.setAuthTokenFilePath(null); } + @AfterAll + static void clearTestDir() { + try { + FileSystemUtils.deleteRecursively(Path.of("/tmp/pmstest")); + } catch (Exception e) { + logger.warn("Could test directory : {}", e.getMessage()); + } + } + @AfterEach void verifyNoRicLocks() { for (Ric ric : this.rics.getRics()) { @@ -205,7 +217,7 @@ class ApplicationTest { } @Test - void testPersistencyPolicies() throws ServiceException { + void testPersistencyPolicies() throws Exception { Ric ric = this.addRic("ric1"); PolicyType type = this.addPolicyType("type1", ric.id()); @@ -213,37 +225,50 @@ class ApplicationTest { for (int i = 0; i < noOfPolicies; ++i) { addPolicy("id" + i, type.getId(), "service", ric.id()); } + waitforS3(); { Policies policies = new Policies(this.applicationConfig); - policies.restoreFromDatabase(ric, this.policyTypes); + policies.restoreFromDatabase(ric, this.policyTypes).blockLast(); assertThat(policies.size()).isEqualTo(noOfPolicies); } { restClient().delete("/policies/id2").block(); Policies policies = new Policies(this.applicationConfig); - policies.restoreFromDatabase(ric, this.policyTypes); + policies.restoreFromDatabase(ric, this.policyTypes).blockLast(); assertThat(policies.size()).isEqualTo(noOfPolicies - 1); } } @Test - void testPersistencyPolicyTypes() throws ServiceException { + void testPersistencyPolicyTypes() throws Exception { Ric ric = this.addRic("ric1"); this.addPolicyType("type1", ric.id()); + waitforS3(); + PolicyTypes types = new PolicyTypes(this.applicationConfig); + types.restoreFromDatabase().blockLast(); assertThat(types.size()).isEqualTo(1); } + @SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests. + private void waitforS3() throws Exception { + if (applicationConfig.isS3Enabled()) { + Thread.sleep(1000); + } + } + @Test - void testPersistencyService() throws ServiceException { + void testPersistencyService() throws Exception { final String SERVICE = "serviceName"; putService(SERVICE, 1234, HttpStatus.CREATED); assertThat(this.services.size()).isEqualTo(1); Service service = this.services.getService(SERVICE); + waitforS3(); Services servicesRestored = new Services(this.applicationConfig); + servicesRestored.restoreFromDatabase().blockLast(); Service serviceRestored = servicesRestored.getService(SERVICE); assertThat(servicesRestored.size()).isEqualTo(1); assertThat(serviceRestored.getCallbackUrl()).isEqualTo(service.getCallbackUrl()); diff --git a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervisionTest.java b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervisionTest.java index d9b36d9c..b2bf58e4 100644 --- a/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervisionTest.java +++ b/a1-policy-management/src/test/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervisionTest.java @@ -34,6 +34,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Vector; + import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; |