summaryrefslogtreecommitdiffstats
path: root/a1-policy-management/src/main/java
diff options
context:
space:
mode:
Diffstat (limited to 'a1-policy-management/src/main/java')
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/Application.java68
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/BeanFactory.java97
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/SwaggerConfig.java93
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/aspect/LogAspect.java63
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/A1Client.java60
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/A1ClientFactory.java140
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/A1UriBuilder.java32
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/AsyncRestClient.java334
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/OscA1Client.java213
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/SdncJsonHelper.java120
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/SdncOnapA1Client.java194
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/SdncOscA1Client.java287
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/StdA1ClientVersion1.java148
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/configuration/ApplicationConfig.java155
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/configuration/ApplicationConfigParser.java189
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/configuration/AsyncConfiguration.java45
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/configuration/ControllerConfig.java38
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/configuration/RicConfig.java37
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/configuration/WebClientConfig.java45
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/PolicyController.java482
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/PolicyInfo.java57
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/RicInfo.java51
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/RicRepositoryController.java109
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/ServiceController.java184
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/ServiceRegistrationInfo.java61
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/ServiceStatus.java51
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/StatusController.java48
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageConsumer.java172
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageHandler.java157
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapRequestMessage.java55
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapResponseMessage.java43
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/exceptions/EnvironmentLoaderException.java30
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/exceptions/ServiceException.java34
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Lock.java197
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Policies.java131
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Policy.java42
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/PolicyType.java32
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/PolicyTypes.java64
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Ric.java163
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Rics.java77
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Service.java62
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Services.java69
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/EnvironmentProcessor.java91
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java263
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java212
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java214
-rw-r--r--a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/ServiceSupervision.java124
47 files changed, 5633 insertions, 0 deletions
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/Application.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/Application.java
new file mode 100644
index 00000000..ae114f67
--- /dev/null
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/Application.java
@@ -0,0 +1,68 @@
+/*-
+ * ========================LICENSE_START=================================
+ * ONAP : ccsdk oran
+ * ======================================================================
+ * Copyright (C) 2019-2020 Nordix Foundation. All rights reserved.
+ * ======================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.onap.ccsdk.oran.a1policymanagementservice;
+
+import org.onap.ccsdk.oran.a1policymanagementservice.dmaap.DmaapMessageConsumer;
+import org.onap.ccsdk.oran.a1policymanagementservice.tasks.RefreshConfigTask;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.CommandLineRunner;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.annotation.Bean;
+
+@SpringBootApplication
+public class Application {
+
+ @Autowired
+ private RefreshConfigTask configRefresh;
+
+ @Autowired
+ private DmaapMessageConsumer dmaapMessageConsumer;
+
+ public static void main(String[] args) {
+ SpringApplication.run(Application.class);
+ }
+
+ /**
+ * Starts the configuration refresh task and reads the configuration.
+ *
+ * @param ctx the application context.
+ *
+ * @return the command line runner for the configuration refresh task.
+ */
+ @Bean
+ public CommandLineRunner configRefreshRunner(ApplicationContext ctx) {
+ return args -> configRefresh.start();
+ }
+
+ /**
+ * Starts the DMaaP message consumer service.
+ *
+ * @param ctx the application context.
+ *
+ * @return the command line runner for the DMaaP message consumer service.
+ */
+ @Bean
+ public CommandLineRunner dmaapMessageConsumerRunner(ApplicationContext ctx) {
+ return args -> dmaapMessageConsumer.start();
+ }
+}
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/BeanFactory.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/BeanFactory.java
new file mode 100644
index 00000000..54a5edaa
--- /dev/null
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/BeanFactory.java
@@ -0,0 +1,97 @@
+/*-
+ * ========================LICENSE_START=================================
+ * ONAP : ccsdk oran
+ * ======================================================================
+ * Copyright (C) 2019-2020 Nordix Foundation. All rights reserved.
+ * ======================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.onap.ccsdk.oran.a1policymanagementservice;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.apache.catalina.connector.Connector;
+import org.onap.ccsdk.oran.a1policymanagementservice.clients.A1ClientFactory;
+import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfig;
+import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policies;
+import org.onap.ccsdk.oran.a1policymanagementservice.repository.PolicyTypes;
+import org.onap.ccsdk.oran.a1policymanagementservice.repository.Rics;
+import org.onap.ccsdk.oran.a1policymanagementservice.repository.Services;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.web.embedded.tomcat.TomcatServletWebServerFactory;
+import org.springframework.boot.web.servlet.server.ServletWebServerFactory;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+class BeanFactory {
+ private final ApplicationConfig applicationConfig = new ApplicationConfig();
+
+ @Value("${server.http-port}")
+ private int httpPort = 0;
+
+ @Bean
+ public Policies getPolicies() {
+ return new Policies();
+ }
+
+ @Bean
+ public PolicyTypes getPolicyTypes() {
+ return new PolicyTypes();
+ }
+
+ @Bean
+ public Rics getRics() {
+ return new Rics();
+ }
+
+ @Bean
+ public ApplicationConfig getApplicationConfig() {
+ return this.applicationConfig;
+ }
+
+ @Bean
+ Services getServices() {
+ return new Services();
+ }
+
+ @Bean
+ A1ClientFactory getA1ClientFactory() {
+ return new A1ClientFactory(this.applicationConfig);
+ }
+
+ @Bean
+ public ObjectMapper mapper() {
+ return new ObjectMapper();
+ }
+
+ @Bean
+ public ServletWebServerFactory servletContainer() {
+ TomcatServletWebServerFactory tomcat = new TomcatServletWebServerFactory();
+ if (httpPort > 0) {
+ tomcat.addAdditionalTomcatConnectors(getHttpConnector(httpPort));
+ }
+ return tomcat;
+ }
+
+ private static Connector getHttpConnector(int httpPort) {
+ Connector connector = new Connector(TomcatServletWebServerFactory.DEFAULT_PROTOCOL);
+ connector.setScheme("http");
+ connector.setPort(httpPort);
+ connector.setSecure(false);
+ return connector;
+ }
+
+}
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/SwaggerConfig.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/SwaggerConfig.java
new file mode 100644
index 00000000..7e25ba13
--- /dev/null
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/SwaggerConfig.java
@@ -0,0 +1,93 @@
+/*-
+ * ========================LICENSE_START=================================
+ * ONAP : ccsdk oran
+ * ======================================================================
+ * Copyright (C) 2019-2020 Nordix Foundation. All rights reserved.
+ * ======================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.onap.ccsdk.oran.a1policymanagementservice;
+
+import com.google.common.base.Predicates;
+
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.web.servlet.config.annotation.ResourceHandlerRegistry;
+import org.springframework.web.servlet.config.annotation.WebMvcConfigurationSupport;
+
+import springfox.documentation.builders.ApiInfoBuilder;
+import springfox.documentation.builders.PathSelectors;
+import springfox.documentation.builders.RequestHandlerSelectors;
+import springfox.documentation.service.ApiInfo;
+import springfox.documentation.spi.DocumentationType;
+import springfox.documentation.spring.web.plugins.Docket;
+import springfox.documentation.swagger2.annotations.EnableSwagger2;
+
+/**
+ * Swagger configuration class that uses swagger2 documentation type and scans
+ * all the controllers under org.onap.ccsdk.oran.a1policymanagementservice.controllers package. To
+ * access the swagger gui go to http://ip:port/swagger-ui.html
+ *
+ */
+@Configuration
+@EnableSwagger2
+public class SwaggerConfig extends WebMvcConfigurationSupport {
+
+ static final String API_TITLE = "A1 Policy management service";
+ static final String DESCRIPTION = "This page lists all the rest apis for the service.";
+ static final String VERSION = "1.0";
+ @SuppressWarnings("squid:S1075") // Refactor your code to get this URI from a customizable parameter.
+ static final String RESOURCES_PATH = "classpath:/META-INF/resources/";
+ static final String WEBJARS_PATH = RESOURCES_PATH + "webjars/";
+ static final String SWAGGER_UI = "swagger-ui.html";
+ static final String WEBJARS = "/webjars/**";
+
+ /**
+ * Gets the API info.
+ *
+ * @return the API info.
+ */
+ @Bean
+ public Docket api() {
+ return new Docket(DocumentationType.SWAGGER_2) //
+ .apiInfo(apiInfo()) //
+ .select() //
+ .apis(RequestHandlerSelectors.any()) //
+ .paths(PathSelectors.any()) //
+ .paths(Predicates.not(PathSelectors.regex("/error"))) //
+ // this endpoint is not implemented, but was visible for Swagger
+ .paths(Predicates.not(PathSelectors.regex("/actuator.*"))) //
+ // this endpoint is implemented by spring framework, exclude for now
+ .build();
+ }
+
+ private static ApiInfo apiInfo() {
+ return new ApiInfoBuilder() //
+ .title(API_TITLE) //
+ .description(DESCRIPTION) //
+ .version(VERSION) //
+ .build();
+ }
+
+ @Override
+ protected void addResourceHandlers(ResourceHandlerRegistry registry) {
+ registry.addResourceHandler(SWAGGER_UI) //
+ .addResourceLocations(RESOURCES_PATH);
+
+ registry.addResourceHandler(WEBJARS) //
+ .addResourceLocations(WEBJARS_PATH);
+ }
+
+}
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/aspect/LogAspect.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/aspect/LogAspect.java
new file mode 100644
index 00000000..16f4a7f8
--- /dev/null
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/aspect/LogAspect.java
@@ -0,0 +1,63 @@
+/*-
+ * ========================LICENSE_START=================================
+ * ONAP : ccsdk oran
+ * ======================================================================
+ * Copyright (C) 2020 Nordix Foundation. All rights reserved.
+ * ======================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.onap.ccsdk.oran.a1policymanagementservice.aspect;
+
+import org.aspectj.lang.JoinPoint;
+import org.aspectj.lang.ProceedingJoinPoint;
+import org.aspectj.lang.annotation.After;
+import org.aspectj.lang.annotation.Around;
+import org.aspectj.lang.annotation.Aspect;
+import org.aspectj.lang.annotation.Before;
+import org.aspectj.lang.reflect.MethodSignature;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+import org.springframework.util.StopWatch;
+
+@Aspect
+@Component
+public class LogAspect {
+
+ private static final Logger logger = LoggerFactory.getLogger(LogAspect.class);
+
+ @Around("execution(* org.onap.ccsdk.oran.a1policymanagementservice..*(..)))")
+ public void executimeTime(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
+ final StopWatch stopWatch = new StopWatch();
+ stopWatch.start();
+ proceedingJoinPoint.proceed();
+ stopWatch.stop();
+ MethodSignature methodSignature = (MethodSignature) proceedingJoinPoint.getSignature();
+ String className = methodSignature.getDeclaringType().getSimpleName();
+ String methodName = methodSignature.getName();
+ logger.trace("Execution time of {}.{}: {} ms", className, methodName, stopWatch.getTotalTimeMillis());
+ }
+
+ @Before("execution(* org.onap.ccsdk.oran.a1policymanagementservice..*(..)))")
+ public void entryLog(final JoinPoint joinPoint) {
+ logger.trace("Entering method: {}", joinPoint.getSignature().getName());
+ }
+
+ @After("execution(* org.onap.ccsdk.oran.a1policymanagementservice..*(..)))")
+ public void exitLog(final JoinPoint joinPoint) {
+ logger.trace("Exiting method: {}", joinPoint.getSignature().getName());
+ }
+
+}
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/A1Client.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/A1Client.java
new file mode 100644
index 00000000..6f1bf357
--- /dev/null
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/A1Client.java
@@ -0,0 +1,60 @@
+/*-
+ * ========================LICENSE_START=================================
+ * ONAP : ccsdk oran
+ * ======================================================================
+ * Copyright (C) 2019-2020 Nordix Foundation. All rights reserved.
+ * ======================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.onap.ccsdk.oran.a1policymanagementservice.clients;
+
+import java.util.List;
+
+import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policy;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+/**
+ * Common interface for 'A1' Policy access. Implementations of this interface
+ * adapts to the different southbound REST APIs supported.
+ */
+public interface A1Client {
+
+ public enum A1ProtocolType {
+ UNKNOWN, //
+ STD_V1_1, // STD A1 version 1.1
+ OSC_V1, // OSC 'A1'
+ SDNC_OSC_STD_V1_1, // SDNC_OSC with STD A1 version 1.1 southbound
+ SDNC_OSC_OSC_V1, // SDNC_OSC with OSC 'A1' southbound
+ SDNC_ONAP
+ }
+
+ public Mono<A1ProtocolType> getProtocolVersion();
+
+ public Mono<List<String>> getPolicyTypeIdentities();
+
+ public Mono<List<String>> getPolicyIdentities();
+
+ public Mono<String> getPolicyTypeSchema(String policyTypeId);
+
+ public Mono<String> putPolicy(Policy policy);
+
+ public Mono<String> deletePolicy(Policy policy);
+
+ public Flux<String> deleteAllPolicies();
+
+ public Mono<String> getPolicyStatus(Policy policy);
+}
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/A1ClientFactory.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/A1ClientFactory.java
new file mode 100644
index 00000000..f8f79e53
--- /dev/null
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/A1ClientFactory.java
@@ -0,0 +1,140 @@
+/*-
+ * ========================LICENSE_START=================================
+ * ONAP : ccsdk oran
+ * ======================================================================
+ * Copyright (C) 2020 Nordix Foundation. All rights reserved.
+ * ======================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.onap.ccsdk.oran.a1policymanagementservice.clients;
+
+import lombok.Getter;
+
+import org.onap.ccsdk.oran.a1policymanagementservice.clients.A1Client.A1ProtocolType;
+import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfig;
+import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ControllerConfig;
+import org.onap.ccsdk.oran.a1policymanagementservice.exceptions.ServiceException;
+import org.onap.ccsdk.oran.a1policymanagementservice.repository.Ric;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import reactor.core.publisher.Mono;
+
+/**
+ * Factory for A1 clients that supports four different protocol versions of the
+ * A1 api.
+ */
+public class A1ClientFactory {
+
+ private static final Logger logger = LoggerFactory.getLogger(A1ClientFactory.class);
+
+ @Getter
+ private final ApplicationConfig appConfig;
+
+ @Autowired
+ public A1ClientFactory(ApplicationConfig appConfig) {
+ this.appConfig = appConfig;
+ }
+
+ /**
+ * Creates an A1 client with the correct A1 protocol for the provided Ric.
+ *
+ * <p>
+ * It detects the protocol version by trial and error, since there is no
+ * getVersion method specified in the A1 api yet.
+ *
+ * <p>
+ * As a side effect it also sets the protocol version in the provided Ric. This
+ * means that after the first successful creation it won't have to try which
+ * protocol to use, but can create the client directly.
+ *
+ * @param ric The RIC to get a client for.
+ * @return a client with the correct protocol, or a ServiceException if none of
+ * the protocols are supported by the Ric.
+ */
+ public Mono<A1Client> createA1Client(Ric ric) {
+ return getProtocolVersion(ric) //
+ .flatMap(version -> createA1ClientMono(ric, version));
+ }
+
+ A1Client createClient(Ric ric, A1ProtocolType version) throws ServiceException {
+ if (version == A1ProtocolType.STD_V1_1) {
+ assertNoControllerConfig(ric, version);
+ return new StdA1ClientVersion1(ric.getConfig(), this.appConfig.getWebClientConfig());
+ } else if (version == A1ProtocolType.OSC_V1) {
+ assertNoControllerConfig(ric, version);
+ return new OscA1Client(ric.getConfig(), this.appConfig.getWebClientConfig());
+ } else if (version == A1ProtocolType.SDNC_OSC_STD_V1_1 || version == A1ProtocolType.SDNC_OSC_OSC_V1) {
+ return new SdncOscA1Client(version, ric.getConfig(), getControllerConfig(ric),
+ this.appConfig.getWebClientConfig());
+ } else if (version == A1ProtocolType.SDNC_ONAP) {
+ return new SdncOnapA1Client(ric.getConfig(), getControllerConfig(ric), this.appConfig.getWebClientConfig());
+ } else {
+ logger.error("Unhandled protocol: {}", version);
+ throw new ServiceException("Unhandled protocol");
+ }
+ }
+
+ private ControllerConfig getControllerConfig(Ric ric) throws ServiceException {
+ String controllerName = ric.getConfig().controllerName();
+ if (controllerName.isEmpty()) {
+ ric.setProtocolVersion(A1ProtocolType.UNKNOWN);
+ throw new ServiceException("No controller configured for RIC: " + ric.name());
+ }
+ try {
+ return this.appConfig.getControllerConfig(controllerName);
+ } catch (ServiceException e) {
+ ric.setProtocolVersion(A1ProtocolType.UNKNOWN);
+ throw e;
+ }
+ }
+
+ private void assertNoControllerConfig(Ric ric, A1ProtocolType version) throws ServiceException {
+ if (!ric.getConfig().controllerName().isEmpty()) {
+ ric.setProtocolVersion(A1ProtocolType.UNKNOWN);
+ throw new ServiceException(
+ "Controller config should be empty, ric: " + ric.name() + " when using protocol version: " + version);
+ }
+ }
+
+ private Mono<A1Client> createA1ClientMono(Ric ric, A1ProtocolType version) {
+ try {
+ return Mono.just(createClient(ric, version));
+ } catch (ServiceException e) {
+ return Mono.error(e);
+ }
+ }
+
+ private Mono<A1Client.A1ProtocolType> getProtocolVersion(Ric ric) {
+ if (ric.getProtocolVersion() == A1ProtocolType.UNKNOWN) {
+ return fetchVersion(ric, A1ProtocolType.STD_V1_1) //
+ .onErrorResume(notUsed -> fetchVersion(ric, A1ProtocolType.OSC_V1)) //
+ .onErrorResume(notUsed -> fetchVersion(ric, A1ProtocolType.SDNC_OSC_STD_V1_1)) //
+ .onErrorResume(notUsed -> fetchVersion(ric, A1ProtocolType.SDNC_ONAP)) //
+ .doOnNext(ric::setProtocolVersion)
+ .doOnNext(version -> logger.debug("Established protocol version:{} for Ric: {}", version, ric.name())) //
+ .doOnError(notUsed -> logger.warn("Could not get protocol version from RIC: {}", ric.name())) //
+ .onErrorResume(
+ notUsed -> Mono.error(new ServiceException("Protocol negotiation failed for " + ric.name())));
+ } else {
+ return Mono.just(ric.getProtocolVersion());
+ }
+ }
+
+ private Mono<A1ProtocolType> fetchVersion(Ric ric, A1ProtocolType protocolType) {
+ return createA1ClientMono(ric, protocolType) //
+ .flatMap(A1Client::getProtocolVersion);
+ }
+}
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/A1UriBuilder.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/A1UriBuilder.java
new file mode 100644
index 00000000..d29226c8
--- /dev/null
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/A1UriBuilder.java
@@ -0,0 +1,32 @@
+/*-
+ * ========================LICENSE_START=================================
+ * ONAP : ccsdk oran
+ * ======================================================================
+ * Copyright (C) 2020 Nordix Foundation. All rights reserved.
+ * ======================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.onap.ccsdk.oran.a1policymanagementservice.clients;
+
+/**
+ * Builder for A1 influenced REST APIs
+ */
+interface A1UriBuilder {
+ String createPutPolicyUri(String type, String policyId);
+
+ String createDeleteUri(String type, String policyId);
+
+ String createGetPolicyStatusUri(String type, String policyId);
+}
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/AsyncRestClient.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/AsyncRestClient.java
new file mode 100644
index 00000000..55532e6c
--- /dev/null
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/AsyncRestClient.java
@@ -0,0 +1,334 @@
+/*-
+ * ========================LICENSE_START=================================
+ * ONAP : ccsdk oran
+ * ======================================================================
+ * Copyright (C) 2019-2020 Nordix Foundation. All rights reserved.
+ * ======================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.onap.ccsdk.oran.a1policymanagementservice.clients;
+
+import io.netty.channel.ChannelOption;
+import io.netty.handler.ssl.SslContext;
+import io.netty.handler.ssl.SslContextBuilder;
+import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
+import io.netty.handler.timeout.ReadTimeoutHandler;
+import io.netty.handler.timeout.WriteTimeoutHandler;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.invoke.MethodHandles;
+import java.security.KeyStore;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.cert.Certificate;
+import java.security.cert.CertificateException;
+import java.security.cert.X509Certificate;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+import javax.net.ssl.KeyManagerFactory;
+
+import org.onap.ccsdk.oran.a1policymanagementservice.configuration.WebClientConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.http.MediaType;
+import org.springframework.http.ResponseEntity;
+import org.springframework.http.client.reactive.ReactorClientHttpConnector;
+import org.springframework.lang.Nullable;
+import org.springframework.util.ResourceUtils;
+import org.springframework.web.reactive.function.client.ExchangeStrategies;
+import org.springframework.web.reactive.function.client.WebClient;
+import org.springframework.web.reactive.function.client.WebClient.RequestHeadersSpec;
+import org.springframework.web.reactive.function.client.WebClientResponseException;
+
+import reactor.core.publisher.Mono;
+import reactor.netty.http.client.HttpClient;
+import reactor.netty.resources.ConnectionProvider;
+import reactor.netty.tcp.TcpClient;
+
+/**
+ * Generic reactive REST client.
+ */
+public class AsyncRestClient {
+ private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ private WebClient webClient = null;
+ private final String baseUrl;
+ private static final AtomicInteger sequenceNumber = new AtomicInteger();
+ private final WebClientConfig clientConfig;
+ static KeyStore clientTrustStore = null;
+ private boolean sslEnabled = true;
+
+ public AsyncRestClient(String baseUrl) {
+ this(baseUrl, null);
+ this.sslEnabled = false;
+ }
+
+ public AsyncRestClient(String baseUrl, WebClientConfig config) {
+ this.baseUrl = baseUrl;
+ this.clientConfig = config;
+ }
+
+ public Mono<ResponseEntity<String>> postForEntity(String uri, @Nullable String body) {
+ Object traceTag = createTraceTag();
+ logger.debug("{} POST uri = '{}{}''", traceTag, baseUrl, uri);
+ logger.trace("{} POST body: {}", traceTag, body);
+ Mono<String> bodyProducer = body != null ? Mono.just(body) : Mono.empty();
+ return getWebClient() //
+ .flatMap(client -> {
+ RequestHeadersSpec<?> request = client.post() //
+ .uri(uri) //
+ .contentType(MediaType.APPLICATION_JSON) //
+ .body(bodyProducer, String.class);
+ return retrieve(traceTag, request);
+ });
+ }
+
+ public Mono<String> post(String uri, @Nullable String body) {
+ return postForEntity(uri, body) //
+ .flatMap(this::toBody);
+ }
+
+ public Mono<String> postWithAuthHeader(String uri, String body, String username, String password) {
+ Object traceTag = createTraceTag();
+ logger.debug("{} POST (auth) uri = '{}{}''", traceTag, baseUrl, uri);
+ logger.trace("{} POST body: {}", traceTag, body);
+ return getWebClient() //
+ .flatMap(client -> {
+ RequestHeadersSpec<?> request = client.post() //
+ .uri(uri) //
+ .headers(headers -> headers.setBasicAuth(username, password)) //
+ .contentType(MediaType.APPLICATION_JSON) //
+ .bodyValue(body);
+ return retrieve(traceTag, request) //
+ .flatMap(this::toBody);
+ });
+ }
+
+ public Mono<ResponseEntity<String>> putForEntity(String uri, String body) {
+ Object traceTag = createTraceTag();
+ logger.debug("{} PUT uri = '{}{}''", traceTag, baseUrl, uri);
+ logger.trace("{} PUT body: {}", traceTag, body);
+ return getWebClient() //
+ .flatMap(client -> {
+ RequestHeadersSpec<?> request = client.put() //
+ .uri(uri) //
+ .contentType(MediaType.APPLICATION_JSON) //
+ .bodyValue(body);
+ return retrieve(traceTag, request);
+ });
+ }
+
+ public Mono<ResponseEntity<String>> putForEntity(String uri) {
+ Object traceTag = createTraceTag();
+ logger.debug("{} PUT uri = '{}{}''", traceTag, baseUrl, uri);
+ logger.trace("{} PUT body: <empty>", traceTag);
+ return getWebClient() //
+ .flatMap(client -> {
+ RequestHeadersSpec<?> request = client.put() //
+ .uri(uri);
+ return retrieve(traceTag, request);
+ });
+ }
+
+ public Mono<String> put(String uri, String body) {
+ return putForEntity(uri, body) //
+ .flatMap(this::toBody);
+ }
+
+ public Mono<ResponseEntity<String>> getForEntity(String uri) {
+ Object traceTag = createTraceTag();
+ logger.debug("{} GET uri = '{}{}''", traceTag, baseUrl, uri);
+ return getWebClient() //
+ .flatMap(client -> {
+ RequestHeadersSpec<?> request = client.get().uri(uri);
+ return retrieve(traceTag, request);
+ });
+ }
+
+ public Mono<String> get(String uri) {
+ return getForEntity(uri) //
+ .flatMap(this::toBody);
+ }
+
+ public Mono<ResponseEntity<String>> deleteForEntity(String uri) {
+ Object traceTag = createTraceTag();
+ logger.debug("{} DELETE uri = '{}{}''", traceTag, baseUrl, uri);
+ return getWebClient() //
+ .flatMap(client -> {
+ RequestHeadersSpec<?> request = client.delete().uri(uri);
+ return retrieve(traceTag, request);
+ });
+ }
+
+ public Mono<String> delete(String uri) {
+ return deleteForEntity(uri) //
+ .flatMap(this::toBody);
+ }
+
+ private Mono<ResponseEntity<String>> retrieve(Object traceTag, RequestHeadersSpec<?> request) {
+ final Class<String> clazz = String.class;
+ return request.retrieve() //
+ .toEntity(clazz) //
+ .doOnNext(entity -> logger.trace("{} Received: {}", traceTag, entity.getBody())) //
+ .doOnError(throwable -> onHttpError(traceTag, throwable));
+ }
+
+ private static Object createTraceTag() {
+ return sequenceNumber.incrementAndGet();
+ }
+
+ private void onHttpError(Object traceTag, Throwable t) {
+ if (t instanceof WebClientResponseException) {
+ WebClientResponseException exception = (WebClientResponseException) t;
+ logger.debug("{} HTTP error status = '{}', body '{}'", traceTag, exception.getStatusCode(),
+ exception.getResponseBodyAsString());
+ } else {
+ logger.debug("{} HTTP error", traceTag, t);
+ }
+ }
+
+ private Mono<String> toBody(ResponseEntity<String> entity) {
+ if (entity.getBody() == null) {
+ return Mono.just("");
+ } else {
+ return Mono.just(entity.getBody());
+ }
+ }
+
+ private boolean isCertificateEntry(KeyStore trustStore, String alias) {
+ try {
+ return trustStore.isCertificateEntry(alias);
+ } catch (KeyStoreException e) {
+ logger.error("Error reading truststore {}", e.getMessage());
+ return false;
+ }
+ }
+
+ private Certificate getCertificate(KeyStore trustStore, String alias) {
+ try {
+ return trustStore.getCertificate(alias);
+ } catch (KeyStoreException e) {
+ logger.error("Error reading truststore {}", e.getMessage());
+ return null;
+ }
+ }
+
+ private static synchronized KeyStore getTrustStore(String trustStorePath, String trustStorePass)
+ throws NoSuchAlgorithmException, CertificateException, IOException, KeyStoreException {
+ if (clientTrustStore == null) {
+ KeyStore store = KeyStore.getInstance(KeyStore.getDefaultType());
+ store.load(new FileInputStream(ResourceUtils.getFile(trustStorePath)), trustStorePass.toCharArray());
+ clientTrustStore = store;
+ }
+ return clientTrustStore;
+ }
+
+ private SslContext createSslContextRejectingUntrustedPeers(String trustStorePath, String trustStorePass,
+ KeyManagerFactory keyManager)
+ throws NoSuchAlgorithmException, CertificateException, IOException, KeyStoreException {
+
+ final KeyStore trustStore = getTrustStore(trustStorePath, trustStorePass);
+ List<Certificate> certificateList = Collections.list(trustStore.aliases()).stream() //
+ .filter(alias -> isCertificateEntry(trustStore, alias)) //
+ .map(alias -> getCertificate(trustStore, alias)) //
+ .collect(Collectors.toList());
+ final X509Certificate[] certificates = certificateList.toArray(new X509Certificate[certificateList.size()]);
+
+ return SslContextBuilder.forClient() //
+ .keyManager(keyManager) //
+ .trustManager(certificates) //
+ .build();
+ }
+
+ private SslContext createSslContext(KeyManagerFactory keyManager)
+ throws NoSuchAlgorithmException, CertificateException, KeyStoreException, IOException {
+ if (this.clientConfig.isTrustStoreUsed()) {
+ return createSslContextRejectingUntrustedPeers(this.clientConfig.trustStore(),
+ this.clientConfig.trustStorePassword(), keyManager);
+ } else {
+ // Trust anyone
+ return SslContextBuilder.forClient() //
+ .keyManager(keyManager) //
+ .trustManager(InsecureTrustManagerFactory.INSTANCE) //
+ .build();
+ }
+ }
+
+ private TcpClient createTcpClientSecure(SslContext sslContext) {
+ return TcpClient.create(ConnectionProvider.newConnection()) //
+ .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10_000) //
+ .secure(c -> c.sslContext(sslContext)) //
+ .doOnConnected(connection -> {
+ connection.addHandlerLast(new ReadTimeoutHandler(30));
+ connection.addHandlerLast(new WriteTimeoutHandler(30));
+ });
+ }
+
+ private TcpClient createTcpClientInsecure() {
+ return TcpClient.create(ConnectionProvider.newConnection()) //
+ .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10_000) //
+ .doOnConnected(connection -> {
+ connection.addHandlerLast(new ReadTimeoutHandler(30));
+ connection.addHandlerLast(new WriteTimeoutHandler(30));
+ });
+ }
+
+ private WebClient createWebClient(String baseUrl, TcpClient tcpClient) {
+ HttpClient httpClient = HttpClient.from(tcpClient);
+ ReactorClientHttpConnector connector = new ReactorClientHttpConnector(httpClient);
+ ExchangeStrategies exchangeStrategies = ExchangeStrategies.builder() //
+ .codecs(configurer -> configurer.defaultCodecs().maxInMemorySize(-1)) //
+ .build();
+ return WebClient.builder() //
+ .clientConnector(connector) //
+ .baseUrl(baseUrl) //
+ .exchangeStrategies(exchangeStrategies) //
+ .build();
+ }
+
+ private Mono<WebClient> getWebClient() {
+ if (this.webClient == null) {
+ try {
+ if (this.sslEnabled) {
+ final KeyManagerFactory keyManager =
+ KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
+ final KeyStore keyStore = KeyStore.getInstance(this.clientConfig.keyStoreType());
+ final String keyStoreFile = this.clientConfig.keyStore();
+ final String keyStorePassword = this.clientConfig.keyStorePassword();
+ final String keyPassword = this.clientConfig.keyPassword();
+ try (final InputStream inputStream = new FileInputStream(keyStoreFile)) {
+ keyStore.load(inputStream, keyStorePassword.toCharArray());
+ }
+ keyManager.init(keyStore, keyPassword.toCharArray());
+ SslContext sslContext = createSslContext(keyManager);
+ TcpClient tcpClient = createTcpClientSecure(sslContext);
+ this.webClient = createWebClient(this.baseUrl, tcpClient);
+ } else {
+ TcpClient tcpClient = createTcpClientInsecure();
+ this.webClient = createWebClient(this.baseUrl, tcpClient);
+ }
+ } catch (Exception e) {
+ logger.error("Could not create WebClient {}", e.getMessage());
+ return Mono.error(e);
+ }
+ }
+ return Mono.just(this.webClient);
+ }
+
+}
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/OscA1Client.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/OscA1Client.java
new file mode 100644
index 00000000..4f26b860
--- /dev/null
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/OscA1Client.java
@@ -0,0 +1,213 @@
+/*-
+ * ========================LICENSE_START=================================
+ * ONAP : ccsdk oran
+ * ======================================================================
+ * Copyright (C) 2020 Nordix Foundation. All rights reserved.
+ * ======================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.onap.ccsdk.oran.a1policymanagementservice.clients;
+
+import java.lang.invoke.MethodHandles;
+import java.util.List;
+
+import org.json.JSONObject;
+import org.onap.ccsdk.oran.a1policymanagementservice.configuration.RicConfig;
+import org.onap.ccsdk.oran.a1policymanagementservice.configuration.WebClientConfig;
+import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+/**
+ * Client for accessing OSC A1 REST API
+ */
+@SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
+public class OscA1Client implements A1Client {
+ static final int CONCURRENCY_RIC = 1; // How may paralell requests that is sent to one NearRT RIC
+
+ public static class UriBuilder implements A1UriBuilder {
+ private final RicConfig ricConfig;
+
+ public UriBuilder(RicConfig ricConfig) {
+ this.ricConfig = ricConfig;
+ }
+
+ @Override
+ public String createPutPolicyUri(String type, String policyId) {
+ return createPolicyUri(type, policyId);
+ }
+
+ /**
+ * /a1-p/policytypes/{policy_type_id}/policies
+ */
+ public String createGetPolicyIdsUri(String type) {
+ return createPolicyTypeUri(type) + "/policies";
+ }
+
+ @Override
+ public String createDeleteUri(String type, String policyId) {
+ return createPolicyUri(type, policyId);
+ }
+
+ /**
+ * ​/a1-p​/policytypes​/{policy_type_id}​/policies​/{policy_instance_id}​/status
+ */
+ @Override
+ public String createGetPolicyStatusUri(String type, String policyId) {
+ return createPolicyUri(type, policyId) + "/status";
+ }
+
+ /**
+ * ​/a1-p​/healthcheck
+ */
+ public String createHealtcheckUri() {
+ return baseUri() + "/healthcheck";
+ }
+
+ /**
+ * /a1-p/policytypes/{policy_type_id}
+ */
+ public String createGetSchemaUri(String type) {
+ return this.createPolicyTypeUri(type);
+ }
+
+ /**
+ * ​/a1-p​/policytypes​/{policy_type_id}
+ */
+ public String createPolicyTypesUri() {
+ return baseUri() + "/policytypes";
+ }
+
+ /**
+ * ​/a1-p​/policytypes​/{policy_type_id}​/policies​/{policy_instance_id}
+ */
+ private String createPolicyUri(String type, String id) {
+ return createPolicyTypeUri(type) + "/policies/" + id;
+ }
+
+ /**
+ * /a1-p/policytypes/{policy_type_id}
+ */
+ private String createPolicyTypeUri(String type) {
+ return createPolicyTypesUri() + "/" + type;
+ }
+
+ private String baseUri() {
+ return ricConfig.baseUrl() + "/a1-p";
+ }
+ }
+
+ private static final String TITLE = "title";
+ private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ private final AsyncRestClient restClient;
+ private final UriBuilder uri;
+
+ public OscA1Client(RicConfig ricConfig, WebClientConfig clientConfig) {
+ this(ricConfig, new AsyncRestClient("", clientConfig));
+ }
+
+ public OscA1Client(RicConfig ricConfig, AsyncRestClient restClient) {
+ this.restClient = restClient;
+ logger.debug("OscA1Client for ric: {}", ricConfig.name());
+
+ uri = new UriBuilder(ricConfig);
+ }
+
+ public static Mono<String> extractCreateSchema(String policyTypeResponse, String policyTypeId) {
+ try {
+ JSONObject obj = new JSONObject(policyTypeResponse);
+ JSONObject schemaObj = obj.getJSONObject("create_schema");
+ schemaObj.put(TITLE, policyTypeId);
+ return Mono.just(schemaObj.toString());
+ } catch (Exception e) {
+ String exceptionString = e.toString();
+ logger.error("Unexpected response for policy type: {}, exception: {}", policyTypeResponse, exceptionString);
+ return Mono.error(e);
+ }
+ }
+
+ @Override
+ public Mono<List<String>> getPolicyTypeIdentities() {
+ return getPolicyTypeIds() //
+ .collectList();
+ }
+
+ @Override
+ public Mono<List<String>> getPolicyIdentities() {
+ return getPolicyTypeIds() //
+ .flatMap(this::getPolicyIdentitiesByType) //
+ .collectList();
+ }
+
+ @Override
+ public Mono<String> getPolicyTypeSchema(String policyTypeId) {
+ String schemaUri = uri.createGetSchemaUri(policyTypeId);
+ return restClient.get(schemaUri) //
+ .flatMap(response -> extractCreateSchema(response, policyTypeId));
+ }
+
+ @Override
+ public Mono<String> putPolicy(Policy policy) {
+ String policyUri = this.uri.createPutPolicyUri(policy.type().name(), policy.id());
+ return restClient.put(policyUri, policy.json());
+ }
+
+ @Override
+ public Mono<String> deletePolicy(Policy policy) {
+ return deletePolicyById(policy.type().name(), policy.id());
+ }
+
+ @Override
+ public Mono<A1ProtocolType> getProtocolVersion() {
+ return restClient.get(uri.createHealtcheckUri()) //
+ .flatMap(notUsed -> Mono.just(A1ProtocolType.OSC_V1));
+ }
+
+ @Override
+ public Flux<String> deleteAllPolicies() {
+ return getPolicyTypeIds() //
+ .flatMap(this::deletePoliciesForType, CONCURRENCY_RIC);
+ }
+
+ @Override
+ public Mono<String> getPolicyStatus(Policy policy) {
+ String statusUri = uri.createGetPolicyStatusUri(policy.type().name(), policy.id());
+ return restClient.get(statusUri);
+
+ }
+
+ private Flux<String> getPolicyTypeIds() {
+ return restClient.get(uri.createPolicyTypesUri()) //
+ .flatMapMany(SdncJsonHelper::parseJsonArrayOfString);
+ }
+
+ private Flux<String> getPolicyIdentitiesByType(String typeId) {
+ return restClient.get(uri.createGetPolicyIdsUri(typeId)) //
+ .flatMapMany(SdncJsonHelper::parseJsonArrayOfString);
+ }
+
+ private Mono<String> deletePolicyById(String typeId, String policyId) {
+ String policyUri = uri.createDeleteUri(typeId, policyId);
+ return restClient.delete(policyUri);
+ }
+
+ private Flux<String> deletePoliciesForType(String typeId) {
+ return getPolicyIdentitiesByType(typeId) //
+ .flatMap(policyId -> deletePolicyById(typeId, policyId), CONCURRENCY_RIC);
+ }
+}
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/SdncJsonHelper.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/SdncJsonHelper.java
new file mode 100644
index 00000000..fdbbd533
--- /dev/null
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/SdncJsonHelper.java
@@ -0,0 +1,120 @@
+/*-
+ * ========================LICENSE_START=================================
+ * ONAP : ccsdk oran
+ * ======================================================================
+ * Copyright (C) 2020 Nordix Foundation. All rights reserved.
+ * ======================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.onap.ccsdk.oran.a1policymanagementservice.clients;
+
+import com.google.gson.FieldNamingPolicy;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+/**
+ * Common json functionality used by the SDNC clients
+ */
+@SuppressWarnings("java:S1192") // Same text in several traces
+class SdncJsonHelper {
+ private static Gson gson = new GsonBuilder() //
+ .setFieldNamingPolicy(FieldNamingPolicy.LOWER_CASE_WITH_DASHES) //
+ .create();
+ private static final String OUTPUT = "output";
+ private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private SdncJsonHelper() {
+ }
+
+ public static Flux<String> parseJsonArrayOfString(String inputString) {
+ try {
+ List<String> arrayList = new ArrayList<>();
+ if (!inputString.isEmpty()) {
+ JSONArray jsonArray = new JSONArray(inputString);
+ for (int i = 0; i < jsonArray.length(); i++) {
+ Object value = jsonArray.get(i);
+ arrayList.add(value.toString());
+ }
+ }
+ return Flux.fromIterable(arrayList);
+ } catch (JSONException ex) { // invalid json
+ logger.debug("Invalid json {}", ex.getMessage());
+ return Flux.error(ex);
+ }
+ }
+
+ public static <T> String createInputJsonString(T params) {
+ JsonElement paramsJson = gson.toJsonTree(params);
+ JsonObject jsonObj = new JsonObject();
+ jsonObj.add("input", paramsJson);
+ return gson.toJson(jsonObj);
+ }
+
+ public static <T> String createOutputJsonString(T params) {
+ JsonElement paramsJson = gson.toJsonTree(params);
+ JsonObject jsonObj = new JsonObject();
+ jsonObj.add(OUTPUT, paramsJson);
+ return gson.toJson(jsonObj);
+ }
+
+ public static Mono<JSONObject> getOutput(String response) {
+ try {
+ JSONObject outputJson = new JSONObject(response);
+ JSONObject responseParams = outputJson.getJSONObject(OUTPUT);
+ return Mono.just(responseParams);
+ } catch (JSONException ex) { // invalid json
+ logger.debug("Invalid json {}", ex.getMessage());
+ return Mono.error(ex);
+ }
+ }
+
+ public static Mono<String> getValueFromResponse(String response, String key) {
+ return getOutput(response) //
+ .flatMap(responseParams -> {
+ if (!responseParams.has(key)) {
+ return Mono.just("");
+ }
+ String value = responseParams.get(key).toString();
+ return Mono.just(value);
+ });
+ }
+
+ public static Mono<String> extractPolicySchema(String inputString) {
+ try {
+ JSONObject jsonObject = new JSONObject(inputString);
+ JSONObject schemaObject = jsonObject.getJSONObject("policySchema");
+ String schemaString = schemaObject.toString();
+ return Mono.just(schemaString);
+ } catch (JSONException ex) { // invalid json
+ logger.debug("Invalid json {}", ex.getMessage());
+ return Mono.error(ex);
+ }
+ }
+}
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/SdncOnapA1Client.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/SdncOnapA1Client.java
new file mode 100644
index 00000000..90e61492
--- /dev/null
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/SdncOnapA1Client.java
@@ -0,0 +1,194 @@
+/*-
+ * ========================LICENSE_START=================================
+ * ONAP : ccsdk oran
+ * ======================================================================
+ * Copyright (C) 2020 Nordix Foundation. All rights reserved.
+ * ======================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.onap.ccsdk.oran.a1policymanagementservice.clients;
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+import org.immutables.gson.Gson;
+import org.immutables.value.Value;
+import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ControllerConfig;
+import org.onap.ccsdk.oran.a1policymanagementservice.configuration.RicConfig;
+import org.onap.ccsdk.oran.a1policymanagementservice.configuration.WebClientConfig;
+import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+/**
+ * Client for accessing the A1 adapter in the SDNC controller in ONAP
+ */
+@SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
+public class SdncOnapA1Client implements A1Client {
+ @Value.Immutable
+ @Gson.TypeAdapters
+ interface SdncOnapAdapterInput {
+ public String nearRtRicId();
+
+ public Optional<String> policyTypeId();
+
+ public Optional<String> policyInstanceId();
+
+ public Optional<String> policyInstance();
+
+ public Optional<List<String>> properties();
+ }
+
+ private static final String URL_PREFIX = "/A1-ADAPTER-API:";
+
+ private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private final ControllerConfig controllerConfig;
+ private final RicConfig ricConfig;
+ private final AsyncRestClient restClient;
+
+ public SdncOnapA1Client(RicConfig ricConfig, ControllerConfig controllerConfig, WebClientConfig clientConfig) {
+ this(ricConfig, controllerConfig,
+ new AsyncRestClient(controllerConfig.baseUrl() + "/restconf/operations", clientConfig));
+ logger.debug("SdncOnapA1Client for ric: {}, a1ControllerBaseUrl: {}", ricConfig.name(),
+ controllerConfig.baseUrl());
+ }
+
+ public SdncOnapA1Client(RicConfig ricConfig, ControllerConfig controllerConfig, AsyncRestClient restClient) {
+ this.ricConfig = ricConfig;
+ this.controllerConfig = controllerConfig;
+ this.restClient = restClient;
+ }
+
+ @Override
+ public Mono<List<String>> getPolicyTypeIdentities() {
+ return getPolicyTypeIds() //
+ .collectList();
+ }
+
+ @Override
+ public Mono<List<String>> getPolicyIdentities() {
+ return getPolicyTypeIds() //
+ .flatMap(this::getPolicyIdentitiesByType) //
+ .collectList();
+ }
+
+ @Override
+ public Mono<String> getPolicyTypeSchema(String policyTypeId) {
+ SdncOnapAdapterInput inputParams = ImmutableSdncOnapAdapterInput.builder() //
+ .nearRtRicId(ricConfig.baseUrl()) //
+ .policyTypeId(policyTypeId) //
+ .build();
+ String inputJsonString = SdncJsonHelper.createInputJsonString(inputParams);
+ logger.debug("POST getPolicyType inputJsonString = {}", inputJsonString);
+
+ return restClient
+ .postWithAuthHeader(URL_PREFIX + "getPolicyType", inputJsonString, controllerConfig.userName(),
+ controllerConfig.password()) //
+ .flatMap(response -> SdncJsonHelper.getValueFromResponse(response, "policy-type")) //
+ .flatMap(SdncJsonHelper::extractPolicySchema);
+ }
+
+ @Override
+ public Mono<String> putPolicy(Policy policy) {
+ SdncOnapAdapterInput inputParams = ImmutableSdncOnapAdapterInput.builder() //
+ .nearRtRicId(ricConfig.baseUrl()) //
+ .policyTypeId(policy.type().name()) //
+ .policyInstanceId(policy.id()) //
+ .policyInstance(policy.json()) //
+ .properties(new ArrayList<>()) //
+ .build();
+
+ String inputJsonString = SdncJsonHelper.createInputJsonString(inputParams);
+ logger.debug("POST putPolicy inputJsonString = {}", inputJsonString);
+
+ return restClient.postWithAuthHeader(URL_PREFIX + "createPolicyInstance", inputJsonString,
+ controllerConfig.userName(), controllerConfig.password());
+ }
+
+ @Override
+ public Mono<String> deletePolicy(Policy policy) {
+ return deletePolicyByTypeId(policy.type().name(), policy.id());
+ }
+
+ @Override
+ public Flux<String> deleteAllPolicies() {
+ return getPolicyTypeIds() //
+ .flatMap(this::deletePoliciesForType); //
+ }
+
+ @Override
+ public Mono<A1ProtocolType> getProtocolVersion() {
+ return getPolicyTypeIdentities() //
+ .flatMap(notUsed -> Mono.just(A1ProtocolType.SDNC_ONAP));
+ }
+
+ @Override
+ public Mono<String> getPolicyStatus(Policy policy) {
+ return Mono.error(new Exception("Status not implemented in the controller"));
+ }
+
+ private Flux<String> getPolicyTypeIds() {
+ SdncOnapAdapterInput inputParams = ImmutableSdncOnapAdapterInput.builder() //
+ .nearRtRicId(ricConfig.baseUrl()) //
+ .build();
+ String inputJsonString = SdncJsonHelper.createInputJsonString(inputParams);
+ logger.debug("POST getPolicyTypeIdentities inputJsonString = {}", inputJsonString);
+
+ return restClient
+ .postWithAuthHeader(URL_PREFIX + "getPolicyTypes", inputJsonString, controllerConfig.userName(),
+ controllerConfig.password()) //
+ .flatMap(response -> SdncJsonHelper.getValueFromResponse(response, "policy-type-id-list")) //
+ .flatMapMany(SdncJsonHelper::parseJsonArrayOfString);
+ }
+
+ private Flux<String> getPolicyIdentitiesByType(String policyTypeId) {
+ SdncOnapAdapterInput inputParams = ImmutableSdncOnapAdapterInput.builder() //
+ .nearRtRicId(ricConfig.baseUrl()) //
+ .policyTypeId(policyTypeId) //
+ .build();
+ String inputJsonString = SdncJsonHelper.createInputJsonString(inputParams);
+ logger.debug("POST getPolicyIdentities inputJsonString = {}", inputJsonString);
+
+ return restClient
+ .postWithAuthHeader(URL_PREFIX + "getPolicyInstances", inputJsonString, controllerConfig.userName(),
+ controllerConfig.password()) //
+ .flatMap(response -> SdncJsonHelper.getValueFromResponse(response, "policy-instance-id-list")) //
+ .flatMapMany(SdncJsonHelper::parseJsonArrayOfString);
+ }
+
+ private Flux<String> deletePoliciesForType(String typeId) {
+ return getPolicyIdentitiesByType(typeId) //
+ .flatMap(policyId -> deletePolicyByTypeId(typeId, policyId)); //
+ }
+
+ private Mono<String> deletePolicyByTypeId(String policyTypeId, String policyId) {
+ SdncOnapAdapterInput inputParams = ImmutableSdncOnapAdapterInput.builder() //
+ .nearRtRicId(ricConfig.baseUrl()) //
+ .policyTypeId(policyTypeId) //
+ .policyInstanceId(policyId) //
+ .build();
+ String inputJsonString = SdncJsonHelper.createInputJsonString(inputParams);
+ logger.debug("POST deletePolicy inputJsonString = {}", inputJsonString);
+
+ return restClient.postWithAuthHeader(URL_PREFIX + "deletePolicyInstance", inputJsonString,
+ controllerConfig.userName(), controllerConfig.password());
+ }
+}
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/SdncOscA1Client.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/SdncOscA1Client.java
new file mode 100644
index 00000000..c9e613c5
--- /dev/null
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/SdncOscA1Client.java
@@ -0,0 +1,287 @@
+/*-
+ * ========================LICENSE_START=================================
+ * ONAP : ccsdk oran
+ * ======================================================================
+ * Copyright (C) 2020 Nordix Foundation. All rights reserved.
+ * ======================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.onap.ccsdk.oran.a1policymanagementservice.clients;
+
+import com.google.gson.FieldNamingPolicy;
+import com.google.gson.GsonBuilder;
+
+import java.lang.invoke.MethodHandles;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+
+import org.immutables.value.Value;
+import org.json.JSONObject;
+import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ControllerConfig;
+import org.onap.ccsdk.oran.a1policymanagementservice.configuration.RicConfig;
+import org.onap.ccsdk.oran.a1policymanagementservice.configuration.WebClientConfig;
+import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.http.HttpStatus;
+import org.springframework.web.reactive.function.client.WebClientResponseException;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+/**
+ * Client for accessing the A1 adapter in the SDNC controller in OSC.
+ */
+@SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
+public class SdncOscA1Client implements A1Client {
+
+ static final int CONCURRENCY_RIC = 1; // How may paralell requests that is sent to one NearRT RIC
+
+ @Value.Immutable
+ @org.immutables.gson.Gson.TypeAdapters
+ public interface AdapterRequest {
+ public String nearRtRicUrl();
+
+ public Optional<String> body();
+ }
+
+ @Value.Immutable
+ @org.immutables.gson.Gson.TypeAdapters
+ public interface AdapterOutput {
+ public Optional<String> body();
+
+ public int httpStatus();
+ }
+
+ static com.google.gson.Gson gson = new GsonBuilder() //
+ .setFieldNamingPolicy(FieldNamingPolicy.LOWER_CASE_WITH_DASHES) //
+ .create(); //
+
+ private static final String GET_POLICY_RPC = "getA1Policy";
+ private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ private final ControllerConfig controllerConfig;
+ private final AsyncRestClient restClient;
+ private final RicConfig ricConfig;
+ private final A1ProtocolType protocolType;
+
+ /**
+ * Constructor that creates the REST client to use.
+ *
+ * @param protocolType the southbound protocol of the controller. Supported protocols are SDNC_OSC_STD_V1_1 and
+ * SDNC_OSC_OSC_V1
+ * @param ricConfig the configuration of the Ric to communicate with
+ * @param controllerConfig the configuration of the SDNC controller to use
+ *
+ * @throws IllegalArgumentException when the protocolType is wrong.
+ */
+ public SdncOscA1Client(A1ProtocolType protocolType, RicConfig ricConfig, ControllerConfig controllerConfig,
+ WebClientConfig clientConfig) {
+ this(protocolType, ricConfig, controllerConfig,
+ new AsyncRestClient(controllerConfig.baseUrl() + "/restconf/operations", clientConfig));
+ logger.debug("SdncOscA1Client for ric: {}, a1Controller: {}", ricConfig.name(), controllerConfig);
+ }
+
+ /**
+ * Constructor where the REST client to use is provided.
+ *
+ * @param protocolType the southbound protocol of the controller. Supported protocols are SDNC_OSC_STD_V1_1 and
+ * SDNC_OSC_OSC_V1
+ * @param ricConfig the configuration of the Ric to communicate with
+ * @param controllerConfig the configuration of the SDNC controller to use
+ * @param restClient the REST client to use
+ *
+ * @throws IllegalArgumentException when the protocolType is wrong.
+ */
+ public SdncOscA1Client(A1ProtocolType protocolType, RicConfig ricConfig, ControllerConfig controllerConfig,
+ AsyncRestClient restClient) {
+ if (!(A1ProtocolType.SDNC_OSC_STD_V1_1.equals(protocolType)
+ || A1ProtocolType.SDNC_OSC_OSC_V1.equals(protocolType))) {
+ throw new IllegalArgumentException("Protocol type must be " + A1ProtocolType.SDNC_OSC_STD_V1_1 + " or "
+ + A1ProtocolType.SDNC_OSC_OSC_V1 + ", was: " + protocolType);
+ }
+ this.restClient = restClient;
+ this.ricConfig = ricConfig;
+ this.protocolType = protocolType;
+ this.controllerConfig = controllerConfig;
+ }
+
+ @Override
+ public Mono<List<String>> getPolicyTypeIdentities() {
+ if (this.protocolType == A1ProtocolType.SDNC_OSC_STD_V1_1) {
+ return Mono.just(Arrays.asList(""));
+ } else {
+ OscA1Client.UriBuilder uri = new OscA1Client.UriBuilder(ricConfig);
+ final String ricUrl = uri.createPolicyTypesUri();
+ return post(GET_POLICY_RPC, ricUrl, Optional.empty()) //
+ .flatMapMany(SdncJsonHelper::parseJsonArrayOfString) //
+ .collectList();
+ }
+
+ }
+
+ @Override
+ public Mono<List<String>> getPolicyIdentities() {
+ return getPolicyIds() //
+ .collectList();
+ }
+
+ @Override
+ public Mono<String> getPolicyTypeSchema(String policyTypeId) {
+ if (this.protocolType == A1ProtocolType.SDNC_OSC_STD_V1_1) {
+ return Mono.just("{}");
+ } else {
+ OscA1Client.UriBuilder uri = new OscA1Client.UriBuilder(ricConfig);
+ final String ricUrl = uri.createGetSchemaUri(policyTypeId);
+ return post(GET_POLICY_RPC, ricUrl, Optional.empty()) //
+ .flatMap(response -> OscA1Client.extractCreateSchema(response, policyTypeId));
+ }
+ }
+
+ @Override
+ public Mono<String> putPolicy(Policy policy) {
+ return getUriBuilder() //
+ .flatMap(builder -> {
+ String ricUrl = builder.createPutPolicyUri(policy.type().name(), policy.id());
+ return post("putA1Policy", ricUrl, Optional.of(policy.json()));
+ });
+ }
+
+ @Override
+ public Mono<String> deletePolicy(Policy policy) {
+ return deletePolicyById(policy.type().name(), policy.id());
+ }
+
+ @Override
+ public Flux<String> deleteAllPolicies() {
+ if (this.protocolType == A1ProtocolType.SDNC_OSC_STD_V1_1) {
+ return getPolicyIds() //
+ .flatMap(policyId -> deletePolicyById("", policyId), CONCURRENCY_RIC); //
+ } else {
+ OscA1Client.UriBuilder uriBuilder = new OscA1Client.UriBuilder(ricConfig);
+ return getPolicyTypeIdentities() //
+ .flatMapMany(Flux::fromIterable) //
+ .flatMap(type -> oscDeleteInstancesForType(uriBuilder, type), CONCURRENCY_RIC);
+ }
+ }
+
+ private Flux<String> oscGetInstancesForType(OscA1Client.UriBuilder uriBuilder, String type) {
+ return post(GET_POLICY_RPC, uriBuilder.createGetPolicyIdsUri(type), Optional.empty()) //
+ .flatMapMany(SdncJsonHelper::parseJsonArrayOfString);
+ }
+
+ private Flux<String> oscDeleteInstancesForType(OscA1Client.UriBuilder uriBuilder, String type) {
+ return oscGetInstancesForType(uriBuilder, type) //
+ .flatMap(instance -> deletePolicyById(type, instance), CONCURRENCY_RIC);
+ }
+
+ @Override
+ public Mono<A1ProtocolType> getProtocolVersion() {
+ return tryStdProtocolVersion() //
+ .onErrorResume(t -> tryOscProtocolVersion());
+ }
+
+ @Override
+ public Mono<String> getPolicyStatus(Policy policy) {
+ return getUriBuilder() //
+ .flatMap(builder -> {
+ String ricUrl = builder.createGetPolicyStatusUri(policy.type().name(), policy.id());
+ return post("getA1PolicyStatus", ricUrl, Optional.empty());
+ });
+ }
+
+ private Mono<A1UriBuilder> getUriBuilder() {
+ if (protocolType == A1ProtocolType.SDNC_OSC_STD_V1_1) {
+ return Mono.just(new StdA1ClientVersion1.UriBuilder(ricConfig));
+ } else {
+ return Mono.just(new OscA1Client.UriBuilder(ricConfig));
+ }
+ }
+
+ private Mono<A1ProtocolType> tryOscProtocolVersion() {
+ OscA1Client.UriBuilder oscApiuriBuilder = new OscA1Client.UriBuilder(ricConfig);
+ return post(GET_POLICY_RPC, oscApiuriBuilder.createHealtcheckUri(), Optional.empty()) //
+ .flatMap(x -> Mono.just(A1ProtocolType.SDNC_OSC_OSC_V1));
+ }
+
+ private Mono<A1ProtocolType> tryStdProtocolVersion() {
+ StdA1ClientVersion1.UriBuilder uriBuilder = new StdA1ClientVersion1.UriBuilder(ricConfig);
+ return post(GET_POLICY_RPC, uriBuilder.createGetPolicyIdsUri(), Optional.empty()) //
+ .flatMap(x -> Mono.just(A1ProtocolType.SDNC_OSC_STD_V1_1));
+ }
+
+ private Flux<String> getPolicyIds() {
+ if (this.protocolType == A1ProtocolType.SDNC_OSC_STD_V1_1) {
+ StdA1ClientVersion1.UriBuilder uri = new StdA1ClientVersion1.UriBuilder(ricConfig);
+ final String ricUrl = uri.createGetPolicyIdsUri();
+ return post(GET_POLICY_RPC, ricUrl, Optional.empty()) //
+ .flatMapMany(SdncJsonHelper::parseJsonArrayOfString);
+ } else {
+ OscA1Client.UriBuilder uri = new OscA1Client.UriBuilder(ricConfig);
+ return getPolicyTypeIdentities() //
+ .flatMapMany(Flux::fromIterable)
+ .flatMap(type -> post(GET_POLICY_RPC, uri.createGetPolicyIdsUri(type), Optional.empty())) //
+ .flatMap(SdncJsonHelper::parseJsonArrayOfString);
+ }
+ }
+
+ private Mono<String> deletePolicyById(String type, String policyId) {
+ return getUriBuilder() //
+ .flatMap(builder -> {
+ String ricUrl = builder.createDeleteUri(type, policyId);
+ return post("deleteA1Policy", ricUrl, Optional.empty());
+ });
+ }
+
+ private Mono<String> post(String rpcName, String ricUrl, Optional<String> body) {
+ AdapterRequest inputParams = ImmutableAdapterRequest.builder() //
+ .nearRtRicUrl(ricUrl) //
+ .body(body) //
+ .build();
+ final String inputJsonString = SdncJsonHelper.createInputJsonString(inputParams);
+ logger.debug("POST inputJsonString = {}", inputJsonString);
+
+ return restClient
+ .postWithAuthHeader(controllerUrl(rpcName), inputJsonString, this.controllerConfig.userName(),
+ this.controllerConfig.password()) //
+ .flatMap(this::extractResponseBody);
+ }
+
+ private Mono<String> extractResponse(JSONObject responseOutput) {
+ AdapterOutput output = gson.fromJson(responseOutput.toString(), ImmutableAdapterOutput.class);
+ Optional<String> optionalBody = output.body();
+ String body = optionalBody.isPresent() ? optionalBody.get() : "";
+ if (HttpStatus.valueOf(output.httpStatus()).is2xxSuccessful()) {
+ return Mono.just(body);
+ } else {
+ logger.debug("Error response: {} {}", output.httpStatus(), body);
+ byte[] responseBodyBytes = body.getBytes(StandardCharsets.UTF_8);
+ WebClientResponseException responseException = new WebClientResponseException(output.httpStatus(),
+ "statusText", null, responseBodyBytes, StandardCharsets.UTF_8, null);
+
+ return Mono.error(responseException);
+ }
+ }
+
+ private Mono<String> extractResponseBody(String responseStr) {
+ return SdncJsonHelper.getOutput(responseStr) //
+ .flatMap(this::extractResponse);
+ }
+
+ private String controllerUrl(String rpcName) {
+ return "/A1-ADAPTER-API:" + rpcName;
+ }
+}
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/StdA1ClientVersion1.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/StdA1ClientVersion1.java
new file mode 100644
index 00000000..0f763537
--- /dev/null
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/StdA1ClientVersion1.java
@@ -0,0 +1,148 @@
+/*-
+ * ========================LICENSE_START=================================
+ * ONAP : ccsdk oran
+ * ======================================================================
+ * Copyright (C) 2019-2020 Nordix Foundation. All rights reserved.
+ * ======================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.onap.ccsdk.oran.a1policymanagementservice.clients;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.onap.ccsdk.oran.a1policymanagementservice.configuration.RicConfig;
+import org.onap.ccsdk.oran.a1policymanagementservice.configuration.WebClientConfig;
+import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policy;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+/**
+ * Client for accessing standard A1 REST API version 1.1
+ */
+public class StdA1ClientVersion1 implements A1Client {
+
+ public static class UriBuilder implements A1UriBuilder {
+
+ private final RicConfig ricConfig;
+
+ public UriBuilder(RicConfig ricConfig) {
+ this.ricConfig = ricConfig;
+ }
+
+ /**
+ * /A1-P/v1/policies/{policyId}
+ */
+ @Override
+ public String createPutPolicyUri(String type, String policyId) {
+ return policiesBaseUri() + policyId;
+ }
+
+ /**
+ * /A1-P/v1/policies
+ */
+ public String createGetPolicyIdsUri() {
+ return baseUri() + "/policies";
+ }
+
+ /**
+ * /A1-P/v1/policies/{policyId}
+ */
+ @Override
+ public String createDeleteUri(String type, String policyId) {
+ return policiesBaseUri() + policyId;
+ }
+
+ /**
+ * /A1-P/v1/policies/{policyId}/status
+ */
+ public String createGetPolicyStatusUri(String type, String policyId) {
+ return policiesBaseUri() + policyId + "/status";
+ }
+
+ private String baseUri() {
+ return ricConfig.baseUrl() + "/A1-P/v1";
+ }
+
+ private String policiesBaseUri() {
+ return createGetPolicyIdsUri() + "/";
+ }
+ }
+
+ private final AsyncRestClient restClient;
+ private final UriBuilder uri;
+
+ public StdA1ClientVersion1(RicConfig ricConfig, WebClientConfig webClientConfig) {
+ this(new AsyncRestClient("", webClientConfig), ricConfig);
+ }
+
+ public StdA1ClientVersion1(AsyncRestClient restClient, RicConfig ricConfig) {
+ this.restClient = restClient;
+ this.uri = new UriBuilder(ricConfig);
+ }
+
+ @Override
+ public Mono<List<String>> getPolicyIdentities() {
+ return getPolicyIds() //
+ .collectList();
+ }
+
+ @Override
+ public Mono<String> putPolicy(Policy policy) {
+ return restClient.put(uri.createPutPolicyUri(policy.type().name(), policy.id()), policy.json());
+ }
+
+ @Override
+ public Mono<List<String>> getPolicyTypeIdentities() {
+ return Mono.just(Arrays.asList(""));
+ }
+
+ @Override
+ public Mono<String> getPolicyTypeSchema(String policyTypeId) {
+ return Mono.just("{}");
+ }
+
+ @Override
+ public Mono<String> deletePolicy(Policy policy) {
+ return deletePolicyById(policy.id());
+ }
+
+ @Override
+ public Flux<String> deleteAllPolicies() {
+ return getPolicyIds() //
+ .flatMap(this::deletePolicyById); //
+ }
+
+ @Override
+ public Mono<A1ProtocolType> getProtocolVersion() {
+ return getPolicyIdentities() //
+ .flatMap(x -> Mono.just(A1ProtocolType.STD_V1_1));
+ }
+
+ @Override
+ public Mono<String> getPolicyStatus(Policy policy) {
+ return restClient.get(uri.createGetPolicyStatusUri(policy.type().name(), policy.id()));
+ }
+
+ private Flux<String> getPolicyIds() {
+ return restClient.get(uri.createGetPolicyIdsUri()) //
+ .flatMapMany(SdncJsonHelper::parseJsonArrayOfString);
+ }
+
+ private Mono<String> deletePolicyById(String policyId) {
+ return restClient.delete(uri.createDeleteUri("", policyId));
+ }
+}
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/configuration/ApplicationConfig.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/configuration/ApplicationConfig.java
new file mode 100644
index 00000000..f03a0ff8
--- /dev/null
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/configuration/ApplicationConfig.java
@@ -0,0 +1,155 @@
+/*-
+ * ========================LICENSE_START=================================
+ * ONAP : ccsdk oran
+ * ======================================================================
+ * Copyright (C) 2019-2020 Nordix Foundation. All rights reserved.
+ * ======================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.onap.ccsdk.oran.a1policymanagementservice.configuration;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.validation.constraints.NotEmpty;
+
+import lombok.Getter;
+
+import org.onap.ccsdk.oran.a1policymanagementservice.exceptions.ServiceException;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import reactor.core.publisher.Flux;
+
+@EnableConfigurationProperties
+@ConfigurationProperties()
+public class ApplicationConfig {
+ @NotEmpty
+ @Getter
+ @Value("${app.filepath}")
+ private String localConfigurationFilePath;
+
+ @Value("${server.ssl.key-store-type}")
+ private String sslKeyStoreType = "";
+
+ @Value("${server.ssl.key-store-password}")
+ private String sslKeyStorePassword = "";
+
+ @Value("${server.ssl.key-store}")
+ private String sslKeyStore = "";
+
+ @Value("${server.ssl.key-password}")
+ private String sslKeyPassword = "";
+
+ @Value("${app.webclient.trust-store-used}")
+ private boolean sslTrustStoreUsed = false;
+
+ @Value("${app.webclient.trust-store-password}")
+ private String sslTrustStorePassword = "";
+
+ @Value("${app.webclient.trust-store}")
+ private String sslTrustStore = "";
+
+ private Map<String, RicConfig> ricConfigs = new HashMap<>();
+
+ @Getter
+ private String dmaapConsumerTopicUrl;
+
+ @Getter
+ private String dmaapProducerTopicUrl;
+
+ private Map<String, ControllerConfig> controllerConfigs = new HashMap<>();
+
+ public synchronized Collection<RicConfig> getRicConfigs() {
+ return this.ricConfigs.values();
+ }
+
+ public WebClientConfig getWebClientConfig() {
+ return ImmutableWebClientConfig.builder() //
+ .keyStoreType(this.sslKeyStoreType) //
+ .keyStorePassword(this.sslKeyStorePassword) //
+ .keyStore(this.sslKeyStore) //
+ .keyPassword(this.sslKeyPassword) //
+ .isTrustStoreUsed(this.sslTrustStoreUsed) //
+ .trustStore(this.sslTrustStore) //
+ .trustStorePassword(this.sslTrustStorePassword) //
+ .build();
+ }
+
+ public synchronized ControllerConfig getControllerConfig(String name) throws ServiceException {
+ ControllerConfig controllerConfig = this.controllerConfigs.get(name);
+ if (controllerConfig == null) {
+ throw new ServiceException("Could not find controller config: " + name);
+ }
+ return controllerConfig;
+ }
+
+ public synchronized RicConfig getRic(String ricName) throws ServiceException {
+ RicConfig ricConfig = this.ricConfigs.get(ricName);
+ if (ricConfig == null) {
+ throw new ServiceException("Could not find ric configuration: " + ricName);
+ }
+ return ricConfig;
+ }
+
+ public static class RicConfigUpdate {
+ public enum Type {
+ ADDED, CHANGED, REMOVED
+ }
+
+ @Getter
+ private final RicConfig ricConfig;
+ @Getter
+ private final Type type;
+
+ RicConfigUpdate(RicConfig ric, Type event) {
+ this.ricConfig = ric;
+ this.type = event;
+ }
+ }
+
+ public synchronized Flux<RicConfigUpdate> setConfiguration(
+ ApplicationConfigParser.ConfigParserResult parserResult) {
+
+ Collection<RicConfigUpdate> modifications = new ArrayList<>();
+ this.controllerConfigs = parserResult.controllerConfigs();
+
+ this.dmaapConsumerTopicUrl = parserResult.dmaapConsumerTopicUrl();
+ this.dmaapProducerTopicUrl = parserResult.dmaapProducerTopicUrl();
+
+ Map<String, RicConfig> newRicConfigs = new HashMap<>();
+ for (RicConfig newConfig : parserResult.ricConfigs()) {
+ RicConfig oldConfig = this.ricConfigs.get(newConfig.name());
+ this.ricConfigs.remove(newConfig.name());
+ if (oldConfig == null) {
+ newRicConfigs.put(newConfig.name(), newConfig);
+ modifications.add(new RicConfigUpdate(newConfig, RicConfigUpdate.Type.ADDED));
+ } else if (!newConfig.equals(oldConfig)) {
+ modifications.add(new RicConfigUpdate(newConfig, RicConfigUpdate.Type.CHANGED));
+ newRicConfigs.put(newConfig.name(), newConfig);
+ } else {
+ newRicConfigs.put(oldConfig.name(), oldConfig);
+ }
+ }
+ for (RicConfig deletedConfig : this.ricConfigs.values()) {
+ modifications.add(new RicConfigUpdate(deletedConfig, RicConfigUpdate.Type.REMOVED));
+ }
+ this.ricConfigs = newRicConfigs;
+
+ return Flux.fromIterable(modifications);
+ }
+}
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/configuration/ApplicationConfigParser.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/configuration/ApplicationConfigParser.java
new file mode 100644
index 00000000..f3fbb2fb
--- /dev/null
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/configuration/ApplicationConfigParser.java
@@ -0,0 +1,189 @@
+/*-
+ * ========================LICENSE_START=================================
+ * ONAP : ccsdk oran
+ * ======================================================================
+ * Copyright (C) 2019-2020 Nordix Foundation. All rights reserved.
+ * ======================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.onap.ccsdk.oran.a1policymanagementservice.configuration;
+
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import javax.validation.constraints.NotNull;
+
+import org.immutables.gson.Gson;
+import org.immutables.value.Value;
+import org.onap.ccsdk.oran.a1policymanagementservice.exceptions.ServiceException;
+
+/**
+ * Parser for the Json representing of the component configuration.
+ */
+public class ApplicationConfigParser {
+
+ private static final String CONFIG = "config";
+ private static final String CONTROLLER = "controller";
+
+ @Value.Immutable
+ @Gson.TypeAdapters
+ public interface ConfigParserResult {
+ List<RicConfig> ricConfigs();
+
+ Map<String, ControllerConfig> controllerConfigs();
+
+ String dmaapConsumerTopicUrl();
+
+ String dmaapProducerTopicUrl();
+
+ }
+
+ public ConfigParserResult parse(JsonObject root) throws ServiceException {
+
+ String dmaapProducerTopicUrl = "";
+ String dmaapConsumerTopicUrl = "";
+
+ JsonObject pmsConfigJson = root.getAsJsonObject(CONFIG);
+
+ if (pmsConfigJson == null) {
+ throw new ServiceException("Missing root configuration \"" + CONFIG + "\" in JSON: " + root);
+ }
+
+ JsonObject json = pmsConfigJson.getAsJsonObject("streams_publishes");
+ if (json != null) {
+ dmaapProducerTopicUrl = parseDmaapConfig(json);
+ }
+
+ json = pmsConfigJson.getAsJsonObject("streams_subscribes");
+ if (json != null) {
+ dmaapConsumerTopicUrl = parseDmaapConfig(json);
+ }
+
+ List<RicConfig> ricConfigs = parseRics(pmsConfigJson);
+ Map<String, ControllerConfig> controllerConfigs = parseControllerConfigs(pmsConfigJson);
+ checkConfigurationConsistency(ricConfigs, controllerConfigs);
+
+ return ImmutableConfigParserResult.builder() //
+ .dmaapConsumerTopicUrl(dmaapConsumerTopicUrl) //
+ .dmaapProducerTopicUrl(dmaapProducerTopicUrl) //
+ .ricConfigs(ricConfigs) //
+ .controllerConfigs(controllerConfigs) //
+ .build();
+ }
+
+ private void checkConfigurationConsistency(List<RicConfig> ricConfigs,
+ Map<String, ControllerConfig> controllerConfigs) throws ServiceException {
+ Set<String> ricUrls = new HashSet<>();
+ Set<String> ricNames = new HashSet<>();
+ for (RicConfig ric : ricConfigs) {
+ if (!ricUrls.add(ric.baseUrl())) {
+ throw new ServiceException("Configuration error, more than one RIC URL: " + ric.baseUrl());
+ }
+ if (!ricNames.add(ric.name())) {
+ throw new ServiceException("Configuration error, more than one RIC with name: " + ric.name());
+ }
+ if (!ric.controllerName().isEmpty() && controllerConfigs.get(ric.controllerName()) == null) {
+ throw new ServiceException(
+ "Configuration error, controller configuration not found: " + ric.controllerName());
+ }
+
+ }
+ }
+
+ private List<RicConfig> parseRics(JsonObject config) throws ServiceException {
+ List<RicConfig> result = new ArrayList<>();
+ for (JsonElement ricElem : getAsJsonArray(config, "ric")) {
+ JsonObject ricAsJson = ricElem.getAsJsonObject();
+ JsonElement controllerNameElement = ricAsJson.get(CONTROLLER);
+ ImmutableRicConfig ricConfig = ImmutableRicConfig.builder() //
+ .name(get(ricAsJson, "name").getAsString()) //
+ .baseUrl(get(ricAsJson, "baseUrl").getAsString()) //
+ .managedElementIds(parseManagedElementIds(get(ricAsJson, "managedElementIds").getAsJsonArray())) //
+ .controllerName(controllerNameElement != null ? controllerNameElement.getAsString() : "") //
+ .build();
+ result.add(ricConfig);
+ }
+ return result;
+ }
+
+ Map<String, ControllerConfig> parseControllerConfigs(JsonObject config) throws ServiceException {
+ if (config.get(CONTROLLER) == null) {
+ return new HashMap<>();
+ }
+ Map<String, ControllerConfig> result = new HashMap<>();
+ for (JsonElement element : getAsJsonArray(config, CONTROLLER)) {
+ JsonObject controllerAsJson = element.getAsJsonObject();
+ ImmutableControllerConfig controllerConfig = ImmutableControllerConfig.builder() //
+ .name(get(controllerAsJson, "name").getAsString()) //
+ .baseUrl(get(controllerAsJson, "baseUrl").getAsString()) //
+ .password(get(controllerAsJson, "password").getAsString()) //
+ .userName(get(controllerAsJson, "userName").getAsString()) // )
+ .build();
+
+ if (result.put(controllerConfig.name(), controllerConfig) != null) {
+ throw new ServiceException(
+ "Configuration error, more than one controller with name: " + controllerConfig.name());
+ }
+ }
+ return result;
+ }
+
+ private List<String> parseManagedElementIds(JsonArray asJsonObject) {
+ Iterator<JsonElement> iterator = asJsonObject.iterator();
+ List<String> managedElementIds = new ArrayList<>();
+ while (iterator.hasNext()) {
+ managedElementIds.add(iterator.next().getAsString());
+
+ }
+ return managedElementIds;
+ }
+
+ private static JsonElement get(JsonObject obj, String memberName) throws ServiceException {
+ JsonElement elem = obj.get(memberName);
+ if (elem == null) {
+ throw new ServiceException("Could not find member: '" + memberName + "' in: " + obj);
+ }
+ return elem;
+ }
+
+ private JsonArray getAsJsonArray(JsonObject obj, String memberName) throws ServiceException {
+ return get(obj, memberName).getAsJsonArray();
+ }
+
+ private String parseDmaapConfig(JsonObject streamCfg) throws ServiceException {
+ Set<Entry<String, JsonElement>> streamConfigEntries = streamCfg.entrySet();
+ if (streamConfigEntries.size() != 1) {
+ throw new ServiceException(
+ "Invalid configuration. Number of streams must be one, config: " + streamConfigEntries);
+ }
+ JsonObject streamConfigEntry = streamConfigEntries.iterator().next().getValue().getAsJsonObject();
+ JsonObject dmaapInfo = get(streamConfigEntry, "dmaap_info").getAsJsonObject();
+ return getAsString(dmaapInfo, "topic_url");
+ }
+
+ private static @NotNull String getAsString(JsonObject obj, String memberName) throws ServiceException {
+ return get(obj, memberName).getAsString();
+ }
+}
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/configuration/AsyncConfiguration.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/configuration/AsyncConfiguration.java
new file mode 100644
index 00000000..9558c744
--- /dev/null
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/configuration/AsyncConfiguration.java
@@ -0,0 +1,45 @@
+/*-
+ * ========================LICENSE_START=================================
+ * ONAP : ccsdk oran
+ * ======================================================================
+ * Copyright (C) 2019-2020 Nordix Foundation. All rights reserved.
+ * ======================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.onap.ccsdk.oran.a1policymanagementservice.configuration;
+
+import java.util.concurrent.Executor;
+
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.scheduling.annotation.AsyncConfigurer;
+import org.springframework.scheduling.annotation.EnableAsync;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+
+@Configuration
+@EnableAsync
+public class AsyncConfiguration implements AsyncConfigurer {
+
+ @Override
+ @Bean(name = "threadPoolTaskExecutor")
+ public Executor getAsyncExecutor() {
+ // Set this configuration value from common properties file
+ ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
+ executor.setCorePoolSize(5);
+ executor.setMaxPoolSize(10);
+ executor.setQueueCapacity(25);
+ return executor;
+ }
+}
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/configuration/ControllerConfig.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/configuration/ControllerConfig.java
new file mode 100644
index 00000000..5f00cf69
--- /dev/null
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/configuration/ControllerConfig.java
@@ -0,0 +1,38 @@
+/*-
+ * ========================LICENSE_START=================================
+ * ONAP : ccsdk oran
+ * ======================================================================
+ * Copyright (C) 2019-2020 Nordix Foundation. All rights reserved.
+ * ======================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.onap.ccsdk.oran.a1policymanagementservice.configuration;
+
+import org.immutables.value.Value;
+
+@Value.Immutable
+@Value.Style(redactedMask = "####")
+
+public interface ControllerConfig {
+ public String name();
+
+ public String baseUrl();
+
+ public String userName();
+
+ @Value.Redacted
+ public String password();
+
+}
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/configuration/RicConfig.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/configuration/RicConfig.java
new file mode 100644
index 00000000..4d141cc9
--- /dev/null
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/configuration/RicConfig.java
@@ -0,0 +1,37 @@
+/*-
+ * ========================LICENSE_START=================================
+ * ONAP : ccsdk oran
+ * ======================================================================
+ * Copyright (C) 2019-2020 Nordix Foundation. All rights reserved.
+ * ======================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.onap.ccsdk.oran.a1policymanagementservice.configuration;
+
+import com.google.common.collect.ImmutableList;
+
+import org.immutables.value.Value;
+
+@Value.Immutable
+public interface RicConfig {
+ public String name();
+
+ public String controllerName();
+
+ public String baseUrl();
+
+ public ImmutableList<String> managedElementIds();
+
+}
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/configuration/WebClientConfig.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/configuration/WebClientConfig.java
new file mode 100644
index 00000000..ef7bd87c
--- /dev/null
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/configuration/WebClientConfig.java
@@ -0,0 +1,45 @@
+/*-
+ * ========================LICENSE_START=================================
+ * ONAP : ccsdk oran
+ * ======================================================================
+ * Copyright (C) 2020 Nordix Foundation. All rights reserved.
+ * ======================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.onap.ccsdk.oran.a1policymanagementservice.configuration;
+
+import org.immutables.value.Value;
+
+@Value.Immutable
+@Value.Style(redactedMask = "####")
+public interface WebClientConfig {
+ public String keyStoreType();
+
+ @Value.Redacted
+ public String keyStorePassword();
+
+ public String keyStore();
+
+ @Value.Redacted
+ public String keyPassword();
+
+ public boolean isTrustStoreUsed();
+
+ @Value.Redacted
+ public String trustStorePassword();
+
+ public String trustStore();
+
+}
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/PolicyController.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/PolicyController.java
new file mode 100644
index 00000000..39876cfc
--- /dev/null
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/PolicyController.java
@@ -0,0 +1,482 @@
+/*-
+ * ========================LICENSE_START=================================
+ * ONAP : ccsdk oran
+ * ======================================================================
+ * Copyright (C) 2019-2020 Nordix Foundation. All rights reserved.
+ * ======================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.onap.ccsdk.oran.a1policymanagementservice.controllers;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import lombok.Getter;
+
+import org.onap.ccsdk.oran.a1policymanagementservice.clients.A1ClientFactory;
+import org.onap.ccsdk.oran.a1policymanagementservice.exceptions.ServiceException;
+import org.onap.ccsdk.oran.a1policymanagementservice.repository.ImmutablePolicy;
+import org.onap.ccsdk.oran.a1policymanagementservice.repository.Lock.LockType;
+import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policies;
+import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policy;
+import org.onap.ccsdk.oran.a1policymanagementservice.repository.PolicyType;
+import org.onap.ccsdk.oran.a1policymanagementservice.repository.PolicyTypes;
+import org.onap.ccsdk.oran.a1policymanagementservice.repository.Ric;
+import org.onap.ccsdk.oran.a1policymanagementservice.repository.Rics;
+import org.onap.ccsdk.oran.a1policymanagementservice.repository.Service;
+import org.onap.ccsdk.oran.a1policymanagementservice.repository.Services;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.DeleteMapping;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PutMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestParam;
+import org.springframework.web.bind.annotation.RestController;
+import org.springframework.web.reactive.function.client.WebClientResponseException;
+import reactor.core.publisher.Mono;
+
+@RestController
+@Api(tags = "A1 Policy Management")
+public class PolicyController {
+
+ public static class RejectionException extends Exception {
+ private static final long serialVersionUID = 1L;
+ @Getter
+ private final HttpStatus status;
+
+ public RejectionException(String message, HttpStatus status) {
+ super(message);
+ this.status = status;
+ }
+ }
+
+ @Autowired
+ private Rics rics;
+ @Autowired
+ private PolicyTypes policyTypes;
+ @Autowired
+ private Policies policies;
+ @Autowired
+ private A1ClientFactory a1ClientFactory;
+ @Autowired
+ private Services services;
+
+ private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ private static Gson gson = new GsonBuilder() //
+ .serializeNulls() //
+ .create(); //
+
+ @GetMapping("/policy_schemas")
+ @ApiOperation(value = "Returns policy type schema definitions")
+ @ApiResponses(
+ value = {
+ @ApiResponse(code = 200, message = "Policy schemas", response = Object.class, responseContainer = "List"), //
+ @ApiResponse(code = 404, message = "RIC is not found", response = String.class)})
+ public ResponseEntity<String> getPolicySchemas( //
+ @ApiParam(name = "ric", required = false, value = "The name of the Near-RT RIC to get the definitions for.") //
+ @RequestParam(name = "ric", required = false) String ricName) {
+ if (ricName == null) {
+ Collection<PolicyType> types = this.policyTypes.getAll();
+ return new ResponseEntity<>(toPolicyTypeSchemasJson(types), HttpStatus.OK);
+ } else {
+ try {
+ Collection<PolicyType> types = rics.getRic(ricName).getSupportedPolicyTypes();
+ return new ResponseEntity<>(toPolicyTypeSchemasJson(types), HttpStatus.OK);
+ } catch (ServiceException e) {
+ return new ResponseEntity<>(e.toString(), HttpStatus.NOT_FOUND);
+ }
+ }
+ }
+
+ @GetMapping("/policy_schema")
+ @ApiOperation(value = "Returns one policy type schema definition")
+ @ApiResponses(
+ value = { //
+ @ApiResponse(code = 200, message = "Policy schema", response = Object.class),
+ @ApiResponse(code = 404, message = "RIC is not found", response = String.class)})
+ public ResponseEntity<String> getPolicySchema( //
+ @ApiParam(name = "id", required = true, value = "The ID of the policy type to get the definition for.") //
+ @RequestParam(name = "id", required = true) String id) {
+ try {
+ PolicyType type = policyTypes.getType(id);
+ return new ResponseEntity<>(type.schema(), HttpStatus.OK);
+ } catch (ServiceException e) {
+ return new ResponseEntity<>(e.toString(), HttpStatus.NOT_FOUND);
+ }
+ }
+
+ @GetMapping("/policy_types")
+ @ApiOperation(value = "Query policy type names")
+ @ApiResponses(
+ value = {
+ @ApiResponse(
+ code = 200,
+ message = "Policy type names",
+ response = String.class,
+ responseContainer = "List"),
+ @ApiResponse(code = 404, message = "RIC is not found", response = String.class)})
+ public ResponseEntity<String> getPolicyTypes( //
+ @ApiParam(name = "ric", required = false, value = "The name of the Near-RT RIC to get types for.") //
+ @RequestParam(name = "ric", required = false) String ricName) {
+ if (ricName == null) {
+ Collection<PolicyType> types = this.policyTypes.getAll();
+ return new ResponseEntity<>(toPolicyTypeIdsJson(types), HttpStatus.OK);
+ } else {
+ try {
+ Collection<PolicyType> types = rics.getRic(ricName).getSupportedPolicyTypes();
+ return new ResponseEntity<>(toPolicyTypeIdsJson(types), HttpStatus.OK);
+ } catch (ServiceException e) {
+ return new ResponseEntity<>(e.toString(), HttpStatus.NOT_FOUND);
+ }
+ }
+ }
+
+ @GetMapping("/policy")
+ @ApiOperation(value = "Returns a policy configuration") //
+ @ApiResponses(
+ value = { //
+ @ApiResponse(code = 200, message = "Policy found", response = Object.class), //
+ @ApiResponse(code = 404, message = "Policy is not found")} //
+ )
+ public ResponseEntity<String> getPolicy( //
+ @ApiParam(name = "id", required = true, value = "The ID of the policy instance.") //
+ @RequestParam(name = "id", required = true) String id) {
+ try {
+ Policy p = policies.getPolicy(id);
+ return new ResponseEntity<>(p.json(), HttpStatus.OK);
+ } catch (ServiceException e) {
+ return new ResponseEntity<>(e.getMessage(), HttpStatus.NOT_FOUND);
+ }
+ }
+
+ @DeleteMapping("/policy")
+ @ApiOperation(value = "Delete a policy", response = Object.class)
+ @ApiResponses(
+ value = { //
+ @ApiResponse(code = 204, message = "Policy deleted", response = Object.class),
+ @ApiResponse(code = 404, message = "Policy is not found", response = String.class),
+ @ApiResponse(code = 423, message = "RIC is not operational", response = String.class)})
+ public Mono<ResponseEntity<Object>> deletePolicy( //
+ @ApiParam(name = "id", required = true, value = "The ID of the policy instance.") //
+ @RequestParam(name = "id", required = true) String id) {
+ try {
+ Policy policy = policies.getPolicy(id);
+ keepServiceAlive(policy.ownerServiceName());
+ Ric ric = policy.ric();
+ return ric.getLock().lock(LockType.SHARED) //
+ .flatMap(notUsed -> assertRicStateIdle(ric)) //
+ .flatMap(notUsed -> a1ClientFactory.createA1Client(policy.ric())) //
+ .doOnNext(notUsed -> policies.remove(policy)) //
+ .flatMap(client -> client.deletePolicy(policy)) //
+ .doOnNext(notUsed -> ric.getLock().unlockBlocking()) //
+ .doOnError(notUsed -> ric.getLock().unlockBlocking()) //
+ .flatMap(notUsed -> Mono.just(new ResponseEntity<>(HttpStatus.NO_CONTENT)))
+ .onErrorResume(this::handleException);
+ } catch (ServiceException e) {
+ return Mono.just(new ResponseEntity<>(HttpStatus.NOT_FOUND));
+ }
+ }
+
+ @PutMapping(path = "/policy")
+ @ApiOperation(value = "Put a policy", response = String.class)
+ @ApiResponses(
+ value = { //
+ @ApiResponse(code = 201, message = "Policy created", response = Object.class), //
+ @ApiResponse(code = 200, message = "Policy updated", response = Object.class), //
+ @ApiResponse(code = 423, message = "RIC is not operational", response = String.class), //
+ @ApiResponse(code = 404, message = "RIC or policy type is not found", response = String.class) //
+ })
+ public Mono<ResponseEntity<Object>> putPolicy( //
+ @ApiParam(name = "type", required = false, value = "The name of the policy type.") //
+ @RequestParam(name = "type", required = false, defaultValue = "") String typeName, //
+ @ApiParam(name = "id", required = true, value = "The ID of the policy instance.") //
+ @RequestParam(name = "id", required = true) String instanceId, //
+ @ApiParam(name = "ric", required = true, value = "The name of the Near-RT RIC where the policy will be " + //
+ "created.") //
+ @RequestParam(name = "ric", required = true) String ricName, //
+ @ApiParam(name = "service", required = true, value = "The name of the service creating the policy.") //
+ @RequestParam(name = "service", required = true) String service, //
+ @ApiParam(name = "transient", required = false, value = "If the policy is transient or not (boolean " + //
+ "defaulted to false). A policy is transient if it will be forgotten when the service needs to " + //
+ "reconnect to the Near-RT RIC.") //
+ @RequestParam(name = "transient", required = false, defaultValue = "false") boolean isTransient, //
+ @RequestBody Object jsonBody) {
+
+ String jsonString = gson.toJson(jsonBody);
+ Ric ric = rics.get(ricName);
+ PolicyType type = policyTypes.get(typeName);
+ keepServiceAlive(service);
+ if (ric == null || type == null) {
+ return Mono.just(new ResponseEntity<>(HttpStatus.NOT_FOUND));
+ }
+ Policy policy = ImmutablePolicy.builder() //
+ .id(instanceId) //
+ .json(jsonString) //
+ .type(type) //
+ .ric(ric) //
+ .ownerServiceName(service) //
+ .lastModified(getTimeStampUtc()) //
+ .isTransient(isTransient) //
+ .build();
+
+ final boolean isCreate = this.policies.get(policy.id()) == null;
+
+ return ric.getLock().lock(LockType.SHARED) //
+ .flatMap(notUsed -> assertRicStateIdle(ric)) //
+ .flatMap(notUsed -> checkSupportedType(ric, type)) //
+ .flatMap(notUsed -> validateModifiedPolicy(policy)) //
+ .flatMap(notUsed -> a1ClientFactory.createA1Client(ric)) //
+ .flatMap(client -> client.putPolicy(policy)) //
+ .doOnNext(notUsed -> policies.put(policy)) //
+ .doOnNext(notUsed -> ric.getLock().unlockBlocking()) //
+ .doOnError(trowable -> ric.getLock().unlockBlocking()) //
+ .flatMap(notUsed -> Mono.just(new ResponseEntity<>(isCreate ? HttpStatus.CREATED : HttpStatus.OK))) //
+ .onErrorResume(this::handleException);
+ }
+
+ @SuppressWarnings({"unchecked"})
+ private <T> Mono<ResponseEntity<T>> createResponseEntity(String message, HttpStatus status) {
+ ResponseEntity<T> re = new ResponseEntity<>((T) message, status);
+ return Mono.just(re);
+ }
+
+ private <T> Mono<ResponseEntity<T>> handleException(Throwable throwable) {
+ if (throwable instanceof WebClientResponseException) {
+ WebClientResponseException e = (WebClientResponseException) throwable;
+ return createResponseEntity(e.getResponseBodyAsString(), e.getStatusCode());
+ } else if (throwable instanceof RejectionException) {
+ RejectionException e = (RejectionException) throwable;
+ return createResponseEntity(e.getMessage(), e.getStatus());
+ } else {
+ return createResponseEntity(throwable.getMessage(), HttpStatus.INTERNAL_SERVER_ERROR);
+ }
+ }
+
+ private Mono<Object> validateModifiedPolicy(Policy policy) {
+ // Check that ric is not updated
+ Policy current = this.policies.get(policy.id());
+ if (current != null && !current.ric().name().equals(policy.ric().name())) {
+ RejectionException e = new RejectionException("Policy cannot change RIC, policyId: " + current.id() + //
+ ", RIC name: " + current.ric().name() + //
+ ", new name: " + policy.ric().name(), HttpStatus.CONFLICT);
+ logger.debug("Request rejected, {}", e.getMessage());
+ return Mono.error(e);
+ }
+ return Mono.just("OK");
+ }
+
+ private Mono<Object> checkSupportedType(Ric ric, PolicyType type) {
+ if (!ric.isSupportingType(type.name())) {
+ logger.debug("Request rejected, type not supported, RIC: {}", ric);
+ RejectionException e = new RejectionException(
+ "Type: " + type.name() + " not supported by RIC: " + ric.name(), HttpStatus.NOT_FOUND);
+ return Mono.error(e);
+ }
+ return Mono.just("OK");
+ }
+
+ private Mono<Object> assertRicStateIdle(Ric ric) {
+ if (ric.getState() == Ric.RicState.AVAILABLE) {
+ return Mono.just("OK");
+ } else {
+ logger.debug("Request rejected RIC not IDLE, ric: {}", ric);
+ RejectionException e = new RejectionException(
+ "Ric is not operational, RIC name: " + ric.name() + ", state: " + ric.getState(), HttpStatus.LOCKED);
+ return Mono.error(e);
+ }
+ }
+
+ @GetMapping("/policies")
+ @ApiOperation(value = "Query policies")
+ @ApiResponses(
+ value = {
+ @ApiResponse(code = 200, message = "Policies", response = PolicyInfo.class, responseContainer = "List"),
+ @ApiResponse(code = 404, message = "RIC or type not found", response = String.class)})
+ public ResponseEntity<String> getPolicies( //
+ @ApiParam(name = "type", required = false, value = "The name of the policy type to get policies for.") //
+ @RequestParam(name = "type", required = false) String type, //
+ @ApiParam(name = "ric", required = false, value = "The name of the Near-RT RIC to get policies for.") //
+ @RequestParam(name = "ric", required = false) String ric, //
+ @ApiParam(name = "service", required = false, value = "The name of the service to get policies for.") //
+ @RequestParam(name = "service", required = false) String service) //
+ {
+ if ((type != null && this.policyTypes.get(type) == null)) {
+ return new ResponseEntity<>("Policy type not found", HttpStatus.NOT_FOUND);
+ }
+ if ((ric != null && this.rics.get(ric) == null)) {
+ return new ResponseEntity<>("RIC not found", HttpStatus.NOT_FOUND);
+ }
+
+ String filteredPolicies = policiesToJson(filter(type, ric, service));
+ return new ResponseEntity<>(filteredPolicies, HttpStatus.OK);
+ }
+
+ @GetMapping("/policy_ids")
+ @ApiOperation(value = "Query policies, only IDs returned")
+ @ApiResponses(
+ value = {@ApiResponse(code = 200, message = "Policy ids", response = String.class, responseContainer = "List"),
+ @ApiResponse(code = 404, message = "RIC or type not found", response = String.class)})
+ public ResponseEntity<String> getPolicyIds( //
+ @ApiParam(name = "type", required = false, value = "The name of the policy type to get policies for.") //
+ @RequestParam(name = "type", required = false) String type, //
+ @ApiParam(name = "ric", required = false, value = "The name of the Near-RT RIC to get policies for.") //
+ @RequestParam(name = "ric", required = false) String ric, //
+ @ApiParam(name = "service", required = false, value = "The name of the service to get policies for.") //
+ @RequestParam(name = "service", required = false) String service) //
+ {
+ if ((type != null && this.policyTypes.get(type) == null)) {
+ return new ResponseEntity<>("Policy type not found", HttpStatus.NOT_FOUND);
+ }
+ if ((ric != null && this.rics.get(ric) == null)) {
+ return new ResponseEntity<>("RIC not found", HttpStatus.NOT_FOUND);
+ }
+
+ String policyIdsJson = toPolicyIdsJson(filter(type, ric, service));
+ return new ResponseEntity<>(policyIdsJson, HttpStatus.OK);
+ }
+
+ @GetMapping("/policy_status")
+ @ApiOperation(value = "Returns a policy status") //
+ @ApiResponses(
+ value = { //
+ @ApiResponse(code = 200, message = "Policy status", response = Object.class), //
+ @ApiResponse(code = 404, message = "Policy is not found", response = String.class)} //
+ )
+ public Mono<ResponseEntity<String>> getPolicyStatus( //
+ @ApiParam(name = "id", required = true, value = "The ID of the policy.") @RequestParam(
+ name = "id", //
+ required = true) String id) {
+ try {
+ Policy policy = policies.getPolicy(id);
+
+ return a1ClientFactory.createA1Client(policy.ric()) //
+ .flatMap(client -> client.getPolicyStatus(policy)) //
+ .flatMap(status -> Mono.just(new ResponseEntity<>(status, HttpStatus.OK)))
+ .onErrorResume(this::handleException);
+ } catch (ServiceException e) {
+ return Mono.just(new ResponseEntity<>(e.getMessage(), HttpStatus.NOT_FOUND));
+ }
+ }
+
+ private void keepServiceAlive(String name) {
+ Service s = this.services.get(name);
+ if (s != null) {
+ s.keepAlive();
+ }
+ }
+
+ private boolean include(String filter, String value) {
+ return filter == null || value.equals(filter);
+ }
+
+ private Collection<Policy> filter(Collection<Policy> collection, String type, String ric, String service) {
+ if (type == null && ric == null && service == null) {
+ return collection;
+ }
+ List<Policy> filtered = new ArrayList<>();
+ for (Policy p : collection) {
+ if (include(type, p.type().name()) && include(ric, p.ric().name())
+ && include(service, p.ownerServiceName())) {
+ filtered.add(p);
+ }
+ }
+ return filtered;
+ }
+
+ private Collection<Policy> filter(String type, String ric, String service) {
+ if (type != null) {
+ return filter(policies.getForType(type), null, ric, service);
+ } else if (service != null) {
+ return filter(policies.getForService(service), type, ric, null);
+ } else if (ric != null) {
+ return filter(policies.getForRic(ric), type, null, service);
+ } else {
+ return policies.getAll();
+ }
+ }
+
+ private String policiesToJson(Collection<Policy> policies) {
+ List<PolicyInfo> v = new ArrayList<>(policies.size());
+ for (Policy p : policies) {
+ PolicyInfo policyInfo = new PolicyInfo();
+ policyInfo.id = p.id();
+ policyInfo.json = fromJson(p.json());
+ policyInfo.ric = p.ric().name();
+ policyInfo.type = p.type().name();
+ policyInfo.service = p.ownerServiceName();
+ policyInfo.lastModified = p.lastModified();
+ if (!policyInfo.validate()) {
+ logger.error("BUG, all fields must be set");
+ }
+ v.add(policyInfo);
+ }
+ return gson.toJson(v);
+ }
+
+ private Object fromJson(String jsonStr) {
+ return gson.fromJson(jsonStr, Object.class);
+ }
+
+ private String toPolicyTypeSchemasJson(Collection<PolicyType> types) {
+ StringBuilder result = new StringBuilder();
+ result.append("[");
+ boolean first = true;
+ for (PolicyType t : types) {
+ if (!first) {
+ result.append(",");
+ }
+ first = false;
+ result.append(t.schema());
+ }
+ result.append("]");
+ return result.toString();
+ }
+
+ private String toPolicyTypeIdsJson(Collection<PolicyType> types) {
+ List<String> v = new ArrayList<>(types.size());
+ for (PolicyType t : types) {
+ v.add(t.name());
+ }
+ return gson.toJson(v);
+ }
+
+ private String toPolicyIdsJson(Collection<Policy> policies) {
+ List<String> v = new ArrayList<>(policies.size());
+ for (Policy p : policies) {
+ v.add(p.id());
+ }
+ return gson.toJson(v);
+ }
+
+ private String getTimeStampUtc() {
+ return java.time.Instant.now().toString();
+ }
+
+}
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/PolicyInfo.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/PolicyInfo.java
new file mode 100644
index 00000000..266a8449
--- /dev/null
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/PolicyInfo.java
@@ -0,0 +1,57 @@
+/*-
+ * ========================LICENSE_START=================================
+ * ONAP : ccsdk oran
+ * ======================================================================
+ * Copyright (C) 2019-2020 Nordix Foundation. All rights reserved.
+ * ======================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.onap.ccsdk.oran.a1policymanagementservice.controllers;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+
+import org.immutables.gson.Gson;
+
+@Gson.TypeAdapters
+@ApiModel(value = "PolicyInfo")
+public class PolicyInfo {
+
+ @ApiModelProperty(value = "identity of the policy")
+ public String id;
+
+ @ApiModelProperty(value = "name of the policy type")
+ public String type;
+
+ @ApiModelProperty(value = "identity of the target Near-RT RIC")
+ public String ric;
+
+ @ApiModelProperty(value = "the configuration of the policy")
+ public Object json;
+
+ @ApiModelProperty(value = "the name of the service owning the policy")
+ public String service;
+
+ @ApiModelProperty(value = "timestamp, last modification time")
+ public String lastModified;
+
+ PolicyInfo() {
+ }
+
+ public boolean validate() {
+ return id != null && type != null && ric != null && json != null && service != null && lastModified != null;
+ }
+
+}
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/RicInfo.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/RicInfo.java
new file mode 100644
index 00000000..0b3b090a
--- /dev/null
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/RicInfo.java
@@ -0,0 +1,51 @@
+/*-
+ * ========================LICENSE_START=================================
+ * ONAP : ccsdk oran
+ * ======================================================================
+ * Copyright (C) 2019-2020 Nordix Foundation. All rights reserved.
+ * ======================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.onap.ccsdk.oran.a1policymanagementservice.controllers;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+
+import java.util.Collection;
+
+import org.immutables.gson.Gson;
+
+@Gson.TypeAdapters
+@ApiModel(value = "RicInfo")
+class RicInfo {
+ @ApiModelProperty(value = "identity of the ric")
+ public final String ricName;
+
+ @ApiModelProperty(value = "O1 identities for managed entities")
+ public final Collection<String> managedElementIds;
+
+ @ApiModelProperty(value = "supported policy types")
+ public final Collection<String> policyTypes;
+
+ @ApiModelProperty(value = "state info")
+ public final String state;
+
+ RicInfo(String name, Collection<String> managedElementIds, Collection<String> policyTypes, String state) {
+ this.ricName = name;
+ this.managedElementIds = managedElementIds;
+ this.policyTypes = policyTypes;
+ this.state = state;
+ }
+}
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/RicRepositoryController.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/RicRepositoryController.java
new file mode 100644
index 00000000..88c27a96
--- /dev/null
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/RicRepositoryController.java
@@ -0,0 +1,109 @@
+/*-
+ * ========================LICENSE_START=================================
+ * ONAP : ccsdk oran
+ * ======================================================================
+ * Copyright (C) 2019-2020 Nordix Foundation. All rights reserved.
+ * ======================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.onap.ccsdk.oran.a1policymanagementservice.controllers;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+import org.onap.ccsdk.oran.a1policymanagementservice.repository.PolicyTypes;
+import org.onap.ccsdk.oran.a1policymanagementservice.repository.Ric;
+import org.onap.ccsdk.oran.a1policymanagementservice.repository.Rics;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RequestParam;
+import org.springframework.web.bind.annotation.RestController;
+
+@RestController
+@Api(tags = "RIC Repository")
+public class RicRepositoryController {
+
+ @Autowired
+ private Rics rics;
+
+ @Autowired
+ PolicyTypes types;
+
+ private static Gson gson = new GsonBuilder() //
+ .serializeNulls() //
+ .create(); //
+
+ /**
+ * Example: http://localhost:8081/rics?managedElementId=kista_1
+ */
+ @GetMapping("/ric")
+ @ApiOperation(value = "Returns the name of a RIC managing one Mananged Element")
+ @ApiResponses(
+ value = { //
+ @ApiResponse(code = 200, message = "RIC is found", response = String.class), //
+ @ApiResponse(code = 404, message = "RIC is not found", response = String.class) //
+ })
+ public ResponseEntity<String> getRic( //
+ @ApiParam(name = "managedElementId", required = true, value = "The ID of the Managed Element") //
+ @RequestParam(name = "managedElementId", required = true) String managedElementId) {
+ Optional<Ric> ric = this.rics.lookupRicForManagedElement(managedElementId);
+
+ if (ric.isPresent()) {
+ return new ResponseEntity<>(ric.get().name(), HttpStatus.OK);
+ } else {
+ return new ResponseEntity<>("No RIC found", HttpStatus.NOT_FOUND);
+ }
+ }
+
+ /**
+ * @return a Json array of all RIC data Example: http://localhost:8081/ric
+ */
+ @GetMapping("/rics")
+ @ApiOperation(value = "Query Near-RT RIC information")
+ @ApiResponses(
+ value = { //
+ @ApiResponse(code = 200, message = "OK", response = RicInfo.class, responseContainer = "List"), //
+ @ApiResponse(code = 404, message = "Policy type is not found", response = String.class)})
+ public ResponseEntity<String> getRics( //
+ @ApiParam(name = "policyType", required = false, value = "The name of the policy type") //
+ @RequestParam(name = "policyType", required = false) String supportingPolicyType) {
+ if ((supportingPolicyType != null) && (this.types.get(supportingPolicyType) == null)) {
+ return new ResponseEntity<>("Policy type not found", HttpStatus.NOT_FOUND);
+ }
+
+ List<RicInfo> result = new ArrayList<>();
+ for (Ric ric : rics.getRics()) {
+ if (supportingPolicyType == null || ric.isSupportingType(supportingPolicyType)) {
+ result.add(new RicInfo(ric.name(), ric.getManagedElementIds(), ric.getSupportedPolicyTypeNames(),
+ ric.getState().toString()));
+ }
+ }
+
+ return new ResponseEntity<>(gson.toJson(result), HttpStatus.OK);
+ }
+
+}
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/ServiceController.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/ServiceController.java
new file mode 100644
index 00000000..f06dd583
--- /dev/null
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/ServiceController.java
@@ -0,0 +1,184 @@
+/*-
+ * ========================LICENSE_START=================================
+ * ONAP : ccsdk oran
+ * ======================================================================
+ * Copyright (C) 2019-2020 Nordix Foundation. All rights reserved.
+ * ======================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.onap.ccsdk.oran.a1policymanagementservice.controllers;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+
+import org.onap.ccsdk.oran.a1policymanagementservice.exceptions.ServiceException;
+import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policies;
+import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policy;
+import org.onap.ccsdk.oran.a1policymanagementservice.repository.Service;
+import org.onap.ccsdk.oran.a1policymanagementservice.repository.Services;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.DeleteMapping;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PutMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestParam;
+import org.springframework.web.bind.annotation.RestController;
+
+@RestController
+@Api(tags = "Service registry and supervision")
+public class ServiceController {
+
+ private final Services services;
+ private final Policies policies;
+
+ private static Gson gson = new GsonBuilder() //
+ .create(); //
+
+ @Autowired
+ ServiceController(Services services, Policies policies) {
+ this.services = services;
+ this.policies = policies;
+ }
+
+ @GetMapping("/services")
+ @ApiOperation(value = "Returns service information")
+ @ApiResponses(
+ value = { //
+ @ApiResponse(code = 200, message = "OK", response = ServiceStatus.class, responseContainer = "List"), //
+ @ApiResponse(code = 404, message = "Service is not found", response = String.class)})
+ public ResponseEntity<String> getServices(//
+ @ApiParam(name = "name", required = false, value = "The name of the service") //
+ @RequestParam(name = "name", required = false) String name) {
+ if (name != null && this.services.get(name) == null) {
+ return new ResponseEntity<>("Service not found", HttpStatus.NOT_FOUND);
+ }
+
+ Collection<ServiceStatus> servicesStatus = new ArrayList<>();
+ for (Service s : this.services.getAll()) {
+ if (name == null || name.equals(s.getName())) {
+ servicesStatus.add(toServiceStatus(s));
+ }
+ }
+
+ String res = gson.toJson(servicesStatus);
+ return new ResponseEntity<>(res, HttpStatus.OK);
+ }
+
+ private ServiceStatus toServiceStatus(Service s) {
+ return new ServiceStatus(s.getName(), s.getKeepAliveInterval().toSeconds(), s.timeSinceLastPing().toSeconds(),
+ s.getCallbackUrl());
+ }
+
+ private void validateRegistrationInfo(ServiceRegistrationInfo registrationInfo)
+ throws ServiceException, MalformedURLException {
+ if (registrationInfo.serviceName.isEmpty()) {
+ throw new ServiceException("Missing mandatory parameter 'serviceName'");
+ }
+ if (registrationInfo.keepAliveIntervalSeconds < 0) {
+ throw new ServiceException("Keepalive interval shoul be greater or equal to 0");
+ }
+ if (!registrationInfo.callbackUrl.isEmpty()) {
+ new URL(registrationInfo.callbackUrl);
+ }
+ }
+
+ @ApiOperation(value = "Register a service")
+ @ApiResponses(
+ value = { //
+ @ApiResponse(code = 200, message = "Service updated", response = String.class),
+ @ApiResponse(code = 201, message = "Service created", response = String.class), //
+ @ApiResponse(code = 400, message = "The ServiceRegistrationInfo is not accepted", response = String.class)})
+ @PutMapping("/service")
+ public ResponseEntity<String> putService(//
+ @RequestBody ServiceRegistrationInfo registrationInfo) {
+ try {
+ validateRegistrationInfo(registrationInfo);
+ final boolean isCreate = this.services.get(registrationInfo.serviceName) == null;
+ this.services.put(toService(registrationInfo));
+ return new ResponseEntity<>("OK", isCreate ? HttpStatus.CREATED : HttpStatus.OK);
+ } catch (Exception e) {
+ return new ResponseEntity<>(e.getMessage(), HttpStatus.BAD_REQUEST);
+ }
+ }
+
+ @ApiOperation(value = "Delete a service")
+ @ApiResponses(
+ value = { //
+ @ApiResponse(code = 204, message = "OK"),
+ @ApiResponse(code = 404, message = "Service not found", response = String.class)})
+ @DeleteMapping("/services")
+ public ResponseEntity<String> deleteService(//
+ @ApiParam(name = "name", required = true, value = "The name of the service") //
+ @RequestParam(name = "name", required = true) String serviceName) {
+ try {
+ Service service = removeService(serviceName);
+ // Remove the policies from the repo and let the consistency monitoring
+ // do the rest.
+ removePolicies(service);
+ return new ResponseEntity<>("OK", HttpStatus.NO_CONTENT);
+ } catch (Exception e) {
+ return new ResponseEntity<>(e.getMessage(), HttpStatus.NOT_FOUND);
+ }
+ }
+
+ @ApiOperation(value = "Heartbeat from a serice")
+ @ApiResponses(
+ value = { //
+ @ApiResponse(code = 200, message = "Service supervision timer refreshed, OK"),
+ @ApiResponse(code = 404, message = "The service is not found, needs re-registration")})
+ @PutMapping("/services/keepalive")
+ public ResponseEntity<String> keepAliveService(//
+ @ApiParam(name = "name", required = true, value = "The name of the service") //
+ @RequestParam(name = "name", required = true) String serviceName) {
+ try {
+ services.getService(serviceName).keepAlive();
+ return new ResponseEntity<>("OK", HttpStatus.OK);
+ } catch (ServiceException e) {
+ return new ResponseEntity<>(e.getMessage(), HttpStatus.NOT_FOUND);
+ }
+ }
+
+ private Service removeService(String name) throws ServiceException {
+ Service service = this.services.getService(name); // Just to verify that it exists
+ this.services.remove(service.getName());
+ return service;
+ }
+
+ private void removePolicies(Service service) {
+ Collection<Policy> policyList = this.policies.getForService(service.getName());
+ for (Policy policy : policyList) {
+ this.policies.remove(policy);
+ }
+ }
+
+ private Service toService(ServiceRegistrationInfo s) {
+ return new Service(s.serviceName, Duration.ofSeconds(s.keepAliveIntervalSeconds), s.callbackUrl);
+ }
+
+}
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/ServiceRegistrationInfo.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/ServiceRegistrationInfo.java
new file mode 100644
index 00000000..132c6276
--- /dev/null
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/ServiceRegistrationInfo.java
@@ -0,0 +1,61 @@
+/*-
+ * ========================LICENSE_START=================================
+ * ONAP : ccsdk oran
+ * ======================================================================
+ * Copyright (C) 2019-2020 Nordix Foundation. All rights reserved.
+ * ======================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.onap.ccsdk.oran.a1policymanagementservice.controllers;
+
+import com.google.gson.annotations.SerializedName;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+
+import org.immutables.gson.Gson;
+
+@Gson.TypeAdapters
+@ApiModel(value = "ServiceRegistrationInfo")
+public class ServiceRegistrationInfo {
+
+ @ApiModelProperty(value = "identity of the service", required = true, allowEmptyValue = false)
+ @SerializedName(value = "serviceName", alternate = {"name"})
+
+ public String serviceName = "";
+
+ @ApiModelProperty(
+ value = "keep alive interval for the service. This is a heartbeat supervision of the service, "
+ + "which in regular intevals must invoke a 'keepAlive' REST call. "
+ + "When a service does not invoke this call within the given time, it is considered unavailble. "
+ + "An unavailable service will be automatically deregistered and its policies will be deleted. "
+ + "Value 0 means no timeout supervision.")
+ @SerializedName("keepAliveIntervalSeconds")
+ public long keepAliveIntervalSeconds = 0;
+
+ @ApiModelProperty(value = "callback for notifying of RIC synchronization", required = false, allowEmptyValue = true)
+ @SerializedName("callbackUrl")
+ public String callbackUrl = "";
+
+ public ServiceRegistrationInfo() {
+ }
+
+ public ServiceRegistrationInfo(String name, long keepAliveIntervalSeconds, String callbackUrl) {
+ this.serviceName = name;
+ this.keepAliveIntervalSeconds = keepAliveIntervalSeconds;
+ this.callbackUrl = callbackUrl;
+ }
+
+}
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/ServiceStatus.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/ServiceStatus.java
new file mode 100644
index 00000000..d9117099
--- /dev/null
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/ServiceStatus.java
@@ -0,0 +1,51 @@
+/*-
+ * ========================LICENSE_START=================================
+ * ONAP : ccsdk oran
+ * ======================================================================
+ * Copyright (C) 2019-2020 Nordix Foundation. All rights reserved.
+ * ======================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.onap.ccsdk.oran.a1policymanagementservice.controllers;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+
+import org.immutables.gson.Gson;
+
+@Gson.TypeAdapters
+@ApiModel(value = "ServiceStatus")
+public class ServiceStatus {
+
+ @ApiModelProperty(value = "identity of the service")
+ public final String serviceName;
+
+ @ApiModelProperty(value = "policy keep alive timeout")
+ public final long keepAliveIntervalSeconds;
+
+ @ApiModelProperty(value = "time since last invocation by the service")
+ public final long timeSinceLastActivitySeconds;
+
+ @ApiModelProperty(value = "callback for notifying of RIC synchronization")
+ public String callbackUrl;
+
+ ServiceStatus(String name, long keepAliveIntervalSeconds, long timeSincePingSeconds, String callbackUrl) {
+ this.serviceName = name;
+ this.keepAliveIntervalSeconds = keepAliveIntervalSeconds;
+ this.timeSinceLastActivitySeconds = timeSincePingSeconds;
+ this.callbackUrl = callbackUrl;
+ }
+
+}
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/StatusController.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/StatusController.java
new file mode 100644
index 00000000..f43a96c2
--- /dev/null
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/controllers/StatusController.java
@@ -0,0 +1,48 @@
+/*-
+ * ========================LICENSE_START=================================
+ * ONAP : ccsdk oran
+ * ======================================================================
+ * Copyright (C) 2019-2020 Nordix Foundation. All rights reserved.
+ * ======================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.onap.ccsdk.oran.a1policymanagementservice.controllers;
+
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+
+import org.springframework.http.HttpStatus;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RestController;
+import reactor.core.publisher.Mono;
+
+@RestController
+@Api(tags = "Health check")
+public class StatusController {
+
+ @GetMapping("/status")
+ @ApiOperation(value = "Returns status and statistics of this service")
+ @ApiResponses(
+ value = { //
+ @ApiResponse(code = 200, message = "Service is living", response = String.class) //
+ })
+ public Mono<ResponseEntity<String>> getStatus() {
+ return Mono.just(new ResponseEntity<>("hunky dory", HttpStatus.OK));
+ }
+
+}
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageConsumer.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageConsumer.java
new file mode 100644
index 00000000..3063a5f5
--- /dev/null
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageConsumer.java
@@ -0,0 +1,172 @@
+/*-
+ * ========================LICENSE_START=================================
+ * ONAP : ccsdk oran
+ * ======================================================================
+ * Copyright (C) 2020 Nordix Foundation. All rights reserved.
+ * ======================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.onap.ccsdk.oran.a1policymanagementservice.dmaap;
+
+import com.google.common.collect.Iterables;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonParser;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.onap.ccsdk.oran.a1policymanagementservice.clients.AsyncRestClient;
+import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfig;
+import org.onap.ccsdk.oran.a1policymanagementservice.exceptions.ServiceException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.http.ResponseEntity;
+import org.springframework.stereotype.Component;
+
+/**
+ * The class fetches incoming requests from DMAAP. It uses the timeout parameter
+ * that lets the MessageRouter keep the connection with the Kafka open until
+ * requests are sent in.
+ *
+ * <p>
+ * this service will regularly check the configuration and start polling DMaaP
+ * if the configuration is added. If the DMaaP configuration is removed, then
+ * the service will stop polling and resume checking for configuration.
+ *
+ * <p>
+ * Each received request is processed by {@link DmaapMessageHandler}.
+ */
+@Component
+public class DmaapMessageConsumer {
+
+ protected static final Duration TIME_BETWEEN_DMAAP_RETRIES = Duration.ofSeconds(10);
+
+ private static final Logger logger = LoggerFactory.getLogger(DmaapMessageConsumer.class);
+
+ private final ApplicationConfig applicationConfig;
+
+ private DmaapMessageHandler dmaapMessageHandler = null;
+
+ @Value("${server.http-port}")
+ private int localServerHttpPort;
+
+ @Autowired
+ public DmaapMessageConsumer(ApplicationConfig applicationConfig) {
+ this.applicationConfig = applicationConfig;
+ }
+
+ /**
+ * Starts the consumer. If there is a DMaaP configuration, it will start polling
+ * for messages. Otherwise it will check regularly for the configuration.
+ *
+ * @return the running thread, for test purposes.
+ */
+ public Thread start() {
+ Thread thread = new Thread(this::messageHandlingLoop);
+ thread.start();
+ return thread;
+ }
+
+ private void messageHandlingLoop() {
+ while (!isStopped()) {
+ try {
+ if (isDmaapConfigured()) {
+ Iterable<String> dmaapMsgs = fetchAllMessages();
+ if (dmaapMsgs != null && Iterables.size(dmaapMsgs) > 0) {
+ logger.debug("Fetched all the messages from DMAAP and will start to process the messages");
+ for (String msg : dmaapMsgs) {
+ processMsg(msg);
+ }
+ }
+ } else {
+ sleep(TIME_BETWEEN_DMAAP_RETRIES); // wait for configuration
+ }
+ } catch (Exception e) {
+ logger.warn("{}", e.getMessage());
+ sleep(TIME_BETWEEN_DMAAP_RETRIES);
+ }
+ }
+ }
+
+ protected boolean isStopped() {
+ return false;
+ }
+
+ protected boolean isDmaapConfigured() {
+ String producerTopicUrl = applicationConfig.getDmaapProducerTopicUrl();
+ String consumerTopicUrl = applicationConfig.getDmaapConsumerTopicUrl();
+ return (!producerTopicUrl.isEmpty() && !consumerTopicUrl.isEmpty());
+ }
+
+ private static List<String> parseMessages(String jsonString) {
+ JsonArray arrayOfMessages = JsonParser.parseString(jsonString).getAsJsonArray();
+ List<String> result = new ArrayList<>();
+ for (JsonElement element : arrayOfMessages) {
+ if (element.isJsonPrimitive()) {
+ result.add(element.getAsString());
+ } else {
+ String messageAsString = element.toString();
+ result.add(messageAsString);
+ }
+ }
+ return result;
+ }
+
+ protected Iterable<String> fetchAllMessages() throws ServiceException {
+ String topicUrl = this.applicationConfig.getDmaapConsumerTopicUrl();
+ AsyncRestClient consumer = getMessageRouterConsumer();
+ ResponseEntity<String> response = consumer.getForEntity(topicUrl).block();
+ logger.debug("DMaaP consumer received {} : {}", response.getStatusCode(), response.getBody());
+ if (response.getStatusCode().is2xxSuccessful()) {
+ return parseMessages(response.getBody());
+ } else {
+ throw new ServiceException("Cannot fetch because of Error respons: " + response.getStatusCode().toString()
+ + " " + response.getBody());
+ }
+ }
+
+ private void processMsg(String msg) {
+ logger.debug("Message Reveived from DMAAP : {}", msg);
+ getDmaapMessageHandler().handleDmaapMsg(msg);
+ }
+
+ protected DmaapMessageHandler getDmaapMessageHandler() {
+ if (this.dmaapMessageHandler == null) {
+ String pmsBaseUrl = "http://localhost:" + this.localServerHttpPort;
+ AsyncRestClient pmsClient = new AsyncRestClient(pmsBaseUrl, this.applicationConfig.getWebClientConfig());
+ AsyncRestClient producer = new AsyncRestClient(this.applicationConfig.getDmaapProducerTopicUrl(),
+ this.applicationConfig.getWebClientConfig());
+ this.dmaapMessageHandler = new DmaapMessageHandler(producer, pmsClient);
+ }
+ return this.dmaapMessageHandler;
+ }
+
+ protected void sleep(Duration duration) {
+ try {
+ Thread.sleep(duration.toMillis());
+ } catch (Exception e) {
+ logger.error("Failed to put the thread to sleep", e);
+ }
+ }
+
+ protected AsyncRestClient getMessageRouterConsumer() {
+ return new AsyncRestClient("", this.applicationConfig.getWebClientConfig());
+ }
+
+}
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageHandler.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageHandler.java
new file mode 100644
index 00000000..040d8b3e
--- /dev/null
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageHandler.java
@@ -0,0 +1,157 @@
+/*-
+ * ========================LICENSE_START=================================
+ * ONAP : ccsdk oran
+ * ======================================================================
+ * Copyright (C) 2020 Nordix Foundation. All rights reserved.
+ * ======================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.onap.ccsdk.oran.a1policymanagementservice.dmaap;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonObject;
+
+import java.util.Optional;
+
+import org.onap.ccsdk.oran.a1policymanagementservice.clients.AsyncRestClient;
+import org.onap.ccsdk.oran.a1policymanagementservice.dmaap.DmaapRequestMessage.Operation;
+import org.onap.ccsdk.oran.a1policymanagementservice.exceptions.ServiceException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.reactive.function.client.WebClientException;
+import org.springframework.web.reactive.function.client.WebClientResponseException;
+import reactor.core.publisher.Mono;
+
+/**
+ * The class handles incoming requests from DMAAP.
+ * <p>
+ * That means: invoke a REST call towards this services and to send back a response though DMAAP
+ */
+public class DmaapMessageHandler {
+ private static final Logger logger = LoggerFactory.getLogger(DmaapMessageHandler.class);
+ private static Gson gson = new GsonBuilder() //
+ .create(); //
+ private final AsyncRestClient dmaapClient;
+ private final AsyncRestClient pmsClient;
+
+ public DmaapMessageHandler(AsyncRestClient dmaapClient, AsyncRestClient pmsClient) {
+ this.pmsClient = pmsClient;
+ this.dmaapClient = dmaapClient;
+ }
+
+ public void handleDmaapMsg(String msg) {
+ try {
+ String result = this.createTask(msg).block();
+ logger.debug("handleDmaapMsg: {}", result);
+ } catch (Exception throwable) {
+ logger.warn("handleDmaapMsg failure {}", throwable.getMessage());
+ }
+ }
+
+ Mono<String> createTask(String msg) {
+ try {
+ DmaapRequestMessage dmaapRequestMessage = gson.fromJson(msg, ImmutableDmaapRequestMessage.class);
+ return this.invokePolicyManagementService(dmaapRequestMessage) //
+ .onErrorResume(t -> handlePolicyManagementServiceCallError(t, dmaapRequestMessage)) //
+ .flatMap(
+ response -> sendDmaapResponse(response.getBody(), dmaapRequestMessage, response.getStatusCode()));
+ } catch (Exception e) {
+ String errorMsg = "Received unparsable message from DMAAP: \"" + msg + "\", reason: " + e.getMessage();
+ return Mono.error(new ServiceException(errorMsg)); // Cannot make any response
+ }
+ }
+
+ private Mono<ResponseEntity<String>> handlePolicyManagementServiceCallError(Throwable error,
+ DmaapRequestMessage dmaapRequestMessage) {
+ logger.debug("Policy Management Service call failed: {}", error.getMessage());
+ HttpStatus status = HttpStatus.INTERNAL_SERVER_ERROR;
+ String errorMessage = error.getMessage();
+ if (error instanceof WebClientResponseException) {
+ WebClientResponseException exception = (WebClientResponseException) error;
+ status = exception.getStatusCode();
+ errorMessage = exception.getResponseBodyAsString();
+ } else if (error instanceof ServiceException) {
+ status = HttpStatus.BAD_REQUEST;
+ errorMessage = error.getMessage();
+ } else if (!(error instanceof WebClientException)) {
+ logger.warn("Unexpected exception ", error);
+ }
+ return sendDmaapResponse(errorMessage, dmaapRequestMessage, status) //
+ .flatMap(notUsed -> Mono.empty());
+ }
+
+ private Mono<ResponseEntity<String>> invokePolicyManagementService(DmaapRequestMessage dmaapRequestMessage) {
+ DmaapRequestMessage.Operation operation = dmaapRequestMessage.operation();
+ String uri = dmaapRequestMessage.url();
+
+ if (operation == Operation.DELETE) {
+ return pmsClient.deleteForEntity(uri);
+ } else if (operation == Operation.GET) {
+ return pmsClient.getForEntity(uri);
+ } else if (operation == Operation.PUT) {
+ return pmsClient.putForEntity(uri, payload(dmaapRequestMessage));
+ } else if (operation == Operation.POST) {
+ return pmsClient.postForEntity(uri, payload(dmaapRequestMessage));
+ } else {
+ return Mono.error(new ServiceException("Not implemented operation: " + operation));
+ }
+ }
+
+ private String payload(DmaapRequestMessage message) {
+ Optional<JsonObject> payload = message.payload();
+ if (payload.isPresent()) {
+ return gson.toJson(payload.get());
+ } else {
+ logger.warn("Expected payload in message from DMAAP: {}", message);
+ return "";
+ }
+ }
+
+ private Mono<String> sendDmaapResponse(String response, DmaapRequestMessage dmaapRequestMessage,
+ HttpStatus status) {
+ return createDmaapResponseMessage(dmaapRequestMessage, response, status) //
+ .flatMap(this::sendToDmaap) //
+ .onErrorResume(this::handleResponseCallError);
+ }
+
+ private Mono<String> sendToDmaap(String body) {
+ logger.debug("sendToDmaap: {} ", body);
+ return dmaapClient.post("", "[" + body + "]");
+ }
+
+ private Mono<String> handleResponseCallError(Throwable t) {
+ logger.debug("Failed to send response to DMaaP: {}", t.getMessage());
+ return Mono.empty();
+ }
+
+ private Mono<String> createDmaapResponseMessage(DmaapRequestMessage dmaapRequestMessage, String response,
+ HttpStatus status) {
+ DmaapResponseMessage dmaapResponseMessage = ImmutableDmaapResponseMessage.builder() //
+ .status(status.toString()) //
+ .message(response == null ? "" : response) //
+ .type("response") //
+ .correlationId(dmaapRequestMessage.correlationId() == null ? "" : dmaapRequestMessage.correlationId()) //
+ .originatorId(dmaapRequestMessage.originatorId() == null ? "" : dmaapRequestMessage.originatorId()) //
+ .requestId(dmaapRequestMessage.requestId() == null ? "" : dmaapRequestMessage.requestId()) //
+ .timestamp(dmaapRequestMessage.timestamp() == null ? "" : dmaapRequestMessage.timestamp()) //
+ .build();
+ String str = gson.toJson(dmaapResponseMessage);
+ return Mono.just(str);
+
+ }
+}
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapRequestMessage.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapRequestMessage.java
new file mode 100644
index 00000000..f41c51c5
--- /dev/null
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapRequestMessage.java
@@ -0,0 +1,55 @@
+/*-
+ * ========================LICENSE_START=================================
+ * ONAP : ccsdk oran
+ * ======================================================================
+ * Copyright (C) 2020 Nordix Foundation. All rights reserved.
+ * ======================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.onap.ccsdk.oran.a1policymanagementservice.dmaap;
+
+import com.google.gson.JsonObject;
+
+import java.util.Optional;
+
+import org.immutables.gson.Gson;
+import org.immutables.value.Value;
+
+@Value.Immutable
+@Gson.TypeAdapters
+public interface DmaapRequestMessage {
+
+ public enum Operation {
+ PUT, GET, DELETE, POST
+ }
+
+ String correlationId();
+
+ String target();
+
+ String timestamp();
+
+ String apiVersion();
+
+ String originatorId();
+
+ String requestId();
+
+ Operation operation();
+
+ String url();
+
+ Optional<JsonObject> payload();
+}
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapResponseMessage.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapResponseMessage.java
new file mode 100644
index 00000000..04024742
--- /dev/null
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapResponseMessage.java
@@ -0,0 +1,43 @@
+/*-
+ * ========================LICENSE_START=================================
+ * ONAP : ccsdk oran
+ * ======================================================================
+ * Copyright (C) 2020 Nordix Foundation. All rights reserved.
+ * ======================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.onap.ccsdk.oran.a1policymanagementservice.dmaap;
+
+import org.immutables.gson.Gson;
+import org.immutables.value.Value;
+
+@Value.Immutable
+@Gson.TypeAdapters
+public interface DmaapResponseMessage {
+
+ String type();
+
+ String correlationId();
+
+ String timestamp();
+
+ String originatorId();
+
+ String requestId();
+
+ String status();
+
+ String message();
+}
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/exceptions/EnvironmentLoaderException.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/exceptions/EnvironmentLoaderException.java
new file mode 100644
index 00000000..4e1009f0
--- /dev/null
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/exceptions/EnvironmentLoaderException.java
@@ -0,0 +1,30 @@
+/*-
+ * ========================LICENSE_START=================================
+ * ONAP : ccsdk oran
+ * ======================================================================
+ * Copyright (C) 2019-2020 Nordix Foundation. All rights reserved.
+ * ======================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.onap.ccsdk.oran.a1policymanagementservice.exceptions;
+
+public class EnvironmentLoaderException extends ServiceException {
+
+ private static final long serialVersionUID = 1L;
+
+ public EnvironmentLoaderException(String message) {
+ super(message);
+ }
+}
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/exceptions/ServiceException.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/exceptions/ServiceException.java
new file mode 100644
index 00000000..17e651aa
--- /dev/null
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/exceptions/ServiceException.java
@@ -0,0 +1,34 @@
+/*-
+ * ========================LICENSE_START=================================
+ * ONAP : ccsdk oran
+ * ======================================================================
+ * Copyright (C) 2019-2020 Nordix Foundation. All rights reserved.
+ * ======================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.onap.ccsdk.oran.a1policymanagementservice.exceptions;
+
+public class ServiceException extends Exception {
+
+ private static final long serialVersionUID = 1L;
+
+ public ServiceException(String message) {
+ super(message);
+ }
+
+ public ServiceException(String message, Exception originalException) {
+ super(message, originalException);
+ }
+}
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Lock.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Lock.java
new file mode 100644
index 00000000..0cde2e12
--- /dev/null
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Lock.java
@@ -0,0 +1,197 @@
+/*-
+ * ========================LICENSE_START=================================
+ * ONAP : ccsdk oran
+ * ======================================================================
+ * Copyright (C) 2019-2020 Nordix Foundation. All rights reserved.
+ * ======================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.onap.ccsdk.oran.a1policymanagementservice.repository;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import reactor.core.publisher.Mono;
+import reactor.core.publisher.MonoSink;
+
+/**
+ * A resource lock. Exclusive means that the caller takes exclusive ownership of
+ * the resurce. Non exclusive lock means that several users can lock the
+ * resource (for shared usage).
+ */
+public class Lock {
+ private static final Logger logger = LoggerFactory.getLogger(Lock.class);
+
+ private boolean isExclusive = false;
+ private int lockCounter = 0;
+ private final List<LockRequest> lockRequestQueue = new LinkedList<>();
+ private static AsynchCallbackExecutor callbackProcessor = new AsynchCallbackExecutor();
+
+ public enum LockType {
+ EXCLUSIVE, SHARED
+ }
+
+ /** The caller thread will be blocked util the lock is granted. */
+ public synchronized void lockBlocking(LockType locktype) {
+ while (!tryLock(locktype)) {
+ this.waitForUnlock();
+ }
+ }
+
+ /** Reactive version. The Lock will be emitted when the lock is granted */
+ public synchronized Mono<Lock> lock(LockType lockType) {
+ if (tryLock(lockType)) {
+ return Mono.just(this);
+ } else {
+ return Mono.create(monoSink -> addToQueue(monoSink, lockType));
+ }
+ }
+
+ public Mono<Lock> unlock() {
+ return Mono.create(monoSink -> {
+ unlockBlocking();
+ monoSink.success(this);
+ });
+ }
+
+ public synchronized void unlockBlocking() {
+ if (lockCounter <= 0) {
+ lockCounter = -1; // Might as well stop, to make it easier to find the problem
+ logger.error("Number of unlocks must match the number of locks");
+ }
+ this.lockCounter--;
+ if (lockCounter == 0) {
+ isExclusive = false;
+ }
+ this.notifyAll();
+ this.processQueuedEntries();
+ }
+
+ @Override
+ public synchronized String toString() {
+ return "Lock cnt: " + this.lockCounter + " exclusive: " + this.isExclusive + " queued: "
+ + this.lockRequestQueue.size();
+ }
+
+ /** returns the current number of granted locks */
+ public synchronized int getLockCounter() {
+ return this.lockCounter;
+ }
+
+ private void processQueuedEntries() {
+ List<LockRequest> granted = new ArrayList<>();
+ for (Iterator<LockRequest> i = lockRequestQueue.iterator(); i.hasNext();) {
+ LockRequest request = i.next();
+ if (tryLock(request.lockType)) {
+ i.remove();
+ granted.add(request);
+ }
+ }
+ callbackProcessor.addAll(granted);
+ }
+
+ private synchronized void addToQueue(MonoSink<Lock> callback, LockType lockType) {
+ lockRequestQueue.add(new LockRequest(callback, lockType, this));
+ processQueuedEntries();
+ }
+
+ @SuppressWarnings("java:S2274") // Always invoke wait() and await() methods inside a loop
+ private synchronized void waitForUnlock() {
+ try {
+ this.wait();
+ } catch (InterruptedException e) {
+ logger.warn("waitForUnlock interrupted", e);
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ private boolean tryLock(LockType lockType) {
+ if (this.isExclusive) {
+ return false;
+ }
+ if (lockType == LockType.EXCLUSIVE && lockCounter > 0) {
+ return false;
+ }
+ lockCounter++;
+ this.isExclusive = lockType == LockType.EXCLUSIVE;
+ return true;
+ }
+
+ /**
+ * Represents a queued lock request
+ */
+ private static class LockRequest {
+ final MonoSink<Lock> callback;
+ final LockType lockType;
+ final Lock lock;
+
+ LockRequest(MonoSink<Lock> callback, LockType lockType, Lock lock) {
+ this.callback = callback;
+ this.lockType = lockType;
+ this.lock = lock;
+ }
+ }
+
+ /**
+ * A separate thread that calls a MonoSink to continue. This is done after a
+ * queued lock is granted.
+ */
+ private static class AsynchCallbackExecutor implements Runnable {
+ private List<LockRequest> lockRequestQueue = new LinkedList<>();
+
+ public AsynchCallbackExecutor() {
+ Thread thread = new Thread(this);
+ thread.start();
+ }
+
+ public synchronized void addAll(List<LockRequest> requests) {
+ this.lockRequestQueue.addAll(requests);
+ this.notifyAll();
+ }
+
+ @Override
+ public void run() {
+ try {
+ while (true) {
+ for (LockRequest request : consume()) {
+ request.callback.success(request.lock);
+ }
+ waitForNewEntries();
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ logger.error("Interrupted {}", e.getMessage());
+ }
+ }
+
+ private synchronized List<LockRequest> consume() {
+ List<LockRequest> q = this.lockRequestQueue;
+ this.lockRequestQueue = new LinkedList<>();
+ return q;
+ }
+
+ @SuppressWarnings("java:S2274")
+ private synchronized void waitForNewEntries() throws InterruptedException {
+ if (this.lockRequestQueue.isEmpty()) {
+ this.wait();
+ }
+ }
+ }
+}
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Policies.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Policies.java
new file mode 100644
index 00000000..162d05f4
--- /dev/null
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Policies.java
@@ -0,0 +1,131 @@
+/*-
+ * ========================LICENSE_START=================================
+ * ONAP : ccsdk oran
+ * ======================================================================
+ * Copyright (C) 2019-2020 Nordix Foundation. All rights reserved.
+ * ======================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.onap.ccsdk.oran.a1policymanagementservice.repository;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.Vector;
+
+import org.onap.ccsdk.oran.a1policymanagementservice.exceptions.ServiceException;
+
+public class Policies {
+ private Map<String, Policy> policiesId = new HashMap<>();
+ private Map<String, Map<String, Policy>> policiesRic = new HashMap<>();
+ private Map<String, Map<String, Policy>> policiesService = new HashMap<>();
+ private Map<String, Map<String, Policy>> policiesType = new HashMap<>();
+
+ public synchronized void put(Policy policy) {
+ policiesId.put(policy.id(), policy);
+ multiMapPut(policiesRic, policy.ric().name(), policy);
+ multiMapPut(policiesService, policy.ownerServiceName(), policy);
+ multiMapPut(policiesType, policy.type().name(), policy);
+ }
+
+ private void multiMapPut(Map<String, Map<String, Policy>> multiMap, String key, Policy value) {
+ multiMap.computeIfAbsent(key, k -> new HashMap<>()).put(value.id(), value);
+ }
+
+ private void multiMapRemove(Map<String, Map<String, Policy>> multiMap, String key, Policy value) {
+ Map<String, Policy> map = multiMap.get(key);
+ if (map != null) {
+ map.remove(value.id());
+ if (map.isEmpty()) {
+ multiMap.remove(key);
+ }
+ }
+ }
+
+ private Collection<Policy> multiMapGet(Map<String, Map<String, Policy>> multiMap, String key) {
+ Map<String, Policy> map = multiMap.get(key);
+ if (map == null) {
+ return Collections.emptyList();
+ }
+ return new Vector<>(map.values());
+ }
+
+ public synchronized boolean containsPolicy(String id) {
+ return policiesId.containsKey(id);
+ }
+
+ public synchronized Policy get(String id) {
+ return policiesId.get(id);
+ }
+
+ public synchronized Policy getPolicy(String id) throws ServiceException {
+ Policy p = policiesId.get(id);
+ if (p == null) {
+ throw new ServiceException("Could not find policy: " + id);
+ }
+ return p;
+ }
+
+ public synchronized Collection<Policy> getAll() {
+ return new Vector<>(policiesId.values());
+ }
+
+ public synchronized Collection<Policy> getForService(String service) {
+ return multiMapGet(policiesService, service);
+ }
+
+ public synchronized Collection<Policy> getForRic(String ric) {
+ return multiMapGet(policiesRic, ric);
+ }
+
+ public synchronized Collection<Policy> getForType(String type) {
+ return multiMapGet(policiesType, type);
+ }
+
+ public synchronized Policy removeId(String id) {
+ Policy p = policiesId.get(id);
+ if (p != null) {
+ remove(p);
+ }
+ return p;
+ }
+
+ public synchronized void remove(Policy policy) {
+ policiesId.remove(policy.id());
+ multiMapRemove(policiesRic, policy.ric().name(), policy);
+ multiMapRemove(policiesService, policy.ownerServiceName(), policy);
+ multiMapRemove(policiesType, policy.type().name(), policy);
+ }
+
+ public synchronized void removePoliciesForRic(String ricName) {
+ Collection<Policy> policiesForRic = getForRic(ricName);
+ for (Policy policy : policiesForRic) {
+ remove(policy);
+ }
+ }
+
+ public synchronized int size() {
+ return policiesId.size();
+ }
+
+ public synchronized void clear() {
+ while (policiesId.size() > 0) {
+ Set<String> keys = policiesId.keySet();
+ removeId(keys.iterator().next());
+ }
+ }
+}
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Policy.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Policy.java
new file mode 100644
index 00000000..957b5f3e
--- /dev/null
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Policy.java
@@ -0,0 +1,42 @@
+/*-
+ * ========================LICENSE_START=================================
+ * ONAP : ccsdk oran
+ * ======================================================================
+ * Copyright (C) 2019-2020 Nordix Foundation. All rights reserved.
+ * ======================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.onap.ccsdk.oran.a1policymanagementservice.repository;
+
+import org.immutables.gson.Gson;
+import org.immutables.value.Value;
+
+@Value.Immutable
+@Gson.TypeAdapters
+public interface Policy {
+ public String id();
+
+ public String json();
+
+ public String ownerServiceName();
+
+ public Ric ric();
+
+ public PolicyType type();
+
+ public String lastModified();
+
+ public boolean isTransient();
+}
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/PolicyType.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/PolicyType.java
new file mode 100644
index 00000000..9524756f
--- /dev/null
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/PolicyType.java
@@ -0,0 +1,32 @@
+/*-
+ * ========================LICENSE_START=================================
+ * ONAP : ccsdk oran
+ * ======================================================================
+ * Copyright (C) 2019-2020 Nordix Foundation. All rights reserved.
+ * ======================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.onap.ccsdk.oran.a1policymanagementservice.repository;
+
+import org.immutables.gson.Gson;
+import org.immutables.value.Value;
+
+@Value.Immutable
+@Gson.TypeAdapters
+public interface PolicyType {
+ public String name();
+
+ public String schema();
+}
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/PolicyTypes.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/PolicyTypes.java
new file mode 100644
index 00000000..9311e58e
--- /dev/null
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/PolicyTypes.java
@@ -0,0 +1,64 @@
+/*-
+ * ========================LICENSE_START=================================
+ * ONAP : ccsdk oran
+ * ======================================================================
+ * Copyright (C) 2019-2020 Nordix Foundation. All rights reserved.
+ * ======================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.onap.ccsdk.oran.a1policymanagementservice.repository;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Vector;
+
+import org.onap.ccsdk.oran.a1policymanagementservice.exceptions.ServiceException;
+
+public class PolicyTypes {
+ private Map<String, PolicyType> types = new HashMap<>();
+
+ public synchronized PolicyType getType(String name) throws ServiceException {
+ PolicyType t = types.get(name);
+ if (t == null) {
+ throw new ServiceException("Could not find type: " + name);
+ }
+ return t;
+ }
+
+ public synchronized PolicyType get(String name) {
+ return types.get(name);
+ }
+
+ public synchronized void put(PolicyType type) {
+ types.put(type.name(), type);
+ }
+
+ public synchronized boolean contains(String policyType) {
+ return types.containsKey(policyType);
+ }
+
+ public synchronized Collection<PolicyType> getAll() {
+ return new Vector<>(types.values());
+ }
+
+ public synchronized int size() {
+ return types.size();
+ }
+
+ public synchronized void clear() {
+ this.types.clear();
+ }
+}
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Ric.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Ric.java
new file mode 100644
index 00000000..14ac981f
--- /dev/null
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Ric.java
@@ -0,0 +1,163 @@
+/*-
+ * ========================LICENSE_START=================================
+ * ONAP : ccsdk oran
+ * ======================================================================
+ * Copyright (C) 2019-2020 Nordix Foundation. All rights reserved.
+ * ======================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.onap.ccsdk.oran.a1policymanagementservice.repository;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Vector;
+
+import lombok.Getter;
+import lombok.Setter;
+
+import org.onap.ccsdk.oran.a1policymanagementservice.clients.A1Client.A1ProtocolType;
+import org.onap.ccsdk.oran.a1policymanagementservice.configuration.RicConfig;
+import org.onap.ccsdk.oran.a1policymanagementservice.repository.Ric.RicState;
+
+/**
+ * Represents the dynamic information about a Near-RT RIC.
+ */
+public class Ric {
+
+ @Setter
+ private RicConfig ricConfig;
+ private RicState state = RicState.UNAVAILABLE;
+ private Map<String, PolicyType> supportedPolicyTypes = new HashMap<>();
+ @Getter
+ @Setter
+ private A1ProtocolType protocolVersion = A1ProtocolType.UNKNOWN;
+
+ @Getter
+ private final Lock lock = new Lock();
+
+ /**
+ * Creates the Ric. Initial state is {@link RicState.UNDEFINED}.
+ *
+ * @param ricConfig The {@link RicConfig} for this Ric.
+ */
+ public Ric(RicConfig ricConfig) {
+ this.ricConfig = ricConfig;
+ }
+
+ public String name() {
+ return ricConfig.name();
+ }
+
+ public RicConfig getConfig() {
+ return this.ricConfig;
+ }
+
+ public synchronized RicState getState() {
+ return this.state;
+ }
+
+ public synchronized void setState(RicState state) {
+ this.state = state;
+ }
+
+ /**
+ * Gets the nodes managed by this Ric.
+ *
+ * @return a vector containing the nodes managed by this Ric.
+ */
+ public synchronized Collection<String> getManagedElementIds() {
+ return ricConfig.managedElementIds();
+ }
+
+ /**
+ * Determines if the given node is managed by this Ric.
+ *
+ * @param managedElementId the node name to check.
+ * @return true if the given node is managed by this Ric.
+ */
+ public synchronized boolean isManaging(String managedElementId) {
+ return ricConfig.managedElementIds().contains(managedElementId);
+ }
+
+ /**
+ * Gets the policy types supported by this Ric.
+ *
+ * @return the policy types supported by this Ric in an unmodifiable list.
+ */
+ public synchronized Collection<PolicyType> getSupportedPolicyTypes() {
+ return new Vector<>(supportedPolicyTypes.values());
+ }
+
+ public synchronized Collection<String> getSupportedPolicyTypeNames() {
+ return new Vector<>(supportedPolicyTypes.keySet());
+ }
+
+ /**
+ * Adds a policy type as supported by this Ric.
+ *
+ * @param type the policy type to support.
+ */
+ public synchronized void addSupportedPolicyType(PolicyType type) {
+ supportedPolicyTypes.put(type.name(), type);
+ }
+
+ /**
+ * Removes all policy type as supported by this Ric.
+ */
+ public synchronized void clearSupportedPolicyTypes() {
+ supportedPolicyTypes.clear();
+ }
+
+ /**
+ * Checks if a type is supported by this Ric.
+ *
+ * @param typeName the name of the type to check if it is supported.
+ *
+ * @return true if the given type is supported by this Ric, false otherwise.
+ */
+ public synchronized boolean isSupportingType(String typeName) {
+ return supportedPolicyTypes.containsKey(typeName);
+ }
+
+ @Override
+ public synchronized String toString() {
+ return Ric.class.getSimpleName() + ": " + "name: " + name() + ", state: " + state + ", baseUrl: "
+ + ricConfig.baseUrl() + ", managedNodes: " + ricConfig.managedElementIds();
+ }
+
+ /**
+ * Represents the states possible for a Ric.
+ */
+ public enum RicState {
+ /**
+ * The Policy Management Service's view of the Ric may be inconsistent.
+ */
+ UNAVAILABLE,
+ /**
+ * The normal state. Policies can be configured.
+ */
+ AVAILABLE,
+ /**
+ * The Policy Management Service is synchronizing the view of the Ric.
+ */
+ SYNCHRONIZING,
+
+ /**
+ * A consistency check between the Policy Management Service and the Ric is done
+ */
+ CONSISTENCY_CHECK
+ }
+}
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Rics.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Rics.java
new file mode 100644
index 00000000..d810dfa0
--- /dev/null
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Rics.java
@@ -0,0 +1,77 @@
+/*-
+ * ========================LICENSE_START=================================
+ * ONAP : ccsdk oran
+ * ======================================================================
+ * Copyright (C) 2019-2020 Nordix Foundation. All rights reserved.
+ * ======================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.onap.ccsdk.oran.a1policymanagementservice.repository;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Vector;
+
+import org.onap.ccsdk.oran.a1policymanagementservice.exceptions.ServiceException;
+
+/**
+ * Dynamic representation of all Rics in the system.
+ */
+public class Rics {
+ Map<String, Ric> registeredRics = new HashMap<>();
+
+ public synchronized void put(Ric ric) {
+ registeredRics.put(ric.name(), ric);
+ }
+
+ public synchronized Collection<Ric> getRics() {
+ return new Vector<>(registeredRics.values());
+ }
+
+ public synchronized Ric getRic(String name) throws ServiceException {
+ Ric ric = registeredRics.get(name);
+ if (ric == null) {
+ throw new ServiceException("Could not find ric: " + name);
+ }
+ return ric;
+ }
+
+ public synchronized Ric get(String name) {
+ return registeredRics.get(name);
+ }
+
+ public synchronized void remove(String name) {
+ registeredRics.remove(name);
+ }
+
+ public synchronized int size() {
+ return registeredRics.size();
+ }
+
+ public synchronized void clear() {
+ this.registeredRics.clear();
+ }
+
+ public synchronized Optional<Ric> lookupRicForManagedElement(String managedElementId) {
+ for (Ric ric : this.registeredRics.values()) {
+ if (ric.getManagedElementIds().contains(managedElementId)) {
+ return Optional.of(ric);
+ }
+ }
+ return Optional.empty();
+ }
+}
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Service.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Service.java
new file mode 100644
index 00000000..823490ce
--- /dev/null
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Service.java
@@ -0,0 +1,62 @@
+/*-
+ * ========================LICENSE_START=================================
+ * ONAP : ccsdk oran
+ * ======================================================================
+ * Copyright (C) 2019-2020 Nordix Foundation. All rights reserved.
+ * ======================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.onap.ccsdk.oran.a1policymanagementservice.repository;
+
+import java.time.Duration;
+import java.time.Instant;
+
+import lombok.Getter;
+
+public class Service {
+ @Getter
+ private final String name;
+ private final Duration keepAliveInterval;
+ private Instant lastPing;
+ private final String callbackUrl;
+
+ public Service(String name, Duration keepAliveInterval, String callbackUrl) {
+ this.name = name;
+ this.keepAliveInterval = keepAliveInterval;
+ this.callbackUrl = callbackUrl;
+ keepAlive();
+ }
+
+ public synchronized Duration getKeepAliveInterval() {
+ return this.keepAliveInterval;
+ }
+
+ public synchronized void keepAlive() {
+ this.lastPing = Instant.now();
+ }
+
+ public synchronized boolean isExpired() {
+ return this.keepAliveInterval.getSeconds() > 0 && timeSinceLastPing().compareTo(this.keepAliveInterval) > 0;
+ }
+
+ public synchronized Duration timeSinceLastPing() {
+ return Duration.between(this.lastPing, Instant.now());
+ }
+
+ public synchronized String getCallbackUrl() {
+ return this.callbackUrl;
+ }
+
+}
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Services.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Services.java
new file mode 100644
index 00000000..63633f67
--- /dev/null
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/repository/Services.java
@@ -0,0 +1,69 @@
+/*-
+ * ========================LICENSE_START=================================
+ * ONAP : ccsdk oran
+ * ======================================================================
+ * Copyright (C) 2019-2020 Nordix Foundation. All rights reserved.
+ * ======================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.onap.ccsdk.oran.a1policymanagementservice.repository;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Vector;
+
+import org.onap.ccsdk.oran.a1policymanagementservice.exceptions.ServiceException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Services {
+ private static final Logger logger = LoggerFactory.getLogger(Services.class);
+
+ private Map<String, Service> registeredServices = new HashMap<>();
+
+ public synchronized Service getService(String name) throws ServiceException {
+ Service s = registeredServices.get(name);
+ if (s == null) {
+ throw new ServiceException("Could not find service: " + name);
+ }
+ return s;
+ }
+
+ public synchronized Service get(String name) {
+ return registeredServices.get(name);
+ }
+
+ public synchronized void put(Service service) {
+ logger.debug("Put service: {}", service.getName());
+ service.keepAlive();
+ registeredServices.put(service.getName(), service);
+ }
+
+ public synchronized Iterable<Service> getAll() {
+ return new Vector<>(registeredServices.values());
+ }
+
+ public synchronized void remove(String name) {
+ registeredServices.remove(name);
+ }
+
+ public synchronized int size() {
+ return registeredServices.size();
+ }
+
+ public synchronized void clear() {
+ registeredServices.clear();
+ }
+}
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/EnvironmentProcessor.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/EnvironmentProcessor.java
new file mode 100644
index 00000000..840de817
--- /dev/null
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/EnvironmentProcessor.java
@@ -0,0 +1,91 @@
+/*-
+ * ========================LICENSE_START=================================
+ * ONAP : ccsdk oran
+ * ======================================================================
+ * Copyright (C) 2019-2020 Nordix Foundation. All rights reserved.
+ * ======================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.onap.ccsdk.oran.a1policymanagementservice.tasks;
+
+import java.util.Optional;
+import java.util.Properties;
+
+import org.onap.ccsdk.oran.a1policymanagementservice.exceptions.EnvironmentLoaderException;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.ImmutableEnvProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import reactor.core.publisher.Mono;
+
+/**
+ * This class reads a few environment variables used for locating the Consul
+ * (Config Binding Service).
+ */
+class EnvironmentProcessor {
+
+ private static final int DEFAULT_CONSUL_PORT = 8500;
+ private static final Logger logger = LoggerFactory.getLogger(EnvironmentProcessor.class);
+
+ private EnvironmentProcessor() {
+ }
+
+ static Mono<EnvProperties> readEnvironmentVariables(Properties systemEnvironment) {
+
+ EnvProperties envProperties;
+ try {
+ envProperties = ImmutableEnvProperties.builder() //
+ .consulHost(getConsulHost(systemEnvironment)) //
+ .consulPort(getConsultPort(systemEnvironment)) //
+ .cbsName(getConfigBindingService(systemEnvironment)) //
+ .appName(getService(systemEnvironment)) //
+ .build();
+ } catch (EnvironmentLoaderException e) {
+ return Mono.error(e);
+ }
+ logger.trace("Evaluated environment system variables {}", envProperties);
+ return Mono.just(envProperties);
+ }
+
+ private static String getConsulHost(Properties systemEnvironments) throws EnvironmentLoaderException {
+ return Optional.ofNullable(systemEnvironments.getProperty("CONSUL_HOST"))
+ .orElseThrow(() -> new EnvironmentLoaderException("$CONSUL_HOST environment has not been defined"));
+ }
+
+ private static Integer getConsultPort(Properties systemEnvironments) {
+ return Optional.ofNullable(systemEnvironments.getProperty("CONSUL_PORT")) //
+ .map(Integer::valueOf) //
+ .orElseGet(EnvironmentProcessor::getDefaultPortOfConsul);
+ }
+
+ private static String getConfigBindingService(Properties systemEnvironments) throws EnvironmentLoaderException {
+ return Optional.ofNullable(systemEnvironments.getProperty("CONFIG_BINDING_SERVICE")) //
+ .orElseThrow(
+ () -> new EnvironmentLoaderException("$CONFIG_BINDING_SERVICE environment has not been defined"));
+ }
+
+ private static String getService(Properties systemEnvironments) throws EnvironmentLoaderException {
+ return Optional
+ .ofNullable(Optional.ofNullable(systemEnvironments.getProperty("HOSTNAME"))
+ .orElse(systemEnvironments.getProperty("SERVICE_NAME")))
+ .orElseThrow(() -> new EnvironmentLoaderException(
+ "Neither $HOSTNAME/$SERVICE_NAME have not been defined as system environment"));
+ }
+
+ private static Integer getDefaultPortOfConsul() {
+ logger.warn("$CONSUL_PORT variable will be set to default port {}", DEFAULT_CONSUL_PORT);
+ return DEFAULT_CONSUL_PORT;
+ }
+}
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java
new file mode 100644
index 00000000..f113da9e
--- /dev/null
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java
@@ -0,0 +1,263 @@
+/*-
+ * ========================LICENSE_START=================================
+ * ONAP : ccsdk oran
+ * ======================================================================
+ * Copyright (C) 2019-2020 Nordix Foundation. All rights reserved.
+ * ======================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.onap.ccsdk.oran.a1policymanagementservice.tasks;
+
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import com.google.gson.TypeAdapterFactory;
+
+import java.io.BufferedInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.time.Duration;
+import java.util.Properties;
+import java.util.ServiceLoader;
+
+import javax.validation.constraints.NotNull;
+
+import lombok.AccessLevel;
+import lombok.Getter;
+
+import org.onap.ccsdk.oran.a1policymanagementservice.clients.A1ClientFactory;
+import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfig;
+import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfig.RicConfigUpdate;
+import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfigParser;
+import org.onap.ccsdk.oran.a1policymanagementservice.configuration.RicConfig;
+import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policies;
+import org.onap.ccsdk.oran.a1policymanagementservice.repository.PolicyTypes;
+import org.onap.ccsdk.oran.a1policymanagementservice.repository.Ric;
+import org.onap.ccsdk.oran.a1policymanagementservice.repository.Rics;
+import org.onap.ccsdk.oran.a1policymanagementservice.repository.Services;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties;
+import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+import reactor.core.Disposable;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+/**
+ * Regularly refreshes the configuration from Consul or from a local configuration file.
+ */
+@Component
+public class RefreshConfigTask {
+
+ private static final Logger logger = LoggerFactory.getLogger(RefreshConfigTask.class);
+
+ @Value("#{systemEnvironment}")
+ public Properties systemEnvironment;
+
+ /**
+ * The time between refreshes of the configuration.
+ */
+ static final Duration CONFIG_REFRESH_INTERVAL = Duration.ofMinutes(1);
+
+ final ApplicationConfig appConfig;
+ @Getter(AccessLevel.PROTECTED)
+ private Disposable refreshTask = null;
+ private boolean isConsulUsed = false;
+
+ private final Rics rics;
+ private final A1ClientFactory a1ClientFactory;
+ private final Policies policies;
+ private final Services services;
+ private final PolicyTypes policyTypes;
+
+ @Autowired
+ public RefreshConfigTask(ApplicationConfig appConfig, Rics rics, Policies policies, Services services,
+ PolicyTypes policyTypes, A1ClientFactory a1ClientFactory) {
+ this.appConfig = appConfig;
+ this.rics = rics;
+ this.policies = policies;
+ this.services = services;
+ this.policyTypes = policyTypes;
+ this.a1ClientFactory = a1ClientFactory;
+ }
+
+ public void start() {
+ logger.debug("Starting refreshConfigTask");
+ stop();
+ refreshTask = createRefreshTask() //
+ .subscribe(notUsed -> logger.debug("Refreshed configuration data"),
+ throwable -> logger.error("Configuration refresh terminated due to exception {}", throwable.toString()),
+ () -> logger.error("Configuration refresh terminated"));
+ }
+
+ public void stop() {
+ if (refreshTask != null) {
+ refreshTask.dispose();
+ }
+ }
+
+ Flux<RicConfigUpdate.Type> createRefreshTask() {
+ Flux<JsonObject> loadFromFile = Flux.interval(Duration.ZERO, CONFIG_REFRESH_INTERVAL) //
+ .filter(notUsed -> !this.isConsulUsed) //
+ .flatMap(notUsed -> loadConfigurationFromFile()) //
+ .onErrorResume(this::ignoreErrorFlux) //
+ .doOnNext(json -> logger.debug("loadFromFile succeeded")) //
+ .doOnTerminate(() -> logger.error("loadFromFile Terminate"));
+
+ Flux<JsonObject> loadFromConsul = Flux.interval(Duration.ZERO, CONFIG_REFRESH_INTERVAL) //
+ .flatMap(i -> getEnvironment(systemEnvironment)) //
+ .flatMap(this::createCbsClient) //
+ .flatMap(this::getFromCbs) //
+ .onErrorResume(this::ignoreErrorMono) //
+ .doOnNext(json -> logger.debug("loadFromConsul succeeded")) //
+ .doOnNext(json -> this.isConsulUsed = true) //
+ .doOnTerminate(() -> logger.error("loadFromConsul Terminated"));
+
+ return Flux.merge(loadFromFile, loadFromConsul) //
+ .flatMap(this::parseConfiguration) //
+ .flatMap(this::updateConfig) //
+ .doOnNext(this::handleUpdatedRicConfig) //
+ .flatMap(configUpdate -> Flux.just(configUpdate.getType())) //
+ .doOnTerminate(() -> logger.error("Configuration refresh task is terminated"));
+ }
+
+ Mono<EnvProperties> getEnvironment(Properties systemEnvironment) {
+ return EnvironmentProcessor.readEnvironmentVariables(systemEnvironment) //
+ .onErrorResume(t -> Mono.empty());
+ }
+
+ Mono<CbsClient> createCbsClient(EnvProperties env) {
+ return CbsClientFactory.createCbsClient(env) //
+ .onErrorResume(this::ignoreErrorMono);
+ }
+
+ private Mono<JsonObject> getFromCbs(CbsClient cbsClient) {
+ try {
+ final CbsRequest getConfigRequest = CbsRequests.getAll(RequestDiagnosticContext.create());
+ return cbsClient.get(getConfigRequest) //
+ .onErrorResume(this::ignoreErrorMono);
+ } catch (Exception e) {
+ return ignoreErrorMono(e);
+ }
+ }
+
+ private <R> Flux<R> ignoreErrorFlux(Throwable throwable) {
+ String errMsg = throwable.toString();
+ logger.warn("Could not refresh application configuration. {}", errMsg);
+ return Flux.empty();
+ }
+
+ private <R> Mono<R> ignoreErrorMono(Throwable throwable) {
+ String errMsg = throwable.toString();
+ logger.warn("Could not refresh application configuration. {}", errMsg);
+ return Mono.empty();
+ }
+
+ private Mono<ApplicationConfigParser.ConfigParserResult> parseConfiguration(JsonObject jsonObject) {
+ try {
+ ApplicationConfigParser parser = new ApplicationConfigParser();
+ return Mono.just(parser.parse(jsonObject));
+ } catch (Exception e) {
+ String str = e.toString();
+ logger.error("Could not parse configuration {}", str);
+ return Mono.empty();
+ }
+ }
+
+ private Flux<RicConfigUpdate> updateConfig(ApplicationConfigParser.ConfigParserResult config) {
+ return this.appConfig.setConfiguration(config);
+ }
+
+ boolean fileExists(String filepath) {
+ return (filepath != null && (new File(filepath).exists()));
+ }
+
+ private void handleUpdatedRicConfig(RicConfigUpdate updatedInfo) {
+ synchronized (this.rics) {
+ String ricName = updatedInfo.getRicConfig().name();
+ RicConfigUpdate.Type event = updatedInfo.getType();
+ if (event == RicConfigUpdate.Type.ADDED) {
+ addRic(updatedInfo.getRicConfig());
+ } else if (event == RicConfigUpdate.Type.REMOVED) {
+ rics.remove(ricName);
+ this.policies.removePoliciesForRic(ricName);
+ } else if (event == RicConfigUpdate.Type.CHANGED) {
+ Ric ric = this.rics.get(ricName);
+ if (ric == null) {
+ // Should not happen,just for robustness
+ addRic(updatedInfo.getRicConfig());
+ } else {
+ ric.setRicConfig(updatedInfo.getRicConfig());
+ }
+ }
+ }
+ }
+
+ private void addRic(RicConfig config) {
+ Ric ric = new Ric(config);
+ this.rics.put(ric);
+ runRicSynchronization(ric);
+ }
+
+ void runRicSynchronization(Ric ric) {
+ RicSynchronizationTask synchronizationTask =
+ new RicSynchronizationTask(a1ClientFactory, policyTypes, policies, services);
+ synchronizationTask.run(ric);
+ }
+
+ /**
+ * Reads the configuration from file.
+ */
+ Flux<JsonObject> loadConfigurationFromFile() {
+ String filepath = appConfig.getLocalConfigurationFilePath();
+ if (!fileExists(filepath)) {
+ return Flux.empty();
+ }
+
+ GsonBuilder gsonBuilder = new GsonBuilder();
+ ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory);
+
+ try (InputStream inputStream = createInputStream(filepath)) {
+ JsonObject rootObject = getJsonElement(inputStream).getAsJsonObject();
+ ApplicationConfigParser appParser = new ApplicationConfigParser();
+ appParser.parse(rootObject);
+ logger.debug("Local configuration file loaded: {}", filepath);
+ return Flux.just(rootObject);
+ } catch (Exception e) {
+ logger.error("Local configuration file not loaded: {}, {}", filepath, e.getMessage());
+ return Flux.empty();
+ }
+ }
+
+ JsonElement getJsonElement(InputStream inputStream) {
+ return JsonParser.parseReader(new InputStreamReader(inputStream));
+ }
+
+ InputStream createInputStream(@NotNull String filepath) throws IOException {
+ return new BufferedInputStream(new FileInputStream(filepath));
+ }
+}
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java
new file mode 100644
index 00000000..832ec117
--- /dev/null
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java
@@ -0,0 +1,212 @@
+/*-
+ * ========================LICENSE_START=================================
+ * ONAP : ccsdk oran
+ * ======================================================================
+ * Copyright (C) 2020 Nordix Foundation. All rights reserved.
+ * ======================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.onap.ccsdk.oran.a1policymanagementservice.tasks;
+
+import java.util.Collection;
+
+import org.onap.ccsdk.oran.a1policymanagementservice.clients.A1Client;
+import org.onap.ccsdk.oran.a1policymanagementservice.clients.A1ClientFactory;
+import org.onap.ccsdk.oran.a1policymanagementservice.exceptions.ServiceException;
+import org.onap.ccsdk.oran.a1policymanagementservice.repository.Lock.LockType;
+import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policies;
+import org.onap.ccsdk.oran.a1policymanagementservice.repository.PolicyTypes;
+import org.onap.ccsdk.oran.a1policymanagementservice.repository.Ric;
+import org.onap.ccsdk.oran.a1policymanagementservice.repository.Ric.RicState;
+import org.onap.ccsdk.oran.a1policymanagementservice.repository.Rics;
+import org.onap.ccsdk.oran.a1policymanagementservice.repository.Services;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.scheduling.annotation.EnableScheduling;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+/**
+ * Regularly checks the existing rics towards the local repository to keep it
+ * consistent. When the policy types or instances in the Near-RT RIC is not
+ * consistent, a synchronization is performed.
+ */
+@Component
+@EnableScheduling
+@SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
+public class RicSupervision {
+ private static final Logger logger = LoggerFactory.getLogger(RicSupervision.class);
+
+ private final Rics rics;
+ private final Policies policies;
+ private final PolicyTypes policyTypes;
+ private final A1ClientFactory a1ClientFactory;
+ private final Services services;
+
+ private static class SynchStartedException extends ServiceException {
+ private static final long serialVersionUID = 1L;
+
+ public SynchStartedException(String message) {
+ super(message);
+ }
+ }
+
+ private static class RicData {
+ RicData(Ric ric, A1Client a1Client) {
+ this.ric = ric;
+ this.a1Client = a1Client;
+ }
+
+ A1Client getClient() {
+ return a1Client;
+ }
+
+ final Ric ric;
+ private final A1Client a1Client;
+ }
+
+ @Autowired
+ public RicSupervision(Rics rics, Policies policies, A1ClientFactory a1ClientFactory, PolicyTypes policyTypes,
+ Services services) {
+ this.rics = rics;
+ this.policies = policies;
+ this.a1ClientFactory = a1ClientFactory;
+ this.policyTypes = policyTypes;
+ this.services = services;
+ }
+
+ /**
+ * Regularly contacts all Rics to check if they are alive and synchronized.
+ */
+ @Scheduled(fixedRate = 1000 * 60)
+ public void checkAllRics() {
+ logger.debug("Checking Rics starting");
+ createTask().subscribe(null, null, () -> logger.debug("Checking all RICs completed"));
+ }
+
+ private Flux<RicData> createTask() {
+ return Flux.fromIterable(rics.getRics()) //
+ .flatMap(this::createRicData) //
+ .flatMap(this::checkOneRic);
+ }
+
+ private Mono<RicData> checkOneRic(RicData ricData) {
+ return checkRicState(ricData) //
+ .flatMap(x -> ricData.ric.getLock().lock(LockType.EXCLUSIVE)) //
+ .flatMap(notUsed -> setRicState(ricData)) //
+ .flatMap(x -> checkRicPolicies(ricData)) //
+ .flatMap(x -> checkRicPolicyTypes(ricData)) //
+ .doOnNext(x -> onRicCheckedOk(ricData)) //
+ .doOnError(t -> onRicCheckedError(t, ricData)) //
+ .onErrorResume(throwable -> Mono.empty());
+ }
+
+ private void onRicCheckedError(Throwable t, RicData ricData) {
+ logger.debug("Ric: {} check stopped, exception: {}", ricData.ric.name(), t.getMessage());
+ if (t instanceof SynchStartedException) {
+ // this is just a temporary state,
+ ricData.ric.setState(RicState.AVAILABLE);
+ } else {
+ ricData.ric.setState(RicState.UNAVAILABLE);
+ }
+ ricData.ric.getLock().unlockBlocking();
+ }
+
+ private void onRicCheckedOk(RicData ricData) {
+ logger.debug("Ric: {} checked OK", ricData.ric.name());
+ ricData.ric.setState(RicState.AVAILABLE);
+ ricData.ric.getLock().unlockBlocking();
+ }
+
+ @SuppressWarnings("squid:S2445") // Blocks should be synchronized on "private final" fields
+ private Mono<RicData> setRicState(RicData ric) {
+ synchronized (ric) {
+ if (ric.ric.getState() == RicState.CONSISTENCY_CHECK) {
+ logger.debug("Ric: {} is already being checked", ric.ric.getConfig().name());
+ return Mono.empty();
+ }
+ ric.ric.setState(RicState.CONSISTENCY_CHECK);
+ return Mono.just(ric);
+ }
+ }
+
+ private Mono<RicData> createRicData(Ric ric) {
+ return Mono.just(ric) //
+ .flatMap(aRic -> this.a1ClientFactory.createA1Client(ric)) //
+ .flatMap(a1Client -> Mono.just(new RicData(ric, a1Client)));
+ }
+
+ private Mono<RicData> checkRicState(RicData ric) {
+ if (ric.ric.getState() == RicState.UNAVAILABLE) {
+ return startSynchronization(ric) //
+ .onErrorResume(t -> Mono.empty());
+ } else if (ric.ric.getState() == RicState.SYNCHRONIZING || ric.ric.getState() == RicState.CONSISTENCY_CHECK) {
+ return Mono.empty();
+ } else {
+ return Mono.just(ric);
+ }
+ }
+
+ private Mono<RicData> checkRicPolicies(RicData ric) {
+ return ric.getClient().getPolicyIdentities() //
+ .flatMap(ricP -> validateInstances(ricP, ric));
+ }
+
+ private Mono<RicData> validateInstances(Collection<String> ricPolicies, RicData ric) {
+ synchronized (this.policies) {
+ if (ricPolicies.size() != policies.getForRic(ric.ric.name()).size()) {
+ return startSynchronization(ric);
+ }
+
+ for (String policyId : ricPolicies) {
+ if (!policies.containsPolicy(policyId)) {
+ return startSynchronization(ric);
+ }
+ }
+ return Mono.just(ric);
+ }
+ }
+
+ private Mono<RicData> checkRicPolicyTypes(RicData ric) {
+ return ric.getClient().getPolicyTypeIdentities() //
+ .flatMap(ricTypes -> validateTypes(ricTypes, ric));
+ }
+
+ private Mono<RicData> validateTypes(Collection<String> ricTypes, RicData ric) {
+ if (ricTypes.size() != ric.ric.getSupportedPolicyTypes().size()) {
+ return startSynchronization(ric);
+ }
+ for (String typeName : ricTypes) {
+ if (!ric.ric.isSupportingType(typeName)) {
+ return startSynchronization(ric);
+ }
+ }
+ return Mono.just(ric);
+ }
+
+ private Mono<RicData> startSynchronization(RicData ric) {
+ RicSynchronizationTask synchronizationTask = createSynchronizationTask();
+ synchronizationTask.run(ric.ric);
+ return Mono.error(new SynchStartedException("Syncronization started"));
+ }
+
+ RicSynchronizationTask createSynchronizationTask() {
+ return new RicSynchronizationTask(a1ClientFactory, policyTypes, policies, services);
+ }
+}
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java
new file mode 100644
index 00000000..1eefeee4
--- /dev/null
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java
@@ -0,0 +1,214 @@
+/*-
+ * ========================LICENSE_START=================================
+ * ONAP : ccsdk oran
+ * ======================================================================
+ * Copyright (C) 2019-2020 Nordix Foundation. All rights reserved.
+ * ======================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.onap.ccsdk.oran.a1policymanagementservice.tasks;
+
+import static org.onap.ccsdk.oran.a1policymanagementservice.repository.Ric.RicState;
+
+import org.onap.ccsdk.oran.a1policymanagementservice.clients.A1Client;
+import org.onap.ccsdk.oran.a1policymanagementservice.clients.A1ClientFactory;
+import org.onap.ccsdk.oran.a1policymanagementservice.clients.AsyncRestClient;
+import org.onap.ccsdk.oran.a1policymanagementservice.repository.ImmutablePolicyType;
+import org.onap.ccsdk.oran.a1policymanagementservice.repository.Lock.LockType;
+import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policies;
+import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policy;
+import org.onap.ccsdk.oran.a1policymanagementservice.repository.PolicyType;
+import org.onap.ccsdk.oran.a1policymanagementservice.repository.PolicyTypes;
+import org.onap.ccsdk.oran.a1policymanagementservice.repository.Ric;
+import org.onap.ccsdk.oran.a1policymanagementservice.repository.Service;
+import org.onap.ccsdk.oran.a1policymanagementservice.repository.Services;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import reactor.core.publisher.BaseSubscriber;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.publisher.SignalType;
+
+/**
+ * Synchronizes the content of a RIC with the content in the repository. This
+ * means:
+ * <p>
+ * load all policy types
+ * <p>
+ * send all policy instances to the RIC
+ * <p>
+ * if that fails remove all policy instances
+ * <p>
+ * Notify subscribing services
+ */
+@SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
+public class RicSynchronizationTask {
+
+ private static final Logger logger = LoggerFactory.getLogger(RicSynchronizationTask.class);
+ static final int CONCURRENCY_RIC = 1; // How may paralell requests that is sent to one NearRT RIC
+
+ private final A1ClientFactory a1ClientFactory;
+ private final PolicyTypes policyTypes;
+ private final Policies policies;
+ private final Services services;
+
+ public RicSynchronizationTask(A1ClientFactory a1ClientFactory, PolicyTypes policyTypes, Policies policies,
+ Services services) {
+ this.a1ClientFactory = a1ClientFactory;
+ this.policyTypes = policyTypes;
+ this.policies = policies;
+ this.services = services;
+ }
+
+ public void run(Ric ric) {
+ logger.debug("Handling ric: {}", ric.getConfig().name());
+
+ if (ric.getState() == RicState.SYNCHRONIZING) {
+ logger.debug("Ric: {} is already being synchronized", ric.getConfig().name());
+ return;
+ }
+
+ ric.getLock().lock(LockType.EXCLUSIVE) //
+ .flatMap(notUsed -> setRicState(ric)) //
+ .flatMap(lock -> this.a1ClientFactory.createA1Client(ric)) //
+ .flatMapMany(client -> runSynchronization(ric, client)) //
+ .onErrorResume(throwable -> deleteAllPolicyInstances(ric, throwable))
+ .subscribe(new BaseSubscriber<Object>() {
+ @Override
+ protected void hookOnError(Throwable throwable) {
+ logger.warn("Synchronization failure for ric: {}, reason: {}", ric.name(), throwable.getMessage());
+ ric.setState(RicState.UNAVAILABLE);
+ }
+
+ @Override
+ protected void hookOnComplete() {
+ onSynchronizationComplete(ric);
+ }
+
+ @Override
+ protected void hookFinally(SignalType type) {
+ ric.getLock().unlockBlocking();
+ }
+ });
+ }
+
+ @SuppressWarnings("squid:S2445") // Blocks should be synchronized on "private final" fields
+ private Mono<Ric> setRicState(Ric ric) {
+ synchronized (ric) {
+ if (ric.getState() == RicState.SYNCHRONIZING) {
+ logger.debug("Ric: {} is already being synchronized", ric.getConfig().name());
+ return Mono.empty();
+ }
+ ric.setState(RicState.SYNCHRONIZING);
+ return Mono.just(ric);
+ }
+ }
+
+ private Flux<Object> runSynchronization(Ric ric, A1Client a1Client) {
+ Flux<PolicyType> synchronizedTypes = synchronizePolicyTypes(ric, a1Client);
+ Flux<?> policiesDeletedInRic = a1Client.deleteAllPolicies();
+ Flux<Policy> policiesRecreatedInRic = recreateAllPoliciesInRic(ric, a1Client);
+
+ return Flux.concat(synchronizedTypes, policiesDeletedInRic, policiesRecreatedInRic);
+ }
+
+ private void onSynchronizationComplete(Ric ric) {
+ logger.debug("Synchronization completed for: {}", ric.name());
+ ric.setState(RicState.AVAILABLE);
+ notifyAllServices("Synchronization completed for:" + ric.name());
+ }
+
+ private void notifyAllServices(String body) {
+ for (Service service : services.getAll()) {
+ String url = service.getCallbackUrl();
+ if (url.length() > 0) {
+ createNotificationClient(url) //
+ .put("", body) //
+ .subscribe( //
+ notUsed -> logger.debug("Service {} notified", service.getName()),
+ throwable -> logger.warn("Service notification failed for service: {}. Cause: {}",
+ service.getName(), throwable.getMessage()),
+ () -> logger.debug("All services notified"));
+ }
+ }
+ }
+
+ private Flux<Object> deleteAllPolicyInstances(Ric ric, Throwable t) {
+ logger.debug("Recreation of policies failed for ric: {}, reason: {}", ric.name(), t.getMessage());
+ deleteAllPoliciesInRepository(ric);
+
+ Flux<PolicyType> synchronizedTypes = this.a1ClientFactory.createA1Client(ric) //
+ .flatMapMany(a1Client -> synchronizePolicyTypes(ric, a1Client));
+ Flux<?> deletePoliciesInRic = this.a1ClientFactory.createA1Client(ric) //
+ .flatMapMany(A1Client::deleteAllPolicies) //
+ .doOnComplete(() -> deleteAllPoliciesInRepository(ric));
+
+ return Flux.concat(synchronizedTypes, deletePoliciesInRic);
+ }
+
+ AsyncRestClient createNotificationClient(final String url) {
+ return new AsyncRestClient(url, this.a1ClientFactory.getAppConfig().getWebClientConfig());
+ }
+
+ private Flux<PolicyType> synchronizePolicyTypes(Ric ric, A1Client a1Client) {
+ return a1Client.getPolicyTypeIdentities() //
+ .doOnNext(x -> ric.clearSupportedPolicyTypes()) //
+ .flatMapMany(Flux::fromIterable) //
+ .doOnNext(typeId -> logger.debug("For ric: {}, handling type: {}", ric.getConfig().name(), typeId)) //
+ .flatMap(policyTypeId -> getPolicyType(policyTypeId, a1Client), CONCURRENCY_RIC) //
+ .doOnNext(ric::addSupportedPolicyType); //
+ }
+
+ private Mono<PolicyType> getPolicyType(String policyTypeId, A1Client a1Client) {
+ if (policyTypes.contains(policyTypeId)) {
+ return Mono.just(policyTypes.get(policyTypeId));
+ }
+ return a1Client.getPolicyTypeSchema(policyTypeId) //
+ .flatMap(schema -> createPolicyType(policyTypeId, schema));
+ }
+
+ private Mono<PolicyType> createPolicyType(String policyTypeId, String schema) {
+ PolicyType pt = ImmutablePolicyType.builder().name(policyTypeId).schema(schema).build();
+ policyTypes.put(pt);
+ return Mono.just(pt);
+ }
+
+ private void deleteAllPoliciesInRepository(Ric ric) {
+ for (Policy policy : policies.getForRic(ric.name())) {
+ this.policies.remove(policy);
+ }
+ }
+
+ private Flux<Policy> putPolicy(Policy policy, Ric ric, A1Client a1Client) {
+ logger.debug("Recreating policy: {}, for ric: {}", policy.id(), ric.getConfig().name());
+ return a1Client.putPolicy(policy) //
+ .flatMapMany(notUsed -> Flux.just(policy));
+ }
+
+ private boolean checkTransient(Policy policy) {
+ if (policy.isTransient()) {
+ this.policies.remove(policy);
+ }
+ return policy.isTransient();
+ }
+
+ private Flux<Policy> recreateAllPoliciesInRic(Ric ric, A1Client a1Client) {
+ return Flux.fromIterable(policies.getForRic(ric.name())) //
+ .filter(policy -> !checkTransient(policy)) //
+ .flatMap(policy -> putPolicy(policy, ric, a1Client), CONCURRENCY_RIC);
+ }
+
+}
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/ServiceSupervision.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/ServiceSupervision.java
new file mode 100644
index 00000000..c066e12c
--- /dev/null
+++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/ServiceSupervision.java
@@ -0,0 +1,124 @@
+/*-
+ * ========================LICENSE_START=================================
+ * ONAP : ccsdk oran
+ * ======================================================================
+ * Copyright (C) 2019-2020 Nordix Foundation. All rights reserved.
+ * ======================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.onap.ccsdk.oran.a1policymanagementservice.tasks;
+
+import java.time.Duration;
+
+import org.onap.ccsdk.oran.a1policymanagementservice.clients.A1ClientFactory;
+import org.onap.ccsdk.oran.a1policymanagementservice.repository.Lock;
+import org.onap.ccsdk.oran.a1policymanagementservice.repository.Lock.LockType;
+import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policies;
+import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policy;
+import org.onap.ccsdk.oran.a1policymanagementservice.repository.Service;
+import org.onap.ccsdk.oran.a1policymanagementservice.repository.Services;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.scheduling.annotation.EnableScheduling;
+import org.springframework.stereotype.Component;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+/**
+ * Periodically checks that services with a keepAliveInterval set are alive. If
+ * a service is deemed not alive, all the service's policies are deleted, both
+ * in the repository and in the affected Rics, and the service is removed from
+ * the repository. This means that the service needs to register again after
+ * this.
+ */
+@Component
+@EnableScheduling
+@SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
+public class ServiceSupervision {
+ private static final Logger logger = LoggerFactory.getLogger(ServiceSupervision.class);
+ static final int CONCURRENCY_RIC = 1; // How may paralell requests that is sent
+ private final Services services;
+ private final Policies policies;
+ private A1ClientFactory a1ClientFactory;
+ private final Duration checkInterval;
+
+ @Autowired
+ public ServiceSupervision(Services services, Policies policies, A1ClientFactory a1ClientFactory) {
+ this(services, policies, a1ClientFactory, Duration.ofMinutes(1));
+ }
+
+ public ServiceSupervision(Services services, Policies policies, A1ClientFactory a1ClientFactory,
+ Duration checkInterval) {
+ this.services = services;
+ this.policies = policies;
+ this.a1ClientFactory = a1ClientFactory;
+ this.checkInterval = checkInterval;
+ start();
+ }
+
+ private void start() {
+ logger.debug("Checking services starting");
+ createTask().subscribe(null, null, () -> logger.error("Checking services unexpectedly terminated"));
+ }
+
+ private Flux<?> createTask() {
+ return Flux.interval(this.checkInterval) //
+ .flatMap(notUsed -> checkAllServices());
+ }
+
+ Flux<Policy> checkAllServices() {
+ return Flux.fromIterable(services.getAll()) //
+ .filter(Service::isExpired) //
+ .doOnNext(service -> logger.info("Service is expired: {}", service.getName())) //
+ .doOnNext(service -> services.remove(service.getName())) //
+ .flatMap(this::getAllPoliciesForService) //
+ .flatMap(this::deletePolicy, CONCURRENCY_RIC);
+ }
+
+ @SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
+ private Flux<Policy> deletePolicy(Policy policy) {
+ Lock lock = policy.ric().getLock();
+ return lock.lock(LockType.SHARED) //
+ .doOnNext(notUsed -> policies.remove(policy)) //
+ .flatMap(notUsed -> deletePolicyInRic(policy))
+ .doOnNext(notUsed -> logger.debug("Policy deleted due to service inactivity: {}, service: {}", policy.id(),
+ policy.ownerServiceName())) //
+ .doOnNext(notUsed -> lock.unlockBlocking()) //
+ .doOnError(throwable -> lock.unlockBlocking()) //
+ .doOnError(throwable -> logger.debug("Failed to delete inactive policy: {}, reason: {}", policy.id(),
+ throwable.getMessage())) //
+ .flatMapMany(notUsed -> Flux.just(policy)) //
+ .onErrorResume(throwable -> Flux.empty());
+ }
+
+ private Flux<Policy> getAllPoliciesForService(Service service) {
+ return Flux.fromIterable(policies.getForService(service.getName()));
+ }
+
+ private Mono<Policy> deletePolicyInRic(Policy policy) {
+ return a1ClientFactory.createA1Client(policy.ric()) //
+ .flatMap(client -> client.deletePolicy(policy) //
+ .onErrorResume(exception -> handleDeleteFromRicFailure(policy, exception)) //
+ .map(nothing -> policy));
+ }
+
+ private Mono<String> handleDeleteFromRicFailure(Policy policy, Throwable e) {
+ logger.warn("Could not delete policy: {} from ric: {}. Cause: {}", policy.id(), policy.ric().name(),
+ e.getMessage());
+ return Mono.empty();
+ }
+}