diff options
Diffstat (limited to 'prh-app-server/src/main/java/org/onap/dcaegen2')
36 files changed, 1630 insertions, 1019 deletions
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/bootstrap/CbsBootstrapConfiguration.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/bootstrap/CbsBootstrapConfiguration.java new file mode 100644 index 00000000..f668a581 --- /dev/null +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/bootstrap/CbsBootstrapConfiguration.java @@ -0,0 +1,58 @@ +/* + * ============LICENSE_START======================================================= + * PNF-REGISTRATION-HANDLER + * ================================================================================ + * Copyright (C) 2019 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.services.bootstrap; + + +import org.onap.dcaegen2.services.prh.configuration.CbsConfiguration; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +@EnableConfigurationProperties(CbsProperties.class) +public class CbsBootstrapConfiguration { + private static final CbsConfiguration CBS_CONFIGURATION = new CbsConfiguration(); + + @Bean + public CbsProperties cbsProperties() { + return new CbsProperties(); + } + + @Bean + @ConditionalOnProperty(value = "cbs.enabled", matchIfMissing = true) + public CbsPropertySourceLocator cbsPropertySourceLocator( + CbsProperties cbsProperties, + CbsConfiguration cbsConfiguration) { + + return new CbsPropertySourceLocator( + cbsProperties, + new CbsJsonToPropertyMapConverter(), + new CbsClientConfigurationResolver(cbsProperties), + new CbsClientFactoryFacade(), + cbsConfiguration); + } + + @Bean + public CbsConfiguration cbsConfiguration() { + return CBS_CONFIGURATION; + } +} diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/bootstrap/CbsClientConfigurationResolver.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/bootstrap/CbsClientConfigurationResolver.java new file mode 100644 index 00000000..41389980 --- /dev/null +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/bootstrap/CbsClientConfigurationResolver.java @@ -0,0 +1,46 @@ +/* + * ============LICENSE_START======================================================= + * PNF-REGISTRATION-HANDLER + * ================================================================================ + * Copyright (C) 2019 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.services.bootstrap; + +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsClientConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class CbsClientConfigurationResolver { + + private static final Logger LOGGER = LoggerFactory.getLogger(CbsClientConfigurationResolver.class); + private final CbsProperties cbsProperties; + + CbsClientConfigurationResolver(CbsProperties cbsProperties) { + this.cbsProperties = cbsProperties; + } + + CbsClientConfiguration resolveCbsClientConfiguration() { + try { + return CbsClientConfiguration.fromEnvironment(); + } catch (Exception e) { + LOGGER.warn("Failed resolving CBS client configuration from system environments: " + e); + } + LOGGER.info("Falling back to use default CBS client configuration properties"); + return cbsProperties.toCbsClientConfiguration(); + } + +} diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/bootstrap/CbsClientFactoryFacade.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/bootstrap/CbsClientFactoryFacade.java new file mode 100644 index 00000000..a68a35c9 --- /dev/null +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/bootstrap/CbsClientFactoryFacade.java @@ -0,0 +1,34 @@ +/* + * ============LICENSE_START======================================================= + * PNF-REGISTRATION-HANDLER + * ================================================================================ + * Copyright (C) 2019 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.services.bootstrap; + +import org.jetbrains.annotations.NotNull; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsClientConfiguration; +import reactor.core.publisher.Mono; + +class CbsClientFactoryFacade { + @NotNull + Mono<CbsClient> createCbsClient(CbsClientConfiguration cbsClientConfiguration) { + return CbsClientFactory.createCbsClient(cbsClientConfiguration); + } +} diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/bootstrap/CbsJsonToPropertyMapConverter.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/bootstrap/CbsJsonToPropertyMapConverter.java new file mode 100644 index 00000000..79bf4656 --- /dev/null +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/bootstrap/CbsJsonToPropertyMapConverter.java @@ -0,0 +1,62 @@ +/* + * ============LICENSE_START======================================================= + * PNF-REGISTRATION-HANDLER + * ================================================================================ + * Copyright (C) 2019 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.services.bootstrap; + +import com.google.gson.JsonObject; +import com.google.gson.JsonPrimitive; + +import java.util.Map; + +import static java.util.stream.Collectors.toMap; + +class CbsJsonToPropertyMapConverter { + + private static final String CBS_CONFIG_ROOT_PROPERTY = "config"; + + Map<String, Object> convertToMap(JsonObject jsonObject) { + verifyExpectedCbsJsonFormat(jsonObject); + JsonObject config = jsonObject.getAsJsonObject(CBS_CONFIG_ROOT_PROPERTY); + return config.entrySet().stream() + .filter(entry -> entry.getValue().isJsonPrimitive()) + .collect(toMap(Map.Entry::getKey, entry -> unpack(entry.getValue().getAsJsonPrimitive()))); + } + + private static void verifyExpectedCbsJsonFormat(JsonObject jsonObject) { + if (!jsonObject.has(CBS_CONFIG_ROOT_PROPERTY)) { + throw new IllegalArgumentException("Missing expected '" + CBS_CONFIG_ROOT_PROPERTY + "'" + + " property in json from CBS."); + } + } + + private Object unpack(JsonPrimitive value) { + if (value.isString()) { + return value.getAsString(); + } + if (value.isBoolean()) { + return value.getAsBoolean(); + } + if (value.isNumber()) { + return value.getAsLong(); + } + throw new IllegalArgumentException("Unexpected JsonPrimitive type found in configuration form CBS: " + value); + } + +} diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/bootstrap/CbsProperties.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/bootstrap/CbsProperties.java new file mode 100644 index 00000000..18d4021b --- /dev/null +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/bootstrap/CbsProperties.java @@ -0,0 +1,98 @@ +/* + * ============LICENSE_START======================================================= + * PNF-REGISTRATION-HANDLER + * ================================================================================ + * Copyright (C) 2019 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.services.bootstrap; + + +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsClientConfiguration; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.ImmutableCbsClientConfiguration; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.boot.context.properties.NestedConfigurationProperty; + +import java.time.Duration; + +@ConfigurationProperties("cbs") +public class CbsProperties { + + private Boolean enabled; + private Duration updatesInterval; + @NestedConfigurationProperty + private RetryProperties fetchRetries = new RetryProperties(); + private String hostname; + private Integer port; + private String appName; + + CbsClientConfiguration toCbsClientConfiguration() { + return ImmutableCbsClientConfiguration.builder() + .hostname(hostname) + .port(port) + .appName(appName) + .build(); + } + + public String getHostname() { + return hostname; + } + + public void setHostname(String hostname) { + this.hostname = hostname; + } + + public Integer getPort() { + return port; + } + + public void setPort(Integer port) { + this.port = port; + } + + public String getAppName() { + return appName; + } + + public void setAppName(String appName) { + this.appName = appName; + } + + public Boolean getEnabled() { + return enabled; + } + + public void setEnabled(Boolean enabled) { + this.enabled = enabled; + } + + public Duration getUpdatesInterval() { + return updatesInterval; + } + + public void setUpdatesInterval(Duration updatesInterval) { + this.updatesInterval = updatesInterval; + } + + public RetryProperties getFetchRetries() { + return fetchRetries; + } + + public void setFetchRetries(RetryProperties fetchRetries) { + this.fetchRetries = fetchRetries; + } + +} diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/bootstrap/CbsPropertySourceLocator.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/bootstrap/CbsPropertySourceLocator.java new file mode 100644 index 00000000..7b660202 --- /dev/null +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/bootstrap/CbsPropertySourceLocator.java @@ -0,0 +1,84 @@ +/* + * ============LICENSE_START======================================================= + * PNF-REGISTRATION-HANDLER + * ================================================================================ + * Copyright (C) 2019 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.services.bootstrap; + +import com.google.gson.JsonObject; +import org.onap.dcaegen2.services.prh.configuration.CbsConfiguration; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsClientConfiguration; +import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.cloud.bootstrap.config.PropertySourceLocator; +import org.springframework.core.env.Environment; +import org.springframework.core.env.MapPropertySource; +import org.springframework.core.env.PropertySource; + +import java.util.Map; + +public class CbsPropertySourceLocator implements PropertySourceLocator { + private static final Logger LOGGER = LoggerFactory.getLogger(CbsPropertySourceLocator.class); + + private final CbsProperties cbsProperties; + private final CbsJsonToPropertyMapConverter cbsJsonToPropertyMapConverter; + private final CbsClientConfigurationResolver cbsClientConfigurationResolver; + private final CbsClientFactoryFacade cbsClientFactoryFacade; + private final CbsConfiguration cbsConfiguration; + + public CbsPropertySourceLocator(CbsProperties cbsProperties, + CbsJsonToPropertyMapConverter cbsJsonToPropertyMapConverter, + CbsClientConfigurationResolver cbsClientConfigurationResolver, + CbsClientFactoryFacade cbsClientFactoryFacade, + CbsConfiguration cbsConfiguration) { + this.cbsProperties = cbsProperties; + this.cbsJsonToPropertyMapConverter = cbsJsonToPropertyMapConverter; + this.cbsClientConfigurationResolver = cbsClientConfigurationResolver; + this.cbsClientFactoryFacade = cbsClientFactoryFacade; + this.cbsConfiguration = cbsConfiguration; + } + + @Override + public PropertySource<?> locate(Environment environment) { + CbsClientConfiguration cbsClientConfiguration = cbsClientConfigurationResolver.resolveCbsClientConfiguration(); + LOGGER.info("Fetching configuration from Config Binding Service @ {}:{} for {}", + cbsClientConfiguration.hostname(), cbsClientConfiguration.port(), cbsClientConfiguration.appName()); + Map<String, Object> properties = cbsClientFactoryFacade.createCbsClient(cbsClientConfiguration) + .flatMap(cbsClient -> cbsClient.get(CbsRequests.getAll(RequestDiagnosticContext.create()))) + .doOnError(e -> LOGGER.warn("Failed fetching config properties from CBS - retrying...", e)) + .retryBackoff(cbsProperties.getFetchRetries().getMaxAttempts(), + cbsProperties.getFetchRetries().getFirstBackoff(), + cbsProperties.getFetchRetries().getMaxBackoff()) + .doOnNext(this::updateCbsConfig) + .map(cbsJsonToPropertyMapConverter::convertToMap) + .block(); + return new MapPropertySource("cbs", properties); + } + + private void updateCbsConfig(JsonObject jsonObject) { + try { + cbsConfiguration.parseCBSConfig(jsonObject); + } catch (Exception e) { + LOGGER.error("Failed parsing configuration from CBS", e); + throw e; + } + } + +} diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/bootstrap/RetryProperties.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/bootstrap/RetryProperties.java new file mode 100644 index 00000000..44108a72 --- /dev/null +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/bootstrap/RetryProperties.java @@ -0,0 +1,54 @@ +/* + * ============LICENSE_START======================================================= + * PNF-REGISTRATION-HANDLER + * ================================================================================ + * Copyright (C) 2019 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.services.bootstrap; + +import java.time.Duration; + +public class RetryProperties { + + private Integer maxAttempts = 10; + private Duration firstBackoff = Duration.ofSeconds(3); + private Duration maxBackoff = Duration.ofSeconds(15); + + public Integer getMaxAttempts() { + return maxAttempts; + } + + public void setMaxAttempts(Integer maxAttempts) { + this.maxAttempts = maxAttempts; + } + + public Duration getFirstBackoff() { + return firstBackoff; + } + + public void setFirstBackoff(Duration firstBackoff) { + this.firstBackoff = firstBackoff; + } + + public Duration getMaxBackoff() { + return maxBackoff; + } + + public void setMaxBackoff(Duration maxBackoff) { + this.maxBackoff = maxBackoff; + } +} diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/MainApp.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/MainApp.java index a1fe5770..1d2a65d3 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/MainApp.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/MainApp.java @@ -20,27 +20,27 @@ package org.onap.dcaegen2.services.prh; -import static org.onap.dcaegen2.services.prh.model.logging.MdcVariables.INVOCATION_ID; -import static org.onap.dcaegen2.services.prh.model.logging.MdcVariables.REQUEST_ID; - import java.util.Map; import java.util.UUID; + import org.slf4j.MDC; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.jackson.JacksonAutoConfiguration; +import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.concurrent.ConcurrentTaskScheduler; +import static org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables.INVOCATION_ID; + /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18 */ @SpringBootApplication(exclude = {JacksonAutoConfiguration.class}) -@Configuration @EnableScheduling +@EnableConfigurationProperties public class MainApp { public static void main(String[] args) { @@ -49,7 +49,6 @@ public class MainApp { @Bean Map<String, String> mdcContextMap() { - MDC.put(REQUEST_ID, "SampleRequestID"); MDC.put(INVOCATION_ID, UUID.randomUUID().toString()); return MDC.getCopyOfContextMap(); } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/AaiHttpClientConfig.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/AaiHttpClientConfig.java new file mode 100644 index 00000000..e09322d3 --- /dev/null +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/AaiHttpClientConfig.java @@ -0,0 +1,84 @@ +/* + * ============LICENSE_START======================================================= + * PNF-REGISTRATION-HANDLER + * ================================================================================ + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.services.prh.configuration; + +import java.nio.charset.StandardCharsets; +import java.util.function.BiFunction; +import org.onap.dcaegen2.services.prh.model.AaiJsonBodyBuilderImpl; +import org.onap.dcaegen2.services.prh.model.AaiPnfResultModel; +import org.onap.dcaegen2.services.prh.model.AaiServiceInstanceResultModel; +import org.onap.dcaegen2.services.prh.model.utils.PrhModelAwareGsonBuilder; +import org.onap.dcaegen2.services.sdk.rest.services.aai.client.config.AaiClientConfiguration; +import org.onap.dcaegen2.services.sdk.rest.services.aai.client.service.AaiHttpClientFactory; +import org.onap.dcaegen2.services.sdk.rest.services.aai.client.service.http.AaiHttpClient; +import org.onap.dcaegen2.services.sdk.rest.services.aai.client.service.http.get.AaiGetServiceInstanceClient; +import org.onap.dcaegen2.services.sdk.rest.services.aai.client.service.http.get.AaiHttpGetClient; +import org.onap.dcaegen2.services.sdk.rest.services.aai.client.service.http.patch.AaiHttpPatchClient; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClient; +import org.onap.dcaegen2.services.sdk.rest.services.model.AaiModel; +import org.onap.dcaegen2.services.sdk.rest.services.model.AaiServiceInstanceQueryModel; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class AaiHttpClientConfig { + @Autowired + private CbsConfiguration cbsConfiguration; + + @Bean + public AaiHttpClient<AaiModel, HttpResponse> getPatchClientFactory() { + return createLazyConfigClient( + (config, client) -> new AaiHttpPatchClient(config, new AaiJsonBodyBuilderImpl(), client)); + } + + @Bean + public AaiHttpClient<AaiServiceInstanceQueryModel, AaiServiceInstanceResultModel> getServiceInstanceClient() { + return createLazyConfigClient( + (config, client) -> new AaiGetServiceInstanceClient(config, client) + .map(httpResponse -> { + httpResponse.throwIfUnsuccessful(); + return httpResponse.bodyAsJson(StandardCharsets.UTF_8, + PrhModelAwareGsonBuilder.createGson(), AaiServiceInstanceResultModel.class); + })); + } + + @Bean + public AaiHttpClient<AaiModel, AaiPnfResultModel> getGetClient() { + return createLazyConfigClient( + (config, client) -> new AaiHttpGetClient(config, client) + .map(httpResponse -> { + httpResponse.throwIfUnsuccessful(); + return httpResponse.bodyAsJson(StandardCharsets.UTF_8, + PrhModelAwareGsonBuilder.createGson(), AaiPnfResultModel.class); + })); + } + + private <T, U> AaiHttpClient<T, U> createLazyConfigClient( + final BiFunction<AaiClientConfiguration, RxHttpClient, AaiHttpClient<T, U>> factoryMethod) { + + return x -> factoryMethod.apply( + cbsConfiguration.getAaiClientConfiguration(), + new AaiHttpClientFactory(cbsConfiguration.getAaiClientConfiguration()).build() + ).getAaiResponse(x); + } +} diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/AppConfig.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/AppConfig.java deleted file mode 100644 index 99886302..00000000 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/AppConfig.java +++ /dev/null @@ -1,285 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * PNF-REGISTRATION-HANDLER - * ================================================================================ - * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.dcaegen2.services.prh.configuration; - -import java.util.Objects; -import java.util.Optional; -import java.util.function.Predicate; -import org.onap.dcaegen2.services.sdk.rest.services.aai.client.config.AaiClientConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.aai.client.config.ImmutableAaiClientConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapConsumerConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.boot.context.properties.EnableConfigurationProperties; -import org.springframework.context.annotation.Configuration; - -/** - * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/9/18 - */ - -@Configuration -@EnableConfigurationProperties -public class AppConfig extends PrhAppConfig { - - private static Predicate<String> isEmpty = String::isEmpty; - @Value("${dmaap.dmaapConsumerConfiguration.dmaapHostName:}") - public String consumerDmaapHostName; - - @Value("${dmaap.dmaapConsumerConfiguration.dmaapPortNumber:}") - public Integer consumerDmaapPortNumber; - - @Value("${dmaap.dmaapConsumerConfiguration.dmaapTopicName:}") - public String consumerDmaapTopicName; - - @Value("${dmaap.dmaapConsumerConfiguration.dmaapProtocol:}") - public String consumerDmaapProtocol; - - @Value("${dmaap.dmaapConsumerConfiguration.dmaapUserName:}") - public String consumerDmaapUserName; - - @Value("${dmaap.dmaapConsumerConfiguration.dmaapUserPassword:}") - public String consumerDmaapUserPassword; - - @Value("${dmaap.dmaapConsumerConfiguration.dmaapContentType:}") - public String consumerDmaapContentType; - - @Value("${dmaap.dmaapConsumerConfiguration.consumerId:}") - public String consumerId; - - @Value("${dmaap.dmaapConsumerConfiguration.consumerGroup:}") - public String consumerGroup; - - @Value("${dmaap.dmaapConsumerConfiguration.timeoutMs:}") - public Integer consumerTimeoutMs; - - @Value("${dmaap.dmaapConsumerConfiguration.message-limit:}") - public Integer consumerMessageLimit; - - @Value("${dmaap.dmaapProducerConfiguration.dmaapHostName:}") - public String producerDmaapHostName; - - @Value("${dmaap.dmaapProducerConfiguration.dmaapPortNumber:}") - public Integer producerDmaapPortNumber; - - @Value("${dmaap.dmaapProducerConfiguration.dmaapTopicName:}") - public String producerDmaapTopicName; - - @Value("${dmaap.dmaapProducerConfiguration.dmaapProtocol:}") - public String producerDmaapProtocol; - - @Value("${dmaap.dmaapProducerConfiguration.dmaapUserName:}") - public String producerDmaapUserName; - - @Value("${dmaap.dmaapProducerConfiguration.dmaapUserPassword:}") - public String producerDmaapUserPassword; - - @Value("${dmaap.dmaapProducerConfiguration.dmaapContentType:}") - public String producerDmaapContentType; - - @Value("${aai.aaiClientConfiguration.aaiHost:}") - public String aaiHost; - - @Value("${aai.aaiClientConfiguration.aaiHostPortNumber:}") - public Integer aaiPort; - - @Value("${aai.aaiClientConfiguration.aaiProtocol:}") - public String aaiProtocol; - - @Value("${aai.aaiClientConfiguration.aaiUserName:}") - public String aaiUserName; - - @Value("${aai.aaiClientConfiguration.aaiUserPassword:}") - public String aaiUserPassword; - - @Value("${aai.aaiClientConfiguration.aaiIgnoreSslCertificateErrors:}") - public Boolean aaiIgnoreSslCertificateErrors; - - @Value("${aai.aaiClientConfiguration.aaiBasePath:}") - public String aaiBasePath; - - @Value("${aai.aaiClientConfiguration.aaiPnfPath:}") - public String aaiPnfPath; - - @Value("${security.trustStorePath:}") - public String trustStorePath; - - @Value("${security.trustStorePasswordPath:}") - public String trustStorePasswordPath; - - @Value("${security.keyStorePath:}") - public String keyStorePath; - - @Value("${security.keyStorePasswordPath:}") - public String keyStorePasswordPath; - - @Value("${security.enableAaiCertAuth:}") - public Boolean enableAaiCertAuth; - - @Value("${security.enableDmaapCertAuth:}") - public Boolean enableDmaapCertAuth; - - @Override - public DmaapConsumerConfiguration getDmaapConsumerConfiguration() { - if (noFileConfiguration(dmaapConsumerConfiguration)) { - return null; - } - return new ImmutableDmaapConsumerConfiguration.Builder() - .dmaapUserPassword( - Optional.ofNullable(consumerDmaapUserPassword).filter(isEmpty.negate()) - .orElse(dmaapConsumerConfiguration.dmaapUserPassword())) - .dmaapUserName( - Optional.ofNullable(consumerDmaapUserName).filter(isEmpty.negate()) - .orElse(dmaapConsumerConfiguration.dmaapUserName())) - .dmaapHostName( - Optional.ofNullable(consumerDmaapHostName).filter(isEmpty.negate()) - .orElse(dmaapConsumerConfiguration.dmaapHostName())) - .dmaapPortNumber( - Optional.ofNullable(consumerDmaapPortNumber).filter(p -> !p.toString().isEmpty()) - .orElse(dmaapConsumerConfiguration.dmaapPortNumber())) - .dmaapProtocol( - Optional.ofNullable(consumerDmaapProtocol).filter(isEmpty.negate()) - .orElse(dmaapConsumerConfiguration.dmaapProtocol())) - .dmaapContentType( - Optional.ofNullable(consumerDmaapContentType).filter(isEmpty.negate()) - .orElse(dmaapConsumerConfiguration.dmaapContentType())) - .dmaapTopicName( - Optional.ofNullable(consumerDmaapTopicName).filter(isEmpty.negate()) - .orElse(dmaapConsumerConfiguration.dmaapTopicName())) - .messageLimit( - Optional.ofNullable(consumerMessageLimit).filter(p -> !p.toString().isEmpty()) - .orElse(dmaapConsumerConfiguration.messageLimit())) - .timeoutMs(Optional.ofNullable(consumerTimeoutMs).filter(p -> !p.toString().isEmpty()) - .orElse(dmaapConsumerConfiguration.timeoutMs())) - .consumerGroup(Optional.ofNullable(consumerGroup).filter(isEmpty.negate()) - .orElse(dmaapConsumerConfiguration.consumerGroup())) - .consumerId(Optional.ofNullable(consumerId).filter(isEmpty.negate()) - .orElse(dmaapConsumerConfiguration.consumerId())) - .trustStorePath( - Optional.ofNullable(trustStorePath).filter(isEmpty.negate()) - .orElse(dmaapConsumerConfiguration.trustStorePath())) - .trustStorePasswordPath( - Optional.ofNullable(trustStorePasswordPath).filter(isEmpty.negate()) - .orElse(dmaapConsumerConfiguration.trustStorePasswordPath())) - .keyStorePath( - Optional.ofNullable(keyStorePath).filter(isEmpty.negate()) - .orElse(dmaapConsumerConfiguration.keyStorePath())) - .keyStorePasswordPath( - Optional.ofNullable(keyStorePasswordPath).filter(isEmpty.negate()) - .orElse(dmaapConsumerConfiguration.keyStorePasswordPath())) - .enableDmaapCertAuth( - Optional.ofNullable(enableDmaapCertAuth).filter(p -> !p.toString().isEmpty()) - .orElse(dmaapConsumerConfiguration.enableDmaapCertAuth())) - .build(); - } - - @Override - public AaiClientConfiguration getAaiClientConfiguration() { - if (noFileConfiguration(aaiClientConfiguration)) { - return null; - } - return new ImmutableAaiClientConfiguration.Builder() - .aaiHost(Optional.ofNullable(aaiHost).filter(isEmpty.negate()).orElse(aaiClientConfiguration.aaiHost())) - .aaiPort( - Optional.ofNullable(aaiPort).filter(p -> !p.toString().isEmpty()) - .orElse(aaiClientConfiguration.aaiPort())) - .aaiIgnoreSslCertificateErrors( - Optional.ofNullable(aaiIgnoreSslCertificateErrors).filter(p -> !p.toString().isEmpty()) - .orElse(aaiClientConfiguration.aaiIgnoreSslCertificateErrors())) - .aaiProtocol( - Optional.ofNullable(aaiProtocol).filter(isEmpty.negate()).orElse(aaiClientConfiguration.aaiProtocol())) - .aaiUserName( - Optional.ofNullable(aaiUserName).filter(isEmpty.negate()).orElse(aaiClientConfiguration.aaiUserName())) - .aaiUserPassword(Optional.ofNullable(aaiUserPassword).filter(isEmpty.negate()) - .orElse(aaiClientConfiguration.aaiUserPassword())) - .aaiBasePath(Optional.ofNullable(aaiBasePath).filter(isEmpty.negate()) - .orElse(aaiClientConfiguration.aaiBasePath())) - .aaiPnfPath( - Optional.ofNullable(aaiPnfPath).filter(isEmpty.negate()).orElse(aaiClientConfiguration.aaiPnfPath())) - .aaiHeaders(aaiClientConfiguration.aaiHeaders()) - .trustStorePath( - Optional.ofNullable(trustStorePath).filter(isEmpty.negate()) - .orElse(aaiClientConfiguration.trustStorePath())) - .trustStorePasswordPath( - Optional.ofNullable(trustStorePasswordPath).filter(isEmpty.negate()) - .orElse(aaiClientConfiguration.trustStorePasswordPath())) - .keyStorePath( - Optional.ofNullable(keyStorePath).filter(isEmpty.negate()) - .orElse(aaiClientConfiguration.keyStorePath())) - .keyStorePasswordPath( - Optional.ofNullable(keyStorePasswordPath).filter(isEmpty.negate()) - .orElse(aaiClientConfiguration.keyStorePasswordPath())) - .enableAaiCertAuth( - Optional.ofNullable(enableAaiCertAuth).filter(p -> !p.toString().isEmpty()) - .orElse(aaiClientConfiguration.enableAaiCertAuth())) - .build(); - } - - @Override - public DmaapPublisherConfiguration getDmaapPublisherConfiguration() { - if (noFileConfiguration(dmaapPublisherConfiguration)) { - return null; - } - return new ImmutableDmaapPublisherConfiguration.Builder() - .dmaapContentType( - Optional.ofNullable(producerDmaapContentType).filter(isEmpty.negate()) - .orElse(dmaapPublisherConfiguration.dmaapContentType())) - .dmaapHostName( - Optional.ofNullable(producerDmaapHostName).filter(isEmpty.negate()) - .orElse(dmaapPublisherConfiguration.dmaapHostName())) - .dmaapPortNumber( - Optional.ofNullable(producerDmaapPortNumber).filter(p -> !p.toString().isEmpty()) - .orElse(dmaapPublisherConfiguration.dmaapPortNumber())) - .dmaapProtocol( - Optional.ofNullable(producerDmaapProtocol).filter(isEmpty.negate()) - .orElse(dmaapPublisherConfiguration.dmaapProtocol())) - .dmaapTopicName( - Optional.ofNullable(producerDmaapTopicName).filter(isEmpty.negate()) - .orElse(dmaapPublisherConfiguration.dmaapTopicName())) - .dmaapUserName( - Optional.ofNullable(producerDmaapUserName).filter(isEmpty.negate()) - .orElse(dmaapPublisherConfiguration.dmaapUserName())) - .dmaapUserPassword( - Optional.ofNullable(producerDmaapUserPassword).filter(isEmpty.negate()) - .orElse(dmaapPublisherConfiguration.dmaapUserPassword())) - .trustStorePath( - Optional.ofNullable(trustStorePath).filter(isEmpty.negate()) - .orElse(dmaapPublisherConfiguration.trustStorePath())) - .trustStorePasswordPath( - Optional.ofNullable(trustStorePasswordPath).filter(isEmpty.negate()) - .orElse(dmaapPublisherConfiguration.trustStorePasswordPath())) - .keyStorePath( - Optional.ofNullable(keyStorePath).filter(isEmpty.negate()) - .orElse(dmaapPublisherConfiguration.keyStorePath())) - .keyStorePasswordPath( - Optional.ofNullable(keyStorePasswordPath).filter(isEmpty.negate()) - .orElse(dmaapPublisherConfiguration.keyStorePasswordPath())) - .enableDmaapCertAuth( - Optional.ofNullable(enableDmaapCertAuth).filter(p -> !p.toString().isEmpty()) - .orElse(dmaapPublisherConfiguration.enableDmaapCertAuth())) - .build(); - } - - private boolean noFileConfiguration(Object object) { - return Objects.isNull(object); - } -} diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CbsConfigRefreshScheduler.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CbsConfigRefreshScheduler.java new file mode 100644 index 00000000..d87f21ed --- /dev/null +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CbsConfigRefreshScheduler.java @@ -0,0 +1,97 @@ +/* + * ============LICENSE_START======================================================= + * PNF-REGISTRATION-HANDLER + * ================================================================================ + * Copyright (C) 2019 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.services.prh.configuration; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.cloud.context.environment.EnvironmentChangeEvent; +import org.springframework.cloud.context.refresh.ContextRefresher; +import org.springframework.context.event.EventListener; +import org.springframework.core.env.Environment; +import org.springframework.stereotype.Component; +import reactor.core.Disposable; +import reactor.core.publisher.Flux; +import reactor.core.scheduler.Scheduler; +import reactor.core.scheduler.Schedulers; + +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import java.time.Duration; + +@Component +public class CbsConfigRefreshScheduler { + + private static final Logger LOGGER = LoggerFactory.getLogger(CbsConfigRefreshScheduler.class); + private static final String CBS_UPDATES_INTERVAL_PROPERTY = "cbs.updates-interval"; + private static final Duration NO_UPDATES = Duration.ZERO; + + private final ContextRefresher contextRefresher; + private final Environment environment; + private final Scheduler scheduler; + private volatile Disposable refreshEventsStreamHandle; + + + public CbsConfigRefreshScheduler(ContextRefresher contextRefresher, Environment environment) { + this.contextRefresher = contextRefresher; + this.environment = environment; + this.scheduler = Schedulers.newElastic("conf-updates"); + } + + @PostConstruct + public void startPollingForCbsUpdates() { + startPollingForCbsUpdates(getCbsUpdatesInterval()); + } + + private void startPollingForCbsUpdates(Duration updatesInterval) { + if (!updatesInterval.equals(NO_UPDATES)) { + LOGGER.info("Configuring pulling for CBS updates in every {}", updatesInterval); + refreshEventsStreamHandle = Flux.interval(updatesInterval, scheduler) + .doOnNext(i -> { + LOGGER.debug("Requesting context refresh"); + contextRefresher.refresh(); + }) + .onErrorContinue((e, o) -> LOGGER.error("Failed fetching config updates from CBS", e)) + .subscribe(); + } + } + + @EventListener + public void onEnvironmentChanged(EnvironmentChangeEvent event) { + if (event.getKeys().contains(CBS_UPDATES_INTERVAL_PROPERTY)) { + LOGGER.info("CBS config polling interval changed to {}", environment.getProperty(CBS_UPDATES_INTERVAL_PROPERTY)); + stopPollingForCbsUpdates(); + startPollingForCbsUpdates(getCbsUpdatesInterval()); + } + } + + private Duration getCbsUpdatesInterval() { + return environment.getProperty(CBS_UPDATES_INTERVAL_PROPERTY, Duration.class, NO_UPDATES); + } + + @PreDestroy + private void stopPollingForCbsUpdates() { + if(refreshEventsStreamHandle != null) { + LOGGER.debug("Stopping pulling for CBS updates"); + refreshEventsStreamHandle.dispose(); + } + } + +} diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CbsConfiguration.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CbsConfiguration.java new file mode 100644 index 00000000..c1226359 --- /dev/null +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CbsConfiguration.java @@ -0,0 +1,93 @@ +/* + * ============LICENSE_START======================================================= + * PNF-REGISTRATION-HANDLER + * ================================================================================ + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.services.prh.configuration; + +import com.google.gson.JsonObject; +import org.onap.dcaegen2.services.sdk.rest.services.aai.client.config.AaiClientConfiguration; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DmaapClientFactory; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Optional; + + +public class CbsConfiguration implements Config { + private static final Logger LOGGER = LoggerFactory.getLogger(CbsConfiguration.class); + private static final String CBS_CONFIG_MISSING = "CBS config missing"; + private AaiClientConfiguration aaiClientCBSConfiguration; + private MessageRouterPublisher messageRouterPublisher; + private MessageRouterSubscriber messageRouterSubscriber; + private MessageRouterPublishRequest messageRouterCBSPublishRequest; + private MessageRouterSubscribeRequest messageRouterCBSSubscribeRequest; + private MessageRouterPublishRequest messageRouterCBSUpdatePublishRequest; + + + public void parseCBSConfig(JsonObject jsonObject) { + LOGGER.info("Received application configuration: {}", jsonObject); + CbsContentParser consulConfigurationParser = new CbsContentParser(jsonObject); + + aaiClientCBSConfiguration = consulConfigurationParser.getAaiClientConfig(); + + messageRouterPublisher = DmaapClientFactory.createMessageRouterPublisher( + consulConfigurationParser.getMessageRouterPublisherConfig()); + messageRouterCBSPublishRequest = consulConfigurationParser.getMessageRouterPublishRequest(); + messageRouterCBSUpdatePublishRequest = consulConfigurationParser.getMessageRouterUpdatePublishRequest(); + + messageRouterSubscriber = DmaapClientFactory.createMessageRouterSubscriber( + consulConfigurationParser.getMessageRouterSubscriberConfig()); + messageRouterCBSSubscribeRequest = consulConfigurationParser.getMessageRouterSubscribeRequest(); + } + + + @Override + public MessageRouterPublisher getMessageRouterPublisher() { + return Optional.ofNullable(messageRouterPublisher).orElseThrow(() -> new RuntimeException(CBS_CONFIG_MISSING)); + } + + @Override + public MessageRouterSubscriber getMessageRouterSubscriber() { + return Optional.ofNullable(messageRouterSubscriber).orElseThrow(() -> new RuntimeException(CBS_CONFIG_MISSING)); + } + + @Override + public MessageRouterPublishRequest getMessageRouterPublishRequest() { + return Optional.ofNullable(messageRouterCBSPublishRequest).orElseThrow(() -> new RuntimeException(CBS_CONFIG_MISSING)); + } + + @Override + public MessageRouterPublishRequest getMessageRouterUpdatePublishRequest() { + return Optional.ofNullable(messageRouterCBSUpdatePublishRequest).orElseThrow(() -> new RuntimeException(CBS_CONFIG_MISSING)); + } + + @Override + public AaiClientConfiguration getAaiClientConfiguration() { + return Optional.ofNullable(aaiClientCBSConfiguration).orElseThrow(() -> new RuntimeException(CBS_CONFIG_MISSING)); + } + + @Override + public MessageRouterSubscribeRequest getMessageRouterSubscribeRequest() { + return Optional.ofNullable(messageRouterCBSSubscribeRequest).orElseThrow(() -> new RuntimeException(CBS_CONFIG_MISSING)); + } +} diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CbsContentParser.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CbsContentParser.java new file mode 100644 index 00000000..85b4c035 --- /dev/null +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CbsContentParser.java @@ -0,0 +1,142 @@ +/* + * ============LICENSE_START======================================================= + * PNF-REGISTRATION-HANDLER + * ================================================================================ + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.services.prh.configuration; + +import com.google.common.reflect.TypeToken; +import com.google.gson.Gson; +import com.google.gson.JsonObject; +import org.onap.dcaegen2.services.sdk.model.streams.RawDataStream; +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink; +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSource; +import org.onap.dcaegen2.services.sdk.rest.services.aai.client.config.AaiClientConfiguration; +import org.onap.dcaegen2.services.sdk.rest.services.aai.client.config.ImmutableAaiClientConfiguration; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.DataStreams; +import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableMessageRouterPublisherConfig; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableMessageRouterSubscriberConfig; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig; +import org.onap.dcaegen2.services.sdk.security.ssl.ImmutableSecurityKeys; +import org.onap.dcaegen2.services.sdk.security.ssl.ImmutableSecurityKeysStore; +import org.onap.dcaegen2.services.sdk.security.ssl.Passwords; +import org.onap.dcaegen2.services.sdk.security.ssl.SecurityKeys; + +import java.nio.file.Paths; +import java.time.Duration; +import java.util.Map; + +import static org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamPredicates.streamWithName; + +/** + * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 8/21/18 + */ +class CbsContentParser { + private static final String SECURITY_TRUST_STORE_PATH = "security.trustStorePath"; + private static final String SECURITY_TRUST_STORE_PASS_PATH = "security.trustStorePasswordPath"; + private static final String SECURITY_KEY_STORE_PATH = "security.keyStorePath"; + private static final String SECURITY_KEY_STORE_PASS_PATH = "security.keyStorePasswordPath"; + private static final String CONFIG = "config"; + private static final String PNF_UPDATE = "pnf-update"; + private static final String PNF_READY = "pnf-ready"; + private static final String VES_REG_OUTPUT = "ves-reg-output"; + + private final JsonObject jsonObject; + + CbsContentParser(JsonObject jsonObject) { + this.jsonObject = jsonObject.getAsJsonObject(CONFIG); + } + + MessageRouterPublishRequest getMessageRouterPublishRequest() { + return getMessageRouterPublishRequest(PNF_READY); + } + + MessageRouterPublishRequest getMessageRouterUpdatePublishRequest() { + return getMessageRouterPublishRequest(PNF_UPDATE); + } + + private MessageRouterPublishRequest getMessageRouterPublishRequest(String streamName) { + RawDataStream<JsonObject> sink = DataStreams.namedSinks(jsonObject).find(streamWithName(streamName)).get(); + MessageRouterSink parsedSink = StreamFromGsonParsers.messageRouterSinkParser().unsafeParse(sink); + + return ImmutableMessageRouterPublishRequest.builder() + .sinkDefinition(parsedSink) + .build(); + } + + MessageRouterPublisherConfig getMessageRouterPublisherConfig() { + return ImmutableMessageRouterPublisherConfig.builder() + .securityKeys(isDmaapCertAuthEnabled(jsonObject) ? createSecurityKeys(jsonObject) : null) + .build(); + } + + MessageRouterSubscriberConfig getMessageRouterSubscriberConfig() { + return ImmutableMessageRouterSubscriberConfig.builder() + .securityKeys(isDmaapCertAuthEnabled(jsonObject) ? createSecurityKeys(jsonObject) : null) + .build(); + } + + private SecurityKeys createSecurityKeys(JsonObject config) { + return ImmutableSecurityKeys.builder() + .keyStore(ImmutableSecurityKeysStore.of(Paths.get(config.get(SECURITY_KEY_STORE_PATH).getAsString()))) + .keyStorePassword(Passwords.fromPath(Paths.get(config.get(SECURITY_KEY_STORE_PASS_PATH).getAsString()))) + .trustStore(ImmutableSecurityKeysStore.of(Paths.get(config.get(SECURITY_TRUST_STORE_PATH).getAsString()))) + .trustStorePassword(Passwords.fromPath(Paths.get(config.get(SECURITY_TRUST_STORE_PASS_PATH).getAsString()))) + .build(); + } + + private boolean isDmaapCertAuthEnabled(JsonObject config) { + return config.get("security.enableDmaapCertAuth").getAsBoolean(); + } + + AaiClientConfiguration getAaiClientConfig() { + return new ImmutableAaiClientConfiguration.Builder() + .pnfUrl(jsonObject.get("aai.aaiClientConfiguration.pnfUrl").getAsString()) + .aaiUserName(jsonObject.get("aai.aaiClientConfiguration.aaiUserName").getAsString()) + .aaiServiceInstancePath(jsonObject.get("aai.aaiClientConfiguration.aaiServiceInstancePath").getAsString()) + .aaiIgnoreSslCertificateErrors( + jsonObject.get("aai.aaiClientConfiguration.aaiIgnoreSslCertificateErrors").getAsBoolean()) + .aaiUserPassword(jsonObject.get("aai.aaiClientConfiguration.aaiUserPassword").getAsString()) + .trustStorePath(jsonObject.get(SECURITY_TRUST_STORE_PATH).getAsString()) + .trustStorePasswordPath(jsonObject.get(SECURITY_TRUST_STORE_PASS_PATH).getAsString()) + .keyStorePath(jsonObject.get(SECURITY_KEY_STORE_PATH).getAsString()) + .keyStorePasswordPath(jsonObject.get(SECURITY_KEY_STORE_PASS_PATH).getAsString()) + .enableAaiCertAuth(jsonObject.get("security.enableAaiCertAuth").getAsBoolean()) + .aaiHeaders(new Gson().fromJson(jsonObject.get("aai.aaiClientConfiguration.aaiHeaders"), + new TypeToken<Map<String, String>>(){}.getType())) + .build(); + } + + MessageRouterSubscribeRequest getMessageRouterSubscribeRequest() { + RawDataStream<JsonObject> source = DataStreams.namedSources(jsonObject).find(streamWithName(VES_REG_OUTPUT)).get(); + MessageRouterSource parsedSource = StreamFromGsonParsers.messageRouterSourceParser().unsafeParse(source); + + return ImmutableMessageRouterSubscribeRequest.builder() + .consumerGroup(jsonObject.get("dmaap.dmaapConsumerConfiguration.consumerGroup").getAsString()) + .sourceDefinition(parsedSource) + .consumerId(jsonObject.get("dmaap.dmaapConsumerConfiguration.consumerId").getAsString()) + .timeout(Duration.ofMillis(jsonObject.get("dmaap.dmaapConsumerConfiguration.timeoutMs").getAsLong())) + .build(); + } +}
\ No newline at end of file diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CloudConfigParser.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CloudConfigParser.java deleted file mode 100644 index 9d7b3396..00000000 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CloudConfigParser.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * PNF-REGISTRATION-HANDLER - * ================================================================================ - * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.dcaegen2.services.prh.configuration; - -import com.google.gson.JsonObject; -import org.onap.dcaegen2.services.sdk.rest.services.aai.client.config.AaiClientConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.aai.client.config.ImmutableAaiClientConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapConsumerConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.ImmutableDmaapPublisherConfiguration; - -/** - * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 8/21/18 - */ -class CloudConfigParser { - - private static final String SECURITY_TRUST_STORE_PATH = "security.trustStorePath"; - private static final String SECURITY_TRUST_STORE_PASS_PATH = "security.trustStorePasswordPath"; - private static final String SECURITY_KEY_STORE_PATH = "security.keyStorePath"; - private static final String SECURITY_KEY_STORE_PASS_PATH = "security.keyStorePasswordPath"; - private final JsonObject jsonObject; - - CloudConfigParser(JsonObject jsonObject) { - this.jsonObject = jsonObject; - } - - DmaapPublisherConfiguration getDmaapPublisherConfig() { - return new ImmutableDmaapPublisherConfiguration.Builder() - .dmaapTopicName(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapTopicName").getAsString()) - .dmaapUserPassword(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapUserPassword").getAsString()) - .dmaapPortNumber(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapPortNumber").getAsInt()) - .dmaapProtocol(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapProtocol").getAsString()) - .dmaapContentType(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapContentType").getAsString()) - .dmaapHostName(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapHostName").getAsString()) - .dmaapUserName(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapUserName").getAsString()) - .trustStorePath(jsonObject.get(SECURITY_TRUST_STORE_PATH).getAsString()) - .trustStorePasswordPath(jsonObject.get(SECURITY_TRUST_STORE_PASS_PATH).getAsString()) - .keyStorePath(jsonObject.get(SECURITY_KEY_STORE_PATH).getAsString()) - .keyStorePasswordPath(jsonObject.get(SECURITY_KEY_STORE_PASS_PATH).getAsString()) - .enableDmaapCertAuth(jsonObject.get("security.enableDmaapCertAuth").getAsBoolean()) - .build(); - } - - AaiClientConfiguration getAaiClientConfig() { - return new ImmutableAaiClientConfiguration.Builder() - .aaiHost(jsonObject.get("aai.aaiClientConfiguration.aaiHost").getAsString()) - .aaiPort(jsonObject.get("aai.aaiClientConfiguration.aaiHostPortNumber").getAsInt()) - .aaiUserName(jsonObject.get("aai.aaiClientConfiguration.aaiUserName").getAsString()) - .aaiPnfPath(jsonObject.get("aai.aaiClientConfiguration.aaiPnfPath").getAsString()) - .aaiIgnoreSslCertificateErrors( - jsonObject.get("aai.aaiClientConfiguration.aaiIgnoreSslCertificateErrors").getAsBoolean()) - .aaiUserPassword(jsonObject.get("aai.aaiClientConfiguration.aaiUserPassword").getAsString()) - .aaiProtocol(jsonObject.get("aai.aaiClientConfiguration.aaiProtocol").getAsString()) - .aaiBasePath(jsonObject.get("aai.aaiClientConfiguration.aaiBasePath").getAsString()) - .trustStorePath(jsonObject.get(SECURITY_TRUST_STORE_PATH).getAsString()) - .trustStorePasswordPath(jsonObject.get(SECURITY_TRUST_STORE_PASS_PATH).getAsString()) - .keyStorePath(jsonObject.get(SECURITY_KEY_STORE_PATH).getAsString()) - .keyStorePasswordPath(jsonObject.get(SECURITY_KEY_STORE_PASS_PATH).getAsString()) - .enableAaiCertAuth(jsonObject.get("security.enableAaiCertAuth").getAsBoolean()) - .build(); - } - - DmaapConsumerConfiguration getDmaapConsumerConfig() { - return new ImmutableDmaapConsumerConfiguration.Builder() - .timeoutMs(jsonObject.get("dmaap.dmaapConsumerConfiguration.timeoutMs").getAsInt()) - .dmaapHostName(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapHostName").getAsString()) - .dmaapUserName(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapUserName").getAsString()) - .dmaapUserPassword(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapUserPassword").getAsString()) - .dmaapTopicName(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapTopicName").getAsString()) - .dmaapPortNumber(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapPortNumber").getAsInt()) - .dmaapContentType(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapContentType").getAsString()) - .messageLimit(jsonObject.get("dmaap.dmaapConsumerConfiguration.messageLimit").getAsInt()) - .dmaapProtocol(jsonObject.get("dmaap.dmaapConsumerConfiguration.dmaapProtocol").getAsString()) - .consumerId(jsonObject.get("dmaap.dmaapConsumerConfiguration.consumerId").getAsString()) - .consumerGroup(jsonObject.get("dmaap.dmaapConsumerConfiguration.consumerGroup").getAsString()) - .trustStorePath(jsonObject.get(SECURITY_TRUST_STORE_PATH).getAsString()) - .trustStorePasswordPath(jsonObject.get(SECURITY_TRUST_STORE_PASS_PATH).getAsString()) - .keyStorePath(jsonObject.get(SECURITY_KEY_STORE_PATH).getAsString()) - .keyStorePasswordPath(jsonObject.get(SECURITY_KEY_STORE_PASS_PATH).getAsString()) - .enableDmaapCertAuth(jsonObject.get("security.enableDmaapCertAuth").getAsBoolean()) - .build(); - } -}
\ No newline at end of file diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CloudConfiguration.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CloudConfiguration.java deleted file mode 100644 index 08c99621..00000000 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CloudConfiguration.java +++ /dev/null @@ -1,112 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * PNF-REGISTRATION-HANDLER - * ================================================================================ - * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.dcaegen2.services.prh.configuration; - -import com.google.gson.JsonObject; -import org.onap.dcaegen2.services.sdk.rest.services.aai.client.config.AaiClientConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.aai.client.config.ImmutableAaiClientConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.EnvProperties; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.providers.CloudConfigurationClient; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.boot.context.properties.EnableConfigurationProperties; -import org.springframework.context.annotation.*; -import org.springframework.scheduling.annotation.EnableScheduling; -import reactor.core.publisher.Flux; -import reactor.core.scheduler.Schedulers; - -import java.util.Optional; -import java.util.Properties; - -/** - * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 8/9/18 - */ -@Configuration -@ComponentScan("org.onap.dcaegen2.services.sdk.rest.services.cbs.client.providers") -@EnableConfigurationProperties -@EnableScheduling -@Primary -public class CloudConfiguration extends AppConfig { - - private static final Logger LOGGER = LoggerFactory.getLogger(CloudConfiguration.class); - private CloudConfigurationClient prhConfigurationProvider; - - private AaiClientConfiguration aaiClientCloudConfiguration; - private DmaapPublisherConfiguration dmaapPublisherCloudConfiguration; - private DmaapConsumerConfiguration dmaapConsumerCloudConfiguration; - - @Value("#{systemEnvironment}") - private Properties systemEnvironment; - - - @Autowired - public void setThreadPoolTaskScheduler(CloudConfigurationClient prhConfigurationProvider) { - this.prhConfigurationProvider = prhConfigurationProvider; - } - - public void runTask() { - Flux.defer(() -> EnvironmentProcessor.evaluate(systemEnvironment)) - .subscribeOn(Schedulers.parallel()) - .subscribe(this::parsingConfigSuccess, this::parsingConfigError); - } - - private void parsingConfigError(Throwable throwable) { - LOGGER.warn("Failed to process system environments", throwable); - } - - private void cloudConfigError(Throwable throwable) { - LOGGER.warn("Failed to gather configuration from ConfigBindingService/Consul", throwable); - } - - private void parsingConfigSuccess(EnvProperties envProperties) { - LOGGER.debug("Fetching PRH configuration from ConfigBindingService/Consul"); - prhConfigurationProvider.callForServiceConfigurationReactive(envProperties) - .subscribe(this::parseCloudConfig, this::cloudConfigError); - } - - private void parseCloudConfig(JsonObject jsonObject) { - LOGGER.info("Received application configuration: {}", jsonObject); - CloudConfigParser cloudConfigParser = new CloudConfigParser(jsonObject); - dmaapPublisherCloudConfiguration = cloudConfigParser.getDmaapPublisherConfig(); - aaiClientCloudConfiguration = ImmutableAaiClientConfiguration.copyOf(cloudConfigParser.getAaiClientConfig()) - .withAaiHeaders(aaiClientConfiguration.aaiHeaders()); - dmaapConsumerCloudConfiguration = cloudConfigParser.getDmaapConsumerConfig(); - } - - @Override - public DmaapPublisherConfiguration getDmaapPublisherConfiguration() { - return Optional.ofNullable(dmaapPublisherCloudConfiguration).orElse(super.getDmaapPublisherConfiguration()); - } - - @Override - public AaiClientConfiguration getAaiClientConfiguration() { - return Optional.ofNullable(aaiClientCloudConfiguration).orElse(super.getAaiClientConfiguration()); - } - - @Override - public DmaapConsumerConfiguration getDmaapConsumerConfiguration() { - return Optional.ofNullable(dmaapConsumerCloudConfiguration).orElse(super.getDmaapConsumerConfiguration()); - } -} diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/Config.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/Config.java index 613e9a83..5fe6d703 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/Config.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/Config.java @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * PNF-REGISTRATION-HANDLER * ================================================================================ - * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * Copyright (C) 2018-2019 NOKIA Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,19 +21,25 @@ package org.onap.dcaegen2.services.prh.configuration; import org.onap.dcaegen2.services.sdk.rest.services.aai.client.config.AaiClientConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest; /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/25/18 */ public interface Config { - DmaapConsumerConfiguration getDmaapConsumerConfiguration(); + MessageRouterSubscribeRequest getMessageRouterSubscribeRequest(); AaiClientConfiguration getAaiClientConfiguration(); - DmaapPublisherConfiguration getDmaapPublisherConfiguration(); + MessageRouterPublishRequest getMessageRouterPublishRequest(); - void initFileStreamReader(); + MessageRouterPublishRequest getMessageRouterUpdatePublishRequest(); + + MessageRouterPublisher getMessageRouterPublisher(); + + MessageRouterSubscriber getMessageRouterSubscriber(); } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/DmaapPublisherTaskConfig.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/DmaapPublisherTaskConfig.java new file mode 100644 index 00000000..e5eb1e66 --- /dev/null +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/DmaapPublisherTaskConfig.java @@ -0,0 +1,44 @@ +/* + * ============LICENSE_START======================================================= + * PNF-REGISTRATION-HANDLER + * ================================================================================ + * Copyright (C) 2018-2019 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.services.prh.configuration; + +import org.onap.dcaegen2.services.prh.tasks.DmaapPublisherTask; +import org.onap.dcaegen2.services.prh.tasks.DmaapPublisherTaskImpl; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class DmaapPublisherTaskConfig { + @Bean(name = "ReadyPublisherTask") + @Autowired + public DmaapPublisherTask getReadyPublisherTask(Config config) { + return new DmaapPublisherTaskImpl( + config::getMessageRouterPublishRequest, config::getMessageRouterPublisher); + } + + @Bean(name = "UpdatePublisherTask") + @Autowired + public DmaapPublisherTask getUpdatePublisherTask(Config config) { + return new DmaapPublisherTaskImpl( + config::getMessageRouterUpdatePublishRequest, config::getMessageRouterPublisher); + } +} diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/EnvironmentProcessor.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/EnvironmentProcessor.java deleted file mode 100644 index 793fcc27..00000000 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/EnvironmentProcessor.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * PNF-REGISTRATION-HANDLER - * ================================================================================ - * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.dcaegen2.services.prh.configuration; - -import org.onap.dcaegen2.services.prh.exceptions.EnvironmentLoaderException; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.EnvProperties; -import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.http.configuration.ImmutableEnvProperties; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import reactor.core.publisher.Mono; - -import java.util.Optional; -import java.util.Properties; - -/** - * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 8/10/18 - */ -final class EnvironmentProcessor { - - private static final Logger LOGGER = LoggerFactory.getLogger(EnvironmentProcessor.class); - private static final int DEFAULT_CONSUL_PORT = 8500; - - private EnvironmentProcessor() { - } - - static Mono<EnvProperties> evaluate(Properties systemEnvironment) { - LOGGER.debug("Loading configuration from system environment variables"); - EnvProperties envProperties; - try { - envProperties = ImmutableEnvProperties.builder().consulHost(getConsulHost(systemEnvironment)) - .consulPort(getConsultPort(systemEnvironment)).cbsName(getConfigBindingService(systemEnvironment)) - .appName(getService(systemEnvironment)).build(); - } catch (EnvironmentLoaderException e) { - return Mono.error(e); - } - LOGGER.info("Evaluated system environment variables: {}", envProperties); - return Mono.just(envProperties); - } - - private static String getConsulHost(Properties systemEnvironments) throws EnvironmentLoaderException { - return Optional.ofNullable(systemEnvironments.getProperty("CONSUL_HOST")) - .orElseThrow(() -> new EnvironmentLoaderException("$CONSUL_HOST environment has not been defined")); - } - - private static Integer getConsultPort(Properties systemEnvironments) { - return Optional.ofNullable(systemEnvironments.getProperty("CONSUL_PORT")).map(Integer::valueOf) - .orElseGet(EnvironmentProcessor::getDefaultPortOfConsul); - } - - private static String getConfigBindingService(Properties systemEnvironments) throws EnvironmentLoaderException { - return Optional.ofNullable(systemEnvironments.getProperty("CONFIG_BINDING_SERVICE")) - .orElseThrow( - () -> new EnvironmentLoaderException("$CONFIG_BINDING_SERVICE environment has not been defined")); - } - - private static String getService(Properties systemEnvironments) throws EnvironmentLoaderException { - return Optional.ofNullable(Optional.ofNullable(systemEnvironments.getProperty("HOSTNAME")) - .orElse(systemEnvironments.getProperty("SERVICE_NAME"))) - .orElseThrow(() -> new EnvironmentLoaderException( - "Neither $HOSTNAME/$SERVICE_NAME have not been defined as system environment")); - } - - private static Integer getDefaultPortOfConsul() { - LOGGER.warn("$CONSUL_PORT environment has not been defined, using default port: {}", DEFAULT_CONSUL_PORT); - return DEFAULT_CONSUL_PORT; - } -} diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/PrhAppConfig.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/PrhAppConfig.java index 612fab48..c21fd400 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/PrhAppConfig.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/PrhAppConfig.java @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * PNF-REGISTRATION-HANDLER * ================================================================================ - * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * Copyright (C) 2018-2019 NOKIA Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,136 +20,38 @@ package org.onap.dcaegen2.services.prh.configuration; -import com.google.gson.GsonBuilder; -import com.google.gson.JsonElement; -import com.google.gson.JsonObject; -import com.google.gson.JsonParser; -import com.google.gson.JsonSyntaxException; -import com.google.gson.TypeAdapterFactory; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.nio.charset.StandardCharsets; -import java.util.ServiceLoader; -import javax.validation.constraints.NotNull; -import org.onap.dcaegen2.services.sdk.rest.services.aai.client.config.AaiClientConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapConsumerConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; -import org.springframework.boot.context.properties.ConfigurationProperties; -import org.springframework.boot.context.properties.EnableConfigurationProperties; -import org.springframework.context.annotation.Configuration; +import org.springframework.boot.context.event.ApplicationStartedEvent; +import org.springframework.context.event.EventListener; import org.springframework.core.io.Resource; +import org.springframework.stereotype.Component; +import org.springframework.util.StreamUtils; + +import java.io.IOException; +import java.nio.charset.Charset; /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/9/18 */ -@Configuration -@EnableConfigurationProperties -@ConfigurationProperties("app") -public abstract class PrhAppConfig implements Config { - - private static final String CONFIG = "configs"; - private static final String AAI = "aai"; - private static final String DMAAP = "dmaap"; - private static final String AAI_CONFIG = "aaiClientConfiguration"; - private static final String DMAAP_PRODUCER = "dmaapProducerConfiguration"; - private static final String DMAAP_CONSUMER = "dmaapConsumerConfiguration"; - private static final String SECURITY = "security"; - +@Component +public class PrhAppConfig { private static final Logger LOGGER = LoggerFactory.getLogger(PrhAppConfig.class); - AaiClientConfiguration aaiClientConfiguration; - - DmaapConsumerConfiguration dmaapConsumerConfiguration; - - DmaapPublisherConfiguration dmaapPublisherConfiguration; - - @Value("classpath:prh_endpoints.json") - private Resource resourceFile; - - @Override - public DmaapConsumerConfiguration getDmaapConsumerConfiguration() { - return dmaapConsumerConfiguration; - } - - @Override - public AaiClientConfiguration getAaiClientConfiguration() { - return aaiClientConfiguration; - } - - @Override - public DmaapPublisherConfiguration getDmaapPublisherConfiguration() { - return dmaapPublisherConfiguration; - } - - @Override - public void initFileStreamReader() { + @Value("classpath:git_info.json") + private Resource gitInfo; - GsonBuilder gsonBuilder = new GsonBuilder(); - ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory); - JsonParser parser = new JsonParser(); - - try (InputStream inputStream = resourceFile.getInputStream()) { - JsonElement rootElement = getJsonElement(parser, inputStream); - if (rootElement.isJsonObject()) { - deserializeAaiConfiguration(gsonBuilder, rootElement); - deserializeDmaapConsumerConfiguration(gsonBuilder, rootElement); - deserializeDmaapPublisherConfiguration(gsonBuilder, rootElement); - } - } - catch (IOException e) { - LOGGER.warn("Problem with file loading, file ", e); - } - } - - private void deserializeDmaapPublisherConfiguration(GsonBuilder gsonBuilder, JsonElement rootElement) { - dmaapPublisherConfiguration = deserializeType(gsonBuilder, concatenateJsonObjects( - rootElement.getAsJsonObject().getAsJsonObject(CONFIG).getAsJsonObject(DMAAP) - .getAsJsonObject(DMAAP_PRODUCER), - rootElement.getAsJsonObject().getAsJsonObject(CONFIG).getAsJsonObject(SECURITY)), - DmaapPublisherConfiguration.class); - } - - private void deserializeDmaapConsumerConfiguration(GsonBuilder gsonBuilder, JsonElement rootElement) { - dmaapConsumerConfiguration = deserializeType(gsonBuilder, concatenateJsonObjects( - rootElement.getAsJsonObject().getAsJsonObject(CONFIG).getAsJsonObject(DMAAP) - .getAsJsonObject(DMAAP_CONSUMER), - rootElement.getAsJsonObject().getAsJsonObject(CONFIG).getAsJsonObject(SECURITY)), - DmaapConsumerConfiguration.class); - } - - private void deserializeAaiConfiguration(GsonBuilder gsonBuilder, JsonElement rootElement) { - aaiClientConfiguration = deserializeType(gsonBuilder, concatenateJsonObjects( - rootElement.getAsJsonObject().getAsJsonObject(CONFIG).getAsJsonObject(AAI).getAsJsonObject(AAI_CONFIG), - rootElement.getAsJsonObject().getAsJsonObject(CONFIG).getAsJsonObject(SECURITY)), - AaiClientConfiguration.class); - } - - JsonElement getJsonElement(JsonParser parser, InputStream inputStream) { - return parser.parse(new InputStreamReader(inputStream, StandardCharsets.UTF_8)); - } - - private JsonObject concatenateJsonObjects(JsonObject target, JsonObject source) { - source.entrySet() - .forEach(entry -> target.add(entry.getKey(), entry.getValue())); - return target; - } - private <T> T deserializeType(@NotNull GsonBuilder gsonBuilder, @NotNull JsonObject jsonObject, - @NotNull Class<T> type) { - try { - return gsonBuilder.create().fromJson(jsonObject, type); - } catch (JsonSyntaxException e) { - LOGGER.warn("Problem with Json deserialization", e); - return null; + @EventListener(ApplicationStartedEvent.class) + public void onApplicationStarted() throws IOException { + if(LOGGER.isDebugEnabled()) { + LOGGER.debug("Git info={}", StreamUtils.copyToString(gitInfo.getInputStream(), Charset.defaultCharset())); } } - void setResourceFile(Resource resourceFile) { - this.resourceFile = resourceFile; + public Resource getGitInfo() { + return gitInfo; } -}
\ No newline at end of file +} diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/HeartbeatController.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/AppInfoController.java index c09cc945..7475814f 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/HeartbeatController.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/AppInfoController.java @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * PNF-REGISTRATION-HANDLER * ================================================================================ - * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * Copyright (C) 2018-2019 NOKIA Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,12 +24,14 @@ import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiResponse; import io.swagger.annotations.ApiResponses; +import org.onap.dcaegen2.services.prh.configuration.PrhAppConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.core.io.Resource; import org.springframework.http.HttpStatus; +import org.springframework.http.MediaType; import org.springframework.http.ResponseEntity; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RequestMethod; +import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import reactor.core.publisher.Mono; @@ -37,29 +39,28 @@ import reactor.core.publisher.Mono; * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/19/18 */ @RestController -@Api(value = "HeartbeatController", description = "Check liveness of PRH service") -public class HeartbeatController { +@Api(value = "AppInfoController", description = "Provides basic information about application") +public class AppInfoController { - private static final Logger LOGGER = LoggerFactory.getLogger(HeartbeatController.class); + private static final Logger LOGGER = LoggerFactory.getLogger(AppInfoController.class); + private final PrhAppConfig config; - /** - * Endpoint for checking that PRH is alive. - * - * @return HTTP Status Code - */ - @RequestMapping(value = "heartbeat", method = RequestMethod.GET) - @ApiOperation(value = "Returns liveness of PRH service") - @ApiResponses(value = { - @ApiResponse(code = 200, message = "PRH sevice is living"), - @ApiResponse(code = 401, message = "You are not authorized to view the resource"), - @ApiResponse(code = 403, message = "Accessing the resource you were trying to reach is forbidden"), - @ApiResponse(code = 404, message = "The resource you were trying to reach is not found") - } - ) + public AppInfoController(PrhAppConfig config) { + this.config = config; + } + + @GetMapping(value = "heartbeat", produces = MediaType.TEXT_PLAIN_VALUE) + @ApiOperation("Returns liveness of PRH service") + @ApiResponses(@ApiResponse(code = 200, message = "Service is alive")) public Mono<ResponseEntity<String>> heartbeat() { - LOGGER.trace("Receiving heartbeat request"); - return Mono.defer(() -> - Mono.just(new ResponseEntity<>("alive", HttpStatus.OK)) + LOGGER.trace("Heartbeat request received"); + return Mono.defer(() -> Mono.just(new ResponseEntity<>("alive", HttpStatus.OK)) ); } -}
\ No newline at end of file + + @GetMapping(value = "version", produces = MediaType.APPLICATION_JSON_VALUE) + @ApiOperation("Returns version information") + public Mono<Resource> version() { + return Mono.defer(() -> Mono.just(config.getGitInfo())); + } +} diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/ScheduleController.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/ScheduleController.java index aa913654..a0aa17e3 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/ScheduleController.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/ScheduleController.java @@ -60,15 +60,18 @@ public class ScheduleController { @ApiOperation(value = "Receiving stop scheduling worker request") public Mono<ResponseEntity<String>> stopTask() { LOGGER.trace("Receiving stop scheduling worker request"); - return scheduledTasksRunner.getResponseFromCancellationOfTasks(); + return Mono.defer(() -> { + scheduledTasksRunner.cancelTasks(); + return Mono.just(new ResponseEntity<>("PRH Service has been stopped!", HttpStatus.OK)); + } + ); } - @ApiOperation(value = "Sends success or error response on starting task execution") private ResponseEntity<String> createStartTaskResponse(boolean wasScheduled) { if (wasScheduled) { return new ResponseEntity<>("PRH Service has been started!", HttpStatus.CREATED); } else { - return new ResponseEntity<>("PRH Service is still running!", HttpStatus.NOT_ACCEPTABLE); + return new ResponseEntity<>("PRH Service is already running!", HttpStatus.NOT_ACCEPTABLE); } } } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/exceptions/EnvironmentLoaderException.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/exceptions/AaiFailureException.java index 5fef80d8..6741bb2d 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/exceptions/EnvironmentLoaderException.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/exceptions/AaiFailureException.java @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * PNF-REGISTRATION-HANDLER * ================================================================================ - * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * Copyright (C) 2019 NOKIA Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,12 +20,9 @@ package org.onap.dcaegen2.services.prh.exceptions; -/** - * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 8/10/18 - */ -public class EnvironmentLoaderException extends Exception { +public class AaiFailureException extends PrhTaskException { - public EnvironmentLoaderException(String message) { + public AaiFailureException(String message) { super(message); } } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java index 4749b520..11939b53 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/DmaapConsumerJsonParser.java @@ -23,32 +23,57 @@ package org.onap.dcaegen2.services.prh.service; import com.google.gson.JsonElement; import com.google.gson.JsonObject; import com.google.gson.JsonParser; -import java.util.Optional; -import java.util.stream.StreamSupport; +import io.vavr.collection.List; import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; import org.onap.dcaegen2.services.prh.model.ImmutableConsumerDmaapModel; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import java.util.Optional; +import java.util.stream.StreamSupport; + +import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.ADDITIONAL_FIELDS; +import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.COMMON_EVENT_HEADER; +import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.COMMON_FORMAT_FOR_JSON_OBJECT; +import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.COMMON_FORMAT_FOR_STRING; +import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.CORRELATION_ID; +import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.EQUIP_MODEL; +import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.EQUIP_TYPE; +import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.EQUIP_VENDOR; +import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.EVENT; +import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.NF_ROLE; +import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.OAM_IPV_4_ADDRESS; +import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.OAM_IPV_6_ADDRESS; +import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.PNF_REGISTRATION_FIELDS; +import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.SERIAL_NUMBER; +import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.SOURCE_NAME; +import static org.onap.dcaegen2.services.prh.service.PnfRegistrationFields.SW_VERSION; + /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 5/8/18 */ +@Component public class DmaapConsumerJsonParser { private static final Logger LOGGER = LoggerFactory.getLogger(DmaapConsumerJsonParser.class); - private static final String EVENT = "event"; - private static final String COMMON_EVENT_HEADER = "commonEventHeader"; - private static final String PNF_REGISTRATION_FIELDS = "pnfRegistrationFields"; - private static final String OAM_IPV_4_ADDRESS = "oamV4IpAddress"; - private static final String OAM_IPV_6_ADDRESS = "oamV6IpAddress"; - private static final String SOURCE_NAME = "sourceName"; - private static final String CORRELATION_ID = "correlationId"; + private String pnfSourceName; + private String pnfOamIpv4Address; + private String pnfOamIpv6Address; + private String pnfSerialNumberOptionalField; + private String pnfEquipVendorOptionalField; + private String pnfEquipModelOptionalField; + private String pnfEquipTypeOptionalField; + private String pnfNfRoleOptionalField; + private String pnfSwVersionOptionalField; + private JsonObject pnfAdditionalFields; /** * Extract info from string and create @see {@link org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel}. @@ -56,31 +81,24 @@ public class DmaapConsumerJsonParser { * @param monoMessage - results from DMaaP * @return reactive DMaaPModel */ - public Flux<ConsumerDmaapModel> getJsonObject(Mono<String> monoMessage) { - return monoMessage - .flatMapMany(this::getJsonParserMessage) - .flatMap(this::createJsonConsumerModel); + public Flux<ConsumerDmaapModel> getJsonObject(Mono<MessageRouterSubscribeResponse> monoMessage) { + return monoMessage.flatMapMany(msgRouterResponse -> getConsumerDmaapModelFromJsonArray(msgRouterResponse.items())); } - private Mono<JsonElement> getJsonParserMessage(String message) { - return StringUtils.isEmpty(message) ? logErrorAndReturnMonoEmpty("DmaaP response is empty") - : Mono.fromCallable(() -> new JsonParser().parse(message)); - } + private Flux<ConsumerDmaapModel> getConsumerDmaapModelFromJsonArray(List<JsonElement> items) { + LOGGER.debug("DmaapConsumerJsonParser input for parsing: {}", items); - private Flux<ConsumerDmaapModel> createJsonConsumerModel(JsonElement jsonElement) { - return jsonElement.isJsonObject() - ? create(Flux.defer(() -> Flux.just(jsonElement.getAsJsonObject()))) - : getConsumerDmaapModelFromJsonArray(jsonElement); - } - - private Flux<ConsumerDmaapModel> getConsumerDmaapModelFromJsonArray(JsonElement jsonElement) { + if (items.size() == 0) { + LOGGER.debug("Nothing to consume from DMaaP"); + return Flux.empty(); + } return create( - Flux.defer(() -> Flux.fromStream(StreamSupport.stream(jsonElement.getAsJsonArray().spliterator(), false) - .map(jsonElementFromArray -> getJsonObjectFromAnArray(jsonElementFromArray) - .orElseGet(JsonObject::new))))); + Flux.defer(() -> Flux.fromStream(StreamSupport.stream(items.spliterator(), false) + .map(jsonElementFromArray -> getJsonObjectFromAnArray(jsonElementFromArray) + .orElseGet(JsonObject::new))))); } - public Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) { + Optional<JsonObject> getJsonObjectFromAnArray(JsonElement element) { JsonParser jsonParser = new JsonParser(); return element.isJsonPrimitive() ? Optional.of(jsonParser.parse(element.getAsString()).getAsJsonObject()) : Optional.of(jsonParser.parse(element.toString()).getAsJsonObject()); @@ -94,43 +112,63 @@ public class DmaapConsumerJsonParser { } private Mono<ConsumerDmaapModel> transform(JsonObject responseFromDmaap) { - JsonObject commonEventHeader = responseFromDmaap.getAsJsonObject(EVENT) .getAsJsonObject(COMMON_EVENT_HEADER); JsonObject pnfRegistrationFields = responseFromDmaap.getAsJsonObject(EVENT) .getAsJsonObject(PNF_REGISTRATION_FIELDS); - String pnfSourceName = getValueFromJson(commonEventHeader, SOURCE_NAME); - String pnfOamIpv4Address = getValueFromJson(pnfRegistrationFields, OAM_IPV_4_ADDRESS); - String pnfOamIpv6Address = getValueFromJson(pnfRegistrationFields, OAM_IPV_6_ADDRESS); - - return (StringUtils.isEmpty(pnfSourceName) || !ipPropertiesNotEmpty(pnfOamIpv4Address, pnfOamIpv6Address)) + this.pnfSourceName = getValueFromJson(commonEventHeader, SOURCE_NAME); + this.pnfNfRoleOptionalField = getValueFromJson(commonEventHeader, NF_ROLE); + this.pnfOamIpv4Address = getValueFromJson(pnfRegistrationFields, OAM_IPV_4_ADDRESS); + this.pnfOamIpv6Address = getValueFromJson(pnfRegistrationFields, OAM_IPV_6_ADDRESS); + this.pnfSerialNumberOptionalField = getValueFromJson(pnfRegistrationFields, SERIAL_NUMBER); + this.pnfEquipVendorOptionalField = getValueFromJson(pnfRegistrationFields, EQUIP_VENDOR); + this.pnfEquipModelOptionalField = getValueFromJson(pnfRegistrationFields, EQUIP_MODEL); + this.pnfEquipTypeOptionalField = getValueFromJson(pnfRegistrationFields, EQUIP_TYPE); + this.pnfSwVersionOptionalField = getValueFromJson(pnfRegistrationFields, SW_VERSION); + this.pnfAdditionalFields = pnfRegistrationFields.getAsJsonObject(ADDITIONAL_FIELDS); + + return (StringUtils.isEmpty(pnfSourceName)) ? logErrorAndReturnMonoEmpty("Incorrect json, consumerDmaapModel can not be created: " - + printMessage(pnfSourceName, pnfOamIpv4Address, pnfOamIpv6Address)) : + + printMessage()) : Mono.just(ImmutableConsumerDmaapModel.builder() .correlationId(pnfSourceName) .ipv4(pnfOamIpv4Address) - .ipv6(pnfOamIpv6Address).build()); + .ipv6(pnfOamIpv6Address) + .serialNumber(pnfSerialNumberOptionalField) + .equipVendor(pnfEquipVendorOptionalField) + .equipModel(pnfEquipModelOptionalField) + .equipType(pnfEquipTypeOptionalField) + .nfRole(pnfNfRoleOptionalField) + .swVersion(pnfSwVersionOptionalField) + .additionalFields(pnfAdditionalFields).build()); } private String getValueFromJson(JsonObject jsonObject, String jsonKey) { return jsonObject.has(jsonKey) ? jsonObject.get(jsonKey).getAsString() : ""; } - private boolean ipPropertiesNotEmpty(String ipv4, String ipv6) { - return (!StringUtils.isEmpty(ipv4)) || !(StringUtils.isEmpty(ipv6)); - } - private boolean containsHeader(JsonObject jsonObject) { return jsonObject.has(EVENT) && jsonObject.getAsJsonObject(EVENT).has(PNF_REGISTRATION_FIELDS); } - private String printMessage(String sourceName, String oamIpv4Address, String oamIpv6Address) { + private String printMessage() { return String.format("%n{" - + "\"" + CORRELATION_ID + "\": \"%s\"," - + "\"" + OAM_IPV_4_ADDRESS + "\": \"%s\"," - + "\"" + OAM_IPV_6_ADDRESS + "\": \"%s\"" - + "%n}", sourceName, oamIpv4Address, oamIpv6Address); + + "\"" + CORRELATION_ID + COMMON_FORMAT_FOR_STRING + "," + + "\"" + OAM_IPV_4_ADDRESS + COMMON_FORMAT_FOR_STRING + "," + + "\"" + OAM_IPV_6_ADDRESS + COMMON_FORMAT_FOR_STRING + "," + + "\"" + SERIAL_NUMBER + COMMON_FORMAT_FOR_STRING + "," + + "\"" + EQUIP_VENDOR + COMMON_FORMAT_FOR_STRING + "," + + "\"" + EQUIP_MODEL + COMMON_FORMAT_FOR_STRING + "," + + "\"" + EQUIP_TYPE + COMMON_FORMAT_FOR_STRING + "," + + "\"" + NF_ROLE + COMMON_FORMAT_FOR_STRING + "," + + "\"" + SW_VERSION + COMMON_FORMAT_FOR_STRING + "," + + "\"" + ADDITIONAL_FIELDS + COMMON_FORMAT_FOR_JSON_OBJECT + + "%n}", this.pnfSourceName, this.pnfOamIpv4Address, this.pnfOamIpv6Address, + this.pnfSerialNumberOptionalField, this.pnfEquipVendorOptionalField, + this.pnfEquipModelOptionalField, this.pnfEquipTypeOptionalField, + this.pnfNfRoleOptionalField, this.pnfSwVersionOptionalField, this.pnfAdditionalFields + ); } private <T> Mono<T> logErrorAndReturnMonoEmpty(String messageForLogger) { diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/PnfRegistrationFields.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/PnfRegistrationFields.java new file mode 100644 index 00000000..a40795b1 --- /dev/null +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/service/PnfRegistrationFields.java @@ -0,0 +1,46 @@ +/* + * ============LICENSE_START======================================================= + * PNF-REGISTRATION-HANDLER + * ================================================================================ + * Copyright (C) 2019 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + + +package org.onap.dcaegen2.services.prh.service; + +class PnfRegistrationFields { + + static final String COMMON_FORMAT_FOR_STRING = "\": \"%s\""; + static final String COMMON_FORMAT_FOR_JSON_OBJECT = "\": %s"; + static final String EVENT = "event"; + static final String COMMON_EVENT_HEADER = "commonEventHeader"; + static final String PNF_REGISTRATION_FIELDS = "pnfRegistrationFields"; + static final String OAM_IPV_4_ADDRESS = "oamV4IpAddress"; + static final String OAM_IPV_6_ADDRESS = "oamV6IpAddress"; + static final String SOURCE_NAME = "sourceName"; + static final String CORRELATION_ID = "correlationId"; + + // optional fields + static final String SERIAL_NUMBER = "serialNumber"; + static final String EQUIP_VENDOR = "vendorName"; + static final String EQUIP_MODEL = "modelNumber"; + static final String EQUIP_TYPE = "unitType"; + static final String NF_ROLE = "nfNamingCode"; + static final String SW_VERSION = "softwareVersion"; + static final String ADDITIONAL_FIELDS = "additionalFields"; + + private PnfRegistrationFields() {} +} diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTask.java index 8e31807a..123eb5a3 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTask.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTask.java @@ -20,34 +20,11 @@ package org.onap.dcaegen2.services.prh.tasks; -import javax.net.ssl.SSLException; -import org.onap.dcaegen2.services.prh.exceptions.AaiNotFoundException; import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; -import org.onap.dcaegen2.services.sdk.rest.services.aai.client.config.AaiClientConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.aai.client.service.AaiReactiveWebClientFactory; -import org.onap.dcaegen2.services.sdk.rest.services.aai.client.service.http.patch.AaiReactiveHttpPatchClient; - -import org.onap.dcaegen2.services.sdk.rest.services.ssl.SslFactory; -import org.springframework.web.reactive.function.client.WebClient; import reactor.core.publisher.Mono; - -/** - * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18 - */ -public abstract class AaiProducerTask { - - abstract Mono<ConsumerDmaapModel> publish(ConsumerDmaapModel message) throws AaiNotFoundException; - - abstract AaiReactiveHttpPatchClient resolveClient() throws SSLException; - - protected abstract AaiClientConfiguration resolveConfiguration(); - - protected abstract Mono<ConsumerDmaapModel> execute(ConsumerDmaapModel consumerDmaapModel) - throws PrhTaskException, SSLException; - - WebClient buildWebClient() throws SSLException { - return new AaiReactiveWebClientFactory(new SslFactory(), resolveConfiguration()).build(); - } +@FunctionalInterface +interface AaiProducerTask { + Mono<ConsumerDmaapModel> execute(ConsumerDmaapModel consumerDmaapModel) throws PrhTaskException; } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTaskImpl.java index 905eb72b..7aaff47d 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTaskImpl.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTaskImpl.java @@ -20,17 +20,14 @@ package org.onap.dcaegen2.services.prh.tasks; -import javax.net.ssl.SSLException; -import org.onap.dcaegen2.services.prh.configuration.Config; import org.onap.dcaegen2.services.prh.exceptions.AaiNotFoundException; import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException; import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; -import org.onap.dcaegen2.services.prh.model.JsonBodyBuilderImpl; import org.onap.dcaegen2.services.prh.model.utils.HttpUtils; -import org.onap.dcaegen2.services.sdk.rest.services.aai.client.config.AaiClientConfiguration; -import org.onap.dcaegen2.services.sdk.rest.services.aai.client.service.http.patch.AaiReactiveHttpPatchClient; - +import org.onap.dcaegen2.services.sdk.rest.services.aai.client.service.http.AaiHttpClient; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse; +import org.onap.dcaegen2.services.sdk.rest.services.model.AaiModel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -42,48 +39,33 @@ import reactor.core.publisher.Mono; * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18 */ @Component -public class AaiProducerTaskImpl extends AaiProducerTask { +public class AaiProducerTaskImpl implements AaiProducerTask { private static final Logger LOGGER = LoggerFactory.getLogger(AaiProducerTaskImpl.class); - private final Config config; - private AaiReactiveHttpPatchClient aaiReactiveHttpPatchClient; + private final AaiHttpClient<AaiModel, HttpResponse> aaiHttpPatchClient; @Autowired - public AaiProducerTaskImpl(Config config) { - this.config = config; - } - - @Override - Mono<ConsumerDmaapModel> publish(ConsumerDmaapModel consumerDmaapModel) { - LOGGER.info("Publish to AAI DmaapModel"); - return aaiReactiveHttpPatchClient.getAaiProducerResponse(consumerDmaapModel) - .flatMap(response -> { - if (HttpUtils.isSuccessfulResponseCode(response.statusCode().value())) { - return Mono.just(consumerDmaapModel); - } - return Mono - .error(new AaiNotFoundException("Incorrect response code for continuation of tasks workflow")); - }); + public AaiProducerTaskImpl(final AaiHttpClient<AaiModel, HttpResponse> aaiHttpPatchClient) { + this.aaiHttpPatchClient = aaiHttpPatchClient; } - @Override - AaiReactiveHttpPatchClient resolveClient() throws SSLException { - return new AaiReactiveHttpPatchClient(resolveConfiguration(), new JsonBodyBuilderImpl()).createAaiWebClient(buildWebClient()); - } - - @Override - protected AaiClientConfiguration resolveConfiguration() { - return config.getAaiClientConfiguration(); + private Mono<ConsumerDmaapModel> publish(ConsumerDmaapModel consumerDmaapModel) { + Mono<HttpResponse> response = aaiHttpPatchClient.getAaiResponse(consumerDmaapModel); + return response.flatMap(r -> { + if (HttpUtils.isSuccessfulResponseCode(r.statusCode())) { + return Mono.just(consumerDmaapModel); + } + return Mono + .error(new AaiNotFoundException("Incorrect response code for continuation of tasks workflow" + r.statusCode())); + }); } @Override - protected Mono<ConsumerDmaapModel> execute(ConsumerDmaapModel consumerDmaapModel) - throws PrhTaskException, SSLException { + public Mono<ConsumerDmaapModel> execute(ConsumerDmaapModel consumerDmaapModel) throws PrhTaskException { if (consumerDmaapModel == null) { throw new DmaapNotFoundException("Invoked null object to DMaaP task"); } - aaiReactiveHttpPatchClient = resolveClient(); LOGGER.debug("Method called with arg {}", consumerDmaapModel); return publish(consumerDmaapModel); } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiQueryTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiQueryTask.java new file mode 100644 index 00000000..f1b900ac --- /dev/null +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiQueryTask.java @@ -0,0 +1,29 @@ +/* + * ============LICENSE_START======================================================= + * PNF-REGISTRATION-HANDLER + * ================================================================================ + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.services.prh.tasks; + +import org.onap.dcaegen2.services.sdk.rest.services.model.AaiModel; +import reactor.core.publisher.Mono; + +@FunctionalInterface +public interface AaiQueryTask { + Mono<Boolean> execute(final AaiModel aaiModel); +} diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiQueryTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiQueryTaskImpl.java new file mode 100644 index 00000000..ed0cbcd9 --- /dev/null +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiQueryTaskImpl.java @@ -0,0 +1,107 @@ +/* + * ============LICENSE_START======================================================= + * PNF-REGISTRATION-HANDLER + * ================================================================================ + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.services.prh.tasks; + +import org.onap.dcaegen2.services.prh.model.*; +import org.onap.dcaegen2.services.sdk.rest.services.aai.client.service.http.AaiHttpClient; +import org.onap.dcaegen2.services.sdk.rest.services.model.AaiModel; +import org.onap.dcaegen2.services.sdk.rest.services.model.AaiServiceInstanceQueryModel; +import org.onap.dcaegen2.services.sdk.rest.services.model.ImmutableAaiServiceInstanceQueryModel; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; +import reactor.core.publisher.Mono; + +import java.util.List; +import java.util.Optional; +import java.util.stream.Stream; + +@Component +public class AaiQueryTaskImpl implements AaiQueryTask { + static final String ACTIVE_STATUS = "Active"; + static final String RELATED_TO = "service-instance"; + static final String CUSTOMER = "customer.global-customer-id"; + static final String SERVICE_TYPE = "service-subscription.service-type"; + static final String SERVICE_INSTANCE_ID = "service-instance.service-instance-id"; + + private final AaiHttpClient<AaiModel, AaiPnfResultModel> getPnfModelClient; + private final AaiHttpClient<AaiServiceInstanceQueryModel, AaiServiceInstanceResultModel> getServiceClient; + + @Autowired + public AaiQueryTaskImpl( + final AaiHttpClient<AaiModel, AaiPnfResultModel> getPnfModelClient, + final AaiHttpClient<AaiServiceInstanceQueryModel, AaiServiceInstanceResultModel> getServiceClient) { + this.getPnfModelClient = getPnfModelClient; + this.getServiceClient = getServiceClient; + } + + @Override + public Mono<Boolean> execute(AaiModel aaiModel) { + return getPnfModelClient + .getAaiResponse(aaiModel) + .flatMap(this::checkIfPnfHasRelationToService) + .flatMap(getServiceClient::getAaiResponse) + .map(this::checkIfRelatedServiceInstanceIsActive) + .defaultIfEmpty(false); + } + + private Mono<AaiServiceInstanceQueryModel> checkIfPnfHasRelationToService(final AaiPnfResultModel model) { + return Mono + .justOrEmpty(model.getRelationshipList()) + .map(this::findRelatedTo) + .flatMap(Mono::justOrEmpty) + .map(RelationshipDict::getRelationshipData) + .flatMap(x -> { + final Optional<String> customer = findValue(x, CUSTOMER); + final Optional<String> serviceType = findValue(x, SERVICE_TYPE); + final Optional<String> serviceInstanceId= findValue(x, SERVICE_INSTANCE_ID); + + return customer.isPresent() && serviceType.isPresent() && serviceInstanceId.isPresent() + ? Mono.just(ImmutableAaiServiceInstanceQueryModel + .builder() + .customerId(customer.get()) + .serviceType(serviceType.get()) + .serviceInstanceId(serviceInstanceId.get()) + .build()) + : Mono.empty(); + }); + } + + private Boolean checkIfRelatedServiceInstanceIsActive(final AaiServiceInstanceResultModel model) { + return ACTIVE_STATUS.equalsIgnoreCase(model.getOrchestrationStatus()); + } + + private Optional<RelationshipDict> findRelatedTo(final Relationship data) { + return Optional.ofNullable(data.getRelationship()) + .map(Stream::of) + .orElseGet(Stream::empty) + .flatMap(List::stream) + .filter(x -> RELATED_TO.equals(x.getRelatedTo())) + .findFirst(); + } + + private Optional<String> findValue(final List<RelationshipData> data, final String key) { + return data + .stream() + .filter(y -> key.equals(y.getRelationshipKey())) + .findFirst() + .map(RelationshipData::getRelationshipValue); + } +} diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/BbsActionsTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/BbsActionsTask.java new file mode 100644 index 00000000..12e13140 --- /dev/null +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/BbsActionsTask.java @@ -0,0 +1,29 @@ +/* + * ============LICENSE_START======================================================= + * PNF-REGISTRATION-HANDLER + * ================================================================================ + * Copyright (C) 2019 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.services.prh.tasks; + +import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; +import reactor.core.publisher.Mono; + +@FunctionalInterface +public interface BbsActionsTask { + Mono<ConsumerDmaapModel> execute(ConsumerDmaapModel consumerDmaapModel); +} diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/BbsActionsTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/BbsActionsTaskImpl.java new file mode 100644 index 00000000..8b893211 --- /dev/null +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/BbsActionsTaskImpl.java @@ -0,0 +1,208 @@ +/* + * ============LICENSE_START======================================================= + * PNF-REGISTRATION-HANDLER + * ================================================================================ + * Copyright (C) 2019 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.services.prh.tasks; + +import static io.netty.handler.codec.http.HttpHeaders.Values.APPLICATION_JSON; +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.http.HttpHeaders.CONTENT_TYPE; +import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpMethod.DELETE; +import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpMethod.GET; +import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpMethod.PUT; + +import com.google.gson.JsonObject; +import io.vavr.collection.HashMap; +import io.vavr.collection.Map; +import org.onap.dcaegen2.services.prh.configuration.Config; +import org.onap.dcaegen2.services.prh.exceptions.AaiFailureException; +import org.onap.dcaegen2.services.prh.model.AaiPnfResultModel; +import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; +import org.onap.dcaegen2.services.prh.model.ImmutableRelationshipDict; +import org.onap.dcaegen2.services.prh.model.Relationship; +import org.onap.dcaegen2.services.prh.model.RelationshipDict; +import org.onap.dcaegen2.services.prh.model.bbs.ImmutableLogicalLink; +import org.onap.dcaegen2.services.prh.model.bbs.LogicalLink; +import org.onap.dcaegen2.services.prh.model.utils.HttpUtils; +import org.onap.dcaegen2.services.prh.model.utils.PrhModelAwareGsonBuilder; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.ImmutableHttpRequest; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RequestBody; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClient; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClientFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +@Component +public class BbsActionsTaskImpl implements BbsActionsTask { + + private static final Logger LOGGER = LoggerFactory.getLogger(BbsActionsTaskImpl.class); + private static final String ATTACHMENT_POINT = "attachment-point"; + private static final String LOGICAL_LINK_URI = "/network/logical-links/logical-link"; + private static final String PNF_URI = "/network/pnfs/pnf"; + private static final String LINK_KEY = "logical-link.link-name"; + private static final String ERROR_PREFIX = "Incorrect response when performing BBS-related actions: "; + private static final String LOGICAL_LINK = "logical-link"; + + private final Config config; + private final RxHttpClient httpClient; + + @Autowired + BbsActionsTaskImpl(Config config) { + this(config, RxHttpClientFactory.createInsecure()); + } + + BbsActionsTaskImpl(Config config, RxHttpClient httpClient) { + this.config = config; + this.httpClient = httpClient; + } + + public Mono<ConsumerDmaapModel> execute(ConsumerDmaapModel consumerDmaapModel) { + JsonObject additionalFields = consumerDmaapModel.getAdditionalFields(); + if (additionalFields == null || !additionalFields.has(ATTACHMENT_POINT)) { + return Mono.just(consumerDmaapModel); + } + String linkName = additionalFields.get(ATTACHMENT_POINT).getAsString(); + if (linkName.isEmpty()) { + LOGGER.warn("Attachment point is empty! Ignore related actions."); + return Mono.just(consumerDmaapModel); + } + String pnfName = consumerDmaapModel.getCorrelationId(); + + return getLinksByPnf(pnfName) + .flatMap(x -> Flux.fromIterable(x.getRelationshipData())) + .filter(x -> LINK_KEY.equals(x.getRelationshipKey())) + .map(x -> x.getRelationshipValue()) + .flatMap(oldLinkName -> getLogicalLink(oldLinkName)) + .filter(oldLink -> oldLink.getLinkType().equals(ATTACHMENT_POINT)) + .flatMap(oldLink -> deleteLogicalLinkInAai(oldLink.getLinkName(), oldLink.getResourceVersion())) + .then(createLogicalLinkInAai(linkName, pnfName)) + .flatMap(response -> handleFinalResponse(response, consumerDmaapModel)); + } + + private Flux<RelationshipDict> getLinksByPnf(String pnfName) { + return httpClient.call(buildGetRequest(PNF_URI + "/" + pnfName)) + .flatMap(response -> handleResponse(response, "GET PNF request. Pnf name: " + pnfName)) + .map(httpResponse -> httpResponse.bodyAsJson(UTF_8, + PrhModelAwareGsonBuilder.createGson(), AaiPnfResultModel.class)) + .flatMapMany(pnfModel -> Flux.fromStream(pnfModel.getRelationshipList().getRelationship().stream())) + .filter(x -> LOGICAL_LINK.equals(x.getRelatedTo())); + } + + private Mono<LogicalLink> getLogicalLink(String linkName) { + ImmutableHttpRequest request = buildGetRequest(LOGICAL_LINK_URI + "/" + linkName); + return httpClient.call(request) + .flatMap(response -> handleResponse(response, "GET LogicalLink request. Link name: " + linkName)) + .map(httpResponse -> httpResponse.bodyAsJson(UTF_8, + PrhModelAwareGsonBuilder.createGson(), LogicalLink.class)); + } + + private Mono<HttpResponse> createLogicalLinkInAai(String linkName, String pnfName) { + ImmutableHttpRequest request = buildLogicalLinkPutRequest(linkName, pnfName); + LOGGER.debug("Creating logical link in AAI {} ", request); + return httpClient.call(request) + .flatMap(response -> handleResponse(response, "PUT LogicalLink request. Link name: " + linkName)); + } + + private Mono<HttpResponse> deleteLogicalLinkInAai(String linkName, String resourceVersion) { + ImmutableHttpRequest request = buildLogicalLinkDeleteRequest(linkName, resourceVersion); + LOGGER.debug("Deleting logical link in AAI {} ", request); + return httpClient.call(request) + .flatMap(response -> handleResponse(response, "DELETE LogicalLink request. Link name: " + linkName)); + } + + private ImmutableHttpRequest buildGetRequest(String path) { + String uri = buildUri(path); + Map<String, String> aaiHeaders = HashMap + .ofAll(config.getAaiClientConfiguration().aaiHeaders()); + + return ImmutableHttpRequest + .builder() + .method(GET) + .url(uri) + .customHeaders(aaiHeaders) + .build(); + } + + private ImmutableHttpRequest buildLogicalLinkPutRequest(String linkName, String pnfName) { + String uri = buildUri(LOGICAL_LINK_URI + "/" + linkName); + ImmutableLogicalLink logicalLink = prepareModelBuilder(linkName, pnfName).build(); + RequestBody requestBody = RequestBody.fromString(PrhModelAwareGsonBuilder.createGson().toJson(logicalLink)); + + // FIXME: AAI headers for PUT are different than PATCH (taken from prh_endpoints.json) + Map<String, String> aaiHeaders = HashMap + .ofAll(config.getAaiClientConfiguration().aaiHeaders()) + .put(CONTENT_TYPE, APPLICATION_JSON); + + return ImmutableHttpRequest + .builder() + .method(PUT) + .url(uri) + .body(requestBody) + .customHeaders(aaiHeaders) + .build(); + } + + private ImmutableHttpRequest buildLogicalLinkDeleteRequest(String linkName, String resourceVersion) { + String uri = buildUri(LOGICAL_LINK_URI + "/" + linkName + "?resource-version=" + resourceVersion); + + Map<String, String> aaiHeaders = HashMap + .ofAll(config.getAaiClientConfiguration().aaiHeaders()) + .put(CONTENT_TYPE, APPLICATION_JSON); + + return ImmutableHttpRequest + .builder() + .method(DELETE) + .url(uri) + .customHeaders(aaiHeaders) + .build(); + } + + private ImmutableLogicalLink.Builder prepareModelBuilder(String linkName, String pnfName) { + Relationship relationship = org.onap.dcaegen2.services.prh.model.ImmutableRelationship.builder() + .addRelationship( + ImmutableRelationshipDict.builder().relatedLink(PNF_URI + "/" + pnfName).build()).build(); + + return ImmutableLogicalLink + .builder() + .linkName(linkName) + .linkType(ATTACHMENT_POINT) + .relationshipList(relationship); + } + + private Mono<HttpResponse> handleResponse(HttpResponse response, String msg) { + return HttpUtils.isSuccessfulResponseCode(response.statusCode()) ? Mono.just(response) : + Mono.error(new AaiFailureException(ERROR_PREFIX + response.statusCode() + ". Occurred in " + msg)); + } + + private Mono<? extends ConsumerDmaapModel> handleFinalResponse( + HttpResponse response, ConsumerDmaapModel consumerDmaapModel) { + return HttpUtils.isSuccessfulResponseCode(response.statusCode()) + ? Mono.just(consumerDmaapModel) : Mono.error(new AaiFailureException(ERROR_PREFIX + response.statusCode())); + } + + private String buildUri(String path) { + return config.getAaiClientConfiguration().pnfUrl().replace(PNF_URI, path); + } +} + diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java index e3ea8962..2deafd8d 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTask.java @@ -20,22 +20,14 @@ package org.onap.dcaegen2.services.prh.tasks; -import javax.net.ssl.SSLException; - import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient; - import reactor.core.publisher.Flux; /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18 */ +@FunctionalInterface interface DmaapConsumerTask { - - void initConfigs(); - - Flux<ConsumerDmaapModel> execute(String object) throws SSLException; - - DMaaPConsumerReactiveHttpClient resolveClient() throws SSLException; + Flux<ConsumerDmaapModel> execute(); } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java index fd7bca1e..a990e502 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapConsumerTaskImpl.java @@ -20,19 +20,15 @@ package org.onap.dcaegen2.services.prh.tasks; -import javax.net.ssl.SSLException; import org.onap.dcaegen2.services.prh.configuration.Config; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; import org.onap.dcaegen2.services.prh.service.DmaapConsumerJsonParser; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.ConsumerReactiveHttpClientFactory; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPConsumerReactiveHttpClient; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.consumer.DMaaPReactiveWebClientFactory; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse; import org.springframework.stereotype.Component; import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; /** @@ -41,39 +37,21 @@ import reactor.core.publisher.Flux; @Component public class DmaapConsumerTaskImpl implements DmaapConsumerTask { - private static final Logger LOGGER = LoggerFactory.getLogger(DmaapConsumerTaskImpl.class); private final Config config; private final DmaapConsumerJsonParser dmaapConsumerJsonParser; - private final ConsumerReactiveHttpClientFactory httpClientFactory; - @Autowired - public DmaapConsumerTaskImpl(Config config) { - this(config, new DmaapConsumerJsonParser(), - new ConsumerReactiveHttpClientFactory(new DMaaPReactiveWebClientFactory())); - } - DmaapConsumerTaskImpl(Config prhAppConfig, - DmaapConsumerJsonParser dmaapConsumerJsonParser, - ConsumerReactiveHttpClientFactory httpClientFactory) { + public DmaapConsumerTaskImpl(Config prhAppConfig, DmaapConsumerJsonParser dmaapConsumerJsonParser) { this.config = prhAppConfig; this.dmaapConsumerJsonParser = dmaapConsumerJsonParser; - this.httpClientFactory = httpClientFactory; } @Override - public void initConfigs() { - config.initFileStreamReader(); + public Flux<ConsumerDmaapModel> execute() { + MessageRouterSubscriber messageRouterSubscriber = config.getMessageRouterSubscriber(); + MessageRouterSubscribeRequest messageRouterSubscribeRequest = config.getMessageRouterSubscribeRequest(); + Mono<MessageRouterSubscribeResponse> response = messageRouterSubscriber.get(messageRouterSubscribeRequest); + return dmaapConsumerJsonParser.getJsonObject(response); } - @Override - public Flux<ConsumerDmaapModel> execute(String object) throws SSLException { - DMaaPConsumerReactiveHttpClient dmaaPConsumerReactiveHttpClient = resolveClient(); - LOGGER.debug("Method called with arg {}", object); - return dmaapConsumerJsonParser.getJsonObject(dmaaPConsumerReactiveHttpClient.getDMaaPConsumerResponse()); - } - - @Override - public DMaaPConsumerReactiveHttpClient resolveClient() throws SSLException { - return httpClientFactory.create(config.getDmaapConsumerConfiguration()); - } } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java index 4d6c0f87..0cdc40ea 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java @@ -22,17 +22,13 @@ package org.onap.dcaegen2.services.prh.tasks; import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.DMaaPPublisherReactiveHttpClient; - -import org.springframework.http.ResponseEntity; -import reactor.core.publisher.Mono; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse; +import reactor.core.publisher.Flux; /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18 */ -interface DmaapPublisherTask { - - Mono<ResponseEntity<String>> execute(ConsumerDmaapModel consumerDmaapModel) throws PrhTaskException; - - DMaaPPublisherReactiveHttpClient resolveClient(); +@FunctionalInterface +public interface DmaapPublisherTask { + Flux<MessageRouterPublishResponse> execute(ConsumerDmaapModel consumerDmaapModel) throws PrhTaskException; } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java index 63e01c12..2890d195 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java @@ -20,54 +20,46 @@ package org.onap.dcaegen2.services.prh.tasks; -import org.onap.dcaegen2.services.prh.configuration.Config; import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException; - import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; -import org.onap.dcaegen2.services.prh.model.JsonBodyBuilderImpl; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.DMaaPPublisherReactiveHttpClient; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.DmaaPRestTemplateFactory; -import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.PublisherReactiveHttpClientFactory; - +import org.onap.dcaegen2.services.prh.model.PnfReadyJsonBodyBuilder; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.http.ResponseEntity; -import org.springframework.stereotype.Component; -import reactor.core.publisher.Mono; +import reactor.core.publisher.Flux; + +import java.util.function.Supplier; /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18 */ -@Component public class DmaapPublisherTaskImpl implements DmaapPublisherTask { private static final Logger LOGGER = LoggerFactory.getLogger(DmaapPublisherTaskImpl.class); - private final Config config; - private final PublisherReactiveHttpClientFactory httpClientFactory; - @Autowired - public DmaapPublisherTaskImpl(Config config) { - this(config, new PublisherReactiveHttpClientFactory(new DmaaPRestTemplateFactory(),new JsonBodyBuilderImpl())); - } + private final Supplier<MessageRouterPublishRequest> publishRequestSupplier; + private final Supplier<MessageRouterPublisher> publisherSupplier; + private final PnfReadyJsonBodyBuilder pnfReadyJsonBodyBuilder = new PnfReadyJsonBodyBuilder(); - DmaapPublisherTaskImpl(Config config, PublisherReactiveHttpClientFactory httpClientFactory) { - this.config = config; - this.httpClientFactory = httpClientFactory; + + public DmaapPublisherTaskImpl(Supplier<MessageRouterPublishRequest> publishRequestSupplier, + Supplier<MessageRouterPublisher> publisherSupplier) { + this.publishRequestSupplier = publishRequestSupplier; + this.publisherSupplier = publisherSupplier; } @Override - public Mono<ResponseEntity<String>> execute(ConsumerDmaapModel consumerDmaapModel) throws DmaapNotFoundException { + public Flux<MessageRouterPublishResponse> execute(ConsumerDmaapModel consumerDmaapModel) throws DmaapNotFoundException { if (consumerDmaapModel == null) { throw new DmaapNotFoundException("Invoked null object to DMaaP task"); } - DMaaPPublisherReactiveHttpClient dmaapPublisherReactiveHttpClient = resolveClient(); LOGGER.info("Method called with arg {}", consumerDmaapModel); - return dmaapPublisherReactiveHttpClient.getDMaaPProducerResponse(consumerDmaapModel); - } - - @Override - public DMaaPPublisherReactiveHttpClient resolveClient() { - return httpClientFactory.create(config.getDmaapPublisherConfiguration()); + MessageRouterPublisher messageRouterPublisher = publisherSupplier.get(); + MessageRouterPublishRequest messageRouterPublishRequest = publishRequestSupplier.get(); + return messageRouterPublisher.put( + messageRouterPublishRequest, + Flux.just(pnfReadyJsonBodyBuilder.createJsonBody(consumerDmaapModel))); } }
\ No newline at end of file diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java index 2924225b..74c6c426 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java @@ -20,139 +20,176 @@ package org.onap.dcaegen2.services.prh.tasks; -import static org.onap.dcaegen2.services.prh.model.logging.MdcVariables.INSTANCE_UUID; -import static org.onap.dcaegen2.services.prh.model.logging.MdcVariables.RESPONSE_CODE; - -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.CountDownLatch; -import java.util.function.Predicate; -import javax.net.ssl.SSLException; import org.onap.dcaegen2.services.prh.exceptions.DmaapEmptyResponseException; import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; -import org.onap.dcaegen2.services.prh.model.logging.MdcVariables; - +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishResponse; +import org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; import org.slf4j.Marker; import org.slf4j.MarkerFactory; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.http.ResponseEntity; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.http.HttpStatus; import org.springframework.stereotype.Component; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.function.Predicate; + +import static org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables.INSTANCE_UUID; +import static org.onap.dcaegen2.services.sdk.rest.services.model.logging.MdcVariables.RESPONSE_CODE; + /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18 */ @Component public class ScheduledTasks { - private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class); + private static final Logger LOGGER = LoggerFactory.getLogger(ScheduledTasks.class); private static final Marker INVOKE = MarkerFactory.getMarker("INVOKE"); + private final DmaapConsumerTask dmaapConsumerTask; - private final DmaapPublisherTask dmaapProducerTask; + private final DmaapPublisherTask dmaapReadyProducerTask; + private final DmaapPublisherTask dmaapUpdateProducerTask; + private final AaiQueryTask aaiQueryTask; private final AaiProducerTask aaiProducerTask; + private final BbsActionsTask bbsActionsTask; private Map<String, String> mdcContextMap; /** * Constructor for tasks registration in PRHWorkflow. * - * @param dmaapConsumerTask - fist task - * @param dmaapPublisherTask - third task - * @param aaiPublisherTask - second task + * @param dmaapConsumerTask - fist task + * @param dmaapReadyPublisherTask - third task + * @param dmaapUpdatePublisherTask - fourth task + * @param aaiPublisherTask - second task */ @Autowired - public ScheduledTasks(DmaapConsumerTask dmaapConsumerTask, DmaapPublisherTask dmaapPublisherTask, - AaiProducerTask aaiPublisherTask, Map<String, String> mdcContextMap) { + public ScheduledTasks( + final DmaapConsumerTask dmaapConsumerTask, + @Qualifier("ReadyPublisherTask") final DmaapPublisherTask dmaapReadyPublisherTask, + @Qualifier("UpdatePublisherTask") final DmaapPublisherTask dmaapUpdatePublisherTask, + final AaiQueryTask aaiQueryTask, + final AaiProducerTask aaiPublisherTask, + final BbsActionsTask bbsActionsTask, + final Map<String, String> mdcContextMap) { this.dmaapConsumerTask = dmaapConsumerTask; - this.dmaapProducerTask = dmaapPublisherTask; + this.dmaapReadyProducerTask = dmaapReadyPublisherTask; + this.dmaapUpdateProducerTask = dmaapUpdatePublisherTask; + this.aaiQueryTask = aaiQueryTask; this.aaiProducerTask = aaiPublisherTask; + this.bbsActionsTask = bbsActionsTask; this.mdcContextMap = mdcContextMap; } + static class State { + public final ConsumerDmaapModel dmaapModel; + public final Boolean activationStatus; + + public State(final ConsumerDmaapModel dmaapModel, final Boolean activationStatus) { + this.dmaapModel = dmaapModel; + this.activationStatus = activationStatus; + } + } + /** * Main function for scheduling prhWorkflow. */ public void scheduleMainPrhEventTask() { MdcVariables.setMdcContextMap(mdcContextMap); try { - logger.trace("Execution of tasks was registered"); + LOGGER.trace("Execution of tasks was registered"); CountDownLatch mainCountDownLatch = new CountDownLatch(1); consumeFromDMaaPMessage() - .doOnError(DmaapEmptyResponseException.class, error -> - logger.warn("Nothing to consume from DMaaP") - ) - .flatMap(this::publishToAaiConfiguration) - .doOnError(exception -> - logger.warn("AAIProducerTask exception has been registered: ", exception)) - .onErrorResume(resumePrhPredicate(), exception -> Mono.empty()) - .flatMap(this::publishToDmaapConfiguration) - .doOnError(exception -> - logger.warn("DMaaPProducerTask exception has been registered: ", exception)) - .onErrorResume(resumePrhPredicate(), exception -> Mono.empty()) - .doOnTerminate(mainCountDownLatch::countDown) - .subscribe(this::onSuccess, this::onError, this::onComplete); + .doOnError(DmaapEmptyResponseException.class, error -> + LOGGER.warn("Nothing to consume from DMaaP") + ) + .flatMap(this::queryAaiForConfiguration) + .flatMap(this::publishToAaiConfiguration) + .flatMap(this::processAdditionalFields) + .flatMap(this::publishToDmaapConfiguration) + .onErrorResume(resumePrhPredicate(), exception -> Mono.empty()) + .doOnTerminate(mainCountDownLatch::countDown) + .subscribe(this::onSuccess, this::onError, this::onComplete); mainCountDownLatch.await(); } catch (InterruptedException e) { - logger.warn("Interruption problem on countDownLatch ", e); + LOGGER.warn("Interruption problem on countDownLatch ", e); Thread.currentThread().interrupt(); } } - private void onComplete() { - logger.info("PRH tasks have been completed"); + LOGGER.info("PRH tasks have been completed"); } - private void onSuccess(ResponseEntity<String> responseCode) { - MDC.put(RESPONSE_CODE, responseCode.getStatusCode().toString()); - logger.info("Prh consumed tasks successfully. HTTP Response code from DMaaPProducer {}", - responseCode.getStatusCode().value()); - MDC.remove(RESPONSE_CODE); + private void onSuccess(MessageRouterPublishResponse response) { + if (response.successful()) { + String statusCodeOk = HttpStatus.OK.name(); + MDC.put(RESPONSE_CODE, statusCodeOk); + LOGGER.info("Prh consumed tasks successfully. HTTP Response code from DMaaPProducer {}", statusCodeOk); + MDC.remove(RESPONSE_CODE); + } } private void onError(Throwable throwable) { if (!(throwable instanceof DmaapEmptyResponseException)) { - logger.warn("Chain of tasks have been aborted due to errors in PRH workflow", throwable); + LOGGER.warn("Chain of tasks have been aborted due to errors in PRH workflow", throwable); } } - private Flux<ConsumerDmaapModel> consumeFromDMaaPMessage() { return Flux.defer(() -> { MdcVariables.setMdcContextMap(mdcContextMap); MDC.put(INSTANCE_UUID, UUID.randomUUID().toString()); - logger.info(INVOKE, "Init configs"); - dmaapConsumerTask.initConfigs(); - return consumeFromDMaaP(); + LOGGER.info(INVOKE, "Init configs"); + return dmaapConsumerTask.execute(); }); } - private Flux<ConsumerDmaapModel> consumeFromDMaaP() { - try { - return dmaapConsumerTask.execute(""); - } catch (SSLException e) { - return Flux.error(e); - } + private Mono<State> queryAaiForConfiguration(final ConsumerDmaapModel monoDMaaPModel) { + return aaiQueryTask + .execute(monoDMaaPModel) + .map(x -> new State(monoDMaaPModel, x)); } - private Mono<ConsumerDmaapModel> publishToAaiConfiguration(ConsumerDmaapModel monoDMaaPModel) { + private Mono<State> publishToAaiConfiguration(final State state) { try { - return aaiProducerTask.execute(monoDMaaPModel); - } catch (PrhTaskException | SSLException e) { + return state.activationStatus + ? Mono.just(state) + : aaiProducerTask + .execute(state.dmaapModel) + .map(x -> state); + } catch (PrhTaskException e) { + LOGGER.warn("AAIProducerTask exception has been registered: ", e); return Mono.error(e); } } - private Mono<ResponseEntity<String>> publishToDmaapConfiguration(ConsumerDmaapModel monoAaiModel) { + private Mono<State> processAdditionalFields(final State state) { + if (state.activationStatus) { + LOGGER.debug("Re-registration - Logical links won't be updated."); + return Mono.just(state); + } + return bbsActionsTask.execute(state.dmaapModel).map(x -> state); + } + + private Flux<MessageRouterPublishResponse> publishToDmaapConfiguration(final State state) { try { - return dmaapProducerTask.execute(monoAaiModel); + if (state.activationStatus) { + LOGGER.debug("Re-registration - Using PNF_UPDATE DMaaP topic."); + return dmaapUpdateProducerTask.execute(state.dmaapModel); + } + return dmaapReadyProducerTask.execute(state.dmaapModel); } catch (PrhTaskException e) { - return Mono.error(e); + LOGGER.warn("DMaaPProducerTask exception has been registered: ", e); + return Flux.error(e); } } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasksRunner.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasksRunner.java index 956ffead..25ed262e 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasksRunner.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasksRunner.java @@ -20,26 +20,20 @@ package org.onap.dcaegen2.services.prh.tasks; -import io.swagger.annotations.ApiOperation; import java.time.Duration; -import java.time.Instant; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ScheduledFuture; -import javax.annotation.PostConstruct; -import org.onap.dcaegen2.services.prh.configuration.CloudConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.Marker; import org.slf4j.MarkerFactory; -import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.context.event.ApplicationStartedEvent; import org.springframework.context.annotation.Configuration; -import org.springframework.http.HttpStatus; -import org.springframework.http.ResponseEntity; +import org.springframework.context.event.EventListener; import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.annotation.EnableScheduling; -import reactor.core.publisher.Mono; /** @@ -48,38 +42,30 @@ import reactor.core.publisher.Mono; @Configuration @EnableScheduling public class ScheduledTasksRunner { - private static final int SCHEDULING_DELAY_FOR_PRH_TASKS = 10; - private static final int SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY = 5; private static final Logger LOGGER = LoggerFactory.getLogger(ScheduledTasksRunner.class); private static final Marker ENTRY = MarkerFactory.getMarker("ENTRY"); private static volatile List<ScheduledFuture> scheduledPrhTaskFutureList = new ArrayList<>(); private final TaskScheduler taskScheduler; private final ScheduledTasks scheduledTask; - private final CloudConfiguration cloudConfiguration; - @Autowired - public ScheduledTasksRunner(TaskScheduler taskScheduler, - ScheduledTasks scheduledTask, - CloudConfiguration cloudConfiguration) { + public ScheduledTasksRunner(TaskScheduler taskScheduler, ScheduledTasks scheduledTask) { this.taskScheduler = taskScheduler; this.scheduledTask = scheduledTask; - this.cloudConfiguration = cloudConfiguration; + } + + @EventListener + public void onApplicationStartedEvent(ApplicationStartedEvent applicationStartedEvent) { + tryToStartTask(); } /** * Function which have to stop tasks execution. - * - * @return response entity about status of cancellation operation */ - @ApiOperation(value = "Get response on stopping task execution") - public synchronized Mono<ResponseEntity<String>> getResponseFromCancellationOfTasks() { + public synchronized void cancelTasks() { scheduledPrhTaskFutureList.forEach(x -> x.cancel(false)); scheduledPrhTaskFutureList.clear(); - return Mono.defer(() -> - Mono.just(new ResponseEntity<>("PRH Service has already been stopped!", HttpStatus.CREATED)) - ); } /** @@ -87,16 +73,10 @@ public class ScheduledTasksRunner { * * @return status of operation execution: true - started, false - not started */ - - @PostConstruct - @ApiOperation(value = "Start task if possible") public synchronized boolean tryToStartTask() { LOGGER.info(ENTRY, "Start scheduling PRH workflow"); if (scheduledPrhTaskFutureList.isEmpty()) { scheduledPrhTaskFutureList.add(taskScheduler - .scheduleAtFixedRate(cloudConfiguration::runTask, Instant.now(), - Duration.ofMinutes(SCHEDULING_REQUEST_FOR_CONFIGURATION_DELAY))); - scheduledPrhTaskFutureList.add(taskScheduler .scheduleWithFixedDelay(scheduledTask::scheduleMainPrhEventTask, Duration.ofSeconds(SCHEDULING_DELAY_FOR_PRH_TASKS))); return true; |