diff options
Diffstat (limited to 'a1-policy-management/src/main')
13 files changed, 664 insertions, 234 deletions
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/BeanFactory.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/BeanFactory.java index 8fc8bc8c..0d93eae2 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/BeanFactory.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/BeanFactory.java @@ -20,11 +20,13 @@ package org.onap.ccsdk.oran.a1policymanagementservice; -import com.fasterxml.jackson.databind.ObjectMapper; + import org.apache.catalina.connector.Connector; import org.onap.ccsdk.oran.a1policymanagementservice.clients.A1ClientFactory; import org.onap.ccsdk.oran.a1policymanagementservice.clients.SecurityContext; import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfig; +import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policies; +import org.onap.ccsdk.oran.a1policymanagementservice.repository.PolicyTypes; import org.onap.ccsdk.oran.a1policymanagementservice.repository.Rics; import org.onap.ccsdk.oran.a1policymanagementservice.repository.Services; import org.springframework.beans.factory.annotation.Autowired; @@ -52,18 +54,27 @@ public class BeanFactory { @Bean public Services getServices(@Autowired ApplicationConfig applicationConfig) { - return new Services(applicationConfig); + Services services = new Services(applicationConfig); + services.restoreFromDatabase().subscribe(); + return services; } @Bean - public A1ClientFactory getA1ClientFactory(@Autowired ApplicationConfig applicationConfig, - @Autowired SecurityContext securityContext) { - return new A1ClientFactory(applicationConfig, securityContext); + public PolicyTypes getPolicyTypes(@Autowired ApplicationConfig applicationConfig) { + PolicyTypes types = new PolicyTypes(applicationConfig); + types.restoreFromDatabase().blockLast(); + return types; + } + + @Bean + public Policies getPolicies(@Autowired ApplicationConfig applicationConfig) { + return new Policies(applicationConfig); } @Bean - public ObjectMapper mapper() { - return new ObjectMapper(); + public A1ClientFactory getA1ClientFactory(@Autowired ApplicationConfig applicationConfig, + @Autowired SecurityContext securityContext) { + return new A1ClientFactory(applicationConfig, securityContext); } @Bean diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/configuration/ApplicationConfig.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/configuration/ApplicationConfig.java index 0e2294fc..6dbf318d 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/configuration/ApplicationConfig.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/configuration/ApplicationConfig.java @@ -20,6 +20,8 @@ package org.onap.ccsdk.oran.a1policymanagementservice.configuration; +import com.google.common.base.Strings; + import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -82,6 +84,22 @@ public class ApplicationConfig { @Value("${app.webclient.http.proxy-type:HTTP}") private String httpProxyType = "HTTP"; + @Getter + @Value("${app.s3.endpointOverride:}") + private String s3EndpointOverride; + + @Getter + @Value("${app.s3.accessKeyId:}") + private String s3AccessKeyId; + + @Getter + @Value("${app.s3.secretAccessKey:}") + private String s3SecretAccessKey; + + @Getter + @Value("${app.s3.bucket:}") + private String s3Bucket; + private Map<String, RicConfig> ricConfigs = new HashMap<>(); @Getter @@ -182,4 +200,9 @@ public class ApplicationConfig { return Flux.fromIterable(modifications); } + + public boolean isS3Enabled() { + return !(Strings.isNullOrEmpty(s3EndpointOverride) || Strings.isNullOrEmpty(s3Bucket)); + } + } diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshCounterTask.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/configuration/Meters.java index 95999a04..203bfacc 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshCounterTask.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/configuration/Meters.java @@ -18,52 +18,28 @@ * ========================LICENSE_END=================================== */ -package org.onap.ccsdk.oran.a1policymanagementservice.tasks; +package org.onap.ccsdk.oran.a1policymanagementservice.configuration; import io.micrometer.core.instrument.MeterRegistry; -import java.lang.invoke.MethodHandles; -import lombok.AccessLevel; -import lombok.Getter; + import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policies; import org.onap.ccsdk.oran.a1policymanagementservice.repository.PolicyTypes; import org.onap.ccsdk.oran.a1policymanagementservice.repository.Rics; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** - * The aim is to collect statistical values from the A1 Policy Management Service. + * The aim is to collect statistical values from the A1 Policy Management + * Service. + * The counters are being updated every minute. */ @Component -public class RefreshCounterTask { - - private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - - @Autowired - private final Rics rics; - - @Autowired - private final PolicyTypes policyTypes; +public class Meters { @Autowired - private final Policies policies; - - @Autowired - @Getter(AccessLevel.PUBLIC) - private final MeterRegistry meterRegistry; - - @Autowired - public RefreshCounterTask(Rics rics, PolicyTypes policyTypes, Policies policies, MeterRegistry meterRegistry) { - this.rics = rics; - this.policyTypes = policyTypes; - this.policies = policies; - this.meterRegistry = meterRegistry; - - logger.trace("Counters have been initialized."); + public Meters(Rics rics, PolicyTypes policyTypes, Policies policies, MeterRegistry meterRegistry) { meterRegistry.gauge("total_ric_count", rics, Rics::size); meterRegistry.gauge("total_policy_type_count", policyTypes, PolicyTypes::size); meterRegistry.gauge("total_policy_count", policies, Policies::size); } - } diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v2/PolicyController.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v2/PolicyController.java index ee2cae58..500ddd2a 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v2/PolicyController.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/v2/PolicyController.java @@ -22,6 +22,7 @@ package org.onap.ccsdk.oran.a1policymanagementservice.controllers.v2; import com.google.gson.Gson; import com.google.gson.GsonBuilder; + import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.Parameter; import io.swagger.v3.oas.annotations.media.Content; @@ -29,12 +30,15 @@ import io.swagger.v3.oas.annotations.media.Schema; import io.swagger.v3.oas.annotations.responses.ApiResponse; import io.swagger.v3.oas.annotations.responses.ApiResponses; import io.swagger.v3.oas.annotations.tags.Tag; + import java.lang.invoke.MethodHandles; import java.time.Instant; import java.util.ArrayList; import java.util.Collection; import java.util.List; + import lombok.Getter; + import org.onap.ccsdk.oran.a1policymanagementservice.clients.A1ClientFactory; import org.onap.ccsdk.oran.a1policymanagementservice.controllers.VoidResponse; import org.onap.ccsdk.oran.a1policymanagementservice.exceptions.EntityNotFoundException; diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/datastore/DataStore.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/datastore/DataStore.java new file mode 100644 index 00000000..51b57dee --- /dev/null +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/datastore/DataStore.java @@ -0,0 +1,55 @@ +/*- + * ========================LICENSE_START================================= + * ONAP : ccsdk oran + * ====================================================================== + * Copyright (C) 2022 Nordix Foundation. All rights reserved. + * ====================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package org.onap.ccsdk.oran.a1policymanagementservice.datastore; + +import com.google.common.base.Strings; + +import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfig; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +public interface DataStore { + + public Flux<String> listObjects(String prefix); + + public Mono<byte[]> readObject(String name); + + public Mono<byte[]> writeObject(String name, byte[] fileData); + + public Mono<Boolean> deleteObject(String name); + + public Mono<String> createDataStore(); + + public Mono<String> deleteAllObjects(); + + public static DataStore create(ApplicationConfig appConfig, String location) { + if (appConfig.isS3Enabled()) { + return new S3ObjectStore(appConfig, location); + } else if (!Strings.isNullOrEmpty(appConfig.getVardataDirectory())) { + return new FileStore(appConfig, location); + } else { + return new NullStore(location); + } + + } + +} diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/datastore/FileStore.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/datastore/FileStore.java new file mode 100644 index 00000000..565120e1 --- /dev/null +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/datastore/FileStore.java @@ -0,0 +1,158 @@ +/*- + * ========================LICENSE_START================================= + * ONAP : ccsdk oran + * ====================================================================== + * Copyright (C) 2022 Nordix Foundation. All rights reserved. + * ====================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + + +package org.onap.ccsdk.oran.a1policymanagementservice.datastore; + +import com.google.common.base.Strings; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Stream; + +import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +class FileStore implements DataStore { + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + ApplicationConfig applicationConfig; + private final String location; + + public FileStore(ApplicationConfig applicationConfig, String location) { + this.applicationConfig = applicationConfig; + this.location = location; + } + + @Override + public Flux<String> listObjects(String prefix) { + Path root = Path.of(path().toString(), prefix); + if (!root.toFile().exists()) { + root = root.getParent(); + } + + logger.debug("Listing files in: {}", root); + + List<String> result = new ArrayList<>(); + try (Stream<Path> stream = Files.walk(root, Integer.MAX_VALUE)) { + + stream.forEach(path -> filterListFiles(path, prefix, result)); + + return Flux.fromIterable(result); + } catch (Exception e) { + logger.warn("Could not list filed in {}, reason; {}", root, e.getMessage()); + return Flux.error(e); + } + } + + private void filterListFiles(Path path, String prefix, List<String> result) { + if (path.toFile().isFile() && externalName(path).startsWith(prefix)) { + result.add(externalName(path)); + } else { + logger.trace("Ignoring file/directory {}, prefix: {}", path, prefix); + } + } + + private String externalName(Path path) { + String fullName = path.toString(); + String externalName = fullName.substring(path().toString().length()); + if (externalName.startsWith("/")) { + externalName = externalName.substring(1); + } + return externalName; + } + + @Override + public Mono<byte[]> readObject(String fileName) { + try { + byte[] contents = Files.readAllBytes(path(fileName)); + return Mono.just(contents); + } catch (Exception e) { + return Mono.error(e); + } + } + + @Override + public Mono<Boolean> deleteObject(String name) { + try { + Files.delete(path(name)); + return Mono.just(true); + } catch (Exception e) { + logger.debug("Could not delete file: {}, reason: {}", path(name), e.getMessage()); + return Mono.just(false); + } + } + + @Override + public Mono<String> createDataStore() { + try { + if (!Strings.isNullOrEmpty(applicationConfig.getVardataDirectory())) { + Files.createDirectories(path()); + } + } catch (IOException e) { + logger.error("Could not create directory: {}, reason: {}", path(), e.getMessage()); + } + return Mono.just("OK"); + } + + private Path path(String name) { + return Path.of(path().toString(), name); + } + + private Path path() { + return Path.of(applicationConfig.getVardataDirectory(), "database", this.location); + } + + @Override + public Mono<String> deleteAllObjects() { + return listObjects("") // + .flatMap(this::deleteObject) // + .collectList() // + .map(o -> "OK"); + } + + @Override + public Mono<byte[]> writeObject(String fileName, byte[] fileData) { + try { + if (!Strings.isNullOrEmpty(applicationConfig.getVardataDirectory())) { + Files.createDirectories(path(fileName).getParent()); + } + File outputFile = path(fileName).toFile(); + + try (FileOutputStream outputStream = new FileOutputStream(outputFile)) { + outputStream.write(fileData); + } + } catch (IOException e) { + logger.warn("Could not write file: {}, reason; {}", path(fileName), e.getMessage()); + } + return Mono.just(fileData); + } + +} diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/datastore/NullStore.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/datastore/NullStore.java new file mode 100644 index 00000000..1ecd8f64 --- /dev/null +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/datastore/NullStore.java @@ -0,0 +1,68 @@ +/*- + * ========================LICENSE_START================================= + * ONAP : ccsdk oran + * ====================================================================== + * Copyright (C) 2022 Nordix Foundation. All rights reserved. + * ====================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package org.onap.ccsdk.oran.a1policymanagementservice.datastore; + +import java.lang.invoke.MethodHandles; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +class NullStore implements DataStore { + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + public NullStore(String location) { + logger.warn("No storage defined for: {}", location); + } + + @Override + public Flux<String> listObjects(String prefix) { + return Flux.empty(); + } + + @Override + public Mono<byte[]> readObject(String name) { + return Mono.just(new byte[0]); + } + + @Override + public Mono<byte[]> writeObject(String name, byte[] fileData) { + return Mono.just(new byte[0]); + } + + @Override + public Mono<Boolean> deleteObject(String name) { + return Mono.just(false); + } + + @Override + public Mono<String> createDataStore() { + return Mono.just(""); + } + + @Override + public Mono<String> deleteAllObjects() { + return Mono.just(""); + } + +} diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/datastore/S3ObjectStore.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/datastore/S3ObjectStore.java new file mode 100644 index 00000000..4c7c3c3e --- /dev/null +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/datastore/S3ObjectStore.java @@ -0,0 +1,227 @@ +/*- + * ========================LICENSE_START================================= + * ONAP : ccsdk oran + * ====================================================================== + * Copyright (C) 2022 Nordix Foundation. All rights reserved. + * ====================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package org.onap.ccsdk.oran.a1policymanagementservice.datastore; + +import java.net.URI; +import java.util.concurrent.CompletableFuture; + +import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.core.BytesWrapper; +import software.amazon.awssdk.core.ResponseBytes; +import software.amazon.awssdk.core.async.AsyncRequestBody; +import software.amazon.awssdk.core.async.AsyncResponseTransformer; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.S3AsyncClientBuilder; +import software.amazon.awssdk.services.s3.model.CreateBucketRequest; +import software.amazon.awssdk.services.s3.model.CreateBucketResponse; +import software.amazon.awssdk.services.s3.model.DeleteBucketRequest; +import software.amazon.awssdk.services.s3.model.DeleteBucketResponse; +import software.amazon.awssdk.services.s3.model.DeleteObjectRequest; +import software.amazon.awssdk.services.s3.model.DeleteObjectResponse; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.GetObjectResponse; +import software.amazon.awssdk.services.s3.model.ListObjectsRequest; +import software.amazon.awssdk.services.s3.model.ListObjectsResponse; +import software.amazon.awssdk.services.s3.model.PutObjectRequest; +import software.amazon.awssdk.services.s3.model.PutObjectResponse; +import software.amazon.awssdk.services.s3.model.S3Object; + +class S3ObjectStore implements DataStore { + private static final Logger logger = LoggerFactory.getLogger(S3ObjectStore.class); + private final ApplicationConfig applicationConfig; + + private static S3AsyncClient s3AsynchClient; + private final String location; + + public S3ObjectStore(ApplicationConfig applicationConfig, String location) { + this.applicationConfig = applicationConfig; + this.location = location; + + getS3AsynchClient(applicationConfig); + } + + private static synchronized S3AsyncClient getS3AsynchClient(ApplicationConfig applicationConfig) { + if (applicationConfig.isS3Enabled() && s3AsynchClient == null) { + s3AsynchClient = getS3AsyncClientBuilder(applicationConfig).build(); + } + return s3AsynchClient; + } + + private static S3AsyncClientBuilder getS3AsyncClientBuilder(ApplicationConfig applicationConfig) { + URI uri = URI.create(applicationConfig.getS3EndpointOverride()); + return S3AsyncClient.builder() // + .region(Region.US_EAST_1) // + .endpointOverride(uri) // + .credentialsProvider(StaticCredentialsProvider.create( // + AwsBasicCredentials.create(applicationConfig.getS3AccessKeyId(), // + applicationConfig.getS3SecretAccessKey()))); + } + + @Override + public Flux<String> listObjects(String prefix) { + return listObjectsInBucket(bucket(), location + "/" + prefix).map(S3Object::key) // + .map(this::externalName); + } + + @Override + public Mono<Boolean> deleteObject(String name) { + DeleteObjectRequest request = DeleteObjectRequest.builder() // + .bucket(bucket()) // + .key(key(name)) // + .build(); + + CompletableFuture<DeleteObjectResponse> future = s3AsynchClient.deleteObject(request); + + return Mono.fromFuture(future).map(resp -> true); + } + + @Override + public Mono<byte[]> readObject(String name) { + return getDataFromS3Object(bucket(), name); + } + + @Override + public Mono<byte[]> writeObject(String name, byte[] fileData) { + + PutObjectRequest request = PutObjectRequest.builder() // + .bucket(bucket()) // + .key(key(name)) // + .build(); + + AsyncRequestBody body = AsyncRequestBody.fromBytes(fileData); + + CompletableFuture<PutObjectResponse> future = s3AsynchClient.putObject(request, body); + + return Mono.fromFuture(future) // + .map(putObjectResponse -> fileData) // + .doOnError(t -> logger.error("Failed to store object '{}' in S3 {}", key(name), t.getMessage())); + } + + @Override + public Mono<String> createDataStore() { + return createS3Bucket(bucket()); + } + + private Mono<String> createS3Bucket(String s3Bucket) { + + CreateBucketRequest request = CreateBucketRequest.builder() // + .bucket(s3Bucket) // + .build(); + + CompletableFuture<CreateBucketResponse> future = s3AsynchClient.createBucket(request); + + return Mono.fromFuture(future) // + .map(f -> s3Bucket) // + .doOnError(t -> logger.debug("Could not create S3 bucket: {}", t.getMessage())) + .onErrorResume(t -> Mono.just(s3Bucket)); + } + + @Override + public Mono<String> deleteAllObjects() { + return listObjects("") // + .flatMap(this::deleteObject) // + .collectList() // + .map(resp -> "OK").onErrorResume(t -> Mono.just("NOK")); + } + + public Mono<String> deleteBucket() { + DeleteBucketRequest request = DeleteBucketRequest.builder() // + .bucket(bucket()) // + .build(); + + CompletableFuture<DeleteBucketResponse> future = s3AsynchClient.deleteBucket(request); + + return Mono.fromFuture(future) // + .doOnError(t -> logger.warn("Could not delete bucket: {}, reason: {}", bucket(), t.getMessage())) + .map(resp -> bucket()) // + .doOnNext(resp -> logger.debug("Deleted bucket: {}", bucket())).onErrorResume(t -> Mono.just("NOK")); + } + + private String bucket() { + return applicationConfig.getS3Bucket(); + } + + private Flux<S3Object> listObjectsInBucket(String bucket, String prefix) { + + return listObjectsRequest(bucket, prefix, null) // + .expand(response -> listObjectsRequest(bucket, prefix, response)) // + .map(ListObjectsResponse::contents) // + .doOnNext(f -> logger.debug("Found objects in {}: {}", bucket, f.size())) // + .doOnError(t -> logger.warn("Error fromlist objects: {}", t.getMessage())) // + .flatMap(Flux::fromIterable) // + .doOnNext(obj -> logger.debug("Found object: {}", obj.key())); + } + + private Mono<ListObjectsResponse> listObjectsRequest(String bucket, String prefix, + ListObjectsResponse prevResponse) { + ListObjectsRequest.Builder builder = ListObjectsRequest.builder() // + .bucket(bucket) // + .maxKeys(1000) // + .prefix(prefix); + + if (prevResponse != null) { + if (Boolean.TRUE.equals(prevResponse.isTruncated())) { + builder.marker(prevResponse.nextMarker()); + } else { + return Mono.empty(); + } + } + + ListObjectsRequest listObjectsRequest = builder.build(); + CompletableFuture<ListObjectsResponse> future = s3AsynchClient.listObjects(listObjectsRequest); + return Mono.fromFuture(future); + } + + private Mono<byte[]> getDataFromS3Object(String bucket, String name) { + + GetObjectRequest request = GetObjectRequest.builder() // + .bucket(bucket) // + .key(key(name)) // + .build(); + + CompletableFuture<ResponseBytes<GetObjectResponse>> future = + s3AsynchClient.getObject(request, AsyncResponseTransformer.toBytes()); + + return Mono.fromFuture(future) // + .map(BytesWrapper::asByteArray) // + .doOnError(t -> logger.error("Failed to get file from S3, key:{}, bucket: {}, {}", key(name), bucket, + t.getMessage())) // + .doOnEach(n -> logger.debug("Read file from S3: {} {}", bucket, key(name))) // + .onErrorResume(t -> Mono.empty()); + } + + private String key(String name) { + return location + "/" + name; + } + + private String externalName(String internalName) { + return internalName.substring(key("").length()); + } + +} diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Policies.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Policies.java index ef924748..d808b57d 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Policies.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Policies.java @@ -2,7 +2,7 @@ * ========================LICENSE_START================================= * ONAP : ccsdk oran * ====================================================================== - * Copyright (C) 2019-2020 Nordix Foundation. All rights reserved. + * Copyright (C) 2019-2022 Nordix Foundation. All rights reserved. * ====================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,18 +23,14 @@ package org.onap.ccsdk.oran.a1policymanagementservice.repository; import com.google.gson.Gson; import com.google.gson.GsonBuilder; -import java.io.File; -import java.io.FileOutputStream; -import java.io.PrintStream; import java.lang.invoke.MethodHandles; -import java.nio.file.Files; -import java.nio.file.Path; import java.time.Instant; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.Vector; @@ -42,17 +38,17 @@ import lombok.Builder; import lombok.Getter; import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfig; +import org.onap.ccsdk.oran.a1policymanagementservice.datastore.DataStore; import org.onap.ccsdk.oran.a1policymanagementservice.exceptions.EntityNotFoundException; -import org.onap.ccsdk.oran.a1policymanagementservice.exceptions.ServiceException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.context.annotation.Configuration; -import org.springframework.util.FileSystemUtils; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; import reactor.util.annotation.Nullable; @SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally -@Configuration public class Policies { @Getter @@ -73,12 +69,31 @@ public class Policies { private MultiMap<Policy> policiesRic = new MultiMap<>(); private MultiMap<Policy> policiesService = new MultiMap<>(); private MultiMap<Policy> policiesType = new MultiMap<>(); + private final DataStore dataStore; private final ApplicationConfig appConfig; private static Gson gson = new GsonBuilder().create(); public Policies(@Autowired ApplicationConfig appConfig) { this.appConfig = appConfig; + this.dataStore = DataStore.create(appConfig, "policies"); + } + + public Flux<Policy> restoreFromDatabase(Ric ric, PolicyTypes types) { + return dataStore.createDataStore() // + .flatMapMany(x -> dataStore.listObjects(getPath(ric))) // + .flatMap(dataStore::readObject) // + .map(String::new) // + .map(json -> gson.fromJson(json, PersistentPolicyInfo.class)) // + .map(policyInfo -> toPolicy(policyInfo, ric, types)) // + .doOnNext(this::put) // + .filter(Objects::nonNull) // + .doOnError(t -> logger.warn("Could not restore policy database for RIC: {}, reason : {}", ric.id(), + t.getMessage())) // + .doFinally(sig -> logger.debug("Restored policy database for RIC: {}, number of policies: {}", ric.id(), + this.policiesRic.get(ric.id()).size())) // + .onErrorResume(t -> Flux.empty()) // + ; } public synchronized void put(Policy policy) { @@ -133,11 +148,7 @@ public class Policies { public synchronized void remove(Policy policy) { if (!policy.isTransient()) { - try { - Files.delete(getPath(policy)); - } catch (Exception e) { - logger.debug("Could not delete policy from database: {}", e.getMessage()); - } + dataStore.deleteObject(getPath(policy)).subscribe(); } policiesId.remove(policy.getId()); policiesRic.remove(policy.getRic().id(), policy.getId()); @@ -175,24 +186,15 @@ public class Policies { Set<String> keys = policiesId.keySet(); removeId(keys.iterator().next()); } - try { - if (this.appConfig.getVardataDirectory() != null) { - FileSystemUtils.deleteRecursively(getDatabasePath()); - } - } catch (Exception e) { - logger.warn("Could not delete policy database : {}", e.getMessage()); - } + dataStore.deleteAllObjects().onErrorResume(t -> Mono.empty()).subscribe(); } public void store(Policy policy) { - try { - Files.createDirectories(getDatabasePath(policy.getRic())); - try (PrintStream out = new PrintStream(new FileOutputStream(getFile(policy)))) { - out.print(gson.toJson(toStorageObject(policy))); - } - } catch (Exception e) { - logger.warn("Could not store policy: {} {}", policy.getId(), e.getMessage()); - } + + byte[] bytes = gson.toJson(toStorageObject(policy)).getBytes(); + this.dataStore.writeObject(this.getPath(policy), bytes) // + .doOnError(t -> logger.error("Could not store job in S3, reason: {}", t.getMessage())) // + .subscribe(); } private boolean isMatch(String filterValue, String actualValue) { @@ -218,29 +220,6 @@ public class Policies { return filtered; } - private File getFile(Policy policy) throws ServiceException { - return getPath(policy).toFile(); - } - - private Path getPath(Policy policy) throws ServiceException { - return Path.of(getDatabaseDirectory(policy.getRic()), policy.getId() + ".json"); - } - - public synchronized void restoreFromDatabase(Ric ric, PolicyTypes types) { - try { - Files.createDirectories(getDatabasePath(ric)); - for (File file : getDatabasePath(ric).toFile().listFiles()) { - String json = Files.readString(file.toPath()); - PersistentPolicyInfo policyStorage = gson.fromJson(json, PersistentPolicyInfo.class); - this.put(toPolicy(policyStorage, ric, types)); - } - logger.debug("Restored policy database for RIC: {}, number of policies: {}", ric.id(), - this.policiesRic.get(ric.id()).size()); - } catch (Exception e) { - logger.warn("Could not restore policy database for RIC: {}, reason : {}", ric.id(), e.getMessage()); - } - } - private PersistentPolicyInfo toStorageObject(Policy p) { return PersistentPolicyInfo.builder() // .id(p.getId()) // @@ -254,35 +233,30 @@ public class Policies { .build(); } - Policy toPolicy(PersistentPolicyInfo p, Ric ric, PolicyTypes types) throws EntityNotFoundException { - return Policy.builder() // - .id(p.getId()) // - .isTransient(p.isTransient()) // - .json(p.getJson()) // - .lastModified(Instant.parse(p.lastModified)) // - .ownerServiceId(p.getOwnerServiceId()) // - .ric(ric) // - .statusNotificationUri(p.getStatusNotificationUri()) // - .type(types.getType(p.getTypeId())) // - .build(); - } - - private Path getDatabasePath(Ric ric) throws ServiceException { - return Path.of(getDatabaseDirectory(ric)); + private Policy toPolicy(PersistentPolicyInfo p, Ric ric, PolicyTypes types) { + try { + return Policy.builder() // + .id(p.getId()) // + .isTransient(p.isTransient()) // + .json(p.getJson()) // + .lastModified(Instant.parse(p.lastModified)) // + .ownerServiceId(p.getOwnerServiceId()) // + .ric(ric) // + .statusNotificationUri(p.getStatusNotificationUri()) // + .type(types.getType(p.getTypeId())) // + .build(); + } catch (EntityNotFoundException e) { + logger.warn("Not found: {}", e.getMessage()); + return null; + } } - private String getDatabaseDirectory(Ric ric) throws ServiceException { - return getDatabaseDirectory() + "/" + ric.id(); + private String getPath(Policy policy) { + return getPath(policy.getRic()) + "/" + policy.getId() + ".json"; } - private String getDatabaseDirectory() throws ServiceException { - if (appConfig.getVardataDirectory() == null) { - throw new ServiceException("No database storage provided"); - } - return appConfig.getVardataDirectory() + "/database/policyInstances"; + private String getPath(Ric ric) { + return ric.id(); } - private Path getDatabasePath() throws ServiceException { - return Path.of(getDatabaseDirectory()); - } } diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/PolicyTypes.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/PolicyTypes.java index 327dee60..b4e9c654 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/PolicyTypes.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/PolicyTypes.java @@ -23,13 +23,7 @@ package org.onap.ccsdk.oran.a1policymanagementservice.repository; import com.google.gson.Gson; import com.google.gson.GsonBuilder; -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.PrintStream; import java.lang.invoke.MethodHandles; -import java.nio.file.Files; -import java.nio.file.Path; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -38,25 +32,25 @@ import java.util.Map; import java.util.Vector; import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfig; +import org.onap.ccsdk.oran.a1policymanagementservice.datastore.DataStore; import org.onap.ccsdk.oran.a1policymanagementservice.exceptions.EntityNotFoundException; import org.onap.ccsdk.oran.a1policymanagementservice.exceptions.ServiceException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.context.annotation.Configuration; import org.springframework.lang.Nullable; -import org.springframework.util.FileSystemUtils; -@Configuration +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + public class PolicyTypes { private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private Map<String, PolicyType> types = new HashMap<>(); - private final ApplicationConfig appConfig; private static Gson gson = new GsonBuilder().create(); + private final DataStore dataStore; public PolicyTypes(@Autowired ApplicationConfig appConfig) { - this.appConfig = appConfig; - restoreFromDatabase(); + this.dataStore = DataStore.create(appConfig, "policytypes"); } public synchronized PolicyType getType(String name) throws EntityNotFoundException { @@ -113,55 +107,26 @@ public class PolicyTypes { public synchronized void clear() { this.types.clear(); - try { - FileSystemUtils.deleteRecursively(getDatabasePath()); - } catch (IOException | ServiceException e) { - logger.warn("Could not delete policy type database : {}", e.getMessage()); - } + dataStore.deleteAllObjects().onErrorResume(t -> Mono.empty()).subscribe(); } public void store(PolicyType type) { - try { - Files.createDirectories(getDatabasePath()); - try (PrintStream out = new PrintStream(new FileOutputStream(getFile(type)))) { - out.print(gson.toJson(type)); - } - } catch (ServiceException e) { - logger.debug("Could not store policy type: {} {}", type.getId(), e.getMessage()); - } catch (IOException e) { - logger.warn("Could not store policy type: {} {}", type.getId(), e.getMessage()); - } + byte[] bytes = gson.toJson(type).getBytes(); + dataStore.writeObject(getPath(type), bytes) // + .doOnError(t -> logger.warn("Could not store policy type: {} {}", type.getId(), t.getMessage())) + .subscribe(); } - private File getFile(PolicyType type) throws ServiceException { - return Path.of(getDatabaseDirectory(), type.getId() + ".json").toFile(); - } + public Flux<PolicyType> restoreFromDatabase() { - void restoreFromDatabase() { - try { - Files.createDirectories(getDatabasePath()); - for (File file : getDatabasePath().toFile().listFiles()) { - String json = Files.readString(file.toPath()); - PolicyType type = gson.fromJson(json, PolicyType.class); - this.types.put(type.getId(), type); - } - logger.debug("Restored type database,no of types: {}", this.types.size()); - } catch (ServiceException e) { - logger.debug("Could not restore policy type database : {}", e.getMessage()); - } catch (Exception e) { - logger.warn("Could not restore policy type database : {}", e.getMessage()); - } - } - - private String getDatabaseDirectory() throws ServiceException { - if (appConfig.getVardataDirectory() == null) { - throw new ServiceException("No policy type storage provided"); - } - return appConfig.getVardataDirectory() + "/database/policyTypes"; - } + return this.dataStore.createDataStore().flatMapMany(x -> dataStore.listObjects("")) // + .flatMap(dataStore::readObject) // + .map(String::new) // + .map(json -> gson.fromJson(json, PolicyType.class)).doOnNext(type -> this.types.put(type.getId(), type)) // + .doOnError(t -> logger.warn("Could not restore policy type database : {}", t.getMessage())) // + .doFinally(sig -> logger.debug("Restored type database,no of types: {}", this.types.size())) + .onErrorResume(t -> Flux.empty()); // - private Path getDatabasePath() throws ServiceException { - return Path.of(getDatabaseDirectory()); } private static Collection<PolicyType> filterTypeName(Collection<PolicyType> types, String typeName) { @@ -188,4 +153,8 @@ public class PolicyTypes { return result; } + private String getPath(PolicyType type) { + return type.getId() + ".json"; + } + } diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Services.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Services.java index 68a8d7dd..9c2846a9 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Services.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Services.java @@ -2,7 +2,7 @@ * ========================LICENSE_START================================= * ONAP : ccsdk oran * ====================================================================== - * Copyright (C) 2019-2020 Nordix Foundation. All rights reserved. + * Copyright (C) 2019-2022 Nordix Foundation. All rights reserved. * ====================================================================== * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,33 +22,29 @@ package org.onap.ccsdk.oran.a1policymanagementservice.repository; import com.google.gson.Gson; -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.PrintStream; -import java.nio.file.Files; -import java.nio.file.Path; import java.util.HashMap; import java.util.Map; import java.util.Vector; import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfig; +import org.onap.ccsdk.oran.a1policymanagementservice.datastore.DataStore; import org.onap.ccsdk.oran.a1policymanagementservice.exceptions.ServiceException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.util.FileSystemUtils; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; public class Services { private static final Logger logger = LoggerFactory.getLogger(Services.class); private static Gson gson = Service.createGson(); - private final ApplicationConfig appConfig; + private final DataStore dataStore; private Map<String, Service> registeredServices = new HashMap<>(); public Services(@Autowired ApplicationConfig appConfig) { - this.appConfig = appConfig; - restoreFromDatabase(); + this.dataStore = DataStore.create(appConfig, "services"); } public synchronized Service getService(String name) throws ServiceException { @@ -77,11 +73,7 @@ public class Services { public synchronized void remove(String name) { Service service = registeredServices.remove(name); if (service != null) { - try { - Files.delete(getPath(service)); - } catch (Exception e) { - // Doesn't matter. - } + dataStore.deleteObject(getPath(service)).subscribe(); } } @@ -91,59 +83,29 @@ public class Services { public synchronized void clear() { registeredServices.clear(); - try { - FileSystemUtils.deleteRecursively(getDatabasePath()); - } catch (Exception e) { - logger.warn("Could not delete services database : {}", e.getMessage()); - } + dataStore.deleteAllObjects().onErrorResume(t -> Mono.empty()).subscribe(); } public void store(Service service) { - try { - Files.createDirectories(getDatabasePath()); - try (PrintStream out = new PrintStream(new FileOutputStream(getFile(service)))) { - String str = gson.toJson(service); - out.print(str); - } - } catch (ServiceException e) { - logger.debug("Could not store service: {} {}", service.getName(), e.getMessage()); - } catch (IOException e) { - logger.warn("Could not store pservice: {} {}", service.getName(), e.getMessage()); - } - } - - private File getFile(Service service) throws ServiceException { - return getPath(service).toFile(); + byte[] bytes = gson.toJson(service).getBytes(); + dataStore.writeObject(getPath(service), bytes) // + .doOnError(t -> logger.warn("Could not service: {} {}", service.getName(), t.getMessage())).subscribe(); } - private Path getPath(Service service) throws ServiceException { - return Path.of(getDatabaseDirectory(), service.getName() + ".json"); + public Flux<Service> restoreFromDatabase() { + return dataStore.createDataStore().flatMapMany(ds -> dataStore.listObjects("")) // + .flatMap(dataStore::readObject, 1) // + .map(String::new) // + .map(json -> gson.fromJson(json, Service.class)) + .doOnNext(service -> this.registeredServices.put(service.getName(), service)) + .doOnError(t -> logger.warn("Could not restore services database : {}", t.getMessage())) + .doFinally(sig -> logger.debug("Restored type database,no of services: {}", + this.registeredServices.size())) // + .onErrorResume(t -> Flux.empty()); // } - void restoreFromDatabase() { - try { - Files.createDirectories(getDatabasePath()); - for (File file : getDatabasePath().toFile().listFiles()) { - String json = Files.readString(file.toPath()); - Service service = gson.fromJson(json, Service.class); - this.registeredServices.put(service.getName(), service); - } - logger.debug("Restored type database,no of services: {}", this.registeredServices.size()); - } catch (ServiceException e) { - logger.debug("Could not restore services database : {}", e.getMessage()); - } catch (Exception e) { - logger.warn("Could not restore services database : {}", e.getMessage()); - } + private String getPath(Service service) { + return service.getName() + ".json"; } - private String getDatabaseDirectory() throws ServiceException { - if (appConfig.getVardataDirectory() == null) { - throw new ServiceException("No storage provided"); - } - return appConfig.getVardataDirectory() + "/database/services"; - } - - private Path getDatabasePath() throws ServiceException { - return Path.of(getDatabaseDirectory()); - } } diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java index 983e92e6..567bb8d2 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java @@ -105,7 +105,7 @@ public class RefreshConfigTask { refreshTask = createRefreshTask() // .subscribe( notUsed -> logger.debug("Refreshed configuration data"), throwable -> logger - .error("Configuration refresh terminated due to exception {}", throwable.toString()), + .error("Configuration refresh terminated due to exception {}", throwable.getMessage()), () -> logger.error("Configuration refresh terminated")); } @@ -128,6 +128,7 @@ public class RefreshConfigTask { .flatMap(this::parseConfiguration) // .flatMap(this::updateConfig, CONCURRENCY) // .flatMap(this::handleUpdatedRicConfig) // + .doOnError(t -> logger.error("Cannot update config {}", t.getMessage())) .doFinally(signal -> logger.error("Configuration refresh task is terminated: {}", signal)); } @@ -200,7 +201,7 @@ public class RefreshConfigTask { void addRic(Ric ric) { this.rics.put(ric); if (this.appConfig.getVardataDirectory() != null) { - this.policies.restoreFromDatabase(ric, this.policyTypes); + this.policies.restoreFromDatabase(ric, this.policyTypes).subscribe(); } logger.debug("Added RIC: {}", ric.id()); } diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java index 7a5f73d4..fdeb47e2 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java @@ -21,6 +21,7 @@ package org.onap.ccsdk.oran.a1policymanagementservice.tasks; import java.util.Collection; + import org.onap.ccsdk.oran.a1policymanagementservice.clients.A1Client; import org.onap.ccsdk.oran.a1policymanagementservice.clients.A1ClientFactory; import org.onap.ccsdk.oran.a1policymanagementservice.clients.AsyncRestClientFactory; @@ -40,6 +41,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; + import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; |