aboutsummaryrefslogtreecommitdiffstats
path: root/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh
diff options
context:
space:
mode:
Diffstat (limited to 'prh-app-server/src/main/java/org/onap/dcaegen2/services/prh')
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/MainApp.java11
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/AaiHttpClientConfig.java84
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/AppConfig.java285
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CbsConfigRefreshScheduler.java97
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CbsConfiguration.java93
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CbsContentParser.java142
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CloudConfigParser.java102
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CloudConfiguration.java112
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/Config.java18
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/DmaapPublisherTaskConfig.java44
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/EnvironmentProcessor.java85
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/PrhAppConfig.java136
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/AppInfoController.java (renamed from prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/HeartbeatController.java)49
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/ScheduleController.java9
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/exceptions/AaiFailureException.java (renamed from prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/exceptions/EnvironmentLoaderException.java)9
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java128
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/PnfRegistrationFields.java46
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTask.java29
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTaskImpl.java52
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiQueryTask.java29
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiQueryTaskImpl.java107
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/BbsActionsTask.java29
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/BbsActionsTaskImpl.java208
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java12
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java42
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java14
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java50
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java153
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasksRunner.java38
29 files changed, 1194 insertions, 1019 deletions
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/MainApp.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/MainApp.java
index a1fe5770..1d2a65d3 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/MainApp.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/MainApp.java
@@ -20,27 +20,27 @@
package org.onap.dcaegen2.services.prh;
-import static org.onap.dcaegen2.services.prh.model.logging.MdcVariables.INVOCATION_ID;
-import static org.onap.dcaegen2.services.prh.model.logging.MdcVariables.REQUEST_ID;
-
import java.util.Map;
import java.util.UUID;
+
import org.slf4j.MDC;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jackson.JacksonAutoConfiguration;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.concurrent.ConcurrentTaskScheduler;
+import static org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables.INVOCATION_ID;
+
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18
*/
@SpringBootApplication(exclude = {JacksonAutoConfiguration.class})
-@Configuration
@EnableScheduling
+@EnableConfigurationProperties
public class MainApp {
public static void main(String[] args) {
@@ -49,7 +49,6 @@ public class MainApp {
@Bean
Map<String, String> mdcContextMap() {
- MDC.put(REQUEST_ID, "SampleRequestID");
MDC.put(INVOCATION_ID, UUID.randomUUID().toString());
return MDC.getCopyOfContextMap();
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/AaiHttpClientConfig.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/AaiHttpClientConfig.java
new file mode 100644
index 00000000..e09322d3
--- /dev/null
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/AaiHttpClientConfig.java
@@ -0,0 +1,84 @@
+/*
+ * ============LICENSE_START=======================================================
+ * PNF-REGISTRATION-HANDLER
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.prh.configuration;
+
+import java.nio.charset.StandardCharsets;
+import java.util.function.BiFunction;
+import org.onap.dcaegen2.services.prh.model.AaiJsonBodyBuilderImpl;
+import org.onap.dcaegen2.services.prh.model.AaiPnfResultModel;
+import org.onap.dcaegen2.services.prh.model.AaiServiceInstanceResultModel;
+import org.onap.dcaegen2.services.prh.model.utils.PrhModelAwareGsonBuilder;
+import org.onap.dcaegen2.services.sdk.rest.services.aai.client.config.AaiClientConfiguration;
+import org.onap.dcaegen2.services.sdk.rest.services.aai.client.service.AaiHttpClientFactory;
+import org.onap.dcaegen2.services.sdk.rest.services.aai.client.service.http.AaiHttpClient;
+import org.onap.dcaegen2.services.sdk.rest.services.aai.client.service.http.get.AaiGetServiceInstanceClient;
+import org.onap.dcaegen2.services.sdk.rest.services.aai.client.service.http.get.AaiHttpGetClient;
+import org.onap.dcaegen2.services.sdk.rest.services.aai.client.service.http.patch.AaiHttpPatchClient;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClient;
+import org.onap.dcaegen2.services.sdk.rest.services.model.AaiModel;
+import org.onap.dcaegen2.services.sdk.rest.services.model.AaiServiceInstanceQueryModel;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+public class AaiHttpClientConfig {
+ @Autowired
+ private CbsConfiguration cbsConfiguration;
+
+ @Bean
+ public AaiHttpClient<AaiModel, HttpResponse> getPatchClientFactory() {
+ return createLazyConfigClient(
+ (config, client) -> new AaiHttpPatchClient(config, new AaiJsonBodyBuilderImpl(), client));
+ }
+
+ @Bean
+ public AaiHttpClient<AaiServiceInstanceQueryModel, AaiServiceInstanceResultModel> getServiceInstanceClient() {
+ return createLazyConfigClient(
+ (config, client) -> new AaiGetServiceInstanceClient(config, client)
+ .map(httpResponse -> {
+ httpResponse.throwIfUnsuccessful();
+ return httpResponse.bodyAsJson(StandardCharsets.UTF_8,
+ PrhModelAwareGsonBuilder.createGson(), AaiServiceInstanceResultModel.class);
+ }));
+ }
+
+ @Bean
+ public AaiHttpClient<AaiModel, AaiPnfResultModel> getGetClient() {
+ return createLazyConfigClient(
+ (config, client) -> new AaiHttpGetClient(config, client)
+ .map(httpResponse -> {
+ httpResponse.throwIfUnsuccessful();
+ return httpResponse.bodyAsJson(StandardCharsets.UTF_8,
+ PrhModelAwareGsonBuilder.createGson(), AaiPnfResultModel.class);
+ }));
+ }
+
+ private <T, U> AaiHttpClient<T, U> createLazyConfigClient(
+ final BiFunction<AaiClientConfiguration, RxHttpClient, AaiHttpClient<T, U>> factoryMethod) {
+
+ return x -> factoryMethod.apply(
+ cbsConfiguration.getAaiClientConfiguration(),
+ new AaiHttpClientFactory(cbsConfiguration.getAaiClientConfiguration()).build()
+ ).getAaiResponse(x);
+ }
+}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/AppConfig.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/AppConfig.java
deleted file mode 100644
index 99886302..00000000
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/AppConfig.java
+++ /dev/null
@@ -1,285 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * PNF-REGISTRATION-HANDLER
- * ================================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.dcaegen2.services.prh.configuration;
-
-import java.util.Objects;
-import java.util.Optional;
-import java.util.function.Predicate;
-import org.onap.dcaegen2.services.sdk.rest.services.aai.client.config.AaiClientConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.aai.client.config.ImmutableAaiClientConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapConsumerConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.annotation.Configuration;
-
-/**
- * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/9/18
- */
-
-@Configuration
-@EnableConfigurationProperties
-public class AppConfig extends PrhAppConfig {
-
- private static Predicate<String> isEmpty = String::isEmpty;
- @Value("${dmaap.dmaapConsumerConfiguration.dmaapHostName:}")
- public String consumerDmaapHostName;
-
- @Value("${dmaap.dmaapConsumerConfiguration.dmaapPortNumber:}")
- public Integer consumerDmaapPortNumber;
-
- @Value("${dmaap.dmaapConsumerConfiguration.dmaapTopicName:}")
- public String consumerDmaapTopicName;
-
- @Value("${dmaap.dmaapConsumerConfiguration.dmaapProtocol:}")
- public String consumerDmaapProtocol;
-
- @Value("${dmaap.dmaapConsumerConfiguration.dmaapUserName:}")
- public String consumerDmaapUserName;
-
- @Value("${dmaap.dmaapConsumerConfiguration.dmaapUserPassword:}")
- public String consumerDmaapUserPassword;
-
- @Value("${dmaap.dmaapConsumerConfiguration.dmaapContentType:}")
- public String consumerDmaapContentType;
-
- @Value("${dmaap.dmaapConsumerConfiguration.consumerId:}")
- public String consumerId;
-
- @Value("${dmaap.dmaapConsumerConfiguration.consumerGroup:}")
- public String consumerGroup;
-
- @Value("${dmaap.dmaapConsumerConfiguration.timeoutMs:}")
- public Integer consumerTimeoutMs;
-
- @Value("${dmaap.dmaapConsumerConfiguration.message-limit:}")
- public Integer consumerMessageLimit;
-
- @Value("${dmaap.dmaapProducerConfiguration.dmaapHostName:}")
- public String producerDmaapHostName;
-
- @Value("${dmaap.dmaapProducerConfiguration.dmaapPortNumber:}")
- public Integer producerDmaapPortNumber;
-
- @Value("${dmaap.dmaapProducerConfiguration.dmaapTopicName:}")
- public String producerDmaapTopicName;
-
- @Value("${dmaap.dmaapProducerConfiguration.dmaapProtocol:}")
- public String producerDmaapProtocol;
-
- @Value("${dmaap.dmaapProducerConfiguration.dmaapUserName:}")
- public String producerDmaapUserName;
-
- @Value("${dmaap.dmaapProducerConfiguration.dmaapUserPassword:}")
- public String producerDmaapUserPassword;
-
- @Value("${dmaap.dmaapProducerConfiguration.dmaapContentType:}")
- public String producerDmaapContentType;
-
- @Value("${aai.aaiClientConfiguration.aaiHost:}")
- public String aaiHost;
-
- @Value("${aai.aaiClientConfiguration.aaiHostPortNumber:}")
- public Integer aaiPort;
-
- @Value("${aai.aaiClientConfiguration.aaiProtocol:}")
- public String aaiProtocol;
-
- @Value("${aai.aaiClientConfiguration.aaiUserName:}")
- public String aaiUserName;
-
- @Value("${aai.aaiClientConfiguration.aaiUserPassword:}")
- public String aaiUserPassword;
-
- @Value("${aai.aaiClientConfiguration.aaiIgnoreSslCertificateErrors:}")
- public Boolean aaiIgnoreSslCertificateErrors;
-
- @Value("${aai.aaiClientConfiguration.aaiBasePath:}")
- public String aaiBasePath;
-
- @Value("${aai.aaiClientConfiguration.aaiPnfPath:}")
- public String aaiPnfPath;
-
- @Value("${security.trustStorePath:}")
- public String trustStorePath;
-
- @Value("${security.trustStorePasswordPath:}")
- public String trustStorePasswordPath;
-
- @Value("${security.keyStorePath:}")
- public String keyStorePath;
-
- @Value("${security.keyStorePasswordPath:}")
- public String keyStorePasswordPath;
-
- @Value("${security.enableAaiCertAuth:}")
- public Boolean enableAaiCertAuth;
-
- @Value("${security.enableDmaapCertAuth:}")
- public Boolean enableDmaapCertAuth;
-
- @Override
- public DmaapConsumerConfiguration getDmaapConsumerConfiguration() {
- if (noFileConfiguration(dmaapConsumerConfiguration)) {
- return null;
- }
- return new ImmutableDmaapConsumerConfiguration.Builder()
- .dmaapUserPassword(
- Optional.ofNullable(consumerDmaapUserPassword).filter(isEmpty.negate())
- .orElse(dmaapConsumerConfiguration.dmaapUserPassword()))
- .dmaapUserName(
- Optional.ofNullable(consumerDmaapUserName).filter(isEmpty.negate())
- .orElse(dmaapConsumerConfiguration.dmaapUserName()))
- .dmaapHostName(
- Optional.ofNullable(consumerDmaapHostName).filter(isEmpty.negate())
- .orElse(dmaapConsumerConfiguration.dmaapHostName()))
- .dmaapPortNumber(
- Optional.ofNullable(consumerDmaapPortNumber).filter(p -> !p.toString().isEmpty())
- .orElse(dmaapConsumerConfiguration.dmaapPortNumber()))
- .dmaapProtocol(
- Optional.ofNullable(consumerDmaapProtocol).filter(isEmpty.negate())
- .orElse(dmaapConsumerConfiguration.dmaapProtocol()))
- .dmaapContentType(
- Optional.ofNullable(consumerDmaapContentType).filter(isEmpty.negate())
- .orElse(dmaapConsumerConfiguration.dmaapContentType()))
- .dmaapTopicName(
- Optional.ofNullable(consumerDmaapTopicName).filter(isEmpty.negate())
- .orElse(dmaapConsumerConfiguration.dmaapTopicName()))
- .messageLimit(
- Optional.ofNullable(consumerMessageLimit).filter(p -> !p.toString().isEmpty())
- .orElse(dmaapConsumerConfiguration.messageLimit()))
- .timeoutMs(Optional.ofNullable(consumerTimeoutMs).filter(p -> !p.toString().isEmpty())
- .orElse(dmaapConsumerConfiguration.timeoutMs()))
- .consumerGroup(Optional.ofNullable(consumerGroup).filter(isEmpty.negate())
- .orElse(dmaapConsumerConfiguration.consumerGroup()))
- .consumerId(Optional.ofNullable(consumerId).filter(isEmpty.negate())
- .orElse(dmaapConsumerConfiguration.consumerId()))
- .trustStorePath(
- Optional.ofNullable(trustStorePath).filter(isEmpty.negate())
- .orElse(dmaapConsumerConfiguration.trustStorePath()))
- .trustStorePasswordPath(
- Optional.ofNullable(trustStorePasswordPath).filter(isEmpty.negate())
- .orElse(dmaapConsumerConfiguration.trustStorePasswordPath()))
- .keyStorePath(
- Optional.ofNullable(keyStorePath).filter(isEmpty.negate())
- .orElse(dmaapConsumerConfiguration.keyStorePath()))
- .keyStorePasswordPath(
- Optional.ofNullable(keyStorePasswordPath).filter(isEmpty.negate())
- .orElse(dmaapConsumerConfiguration.keyStorePasswordPath()))
- .enableDmaapCertAuth(
- Optional.ofNullable(enableDmaapCertAuth).filter(p -> !p.toString().isEmpty())
- .orElse(dmaapConsumerConfiguration.enableDmaapCertAuth()))
- .build();
- }
-
- @Override
- public AaiClientConfiguration getAaiClientConfiguration() {
- if (noFileConfiguration(aaiClientConfiguration)) {
- return null;
- }
- return new ImmutableAaiClientConfiguration.Builder()
- .aaiHost(Optional.ofNullable(aaiHost).filter(isEmpty.negate()).orElse(aaiClientConfiguration.aaiHost()))
- .aaiPort(
- Optional.ofNullable(aaiPort).filter(p -> !p.toString().isEmpty())
- .orElse(aaiClientConfiguration.aaiPort()))
- .aaiIgnoreSslCertificateErrors(
- Optional.ofNullable(aaiIgnoreSslCertificateErrors).filter(p -> !p.toString().isEmpty())
- .orElse(aaiClientConfiguration.aaiIgnoreSslCertificateErrors()))
- .aaiProtocol(
- Optional.ofNullable(aaiProtocol).filter(isEmpty.negate()).orElse(aaiClientConfiguration.aaiProtocol()))
- .aaiUserName(
- Optional.ofNullable(aaiUserName).filter(isEmpty.negate()).orElse(aaiClientConfiguration.aaiUserName()))
- .aaiUserPassword(Optional.ofNullable(aaiUserPassword).filter(isEmpty.negate())
- .orElse(aaiClientConfiguration.aaiUserPassword()))
- .aaiBasePath(Optional.ofNullable(aaiBasePath).filter(isEmpty.negate())
- .orElse(aaiClientConfiguration.aaiBasePath()))
- .aaiPnfPath(
- Optional.ofNullable(aaiPnfPath).filter(isEmpty.negate()).orElse(aaiClientConfiguration.aaiPnfPath()))
- .aaiHeaders(aaiClientConfiguration.aaiHeaders())
- .trustStorePath(
- Optional.ofNullable(trustStorePath).filter(isEmpty.negate())
- .orElse(aaiClientConfiguration.trustStorePath()))
- .trustStorePasswordPath(
- Optional.ofNullable(trustStorePasswordPath).filter(isEmpty.negate())
- .orElse(aaiClientConfiguration.trustStorePasswordPath()))
- .keyStorePath(
- Optional.ofNullable(keyStorePath).filter(isEmpty.negate())
- .orElse(aaiClientConfiguration.keyStorePath()))
- .keyStorePasswordPath(
- Optional.ofNullable(keyStorePasswordPath).filter(isEmpty.negate())
- .orElse(aaiClientConfiguration.keyStorePasswordPath()))
- .enableAaiCertAuth(
- Optional.ofNullable(enableAaiCertAuth).filter(p -> !p.toString().isEmpty())
- .orElse(aaiClientConfiguration.enableAaiCertAuth()))
- .build();
- }
-
- @Override
- public DmaapPublisherConfiguration getDmaapPublisherConfiguration() {
- if (noFileConfiguration(dmaapPublisherConfiguration)) {
- return null;
- }
- return new ImmutableDmaapPublisherConfiguration.Builder()
- .dmaapContentType(
- Optional.ofNullable(producerDmaapContentType).filter(isEmpty.negate())
- .orElse(dmaapPublisherConfiguration.dmaapContentType()))
- .dmaapHostName(
- Optional.ofNullable(producerDmaapHostName).filter(isEmpty.negate())
- .orElse(dmaapPublisherConfiguration.dmaapHostName()))
- .dmaapPortNumber(
- Optional.ofNullable(producerDmaapPortNumber).filter(p -> !p.toString().isEmpty())
- .orElse(dmaapPublisherConfiguration.dmaapPortNumber()))
- .dmaapProtocol(
- Optional.ofNullable(producerDmaapProtocol).filter(isEmpty.negate())
- .orElse(dmaapPublisherConfiguration.dmaapProtocol()))
- .dmaapTopicName(
- Optional.ofNullable(producerDmaapTopicName).filter(isEmpty.negate())
- .orElse(dmaapPublisherConfiguration.dmaapTopicName()))
- .dmaapUserName(
- Optional.ofNullable(producerDmaapUserName).filter(isEmpty.negate())
- .orElse(dmaapPublisherConfiguration.dmaapUserName()))
- .dmaapUserPassword(
- Optional.ofNullable(producerDmaapUserPassword).filter(isEmpty.negate())
- .orElse(dmaapPublisherConfiguration.dmaapUserPassword()))
- .trustStorePath(
- Optional.ofNullable(trustStorePath).filter(isEmpty.negate())
- .orElse(dmaapPublisherConfiguration.trustStorePath()))
- .trustStorePasswordPath(
- Optional.ofNullable(trustStorePasswordPath).filter(isEmpty.negate())
- .orElse(dmaapPublisherConfiguration.trustStorePasswordPath()))
- .keyStorePath(
- Optional.ofNullable(keyStorePath).filter(isEmpty.negate())
- .orElse(dmaapPublisherConfiguration.keyStorePath()))
- .keyStorePasswordPath(
- Optional.ofNullable(keyStorePasswordPath).filter(isEmpty.negate())
- .orElse(dmaapPublisherConfiguration.keyStorePasswordPath()))
- .enableDmaapCertAuth(
- Optional.ofNullable(enableDmaapCertAuth).filter(p -> !p.toString().isEmpty())
- .orElse(dmaapPublisherConfiguration.enableDmaapCertAuth()))
- .build();
- }
-
- private boolean noFileConfiguration(Object object) {
- return Objects.isNull(object);
- }
-}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CbsConfigRefreshScheduler.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CbsConfigRefreshScheduler.java
new file mode 100644
index 00000000..d87f21ed
--- /dev/null
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CbsConfigRefreshScheduler.java
@@ -0,0 +1,97 @@
+/*
+ * ============LICENSE_START=======================================================
+ * PNF-REGISTRATION-HANDLER
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.prh.configuration;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.cloud.context.environment.EnvironmentChangeEvent;
+import org.springframework.cloud.context.refresh.ContextRefresher;
+import org.springframework.context.event.EventListener;
+import org.springframework.core.env.Environment;
+import org.springframework.stereotype.Component;
+import reactor.core.Disposable;
+import reactor.core.publisher.Flux;
+import reactor.core.scheduler.Scheduler;
+import reactor.core.scheduler.Schedulers;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import java.time.Duration;
+
+@Component
+public class CbsConfigRefreshScheduler {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(CbsConfigRefreshScheduler.class);
+ private static final String CBS_UPDATES_INTERVAL_PROPERTY = "cbs.updates-interval";
+ private static final Duration NO_UPDATES = Duration.ZERO;
+
+ private final ContextRefresher contextRefresher;
+ private final Environment environment;
+ private final Scheduler scheduler;
+ private volatile Disposable refreshEventsStreamHandle;
+
+
+ public CbsConfigRefreshScheduler(ContextRefresher contextRefresher, Environment environment) {
+ this.contextRefresher = contextRefresher;
+ this.environment = environment;
+ this.scheduler = Schedulers.newElastic("conf-updates");
+ }
+
+ @PostConstruct
+ public void startPollingForCbsUpdates() {
+ startPollingForCbsUpdates(getCbsUpdatesInterval());
+ }
+
+ private void startPollingForCbsUpdates(Duration updatesInterval) {
+ if (!updatesInterval.equals(NO_UPDATES)) {
+ LOGGER.info("Configuring pulling for CBS updates in every {}", updatesInterval);
+ refreshEventsStreamHandle = Flux.interval(updatesInterval, scheduler)
+ .doOnNext(i -> {
+ LOGGER.debug("Requesting context refresh");
+ contextRefresher.refresh();
+ })
+ .onErrorContinue((e, o) -> LOGGER.error("Failed fetching config updates from CBS", e))
+ .subscribe();
+ }
+ }
+
+ @EventListener
+ public void onEnvironmentChanged(EnvironmentChangeEvent event) {
+ if (event.getKeys().contains(CBS_UPDATES_INTERVAL_PROPERTY)) {
+ LOGGER.info("CBS config polling interval changed to {}", environment.getProperty(CBS_UPDATES_INTERVAL_PROPERTY));
+ stopPollingForCbsUpdates();
+ startPollingForCbsUpdates(getCbsUpdatesInterval());
+ }
+ }
+
+ private Duration getCbsUpdatesInterval() {
+ return environment.getProperty(CBS_UPDATES_INTERVAL_PROPERTY, Duration.class, NO_UPDATES);
+ }
+
+ @PreDestroy
+ private void stopPollingForCbsUpdates() {
+ if(refreshEventsStreamHandle != null) {
+ LOGGER.debug("Stopping pulling for CBS updates");
+ refreshEventsStreamHandle.dispose();
+ }
+ }
+
+}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CbsConfiguration.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CbsConfiguration.java
new file mode 100644
index 00000000..c1226359
--- /dev/null
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CbsConfiguration.java
@@ -0,0 +1,93 @@
+/*
+ * ============LICENSE_START=======================================================
+ * PNF-REGISTRATION-HANDLER
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.prh.configuration;
+
+import com.google.gson.JsonObject;
+import org.onap.dcaegen2.services.sdk.rest.services.aai.client.config.AaiClientConfiguration;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DmaapClientFactory;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+
+
+public class CbsConfiguration implements Config {
+ private static final Logger LOGGER = LoggerFactory.getLogger(CbsConfiguration.class);
+ private static final String CBS_CONFIG_MISSING = "CBS config missing";
+ private AaiClientConfiguration aaiClientCBSConfiguration;
+ private MessageRouterPublisher messageRouterPublisher;
+ private MessageRouterSubscriber messageRouterSubscriber;
+ private MessageRouterPublishRequest messageRouterCBSPublishRequest;
+ private MessageRouterSubscribeRequest messageRouterCBSSubscribeRequest;
+ private MessageRouterPublishRequest messageRouterCBSUpdatePublishRequest;
+
+
+ public void parseCBSConfig(JsonObject jsonObject) {
+ LOGGER.info("Received application configuration: {}", jsonObject);
+ CbsContentParser consulConfigurationParser = new CbsContentParser(jsonObject);
+
+ aaiClientCBSConfiguration = consulConfigurationParser.getAaiClientConfig();
+
+ messageRouterPublisher = DmaapClientFactory.createMessageRouterPublisher(
+ consulConfigurationParser.getMessageRouterPublisherConfig());
+ messageRouterCBSPublishRequest = consulConfigurationParser.getMessageRouterPublishRequest();
+ messageRouterCBSUpdatePublishRequest = consulConfigurationParser.getMessageRouterUpdatePublishRequest();
+
+ messageRouterSubscriber = DmaapClientFactory.createMessageRouterSubscriber(
+ consulConfigurationParser.getMessageRouterSubscriberConfig());
+ messageRouterCBSSubscribeRequest = consulConfigurationParser.getMessageRouterSubscribeRequest();
+ }
+
+
+ @Override
+ public MessageRouterPublisher getMessageRouterPublisher() {
+ return Optional.ofNullable(messageRouterPublisher).orElseThrow(() -> new RuntimeException(CBS_CONFIG_MISSING));
+ }
+
+ @Override
+ public MessageRouterSubscriber getMessageRouterSubscriber() {
+ return Optional.ofNullable(messageRouterSubscriber).orElseThrow(() -> new RuntimeException(CBS_CONFIG_MISSING));
+ }
+
+ @Override
+ public MessageRouterPublishRequest getMessageRouterPublishRequest() {
+ return Optional.ofNullable(messageRouterCBSPublishRequest).orElseThrow(() -> new RuntimeException(CBS_CONFIG_MISSING));
+ }
+
+ @Override
+ public MessageRouterPublishRequest getMessageRouterUpdatePublishRequest() {
+ return Optional.ofNullable(messageRouterCBSUpdatePublishRequest).orElseThrow(() -> new RuntimeException(CBS_CONFIG_MISSING));
+ }
+
+ @Override
+ public AaiClientConfiguration getAaiClientConfiguration() {
+ return Optional.ofNullable(aaiClientCBSConfiguration).orElseThrow(() -> new RuntimeException(CBS_CONFIG_MISSING));
+ }
+
+ @Override
+ public MessageRouterSubscribeRequest getMessageRouterSubscribeRequest() {
+ return Optional.ofNullable(messageRouterCBSSubscribeRequest).orElseThrow(() -> new RuntimeException(CBS_CONFIG_MISSING));
+ }
+}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CbsContentParser.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CbsContentParser.java
new file mode 100644
index 00000000..85b4c035
--- /dev/null
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CbsContentParser.java
@@ -0,0 +1,142 @@
+/*
+ * ============LICENSE_START=======================================================
+ * PNF-REGISTRATION-HANDLER
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.prh.configuration;
+
+import com.google.common.reflect.TypeToken;
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSource;
+import org.onap.dcaegen2.services.sdk.rest.services.aai.client.config.AaiClientConfiguration;
+import org.onap.dcaegen2.services.sdk.rest.services.aai.client.config.ImmutableAaiClientConfiguration;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.DataStreams;
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableMessageRouterPublisherConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableMessageRouterSubscriberConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
+import org.onap.dcaegen2.services.sdk.security.ssl.ImmutableSecurityKeys;
+import org.onap.dcaegen2.services.sdk.security.ssl.ImmutableSecurityKeysStore;
+import org.onap.dcaegen2.services.sdk.security.ssl.Passwords;
+import org.onap.dcaegen2.services.sdk.security.ssl.SecurityKeys;
+
+import java.nio.file.Paths;
+import java.time.Duration;
+import java.util.Map;
+
+import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamPredicates.streamWithName;
+
+/**
+ * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 8/21/18
+ */
+class CbsContentParser {
+ private static final String SECURITY_TRUST_STORE_PATH = "security.trustStorePath";
+ private static final String SECURITY_TRUST_STORE_PASS_PATH = "security.trustStorePasswordPath";
+ private static final String SECURITY_KEY_STORE_PATH = "security.keyStorePath";
+ private static final String SECURITY_KEY_STORE_PASS_PATH = "security.keyStorePasswordPath";
+ private static final String CONFIG = "config";
+ private static final String PNF_UPDATE = "pnf-update";
+ private static final String PNF_READY = "pnf-ready";
+ private static final String VES_REG_OUTPUT = "ves-reg-output";
+
+ private final JsonObject jsonObject;
+
+ CbsContentParser(JsonObject jsonObject) {
+ this.jsonObject = jsonObject.getAsJsonObject(CONFIG);
+ }
+
+ MessageRouterPublishRequest getMessageRouterPublishRequest() {
+ return getMessageRouterPublishRequest(PNF_READY);
+ }
+
+ MessageRouterPublishRequest getMessageRouterUpdatePublishRequest() {
+ return getMessageRouterPublishRequest(PNF_UPDATE);
+ }
+
+ private MessageRouterPublishRequest getMessageRouterPublishRequest(String streamName) {
+ RawDataStream<JsonObject> sink = DataStreams.namedSinks(jsonObject).find(streamWithName(streamName)).get();
+ MessageRouterSink parsedSink = StreamFromGsonParsers.messageRouterSinkParser().unsafeParse(sink);
+
+ return ImmutableMessageRouterPublishRequest.builder()
+ .sinkDefinition(parsedSink)
+ .build();
+ }
+
+ MessageRouterPublisherConfig getMessageRouterPublisherConfig() {
+ return ImmutableMessageRouterPublisherConfig.builder()
+ .securityKeys(isDmaapCertAuthEnabled(jsonObject) ? createSecurityKeys(jsonObject) : null)
+ .build();
+ }
+
+ MessageRouterSubscriberConfig getMessageRouterSubscriberConfig() {
+ return ImmutableMessageRouterSubscriberConfig.builder()
+ .securityKeys(isDmaapCertAuthEnabled(jsonObject) ? createSecurityKeys(jsonObject) : null)
+ .build();
+ }
+
+ private SecurityKeys createSecurityKeys(JsonObject config) {
+ return ImmutableSecurityKeys.builder()
+ .keyStore(ImmutableSecurityKeysStore.of(Paths.get(config.get(SECURITY_KEY_STORE_PATH).getAsString())))
+ .keyStorePassword(Passwords.fromPath(Paths.get(config.get(SECURITY_KEY_STORE_PASS_PATH).getAsString())))
+ .trustStore(ImmutableSecurityKeysStore.of(Paths.get(config.get(SECURITY_TRUST_STORE_PATH).getAsString())))
+ .trustStorePassword(Passwords.fromPath(Paths.get(config.get(SECURITY_TRUST_STORE_PASS_PATH).getAsString())))
+ .build();
+ }
+
+ private boolean isDmaapCertAuthEnabled(JsonObject config) {
+ return config.get("security.enableDmaapCertAuth").getAsBoolean();
+ }
+
+ AaiClientConfiguration getAaiClientConfig() {
+ return new ImmutableAaiClientConfiguration.Builder()
+ .pnfUrl(jsonObject.get("aai.aaiClientConfiguration.pnfUrl").getAsString())
+ .aaiUserName(jsonObject.get("aai.aaiClientConfiguration.aaiUserName").getAsString())
+ .aaiServiceInstancePath(jsonObject.get("aai.aaiClientConfiguration.aaiServiceInstancePath").getAsString())
+ .aaiIgnoreSslCertificateErrors(
+ jsonObject.get("aai.aaiClientConfiguration.aaiIgnoreSslCertificateErrors").getAsBoolean())
+ .aaiUserPassword(jsonObject.get("aai.aaiClientConfiguration.aaiUserPassword").getAsString())
+ .trustStorePath(jsonObject.get(SECURITY_TRUST_STORE_PATH).getAsString())
+ .trustStorePasswordPath(jsonObject.get(SECURITY_TRUST_STORE_PASS_PATH).getAsString())
+ .keyStorePath(jsonObject.get(SECURITY_KEY_STORE_PATH).getAsString())
+ .keyStorePasswordPath(jsonObject.get(SECURITY_KEY_STORE_PASS_PATH).getAsString())
+ .enableAaiCertAuth(jsonObject.get("security.enableAaiCertAuth").getAsBoolean())
+ .aaiHeaders(new Gson().fromJson(jsonObject.get("aai.aaiClientConfiguration.aaiHeaders"),
+ new TypeToken<Map<String, String>>(){}.getType()))
+ .build();
+ }
+
+ MessageRouterSubscribeRequest getMessageRouterSubscribeRequest() {
+ RawDataStream<JsonObject> source = DataStreams.namedSources(jsonObject).find(streamWithName(VES_REG_OUTPUT)).get();
+ MessageRouterSource parsedSource = StreamFromGsonParsers.messageRouterSourceParser().unsafeParse(source);
+
+ return ImmutableMessageRouterSubscribeRequest.builder()
+ .consumerGroup(jsonObject.get("dmaap.dmaapConsumerConfiguration.consumerGroup").getAsString())
+ .sourceDefinition(parsedSource)
+ .consumerId(jsonObject.get("dmaap.dmaapConsumerConfiguration.consumerId").getAsString())
+ .timeout(Duration.ofMillis(jsonObject.get("dmaap.dmaapConsumerConfiguration.timeoutMs").getAsLong()))
+ .build();
+ }
+} \ No newline at end of file
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CloudConfigParser.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CloudConfigParser.java
deleted file mode 100644
index 9d7b3396..00000000
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CloudConfigParser.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * PNF-REGISTRATION-HANDLER
- * ================================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.dcaegen2.services.prh.configuration;
-
-import com.google.gson.JsonObject;
-import org.onap.dcaegen2.services.sdk.rest.services.aai.client.config.AaiClientConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.aai.client.config.ImmutableAaiClientConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapConsumerConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration;
-
-/**
- * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 8/21/18
- */
-class CloudConfigParser {
-
- private static final String SECURITY_TRUST_STORE_PATH = "security.trustStorePath";
- private static final String SECURITY_TRUST_STORE_PASS_PATH = "security.trustStorePasswordPath";
- private static final String SECURITY_KEY_STORE_PATH = "security.keyStorePath";
- private static final String SECURITY_KEY_STORE_PASS_PATH = "security.keyStorePasswordPath";
- private final JsonObject jsonObject;
-
- CloudConfigParser(JsonObject jsonObject) {
- this.jsonObject = jsonObject;
- }
-
- DmaapPublisherConfiguration getDmaapPublisherConfig() {
- return new ImmutableDmaapPublisherConfiguration.Builder()
- .dmaapTopicName(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapTopicName").getAsString())
- .dmaapUserPassword(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapUserPassword").getAsString())
- .dmaapPortNumber(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapPortNumber").getAsInt())
- .dmaapProtocol(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapProtocol").getAsString())
- .dmaapContentType(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapContentType").getAsString())
- .dmaapHostName(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapHostName").getAsString())
- .dmaapUserName(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapUserName").getAsString())
- .trustStorePath(jsonObject.get(SECURITY_TRUST_STORE_PATH).getAsString())
- .trustStorePasswordPath(jsonObject.get(SECURITY_TRUST_STORE_PASS_PATH).getAsString())
- .keyStorePath(jsonObject.get(SECURITY_KEY_STORE_PATH).getAsString())
- .keyStorePasswordPath(jsonObject.get(SECURITY_KEY_STORE_PASS_PATH).getAsString())
- .enableDmaapCertAuth(jsonObject.get("security.enableDmaapCertAuth").getAsBoolean())
- .build();
- }
-
- AaiClientConfiguration getAaiClientConfig() {
- return new ImmutableAaiClientConfiguration.Builder()
- .aaiHost(jsonObject.get("aai.aaiClientConfiguration.aaiHost").getAsString())
- .aaiPort(jsonObject.get("aai.aaiClientConfiguration.aaiHostPortNumber").getAsInt())
- .aaiUserName(jsonObject.get("aai.aaiClientConfiguration.aaiUserName").getAsString())
- .aaiPnfPath(jsonObject.get("aai.aaiClientConfiguration.aaiPnfPath").getAsString())
- .aaiIgnoreSslCertificateErrors(
- jsonObject.get("aai.aaiClientConfiguration.aaiIgnoreSslCertificateErrors").getAsBoolean())
- .aaiUserPassword(jsonObject.get("aai.aaiClientConfiguration.aaiUserPassword").getAsString())
- .aaiProtocol(jsonObject.get("aai.aaiClientConfiguration.aaiProtocol").getAsString())
- .aaiBasePath(jsonObject.get("aai.aaiClientConfiguration.aaiBasePath").getAsString())
- .trustStorePath(jsonObject.get(SECURITY_TRUST_STORE_PATH).getAsString())
- .trustStorePasswordPath(jsonObject.get(SECURITY_TRUST_STORE_PASS_PATH).getAsString())
- .keyStorePath(jsonObject.get(SECURITY_KEY_STORE_PATH).getAsString())
- .keyStorePasswordPath(jsonObject.get(SECURITY_KEY_STORE_PASS_PATH).getAsString())
- .enableAaiCertAuth(jsonObject.get("security.enableAaiCertAuth").getAsBoolean())
- .build();
- }
-
- DmaapConsumerConfiguration getDmaapConsumerConfig() {
- return new ImmutableDmaapConsumerConfiguration.Builder()
- .timeoutMs(jsonObject.get("dmaap.dmaapConsumerConfiguration.timeoutMs").getAsInt())
- .dmaapHostName(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapHostName").getAsString())
- .dmaapUserName(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapUserName").getAsString())
- .dmaapUserPassword(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapUserPassword").getAsString())
- .dmaapTopicName(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapTopicName").getAsString())
- .dmaapPortNumber(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapPortNumber").getAsInt())
- .dmaapContentType(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapContentType").getAsString())
- .messageLimit(jsonObject.get("dmaap.dmaapConsumerConfiguration.messageLimit").getAsInt())
- .dmaapProtocol(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapProtocol").getAsString())
- .consumerId(jsonObject.get("dmaap.dmaapConsumerConfiguration.consumerId").getAsString())
- .consumerGroup(jsonObject.get("dmaap.dmaapConsumerConfiguration.consumerGroup").getAsString())
- .trustStorePath(jsonObject.get(SECURITY_TRUST_STORE_PATH).getAsString())
- .trustStorePasswordPath(jsonObject.get(SECURITY_TRUST_STORE_PASS_PATH).getAsString())
- .keyStorePath(jsonObject.get(SECURITY_KEY_STORE_PATH).getAsString())
- .keyStorePasswordPath(jsonObject.get(SECURITY_KEY_STORE_PASS_PATH).getAsString())
- .enableDmaapCertAuth(jsonObject.get("security.enableDmaapCertAuth").getAsBoolean())
- .build();
- }
-} \ No newline at end of file
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CloudConfiguration.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CloudConfiguration.java
deleted file mode 100644
index 08c99621..00000000
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CloudConfiguration.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * PNF-REGISTRATION-HANDLER
- * ================================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.dcaegen2.services.prh.configuration;
-
-import com.google.gson.JsonObject;
-import org.onap.dcaegen2.services.sdk.rest.services.aai.client.config.AaiClientConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.aai.client.config.ImmutableAaiClientConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.EnvProperties;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.providers.CloudConfigurationClient;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.annotation.*;
-import org.springframework.scheduling.annotation.EnableScheduling;
-import reactor.core.publisher.Flux;
-import reactor.core.scheduler.Schedulers;
-
-import java.util.Optional;
-import java.util.Properties;
-
-/**
- * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 8/9/18
- */
-@Configuration
-@ComponentScan("org.onap.dcaegen2.services.sdk.rest.services.cbs.client.providers")
-@EnableConfigurationProperties
-@EnableScheduling
-@Primary
-public class CloudConfiguration extends AppConfig {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(CloudConfiguration.class);
- private CloudConfigurationClient prhConfigurationProvider;
-
- private AaiClientConfiguration aaiClientCloudConfiguration;
- private DmaapPublisherConfiguration dmaapPublisherCloudConfiguration;
- private DmaapConsumerConfiguration dmaapConsumerCloudConfiguration;
-
- @Value("#{systemEnvironment}")
- private Properties systemEnvironment;
-
-
- @Autowired
- public void setThreadPoolTaskScheduler(CloudConfigurationClient prhConfigurationProvider) {
- this.prhConfigurationProvider = prhConfigurationProvider;
- }
-
- public void runTask() {
- Flux.defer(() -> EnvironmentProcessor.evaluate(systemEnvironment))
- .subscribeOn(Schedulers.parallel())
- .subscribe(this::parsingConfigSuccess, this::parsingConfigError);
- }
-
- private void parsingConfigError(Throwable throwable) {
- LOGGER.warn("Failed to process system environments", throwable);
- }
-
- private void cloudConfigError(Throwable throwable) {
- LOGGER.warn("Failed to gather configuration from ConfigBindingService/Consul", throwable);
- }
-
- private void parsingConfigSuccess(EnvProperties envProperties) {
- LOGGER.debug("Fetching PRH configuration from ConfigBindingService/Consul");
- prhConfigurationProvider.callForServiceConfigurationReactive(envProperties)
- .subscribe(this::parseCloudConfig, this::cloudConfigError);
- }
-
- private void parseCloudConfig(JsonObject jsonObject) {
- LOGGER.info("Received application configuration: {}", jsonObject);
- CloudConfigParser cloudConfigParser = new CloudConfigParser(jsonObject);
- dmaapPublisherCloudConfiguration = cloudConfigParser.getDmaapPublisherConfig();
- aaiClientCloudConfiguration = ImmutableAaiClientConfiguration.copyOf(cloudConfigParser.getAaiClientConfig())
- .withAaiHeaders(aaiClientConfiguration.aaiHeaders());
- dmaapConsumerCloudConfiguration = cloudConfigParser.getDmaapConsumerConfig();
- }
-
- @Override
- public DmaapPublisherConfiguration getDmaapPublisherConfiguration() {
- return Optional.ofNullable(dmaapPublisherCloudConfiguration).orElse(super.getDmaapPublisherConfiguration());
- }
-
- @Override
- public AaiClientConfiguration getAaiClientConfiguration() {
- return Optional.ofNullable(aaiClientCloudConfiguration).orElse(super.getAaiClientConfiguration());
- }
-
- @Override
- public DmaapConsumerConfiguration getDmaapConsumerConfiguration() {
- return Optional.ofNullable(dmaapConsumerCloudConfiguration).orElse(super.getDmaapConsumerConfiguration());
- }
-}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/Config.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/Config.java
index 613e9a83..5fe6d703 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/Config.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/Config.java
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* PNF-REGISTRATION-HANDLER
* ================================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * Copyright (C) 2018-2019 NOKIA Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,19 +21,25 @@
package org.onap.dcaegen2.services.prh.configuration;
import org.onap.dcaegen2.services.sdk.rest.services.aai.client.config.AaiClientConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/25/18
*/
public interface Config {
- DmaapConsumerConfiguration getDmaapConsumerConfiguration();
+ MessageRouterSubscribeRequest getMessageRouterSubscribeRequest();
AaiClientConfiguration getAaiClientConfiguration();
- DmaapPublisherConfiguration getDmaapPublisherConfiguration();
+ MessageRouterPublishRequest getMessageRouterPublishRequest();
- void initFileStreamReader();
+ MessageRouterPublishRequest getMessageRouterUpdatePublishRequest();
+
+ MessageRouterPublisher getMessageRouterPublisher();
+
+ MessageRouterSubscriber getMessageRouterSubscriber();
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/DmaapPublisherTaskConfig.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/DmaapPublisherTaskConfig.java
new file mode 100644
index 00000000..e5eb1e66
--- /dev/null
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/DmaapPublisherTaskConfig.java
@@ -0,0 +1,44 @@
+/*
+ * ============LICENSE_START=======================================================
+ * PNF-REGISTRATION-HANDLER
+ * ================================================================================
+ * Copyright (C) 2018-2019 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.prh.configuration;
+
+import org.onap.dcaegen2.services.prh.tasks.DmaapPublisherTask;
+import org.onap.dcaegen2.services.prh.tasks.DmaapPublisherTaskImpl;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+public class DmaapPublisherTaskConfig {
+ @Bean(name = "ReadyPublisherTask")
+ @Autowired
+ public DmaapPublisherTask getReadyPublisherTask(Config config) {
+ return new DmaapPublisherTaskImpl(
+ config::getMessageRouterPublishRequest, config::getMessageRouterPublisher);
+ }
+
+ @Bean(name = "UpdatePublisherTask")
+ @Autowired
+ public DmaapPublisherTask getUpdatePublisherTask(Config config) {
+ return new DmaapPublisherTaskImpl(
+ config::getMessageRouterUpdatePublishRequest, config::getMessageRouterPublisher);
+ }
+}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/EnvironmentProcessor.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/EnvironmentProcessor.java
deleted file mode 100644
index 793fcc27..00000000
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/EnvironmentProcessor.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * PNF-REGISTRATION-HANDLER
- * ================================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.dcaegen2.services.prh.configuration;
-
-import org.onap.dcaegen2.services.prh.exceptions.EnvironmentLoaderException;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.EnvProperties;
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.ImmutableEnvProperties;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import reactor.core.publisher.Mono;
-
-import java.util.Optional;
-import java.util.Properties;
-
-/**
- * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 8/10/18
- */
-final class EnvironmentProcessor {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(EnvironmentProcessor.class);
- private static final int DEFAULT_CONSUL_PORT = 8500;
-
- private EnvironmentProcessor() {
- }
-
- static Mono<EnvProperties> evaluate(Properties systemEnvironment) {
- LOGGER.debug("Loading configuration from system environment variables");
- EnvProperties envProperties;
- try {
- envProperties = ImmutableEnvProperties.builder().consulHost(getConsulHost(systemEnvironment))
- .consulPort(getConsultPort(systemEnvironment)).cbsName(getConfigBindingService(systemEnvironment))
- .appName(getService(systemEnvironment)).build();
- } catch (EnvironmentLoaderException e) {
- return Mono.error(e);
- }
- LOGGER.info("Evaluated system environment variables: {}", envProperties);
- return Mono.just(envProperties);
- }
-
- private static String getConsulHost(Properties systemEnvironments) throws EnvironmentLoaderException {
- return Optional.ofNullable(systemEnvironments.getProperty("CONSUL_HOST"))
- .orElseThrow(() -> new EnvironmentLoaderException("$CONSUL_HOST environment has not been defined"));
- }
-
- private static Integer getConsultPort(Properties systemEnvironments) {
- return Optional.ofNullable(systemEnvironments.getProperty("CONSUL_PORT")).map(Integer::valueOf)
- .orElseGet(EnvironmentProcessor::getDefaultPortOfConsul);
- }
-
- private static String getConfigBindingService(Properties systemEnvironments) throws EnvironmentLoaderException {
- return Optional.ofNullable(systemEnvironments.getProperty("CONFIG_BINDING_SERVICE"))
- .orElseThrow(
- () -> new EnvironmentLoaderException("$CONFIG_BINDING_SERVICE environment has not been defined"));
- }
-
- private static String getService(Properties systemEnvironments) throws EnvironmentLoaderException {
- return Optional.ofNullable(Optional.ofNullable(systemEnvironments.getProperty("HOSTNAME"))
- .orElse(systemEnvironments.getProperty("SERVICE_NAME")))
- .orElseThrow(() -> new EnvironmentLoaderException(
- "Neither $HOSTNAME/$SERVICE_NAME have not been defined as system environment"));
- }
-
- private static Integer getDefaultPortOfConsul() {
- LOGGER.warn("$CONSUL_PORT environment has not been defined, using default port: {}", DEFAULT_CONSUL_PORT);
- return DEFAULT_CONSUL_PORT;
- }
-}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/PrhAppConfig.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/PrhAppConfig.java
index 612fab48..c21fd400 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/PrhAppConfig.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/PrhAppConfig.java
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* PNF-REGISTRATION-HANDLER
* ================================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * Copyright (C) 2018-2019 NOKIA Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,136 +20,38 @@
package org.onap.dcaegen2.services.prh.configuration;
-import com.google.gson.GsonBuilder;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParser;
-import com.google.gson.JsonSyntaxException;
-import com.google.gson.TypeAdapterFactory;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.nio.charset.StandardCharsets;
-import java.util.ServiceLoader;
-import javax.validation.constraints.NotNull;
-import org.onap.dcaegen2.services.sdk.rest.services.aai.client.config.AaiClientConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
-import org.springframework.boot.context.properties.ConfigurationProperties;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.annotation.Configuration;
+import org.springframework.boot.context.event.ApplicationStartedEvent;
+import org.springframework.context.event.EventListener;
import org.springframework.core.io.Resource;
+import org.springframework.stereotype.Component;
+import org.springframework.util.StreamUtils;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/9/18
*/
-@Configuration
-@EnableConfigurationProperties
-@ConfigurationProperties("app")
-public abstract class PrhAppConfig implements Config {
-
- private static final String CONFIG = "configs";
- private static final String AAI = "aai";
- private static final String DMAAP = "dmaap";
- private static final String AAI_CONFIG = "aaiClientConfiguration";
- private static final String DMAAP_PRODUCER = "dmaapProducerConfiguration";
- private static final String DMAAP_CONSUMER = "dmaapConsumerConfiguration";
- private static final String SECURITY = "security";
-
+@Component
+public class PrhAppConfig {
private static final Logger LOGGER = LoggerFactory.getLogger(PrhAppConfig.class);
- AaiClientConfiguration aaiClientConfiguration;
-
- DmaapConsumerConfiguration dmaapConsumerConfiguration;
-
- DmaapPublisherConfiguration dmaapPublisherConfiguration;
-
- @Value("classpath:prh_endpoints.json")
- private Resource resourceFile;
-
- @Override
- public DmaapConsumerConfiguration getDmaapConsumerConfiguration() {
- return dmaapConsumerConfiguration;
- }
-
- @Override
- public AaiClientConfiguration getAaiClientConfiguration() {
- return aaiClientConfiguration;
- }
-
- @Override
- public DmaapPublisherConfiguration getDmaapPublisherConfiguration() {
- return dmaapPublisherConfiguration;
- }
-
- @Override
- public void initFileStreamReader() {
+ @Value("classpath:git_info.json")
+ private Resource gitInfo;
- GsonBuilder gsonBuilder = new GsonBuilder();
- ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory);
- JsonParser parser = new JsonParser();
-
- try (InputStream inputStream = resourceFile.getInputStream()) {
- JsonElement rootElement = getJsonElement(parser, inputStream);
- if (rootElement.isJsonObject()) {
- deserializeAaiConfiguration(gsonBuilder, rootElement);
- deserializeDmaapConsumerConfiguration(gsonBuilder, rootElement);
- deserializeDmaapPublisherConfiguration(gsonBuilder, rootElement);
- }
- }
- catch (IOException e) {
- LOGGER.warn("Problem with file loading, file ", e);
- }
- }
-
- private void deserializeDmaapPublisherConfiguration(GsonBuilder gsonBuilder, JsonElement rootElement) {
- dmaapPublisherConfiguration = deserializeType(gsonBuilder, concatenateJsonObjects(
- rootElement.getAsJsonObject().getAsJsonObject(CONFIG).getAsJsonObject(DMAAP)
- .getAsJsonObject(DMAAP_PRODUCER),
- rootElement.getAsJsonObject().getAsJsonObject(CONFIG).getAsJsonObject(SECURITY)),
- DmaapPublisherConfiguration.class);
- }
-
- private void deserializeDmaapConsumerConfiguration(GsonBuilder gsonBuilder, JsonElement rootElement) {
- dmaapConsumerConfiguration = deserializeType(gsonBuilder, concatenateJsonObjects(
- rootElement.getAsJsonObject().getAsJsonObject(CONFIG).getAsJsonObject(DMAAP)
- .getAsJsonObject(DMAAP_CONSUMER),
- rootElement.getAsJsonObject().getAsJsonObject(CONFIG).getAsJsonObject(SECURITY)),
- DmaapConsumerConfiguration.class);
- }
-
- private void deserializeAaiConfiguration(GsonBuilder gsonBuilder, JsonElement rootElement) {
- aaiClientConfiguration = deserializeType(gsonBuilder, concatenateJsonObjects(
- rootElement.getAsJsonObject().getAsJsonObject(CONFIG).getAsJsonObject(AAI).getAsJsonObject(AAI_CONFIG),
- rootElement.getAsJsonObject().getAsJsonObject(CONFIG).getAsJsonObject(SECURITY)),
- AaiClientConfiguration.class);
- }
-
- JsonElement getJsonElement(JsonParser parser, InputStream inputStream) {
- return parser.parse(new InputStreamReader(inputStream, StandardCharsets.UTF_8));
- }
-
- private JsonObject concatenateJsonObjects(JsonObject target, JsonObject source) {
- source.entrySet()
- .forEach(entry -> target.add(entry.getKey(), entry.getValue()));
- return target;
- }
- private <T> T deserializeType(@NotNull GsonBuilder gsonBuilder, @NotNull JsonObject jsonObject,
- @NotNull Class<T> type) {
- try {
- return gsonBuilder.create().fromJson(jsonObject, type);
- } catch (JsonSyntaxException e) {
- LOGGER.warn("Problem with Json deserialization", e);
- return null;
+ @EventListener(ApplicationStartedEvent.class)
+ public void onApplicationStarted() throws IOException {
+ if(LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Git info={}", StreamUtils.copyToString(gitInfo.getInputStream(), Charset.defaultCharset()));
}
}
- void setResourceFile(Resource resourceFile) {
- this.resourceFile = resourceFile;
+ public Resource getGitInfo() {
+ return gitInfo;
}
-} \ No newline at end of file
+}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/HeartbeatController.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/AppInfoController.java
index c09cc945..7475814f 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/HeartbeatController.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/AppInfoController.java
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* PNF-REGISTRATION-HANDLER
* ================================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * Copyright (C) 2018-2019 NOKIA Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -24,12 +24,14 @@ import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
+import org.onap.dcaegen2.services.prh.configuration.PrhAppConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.core.io.Resource;
import org.springframework.http.HttpStatus;
+import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
-import org.springframework.web.bind.annotation.RequestMapping;
-import org.springframework.web.bind.annotation.RequestMethod;
+import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;
@@ -37,29 +39,28 @@ import reactor.core.publisher.Mono;
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/19/18
*/
@RestController
-@Api(value = "HeartbeatController", description = "Check liveness of PRH service")
-public class HeartbeatController {
+@Api(value = "AppInfoController", description = "Provides basic information about application")
+public class AppInfoController {
- private static final Logger LOGGER = LoggerFactory.getLogger(HeartbeatController.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(AppInfoController.class);
+ private final PrhAppConfig config;
- /**
- * Endpoint for checking that PRH is alive.
- *
- * @return HTTP Status Code
- */
- @RequestMapping(value = "heartbeat", method = RequestMethod.GET)
- @ApiOperation(value = "Returns liveness of PRH service")
- @ApiResponses(value = {
- @ApiResponse(code = 200, message = "PRH sevice is living"),
- @ApiResponse(code = 401, message = "You are not authorized to view the resource"),
- @ApiResponse(code = 403, message = "Accessing the resource you were trying to reach is forbidden"),
- @ApiResponse(code = 404, message = "The resource you were trying to reach is not found")
- }
- )
+ public AppInfoController(PrhAppConfig config) {
+ this.config = config;
+ }
+
+ @GetMapping(value = "heartbeat", produces = MediaType.TEXT_PLAIN_VALUE)
+ @ApiOperation("Returns liveness of PRH service")
+ @ApiResponses(@ApiResponse(code = 200, message = "Service is alive"))
public Mono<ResponseEntity<String>> heartbeat() {
- LOGGER.trace("Receiving heartbeat request");
- return Mono.defer(() ->
- Mono.just(new ResponseEntity<>("alive", HttpStatus.OK))
+ LOGGER.trace("Heartbeat request received");
+ return Mono.defer(() -> Mono.just(new ResponseEntity<>("alive", HttpStatus.OK))
);
}
-} \ No newline at end of file
+
+ @GetMapping(value = "version", produces = MediaType.APPLICATION_JSON_VALUE)
+ @ApiOperation("Returns version information")
+ public Mono<Resource> version() {
+ return Mono.defer(() -> Mono.just(config.getGitInfo()));
+ }
+}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/ScheduleController.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/ScheduleController.java
index aa913654..a0aa17e3 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/ScheduleController.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/ScheduleController.java
@@ -60,15 +60,18 @@ public class ScheduleController {
@ApiOperation(value = "Receiving stop scheduling worker request")
public Mono<ResponseEntity<String>> stopTask() {
LOGGER.trace("Receiving stop scheduling worker request");
- return scheduledTasksRunner.getResponseFromCancellationOfTasks();
+ return Mono.defer(() -> {
+ scheduledTasksRunner.cancelTasks();
+ return Mono.just(new ResponseEntity<>("PRH Service has been stopped!", HttpStatus.OK));
+ }
+ );
}
- @ApiOperation(value = "Sends success or error response on starting task execution")
private ResponseEntity<String> createStartTaskResponse(boolean wasScheduled) {
if (wasScheduled) {
return new ResponseEntity<>("PRH Service has been started!", HttpStatus.CREATED);
} else {
- return new ResponseEntity<>("PRH Service is still running!", HttpStatus.NOT_ACCEPTABLE);
+ return new ResponseEntity<>("PRH Service is already running!", HttpStatus.NOT_ACCEPTABLE);
}
}
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/exceptions/EnvironmentLoaderException.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/exceptions/AaiFailureException.java
index 5fef80d8..6741bb2d 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/exceptions/EnvironmentLoaderException.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/exceptions/AaiFailureException.java
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* PNF-REGISTRATION-HANDLER
* ================================================================================
- * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * Copyright (C) 2019 NOKIA Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,12 +20,9 @@
package org.onap.dcaegen2.services.prh.exceptions;
-/**
- * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 8/10/18
- */
-public class EnvironmentLoaderException extends Exception {
+public class AaiFailureException extends PrhTaskException {
- public EnvironmentLoaderException(String message) {
+ public AaiFailureException(String message) {
super(message);
}
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java
index 4749b520..11939b53 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java
@@ -23,32 +23,57 @@ package org.onap.dcaegen2.services.prh.service;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
-import java.util.Optional;
-import java.util.stream.StreamSupport;
+import io.vavr.collection.List;
import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
import org.onap.dcaegen2.services.prh.model.ImmutableConsumerDmaapModel;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
+import java.util.Optional;
+import java.util.stream.StreamSupport;
+
+import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.ADDITIONAL_FIELDS;
+import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.COMMON_EVENT_HEADER;
+import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.COMMON_FORMAT_FOR_JSON_OBJECT;
+import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.COMMON_FORMAT_FOR_STRING;
+import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.CORRELATION_ID;
+import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.EQUIP_MODEL;
+import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.EQUIP_TYPE;
+import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.EQUIP_VENDOR;
+import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.EVENT;
+import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.NF_ROLE;
+import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.OAM_IPV_4_ADDRESS;
+import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.OAM_IPV_6_ADDRESS;
+import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.PNF_REGISTRATION_FIELDS;
+import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.SERIAL_NUMBER;
+import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.SOURCE_NAME;
+import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.SW_VERSION;
+
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/8/18
*/
+@Component
public class DmaapConsumerJsonParser {
private static final Logger LOGGER = LoggerFactory.getLogger(DmaapConsumerJsonParser.class);
- private static final String EVENT = "event";
- private static final String COMMON_EVENT_HEADER = "commonEventHeader";
- private static final String PNF_REGISTRATION_FIELDS = "pnfRegistrationFields";
- private static final String OAM_IPV_4_ADDRESS = "oamV4IpAddress";
- private static final String OAM_IPV_6_ADDRESS = "oamV6IpAddress";
- private static final String SOURCE_NAME = "sourceName";
- private static final String CORRELATION_ID = "correlationId";
+ private String pnfSourceName;
+ private String pnfOamIpv4Address;
+ private String pnfOamIpv6Address;
+ private String pnfSerialNumberOptionalField;
+ private String pnfEquipVendorOptionalField;
+ private String pnfEquipModelOptionalField;
+ private String pnfEquipTypeOptionalField;
+ private String pnfNfRoleOptionalField;
+ private String pnfSwVersionOptionalField;
+ private JsonObject pnfAdditionalFields;
/**
* Extract info from string and create @see {@link org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel}.
@@ -56,31 +81,24 @@ public class DmaapConsumerJsonParser {
* @param monoMessage - results from DMaaP
* @return reactive DMaaPModel
*/
- public Flux<ConsumerDmaapModel> getJsonObject(Mono<String> monoMessage) {
- return monoMessage
- .flatMapMany(this::getJsonParserMessage)
- .flatMap(this::createJsonConsumerModel);
+ public Flux<ConsumerDmaapModel> getJsonObject(Mono<MessageRouterSubscribeResponse> monoMessage) {
+ return monoMessage.flatMapMany(msgRouterResponse -> getConsumerDmaapModelFromJsonArray(msgRouterResponse.items()));
}
- private Mono<JsonElement> getJsonParserMessage(String message) {
- return StringUtils.isEmpty(message) ? logErrorAndReturnMonoEmpty("DmaaP response is empty")
- : Mono.fromCallable(() -> new JsonParser().parse(message));
- }
+ private Flux<ConsumerDmaapModel> getConsumerDmaapModelFromJsonArray(List<JsonElement> items) {
+ LOGGER.debug("DmaapConsumerJsonParser input for parsing: {}", items);
- private Flux<ConsumerDmaapModel> createJsonConsumerModel(JsonElement jsonElement) {
- return jsonElement.isJsonObject()
- ? create(Flux.defer(() -> Flux.just(jsonElement.getAsJsonObject())))
- : getConsumerDmaapModelFromJsonArray(jsonElement);
- }
-
- private Flux<ConsumerDmaapModel> getConsumerDmaapModelFromJsonArray(JsonElement jsonElement) {
+ if (items.size() == 0) {
+ LOGGER.debug("Nothing to consume from DMaaP");
+ return Flux.empty();
+ }
return create(
- Flux.defer(() -> Flux.fromStream(StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false)
- .map(jsonElementFromArray -> getJsonObjectFromAnArray(jsonElementFromArray)
- .orElseGet(JsonObject::new)))));
+ Flux.defer(() -> Flux.fromStream(StreamSupport.stream(items.spliterator(), false)
+ .map(jsonElementFromArray -> getJsonObjectFromAnArray(jsonElementFromArray)
+ .orElseGet(JsonObject::new)))));
}
- public Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) {
+ Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) {
JsonParser jsonParser = new JsonParser();
return element.isJsonPrimitive() ? Optional.of(jsonParser.parse(element.getAsString()).getAsJsonObject())
: Optional.of(jsonParser.parse(element.toString()).getAsJsonObject());
@@ -94,43 +112,63 @@ public class DmaapConsumerJsonParser {
}
private Mono<ConsumerDmaapModel> transform(JsonObject responseFromDmaap) {
-
JsonObject commonEventHeader = responseFromDmaap.getAsJsonObject(EVENT)
.getAsJsonObject(COMMON_EVENT_HEADER);
JsonObject pnfRegistrationFields = responseFromDmaap.getAsJsonObject(EVENT)
.getAsJsonObject(PNF_REGISTRATION_FIELDS);
- String pnfSourceName = getValueFromJson(commonEventHeader, SOURCE_NAME);
- String pnfOamIpv4Address = getValueFromJson(pnfRegistrationFields, OAM_IPV_4_ADDRESS);
- String pnfOamIpv6Address = getValueFromJson(pnfRegistrationFields, OAM_IPV_6_ADDRESS);
-
- return (StringUtils.isEmpty(pnfSourceName) || !ipPropertiesNotEmpty(pnfOamIpv4Address, pnfOamIpv6Address))
+ this.pnfSourceName = getValueFromJson(commonEventHeader, SOURCE_NAME);
+ this.pnfNfRoleOptionalField = getValueFromJson(commonEventHeader, NF_ROLE);
+ this.pnfOamIpv4Address = getValueFromJson(pnfRegistrationFields, OAM_IPV_4_ADDRESS);
+ this.pnfOamIpv6Address = getValueFromJson(pnfRegistrationFields, OAM_IPV_6_ADDRESS);
+ this.pnfSerialNumberOptionalField = getValueFromJson(pnfRegistrationFields, SERIAL_NUMBER);
+ this.pnfEquipVendorOptionalField = getValueFromJson(pnfRegistrationFields, EQUIP_VENDOR);
+ this.pnfEquipModelOptionalField = getValueFromJson(pnfRegistrationFields, EQUIP_MODEL);
+ this.pnfEquipTypeOptionalField = getValueFromJson(pnfRegistrationFields, EQUIP_TYPE);
+ this.pnfSwVersionOptionalField = getValueFromJson(pnfRegistrationFields, SW_VERSION);
+ this.pnfAdditionalFields = pnfRegistrationFields.getAsJsonObject(ADDITIONAL_FIELDS);
+
+ return (StringUtils.isEmpty(pnfSourceName))
? logErrorAndReturnMonoEmpty("Incorrect json, consumerDmaapModel can not be created: "
- + printMessage(pnfSourceName, pnfOamIpv4Address, pnfOamIpv6Address)) :
+ + printMessage()) :
Mono.just(ImmutableConsumerDmaapModel.builder()
.correlationId(pnfSourceName)
.ipv4(pnfOamIpv4Address)
- .ipv6(pnfOamIpv6Address).build());
+ .ipv6(pnfOamIpv6Address)
+ .serialNumber(pnfSerialNumberOptionalField)
+ .equipVendor(pnfEquipVendorOptionalField)
+ .equipModel(pnfEquipModelOptionalField)
+ .equipType(pnfEquipTypeOptionalField)
+ .nfRole(pnfNfRoleOptionalField)
+ .swVersion(pnfSwVersionOptionalField)
+ .additionalFields(pnfAdditionalFields).build());
}
private String getValueFromJson(JsonObject jsonObject, String jsonKey) {
return jsonObject.has(jsonKey) ? jsonObject.get(jsonKey).getAsString() : "";
}
- private boolean ipPropertiesNotEmpty(String ipv4, String ipv6) {
- return (!StringUtils.isEmpty(ipv4)) || !(StringUtils.isEmpty(ipv6));
- }
-
private boolean containsHeader(JsonObject jsonObject) {
return jsonObject.has(EVENT) && jsonObject.getAsJsonObject(EVENT).has(PNF_REGISTRATION_FIELDS);
}
- private String printMessage(String sourceName, String oamIpv4Address, String oamIpv6Address) {
+ private String printMessage() {
return String.format("%n{"
- + "\"" + CORRELATION_ID + "\": \"%s\","
- + "\"" + OAM_IPV_4_ADDRESS + "\": \"%s\","
- + "\"" + OAM_IPV_6_ADDRESS + "\": \"%s\""
- + "%n}", sourceName, oamIpv4Address, oamIpv6Address);
+ + "\"" + CORRELATION_ID + COMMON_FORMAT_FOR_STRING + ","
+ + "\"" + OAM_IPV_4_ADDRESS + COMMON_FORMAT_FOR_STRING + ","
+ + "\"" + OAM_IPV_6_ADDRESS + COMMON_FORMAT_FOR_STRING + ","
+ + "\"" + SERIAL_NUMBER + COMMON_FORMAT_FOR_STRING + ","
+ + "\"" + EQUIP_VENDOR + COMMON_FORMAT_FOR_STRING + ","
+ + "\"" + EQUIP_MODEL + COMMON_FORMAT_FOR_STRING + ","
+ + "\"" + EQUIP_TYPE + COMMON_FORMAT_FOR_STRING + ","
+ + "\"" + NF_ROLE + COMMON_FORMAT_FOR_STRING + ","
+ + "\"" + SW_VERSION + COMMON_FORMAT_FOR_STRING + ","
+ + "\"" + ADDITIONAL_FIELDS + COMMON_FORMAT_FOR_JSON_OBJECT
+ + "%n}", this.pnfSourceName, this.pnfOamIpv4Address, this.pnfOamIpv6Address,
+ this.pnfSerialNumberOptionalField, this.pnfEquipVendorOptionalField,
+ this.pnfEquipModelOptionalField, this.pnfEquipTypeOptionalField,
+ this.pnfNfRoleOptionalField, this.pnfSwVersionOptionalField, this.pnfAdditionalFields
+ );
}
private <T> Mono<T> logErrorAndReturnMonoEmpty(String messageForLogger) {
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/PnfRegistrationFields.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/PnfRegistrationFields.java
new file mode 100644
index 00000000..a40795b1
--- /dev/null
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/PnfRegistrationFields.java
@@ -0,0 +1,46 @@
+/*
+ * ============LICENSE_START=======================================================
+ * PNF-REGISTRATION-HANDLER
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+
+package org.onap.dcaegen2.services.prh.service;
+
+class PnfRegistrationFields {
+
+ static final String COMMON_FORMAT_FOR_STRING = "\": \"%s\"";
+ static final String COMMON_FORMAT_FOR_JSON_OBJECT = "\": %s";
+ static final String EVENT = "event";
+ static final String COMMON_EVENT_HEADER = "commonEventHeader";
+ static final String PNF_REGISTRATION_FIELDS = "pnfRegistrationFields";
+ static final String OAM_IPV_4_ADDRESS = "oamV4IpAddress";
+ static final String OAM_IPV_6_ADDRESS = "oamV6IpAddress";
+ static final String SOURCE_NAME = "sourceName";
+ static final String CORRELATION_ID = "correlationId";
+
+ // optional fields
+ static final String SERIAL_NUMBER = "serialNumber";
+ static final String EQUIP_VENDOR = "vendorName";
+ static final String EQUIP_MODEL = "modelNumber";
+ static final String EQUIP_TYPE = "unitType";
+ static final String NF_ROLE = "nfNamingCode";
+ static final String SW_VERSION = "softwareVersion";
+ static final String ADDITIONAL_FIELDS = "additionalFields";
+
+ private PnfRegistrationFields() {}
+}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTask.java
index 8e31807a..123eb5a3 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTask.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTask.java
@@ -20,34 +20,11 @@
package org.onap.dcaegen2.services.prh.tasks;
-import javax.net.ssl.SSLException;
-import org.onap.dcaegen2.services.prh.exceptions.AaiNotFoundException;
import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.services.sdk.rest.services.aai.client.config.AaiClientConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.aai.client.service.AaiReactiveWebClientFactory;
-import org.onap.dcaegen2.services.sdk.rest.services.aai.client.service.http.patch.AaiReactiveHttpPatchClient;
-
-import org.onap.dcaegen2.services.sdk.rest.services.ssl.SslFactory;
-import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;
-
-/**
- * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18
- */
-public abstract class AaiProducerTask {
-
- abstract Mono<ConsumerDmaapModel> publish(ConsumerDmaapModel message) throws AaiNotFoundException;
-
- abstract AaiReactiveHttpPatchClient resolveClient() throws SSLException;
-
- protected abstract AaiClientConfiguration resolveConfiguration();
-
- protected abstract Mono<ConsumerDmaapModel> execute(ConsumerDmaapModel consumerDmaapModel)
- throws PrhTaskException, SSLException;
-
- WebClient buildWebClient() throws SSLException {
- return new AaiReactiveWebClientFactory(new SslFactory(), resolveConfiguration()).build();
- }
+@FunctionalInterface
+interface AaiProducerTask {
+ Mono<ConsumerDmaapModel> execute(ConsumerDmaapModel consumerDmaapModel) throws PrhTaskException;
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTaskImpl.java
index 905eb72b..7aaff47d 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTaskImpl.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTaskImpl.java
@@ -20,17 +20,14 @@
package org.onap.dcaegen2.services.prh.tasks;
-import javax.net.ssl.SSLException;
-import org.onap.dcaegen2.services.prh.configuration.Config;
import org.onap.dcaegen2.services.prh.exceptions.AaiNotFoundException;
import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException;
import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.services.prh.model.JsonBodyBuilderImpl;
import org.onap.dcaegen2.services.prh.model.utils.HttpUtils;
-import org.onap.dcaegen2.services.sdk.rest.services.aai.client.config.AaiClientConfiguration;
-import org.onap.dcaegen2.services.sdk.rest.services.aai.client.service.http.patch.AaiReactiveHttpPatchClient;
-
+import org.onap.dcaegen2.services.sdk.rest.services.aai.client.service.http.AaiHttpClient;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.model.AaiModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -42,48 +39,33 @@ import reactor.core.publisher.Mono;
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18
*/
@Component
-public class AaiProducerTaskImpl extends AaiProducerTask {
+public class AaiProducerTaskImpl implements AaiProducerTask {
private static final Logger LOGGER = LoggerFactory.getLogger(AaiProducerTaskImpl.class);
- private final Config config;
- private AaiReactiveHttpPatchClient aaiReactiveHttpPatchClient;
+ private final AaiHttpClient<AaiModel, HttpResponse> aaiHttpPatchClient;
@Autowired
- public AaiProducerTaskImpl(Config config) {
- this.config = config;
- }
-
- @Override
- Mono<ConsumerDmaapModel> publish(ConsumerDmaapModel consumerDmaapModel) {
- LOGGER.info("Publish to AAI DmaapModel");
- return aaiReactiveHttpPatchClient.getAaiProducerResponse(consumerDmaapModel)
- .flatMap(response -> {
- if (HttpUtils.isSuccessfulResponseCode(response.statusCode().value())) {
- return Mono.just(consumerDmaapModel);
- }
- return Mono
- .error(new AaiNotFoundException("Incorrect response code for continuation of tasks workflow"));
- });
+ public AaiProducerTaskImpl(final AaiHttpClient<AaiModel, HttpResponse> aaiHttpPatchClient) {
+ this.aaiHttpPatchClient = aaiHttpPatchClient;
}
- @Override
- AaiReactiveHttpPatchClient resolveClient() throws SSLException {
- return new AaiReactiveHttpPatchClient(resolveConfiguration(), new JsonBodyBuilderImpl()).createAaiWebClient(buildWebClient());
- }
-
- @Override
- protected AaiClientConfiguration resolveConfiguration() {
- return config.getAaiClientConfiguration();
+ private Mono<ConsumerDmaapModel> publish(ConsumerDmaapModel consumerDmaapModel) {
+ Mono<HttpResponse> response = aaiHttpPatchClient.getAaiResponse(consumerDmaapModel);
+ return response.flatMap(r -> {
+ if (HttpUtils.isSuccessfulResponseCode(r.statusCode())) {
+ return Mono.just(consumerDmaapModel);
+ }
+ return Mono
+ .error(new AaiNotFoundException("Incorrect response code for continuation of tasks workflow" + r.statusCode()));
+ });
}
@Override
- protected Mono<ConsumerDmaapModel> execute(ConsumerDmaapModel consumerDmaapModel)
- throws PrhTaskException, SSLException {
+ public Mono<ConsumerDmaapModel> execute(ConsumerDmaapModel consumerDmaapModel) throws PrhTaskException {
if (consumerDmaapModel == null) {
throw new DmaapNotFoundException("Invoked null object to DMaaP task");
}
- aaiReactiveHttpPatchClient = resolveClient();
LOGGER.debug("Method called with arg {}", consumerDmaapModel);
return publish(consumerDmaapModel);
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiQueryTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiQueryTask.java
new file mode 100644
index 00000000..f1b900ac
--- /dev/null
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiQueryTask.java
@@ -0,0 +1,29 @@
+/*
+ * ============LICENSE_START=======================================================
+ * PNF-REGISTRATION-HANDLER
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.prh.tasks;
+
+import org.onap.dcaegen2.services.sdk.rest.services.model.AaiModel;
+import reactor.core.publisher.Mono;
+
+@FunctionalInterface
+public interface AaiQueryTask {
+ Mono<Boolean> execute(final AaiModel aaiModel);
+}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiQueryTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiQueryTaskImpl.java
new file mode 100644
index 00000000..ed0cbcd9
--- /dev/null
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiQueryTaskImpl.java
@@ -0,0 +1,107 @@
+/*
+ * ============LICENSE_START=======================================================
+ * PNF-REGISTRATION-HANDLER
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.prh.tasks;
+
+import org.onap.dcaegen2.services.prh.model.*;
+import org.onap.dcaegen2.services.sdk.rest.services.aai.client.service.http.AaiHttpClient;
+import org.onap.dcaegen2.services.sdk.rest.services.model.AaiModel;
+import org.onap.dcaegen2.services.sdk.rest.services.model.AaiServiceInstanceQueryModel;
+import org.onap.dcaegen2.services.sdk.rest.services.model.ImmutableAaiServiceInstanceQueryModel;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+import reactor.core.publisher.Mono;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Stream;
+
+@Component
+public class AaiQueryTaskImpl implements AaiQueryTask {
+ static final String ACTIVE_STATUS = "Active";
+ static final String RELATED_TO = "service-instance";
+ static final String CUSTOMER = "customer.global-customer-id";
+ static final String SERVICE_TYPE = "service-subscription.service-type";
+ static final String SERVICE_INSTANCE_ID = "service-instance.service-instance-id";
+
+ private final AaiHttpClient<AaiModel, AaiPnfResultModel> getPnfModelClient;
+ private final AaiHttpClient<AaiServiceInstanceQueryModel, AaiServiceInstanceResultModel> getServiceClient;
+
+ @Autowired
+ public AaiQueryTaskImpl(
+ final AaiHttpClient<AaiModel, AaiPnfResultModel> getPnfModelClient,
+ final AaiHttpClient<AaiServiceInstanceQueryModel, AaiServiceInstanceResultModel> getServiceClient) {
+ this.getPnfModelClient = getPnfModelClient;
+ this.getServiceClient = getServiceClient;
+ }
+
+ @Override
+ public Mono<Boolean> execute(AaiModel aaiModel) {
+ return getPnfModelClient
+ .getAaiResponse(aaiModel)
+ .flatMap(this::checkIfPnfHasRelationToService)
+ .flatMap(getServiceClient::getAaiResponse)
+ .map(this::checkIfRelatedServiceInstanceIsActive)
+ .defaultIfEmpty(false);
+ }
+
+ private Mono<AaiServiceInstanceQueryModel> checkIfPnfHasRelationToService(final AaiPnfResultModel model) {
+ return Mono
+ .justOrEmpty(model.getRelationshipList())
+ .map(this::findRelatedTo)
+ .flatMap(Mono::justOrEmpty)
+ .map(RelationshipDict::getRelationshipData)
+ .flatMap(x -> {
+ final Optional<String> customer = findValue(x, CUSTOMER);
+ final Optional<String> serviceType = findValue(x, SERVICE_TYPE);
+ final Optional<String> serviceInstanceId= findValue(x, SERVICE_INSTANCE_ID);
+
+ return customer.isPresent() && serviceType.isPresent() && serviceInstanceId.isPresent()
+ ? Mono.just(ImmutableAaiServiceInstanceQueryModel
+ .builder()
+ .customerId(customer.get())
+ .serviceType(serviceType.get())
+ .serviceInstanceId(serviceInstanceId.get())
+ .build())
+ : Mono.empty();
+ });
+ }
+
+ private Boolean checkIfRelatedServiceInstanceIsActive(final AaiServiceInstanceResultModel model) {
+ return ACTIVE_STATUS.equalsIgnoreCase(model.getOrchestrationStatus());
+ }
+
+ private Optional<RelationshipDict> findRelatedTo(final Relationship data) {
+ return Optional.ofNullable(data.getRelationship())
+ .map(Stream::of)
+ .orElseGet(Stream::empty)
+ .flatMap(List::stream)
+ .filter(x -> RELATED_TO.equals(x.getRelatedTo()))
+ .findFirst();
+ }
+
+ private Optional<String> findValue(final List<RelationshipData> data, final String key) {
+ return data
+ .stream()
+ .filter(y -> key.equals(y.getRelationshipKey()))
+ .findFirst()
+ .map(RelationshipData::getRelationshipValue);
+ }
+}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/BbsActionsTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/BbsActionsTask.java
new file mode 100644
index 00000000..12e13140
--- /dev/null
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/BbsActionsTask.java
@@ -0,0 +1,29 @@
+/*
+ * ============LICENSE_START=======================================================
+ * PNF-REGISTRATION-HANDLER
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.prh.tasks;
+
+import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
+import reactor.core.publisher.Mono;
+
+@FunctionalInterface
+public interface BbsActionsTask {
+ Mono<ConsumerDmaapModel> execute(ConsumerDmaapModel consumerDmaapModel);
+}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/BbsActionsTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/BbsActionsTaskImpl.java
new file mode 100644
index 00000000..8b893211
--- /dev/null
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/BbsActionsTaskImpl.java
@@ -0,0 +1,208 @@
+/*
+ * ============LICENSE_START=======================================================
+ * PNF-REGISTRATION-HANDLER
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcaegen2.services.prh.tasks;
+
+import static io.netty.handler.codec.http.HttpHeaders.Values.APPLICATION_JSON;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.http.HttpHeaders.CONTENT_TYPE;
+import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpMethod.DELETE;
+import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpMethod.GET;
+import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpMethod.PUT;
+
+import com.google.gson.JsonObject;
+import io.vavr.collection.HashMap;
+import io.vavr.collection.Map;
+import org.onap.dcaegen2.services.prh.configuration.Config;
+import org.onap.dcaegen2.services.prh.exceptions.AaiFailureException;
+import org.onap.dcaegen2.services.prh.model.AaiPnfResultModel;
+import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
+import org.onap.dcaegen2.services.prh.model.ImmutableRelationshipDict;
+import org.onap.dcaegen2.services.prh.model.Relationship;
+import org.onap.dcaegen2.services.prh.model.RelationshipDict;
+import org.onap.dcaegen2.services.prh.model.bbs.ImmutableLogicalLink;
+import org.onap.dcaegen2.services.prh.model.bbs.LogicalLink;
+import org.onap.dcaegen2.services.prh.model.utils.HttpUtils;
+import org.onap.dcaegen2.services.prh.model.utils.PrhModelAwareGsonBuilder;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.ImmutableHttpRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RequestBody;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClient;
+import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClientFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+@Component
+public class BbsActionsTaskImpl implements BbsActionsTask {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(BbsActionsTaskImpl.class);
+ private static final String ATTACHMENT_POINT = "attachment-point";
+ private static final String LOGICAL_LINK_URI = "/network/logical-links/logical-link";
+ private static final String PNF_URI = "/network/pnfs/pnf";
+ private static final String LINK_KEY = "logical-link.link-name";
+ private static final String ERROR_PREFIX = "Incorrect response when performing BBS-related actions: ";
+ private static final String LOGICAL_LINK = "logical-link";
+
+ private final Config config;
+ private final RxHttpClient httpClient;
+
+ @Autowired
+ BbsActionsTaskImpl(Config config) {
+ this(config, RxHttpClientFactory.createInsecure());
+ }
+
+ BbsActionsTaskImpl(Config config, RxHttpClient httpClient) {
+ this.config = config;
+ this.httpClient = httpClient;
+ }
+
+ public Mono<ConsumerDmaapModel> execute(ConsumerDmaapModel consumerDmaapModel) {
+ JsonObject additionalFields = consumerDmaapModel.getAdditionalFields();
+ if (additionalFields == null || !additionalFields.has(ATTACHMENT_POINT)) {
+ return Mono.just(consumerDmaapModel);
+ }
+ String linkName = additionalFields.get(ATTACHMENT_POINT).getAsString();
+ if (linkName.isEmpty()) {
+ LOGGER.warn("Attachment point is empty! Ignore related actions.");
+ return Mono.just(consumerDmaapModel);
+ }
+ String pnfName = consumerDmaapModel.getCorrelationId();
+
+ return getLinksByPnf(pnfName)
+ .flatMap(x -> Flux.fromIterable(x.getRelationshipData()))
+ .filter(x -> LINK_KEY.equals(x.getRelationshipKey()))
+ .map(x -> x.getRelationshipValue())
+ .flatMap(oldLinkName -> getLogicalLink(oldLinkName))
+ .filter(oldLink -> oldLink.getLinkType().equals(ATTACHMENT_POINT))
+ .flatMap(oldLink -> deleteLogicalLinkInAai(oldLink.getLinkName(), oldLink.getResourceVersion()))
+ .then(createLogicalLinkInAai(linkName, pnfName))
+ .flatMap(response -> handleFinalResponse(response, consumerDmaapModel));
+ }
+
+ private Flux<RelationshipDict> getLinksByPnf(String pnfName) {
+ return httpClient.call(buildGetRequest(PNF_URI + "/" + pnfName))
+ .flatMap(response -> handleResponse(response, "GET PNF request. Pnf name: " + pnfName))
+ .map(httpResponse -> httpResponse.bodyAsJson(UTF_8,
+ PrhModelAwareGsonBuilder.createGson(), AaiPnfResultModel.class))
+ .flatMapMany(pnfModel -> Flux.fromStream(pnfModel.getRelationshipList().getRelationship().stream()))
+ .filter(x -> LOGICAL_LINK.equals(x.getRelatedTo()));
+ }
+
+ private Mono<LogicalLink> getLogicalLink(String linkName) {
+ ImmutableHttpRequest request = buildGetRequest(LOGICAL_LINK_URI + "/" + linkName);
+ return httpClient.call(request)
+ .flatMap(response -> handleResponse(response, "GET LogicalLink request. Link name: " + linkName))
+ .map(httpResponse -> httpResponse.bodyAsJson(UTF_8,
+ PrhModelAwareGsonBuilder.createGson(), LogicalLink.class));
+ }
+
+ private Mono<HttpResponse> createLogicalLinkInAai(String linkName, String pnfName) {
+ ImmutableHttpRequest request = buildLogicalLinkPutRequest(linkName, pnfName);
+ LOGGER.debug("Creating logical link in AAI {} ", request);
+ return httpClient.call(request)
+ .flatMap(response -> handleResponse(response, "PUT LogicalLink request. Link name: " + linkName));
+ }
+
+ private Mono<HttpResponse> deleteLogicalLinkInAai(String linkName, String resourceVersion) {
+ ImmutableHttpRequest request = buildLogicalLinkDeleteRequest(linkName, resourceVersion);
+ LOGGER.debug("Deleting logical link in AAI {} ", request);
+ return httpClient.call(request)
+ .flatMap(response -> handleResponse(response, "DELETE LogicalLink request. Link name: " + linkName));
+ }
+
+ private ImmutableHttpRequest buildGetRequest(String path) {
+ String uri = buildUri(path);
+ Map<String, String> aaiHeaders = HashMap
+ .ofAll(config.getAaiClientConfiguration().aaiHeaders());
+
+ return ImmutableHttpRequest
+ .builder()
+ .method(GET)
+ .url(uri)
+ .customHeaders(aaiHeaders)
+ .build();
+ }
+
+ private ImmutableHttpRequest buildLogicalLinkPutRequest(String linkName, String pnfName) {
+ String uri = buildUri(LOGICAL_LINK_URI + "/" + linkName);
+ ImmutableLogicalLink logicalLink = prepareModelBuilder(linkName, pnfName).build();
+ RequestBody requestBody = RequestBody.fromString(PrhModelAwareGsonBuilder.createGson().toJson(logicalLink));
+
+ // FIXME: AAI headers for PUT are different than PATCH (taken from prh_endpoints.json)
+ Map<String, String> aaiHeaders = HashMap
+ .ofAll(config.getAaiClientConfiguration().aaiHeaders())
+ .put(CONTENT_TYPE, APPLICATION_JSON);
+
+ return ImmutableHttpRequest
+ .builder()
+ .method(PUT)
+ .url(uri)
+ .body(requestBody)
+ .customHeaders(aaiHeaders)
+ .build();
+ }
+
+ private ImmutableHttpRequest buildLogicalLinkDeleteRequest(String linkName, String resourceVersion) {
+ String uri = buildUri(LOGICAL_LINK_URI + "/" + linkName + "?resource-version=" + resourceVersion);
+
+ Map<String, String> aaiHeaders = HashMap
+ .ofAll(config.getAaiClientConfiguration().aaiHeaders())
+ .put(CONTENT_TYPE, APPLICATION_JSON);
+
+ return ImmutableHttpRequest
+ .builder()
+ .method(DELETE)
+ .url(uri)
+ .customHeaders(aaiHeaders)
+ .build();
+ }
+
+ private ImmutableLogicalLink.Builder prepareModelBuilder(String linkName, String pnfName) {
+ Relationship relationship = org.onap.dcaegen2.services.prh.model.ImmutableRelationship.builder()
+ .addRelationship(
+ ImmutableRelationshipDict.builder().relatedLink(PNF_URI + "/" + pnfName).build()).build();
+
+ return ImmutableLogicalLink
+ .builder()
+ .linkName(linkName)
+ .linkType(ATTACHMENT_POINT)
+ .relationshipList(relationship);
+ }
+
+ private Mono<HttpResponse> handleResponse(HttpResponse response, String msg) {
+ return HttpUtils.isSuccessfulResponseCode(response.statusCode()) ? Mono.just(response) :
+ Mono.error(new AaiFailureException(ERROR_PREFIX + response.statusCode() + ". Occurred in " + msg));
+ }
+
+ private Mono<? extends ConsumerDmaapModel> handleFinalResponse(
+ HttpResponse response, ConsumerDmaapModel consumerDmaapModel) {
+ return HttpUtils.isSuccessfulResponseCode(response.statusCode())
+ ? Mono.just(consumerDmaapModel) : Mono.error(new AaiFailureException(ERROR_PREFIX + response.statusCode()));
+ }
+
+ private String buildUri(String path) {
+ return config.getAaiClientConfiguration().pnfUrl().replace(PNF_URI, path);
+ }
+}
+
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java
index e3ea8962..2deafd8d 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java
@@ -20,22 +20,14 @@
package org.onap.dcaegen2.services.prh.tasks;
-import javax.net.ssl.SSLException;
-
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient;
-
import reactor.core.publisher.Flux;
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18
*/
+@FunctionalInterface
interface DmaapConsumerTask {
-
- void initConfigs();
-
- Flux<ConsumerDmaapModel> execute(String object) throws SSLException;
-
- DMaaPConsumerReactiveHttpClient resolveClient() throws SSLException;
+ Flux<ConsumerDmaapModel> execute();
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java
index fd7bca1e..a990e502 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java
@@ -20,19 +20,15 @@
package org.onap.dcaegen2.services.prh.tasks;
-import javax.net.ssl.SSLException;
import org.onap.dcaegen2.services.prh.configuration.Config;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
import org.onap.dcaegen2.services.prh.service.DmaapConsumerJsonParser;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.ConsumerReactiveHttpClientFactory;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPReactiveWebClientFactory;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
/**
@@ -41,39 +37,21 @@ import reactor.core.publisher.Flux;
@Component
public class DmaapConsumerTaskImpl implements DmaapConsumerTask {
- private static final Logger LOGGER = LoggerFactory.getLogger(DmaapConsumerTaskImpl.class);
private final Config config;
private final DmaapConsumerJsonParser dmaapConsumerJsonParser;
- private final ConsumerReactiveHttpClientFactory httpClientFactory;
- @Autowired
- public DmaapConsumerTaskImpl(Config config) {
- this(config, new DmaapConsumerJsonParser(),
- new ConsumerReactiveHttpClientFactory(new DMaaPReactiveWebClientFactory()));
- }
- DmaapConsumerTaskImpl(Config prhAppConfig,
- DmaapConsumerJsonParser dmaapConsumerJsonParser,
- ConsumerReactiveHttpClientFactory httpClientFactory) {
+ public DmaapConsumerTaskImpl(Config prhAppConfig, DmaapConsumerJsonParser dmaapConsumerJsonParser) {
this.config = prhAppConfig;
this.dmaapConsumerJsonParser = dmaapConsumerJsonParser;
- this.httpClientFactory = httpClientFactory;
}
@Override
- public void initConfigs() {
- config.initFileStreamReader();
+ public Flux<ConsumerDmaapModel> execute() {
+ MessageRouterSubscriber messageRouterSubscriber = config.getMessageRouterSubscriber();
+ MessageRouterSubscribeRequest messageRouterSubscribeRequest = config.getMessageRouterSubscribeRequest();
+ Mono<MessageRouterSubscribeResponse> response = messageRouterSubscriber.get(messageRouterSubscribeRequest);
+ return dmaapConsumerJsonParser.getJsonObject(response);
}
- @Override
- public Flux<ConsumerDmaapModel> execute(String object) throws SSLException {
- DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient = resolveClient();
- LOGGER.debug("Method called with arg {}", object);
- return dmaapConsumerJsonParser.getJsonObject(dmaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse());
- }
-
- @Override
- public DMaaPConsumerReactiveHttpClient resolveClient() throws SSLException {
- return httpClientFactory.create(config.getDmaapConsumerConfiguration());
- }
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java
index 4d6c0f87..0cdc40ea 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java
@@ -22,17 +22,13 @@ package org.onap.dcaegen2.services.prh.tasks;
import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.DMaaPPublisherReactiveHttpClient;
-
-import org.springframework.http.ResponseEntity;
-import reactor.core.publisher.Mono;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
+import reactor.core.publisher.Flux;
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18
*/
-interface DmaapPublisherTask {
-
- Mono<ResponseEntity<String>> execute(ConsumerDmaapModel consumerDmaapModel) throws PrhTaskException;
-
- DMaaPPublisherReactiveHttpClient resolveClient();
+@FunctionalInterface
+public interface DmaapPublisherTask {
+ Flux<MessageRouterPublishResponse> execute(ConsumerDmaapModel consumerDmaapModel) throws PrhTaskException;
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java
index 63e01c12..2890d195 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java
@@ -20,54 +20,46 @@
package org.onap.dcaegen2.services.prh.tasks;
-import org.onap.dcaegen2.services.prh.configuration.Config;
import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException;
-
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.services.prh.model.JsonBodyBuilderImpl;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.DMaaPPublisherReactiveHttpClient;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.DmaaPRestTemplateFactory;
-import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.PublisherReactiveHttpClientFactory;
-
+import org.onap.dcaegen2.services.prh.model.PnfReadyJsonBodyBuilder;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.http.ResponseEntity;
-import org.springframework.stereotype.Component;
-import reactor.core.publisher.Mono;
+import reactor.core.publisher.Flux;
+
+import java.util.function.Supplier;
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18
*/
-@Component
public class DmaapPublisherTaskImpl implements DmaapPublisherTask {
private static final Logger LOGGER = LoggerFactory.getLogger(DmaapPublisherTaskImpl.class);
- private final Config config;
- private final PublisherReactiveHttpClientFactory httpClientFactory;
- @Autowired
- public DmaapPublisherTaskImpl(Config config) {
- this(config, new PublisherReactiveHttpClientFactory(new DmaaPRestTemplateFactory(),new JsonBodyBuilderImpl()));
- }
+ private final Supplier<MessageRouterPublishRequest> publishRequestSupplier;
+ private final Supplier<MessageRouterPublisher> publisherSupplier;
+ private final PnfReadyJsonBodyBuilder pnfReadyJsonBodyBuilder = new PnfReadyJsonBodyBuilder();
- DmaapPublisherTaskImpl(Config config, PublisherReactiveHttpClientFactory httpClientFactory) {
- this.config = config;
- this.httpClientFactory = httpClientFactory;
+
+ public DmaapPublisherTaskImpl(Supplier<MessageRouterPublishRequest> publishRequestSupplier,
+ Supplier<MessageRouterPublisher> publisherSupplier) {
+ this.publishRequestSupplier = publishRequestSupplier;
+ this.publisherSupplier = publisherSupplier;
}
@Override
- public Mono<ResponseEntity<String>> execute(ConsumerDmaapModel consumerDmaapModel) throws DmaapNotFoundException {
+ public Flux<MessageRouterPublishResponse> execute(ConsumerDmaapModel consumerDmaapModel) throws DmaapNotFoundException {
if (consumerDmaapModel == null) {
throw new DmaapNotFoundException("Invoked null object to DMaaP task");
}
- DMaaPPublisherReactiveHttpClient dmaapPublisherReactiveHttpClient = resolveClient();
LOGGER.info("Method called with arg {}", consumerDmaapModel);
- return dmaapPublisherReactiveHttpClient.getDMaaPProducerResponse(consumerDmaapModel);
- }
-
- @Override
- public DMaaPPublisherReactiveHttpClient resolveClient() {
- return httpClientFactory.create(config.getDmaapPublisherConfiguration());
+ MessageRouterPublisher messageRouterPublisher = publisherSupplier.get();
+ MessageRouterPublishRequest messageRouterPublishRequest = publishRequestSupplier.get();
+ return messageRouterPublisher.put(
+ messageRouterPublishRequest,
+ Flux.just(pnfReadyJsonBodyBuilder.createJsonBody(consumerDmaapModel)));
}
} \ No newline at end of file
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
index 2924225b..74c6c426 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java
@@ -20,139 +20,176 @@
package org.onap.dcaegen2.services.prh.tasks;
-import static org.onap.dcaegen2.services.prh.model.logging.MdcVariables.INSTANCE_UUID;
-import static org.onap.dcaegen2.services.prh.model.logging.MdcVariables.RESPONSE_CODE;
-
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.CountDownLatch;
-import java.util.function.Predicate;
-import javax.net.ssl.SSLException;
import org.onap.dcaegen2.services.prh.exceptions.DmaapEmptyResponseException;
import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException;
import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel;
-import org.onap.dcaegen2.services.prh.model.logging.MdcVariables;
-
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse;
+import org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.slf4j.Marker;
import org.slf4j.MarkerFactory;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.http.ResponseEntity;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.function.Predicate;
+
+import static org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables.INSTANCE_UUID;
+import static org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables.RESPONSE_CODE;
+
/**
* @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18
*/
@Component
public class ScheduledTasks {
- private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(ScheduledTasks.class);
private static final Marker INVOKE = MarkerFactory.getMarker("INVOKE");
+
private final DmaapConsumerTask dmaapConsumerTask;
- private final DmaapPublisherTask dmaapProducerTask;
+ private final DmaapPublisherTask dmaapReadyProducerTask;
+ private final DmaapPublisherTask dmaapUpdateProducerTask;
+ private final AaiQueryTask aaiQueryTask;
private final AaiProducerTask aaiProducerTask;
+ private final BbsActionsTask bbsActionsTask;
private Map<String, String> mdcContextMap;
/**
* Constructor for tasks registration in PRHWorkflow.
*
- * @param dmaapConsumerTask - fist task
- * @param dmaapPublisherTask - third task
- * @param aaiPublisherTask - second task
+ * @param dmaapConsumerTask - fist task
+ * @param dmaapReadyPublisherTask - third task
+ * @param dmaapUpdatePublisherTask - fourth task
+ * @param aaiPublisherTask - second task
*/
@Autowired
- public ScheduledTasks(DmaapConsumerTask dmaapConsumerTask, DmaapPublisherTask dmaapPublisherTask,
- AaiProducerTask aaiPublisherTask, Map<String, String> mdcContextMap) {
+ public ScheduledTasks(
+ final DmaapConsumerTask dmaapConsumerTask,
+ @Qualifier("ReadyPublisherTask") final DmaapPublisherTask dmaapReadyPublisherTask,
+ @Qualifier("UpdatePublisherTask") final DmaapPublisherTask dmaapUpdatePublisherTask,
+ final AaiQueryTask aaiQueryTask,
+ final AaiProducerTask aaiPublisherTask,
+ final BbsActionsTask bbsActionsTask,
+ final Map<String, String> mdcContextMap) {
this.dmaapConsumerTask = dmaapConsumerTask;
- this.dmaapProducerTask = dmaapPublisherTask;
+ this.dmaapReadyProducerTask = dmaapReadyPublisherTask;
+ this.dmaapUpdateProducerTask = dmaapUpdatePublisherTask;
+ this.aaiQueryTask = aaiQueryTask;
this.aaiProducerTask = aaiPublisherTask;
+ this.bbsActionsTask = bbsActionsTask;
this.mdcContextMap = mdcContextMap;
}
+ static class State {
+ public final ConsumerDmaapModel dmaapModel;
+ public final Boolean activationStatus;
+
+ public State(final ConsumerDmaapModel dmaapModel, final Boolean activationStatus) {
+ this.dmaapModel = dmaapModel;
+ this.activationStatus = activationStatus;
+ }
+ }
+
/**
* Main function for scheduling prhWorkflow.
*/
public void scheduleMainPrhEventTask() {
MdcVariables.setMdcContextMap(mdcContextMap);
try {
- logger.trace("Execution of tasks was registered");
+ LOGGER.trace("Execution of tasks was registered");
CountDownLatch mainCountDownLatch = new CountDownLatch(1);
consumeFromDMaaPMessage()
- .doOnError(DmaapEmptyResponseException.class, error ->
- logger.warn("Nothing to consume from DMaaP")
- )
- .flatMap(this::publishToAaiConfiguration)
- .doOnError(exception ->
- logger.warn("AAIProducerTask exception has been registered: ", exception))
- .onErrorResume(resumePrhPredicate(), exception -> Mono.empty())
- .flatMap(this::publishToDmaapConfiguration)
- .doOnError(exception ->
- logger.warn("DMaaPProducerTask exception has been registered: ", exception))
- .onErrorResume(resumePrhPredicate(), exception -> Mono.empty())
- .doOnTerminate(mainCountDownLatch::countDown)
- .subscribe(this::onSuccess, this::onError, this::onComplete);
+ .doOnError(DmaapEmptyResponseException.class, error ->
+ LOGGER.warn("Nothing to consume from DMaaP")
+ )
+ .flatMap(this::queryAaiForConfiguration)
+ .flatMap(this::publishToAaiConfiguration)
+ .flatMap(this::processAdditionalFields)
+ .flatMap(this::publishToDmaapConfiguration)
+ .onErrorResume(resumePrhPredicate(), exception -> Mono.empty())
+ .doOnTerminate(mainCountDownLatch::countDown)
+ .subscribe(this::onSuccess, this::onError, this::onComplete);
mainCountDownLatch.await();
} catch (InterruptedException e) {
- logger.warn("Interruption problem on countDownLatch ", e);
+ LOGGER.warn("Interruption problem on countDownLatch ", e);
Thread.currentThread().interrupt();
}
}
-
private void onComplete() {
- logger.info("PRH tasks have been completed");
+ LOGGER.info("PRH tasks have been completed");
}
- private void onSuccess(ResponseEntity<String> responseCode) {
- MDC.put(RESPONSE_CODE, responseCode.getStatusCode().toString());
- logger.info("Prh consumed tasks successfully. HTTP Response code from DMaaPProducer {}",
- responseCode.getStatusCode().value());
- MDC.remove(RESPONSE_CODE);
+ private void onSuccess(MessageRouterPublishResponse response) {
+ if (response.successful()) {
+ String statusCodeOk = HttpStatus.OK.name();
+ MDC.put(RESPONSE_CODE, statusCodeOk);
+ LOGGER.info("Prh consumed tasks successfully. HTTP Response code from DMaaPProducer {}", statusCodeOk);
+ MDC.remove(RESPONSE_CODE);
+ }
}
private void onError(Throwable throwable) {
if (!(throwable instanceof DmaapEmptyResponseException)) {
- logger.warn("Chain of tasks have been aborted due to errors in PRH workflow", throwable);
+ LOGGER.warn("Chain of tasks have been aborted due to errors in PRH workflow", throwable);
}
}
-
private Flux<ConsumerDmaapModel> consumeFromDMaaPMessage() {
return Flux.defer(() -> {
MdcVariables.setMdcContextMap(mdcContextMap);
MDC.put(INSTANCE_UUID, UUID.randomUUID().toString());
- logger.info(INVOKE, "Init configs");
- dmaapConsumerTask.initConfigs();
- return consumeFromDMaaP();
+ LOGGER.info(INVOKE, "Init configs");
+ return dmaapConsumerTask.execute();
});
}
- private Flux<ConsumerDmaapModel> consumeFromDMaaP() {
- try {
- return dmaapConsumerTask.execute("");
- } catch (SSLException e) {
- return Flux.error(e);
- }
+ private Mono<State> queryAaiForConfiguration(final ConsumerDmaapModel monoDMaaPModel) {
+ return aaiQueryTask
+ .execute(monoDMaaPModel)
+ .map(x -> new State(monoDMaaPModel, x));
}
- private Mono<ConsumerDmaapModel> publishToAaiConfiguration(ConsumerDmaapModel monoDMaaPModel) {
+ private Mono<State> publishToAaiConfiguration(final State state) {
try {
- return aaiProducerTask.execute(monoDMaaPModel);
- } catch (PrhTaskException | SSLException e) {
+ return state.activationStatus
+ ? Mono.just(state)
+ : aaiProducerTask
+ .execute(state.dmaapModel)
+ .map(x -> state);
+ } catch (PrhTaskException e) {
+ LOGGER.warn("AAIProducerTask exception has been registered: ", e);
return Mono.error(e);
}
}
- private Mono<ResponseEntity<String>> publishToDmaapConfiguration(ConsumerDmaapModel monoAaiModel) {
+ private Mono<State> processAdditionalFields(final State state) {
+ if (state.activationStatus) {
+ LOGGER.debug("Re-registration - Logical links won't be updated.");
+ return Mono.just(state);
+ }
+ return bbsActionsTask.execute(state.dmaapModel).map(x -> state);
+ }
+
+ private Flux<MessageRouterPublishResponse> publishToDmaapConfiguration(final State state) {
try {
- return dmaapProducerTask.execute(monoAaiModel);
+ if (state.activationStatus) {
+ LOGGER.debug("Re-registration - Using PNF_UPDATE DMaaP topic.");
+ return dmaapUpdateProducerTask.execute(state.dmaapModel);
+ }
+ return dmaapReadyProducerTask.execute(state.dmaapModel);
} catch (PrhTaskException e) {
- return Mono.error(e);
+ LOGGER.warn("DMaaPProducerTask exception has been registered: ", e);
+ return Flux.error(e);
}
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasksRunner.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasksRunner.java
index 956ffead..25ed262e 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasksRunner.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasksRunner.java
@@ -20,26 +20,20 @@
package org.onap.dcaegen2.services.prh.tasks;
-import io.swagger.annotations.ApiOperation;
import java.time.Duration;
-import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ScheduledFuture;
-import javax.annotation.PostConstruct;
-import org.onap.dcaegen2.services.prh.configuration.CloudConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.Marker;
import org.slf4j.MarkerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.context.event.ApplicationStartedEvent;
import org.springframework.context.annotation.Configuration;
-import org.springframework.http.HttpStatus;
-import org.springframework.http.ResponseEntity;
+import org.springframework.context.event.EventListener;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.annotation.EnableScheduling;
-import reactor.core.publisher.Mono;
/**
@@ -48,38 +42,30 @@ import reactor.core.publisher.Mono;
@Configuration
@EnableScheduling
public class ScheduledTasksRunner {
-
private static final int SCHEDULING_DELAY_FOR_PRH_TASKS = 10;
- private static final int SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY = 5;
private static final Logger LOGGER = LoggerFactory.getLogger(ScheduledTasksRunner.class);
private static final Marker ENTRY = MarkerFactory.getMarker("ENTRY");
private static volatile List<ScheduledFuture> scheduledPrhTaskFutureList = new ArrayList<>();
private final TaskScheduler taskScheduler;
private final ScheduledTasks scheduledTask;
- private final CloudConfiguration cloudConfiguration;
- @Autowired
- public ScheduledTasksRunner(TaskScheduler taskScheduler,
- ScheduledTasks scheduledTask,
- CloudConfiguration cloudConfiguration) {
+ public ScheduledTasksRunner(TaskScheduler taskScheduler, ScheduledTasks scheduledTask) {
this.taskScheduler = taskScheduler;
this.scheduledTask = scheduledTask;
- this.cloudConfiguration = cloudConfiguration;
+ }
+
+ @EventListener
+ public void onApplicationStartedEvent(ApplicationStartedEvent applicationStartedEvent) {
+ tryToStartTask();
}
/**
* Function which have to stop tasks execution.
- *
- * @return response entity about status of cancellation operation
*/
- @ApiOperation(value = "Get response on stopping task execution")
- public synchronized Mono<ResponseEntity<String>> getResponseFromCancellationOfTasks() {
+ public synchronized void cancelTasks() {
scheduledPrhTaskFutureList.forEach(x -> x.cancel(false));
scheduledPrhTaskFutureList.clear();
- return Mono.defer(() ->
- Mono.just(new ResponseEntity<>("PRH Service has already been stopped!", HttpStatus.CREATED))
- );
}
/**
@@ -87,16 +73,10 @@ public class ScheduledTasksRunner {
*
* @return status of operation execution: true - started, false - not started
*/
-
- @PostConstruct
- @ApiOperation(value = "Start task if possible")
public synchronized boolean tryToStartTask() {
LOGGER.info(ENTRY, "Start scheduling PRH workflow");
if (scheduledPrhTaskFutureList.isEmpty()) {
scheduledPrhTaskFutureList.add(taskScheduler
- .scheduleAtFixedRate(cloudConfiguration::runTask, Instant.now(),
- Duration.ofMinutes(SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY)));
- scheduledPrhTaskFutureList.add(taskScheduler
.scheduleWithFixedDelay(scheduledTask::scheduleMainPrhEventTask,
Duration.ofSeconds(SCHEDULING_DELAY_FOR_PRH_TASKS)));
return true;