diff options
author | andrzejszukuc <andrzej.szukuc@nokia.com> | 2019-03-28 16:48:55 +0100 |
---|---|---|
committer | andrzejszukuc <andrzej.szukuc@nokia.com> | 2019-03-29 18:14:06 +0100 |
commit | a9dfad25c62ec8b4ed1bd04f4b51a59581431a85 (patch) | |
tree | 07da8b640412e2fa822d08db39bd2a70936b4057 /prh-app-server/src/main/java | |
parent | 3a3f19a74d62567a703d2b9cf01de0cde463a557 (diff) |
PNF re-registration is supported
Change-Id: I3a70c610e075bcfbab8cee62ae229ce06cfc5e5d
Signed-off-by: andrzejszukuc <andrzej.szukuc@nokia.com>
Issue-ID: DCAEGEN2-1059
Diffstat (limited to 'prh-app-server/src/main/java')
16 files changed, 665 insertions, 256 deletions
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/AaiHttpClientConfig.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/AaiHttpClientConfig.java new file mode 100644 index 00000000..c90fd9e3 --- /dev/null +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/AaiHttpClientConfig.java @@ -0,0 +1,75 @@ +/* + * ============LICENSE_START======================================================= + * PNF-REGISTRATION-HANDLER + * ================================================================================ + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.services.prh.configuration; + +import org.onap.dcaegen2.services.prh.model.AaiJsonBodyBuilderImpl; +import org.onap.dcaegen2.services.prh.model.AaiPnfResultModel; +import org.onap.dcaegen2.services.prh.model.AaiServiceInstanceResultModel; +import org.onap.dcaegen2.services.sdk.rest.services.aai.client.config.AaiClientConfiguration; +import org.onap.dcaegen2.services.sdk.rest.services.aai.client.service.AaiHttpClientFactory; +import org.onap.dcaegen2.services.sdk.rest.services.aai.client.service.http.AaiHttpClient; +import org.onap.dcaegen2.services.sdk.rest.services.aai.client.service.http.get.AaiGetServiceInstanceClient; +import org.onap.dcaegen2.services.sdk.rest.services.aai.client.service.http.get.AaiHttpGetClient; +import org.onap.dcaegen2.services.sdk.rest.services.aai.client.service.http.patch.AaiHttpPatchClient; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.CloudHttpClient; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse; +import org.onap.dcaegen2.services.sdk.rest.services.model.AaiModel; +import org.onap.dcaegen2.services.sdk.rest.services.model.AaiServiceInstanceQueryModel; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.util.function.BiFunction; + +@Configuration +public class AaiHttpClientConfig { + @Autowired + private CloudConfiguration cloudConfig; + + @Bean + public AaiHttpClient<AaiModel, HttpResponse> getPatchClientFactory() { + return createLazyConfigClient( + (config, client) -> new AaiHttpPatchClient(config, new AaiJsonBodyBuilderImpl(), client)); + } + + @Bean + public AaiHttpClient<AaiServiceInstanceQueryModel, AaiServiceInstanceResultModel> getServiceInstanceClient() { + return createLazyConfigClient( + (config, client) -> new AaiGetServiceInstanceClient(config, client) + .map(x -> x.bodyAsJson(AaiServiceInstanceResultModel.class))); + } + + @Bean + public AaiHttpClient<AaiModel, AaiPnfResultModel> getGetClient() { + return createLazyConfigClient( + (config, client) -> new AaiHttpGetClient(config, client) + .map(x -> x.bodyAsJson(AaiPnfResultModel.class))); + } + + private <T, U> AaiHttpClient<T, U> createLazyConfigClient( + final BiFunction<AaiClientConfiguration, CloudHttpClient, AaiHttpClient<T, U>> factoryMethod) { + + return x -> factoryMethod.apply( + cloudConfig.getAaiClientConfiguration(), + new AaiHttpClientFactory(cloudConfig.getAaiClientConfiguration()).build() + ).getAaiResponse(x); + } +} diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/AppConfig.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/AppConfig.java index 99886302..2a7661a9 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/AppConfig.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/AppConfig.java @@ -96,6 +96,27 @@ public class AppConfig extends PrhAppConfig { @Value("${dmaap.dmaapProducerConfiguration.dmaapContentType:}") public String producerDmaapContentType; + @Value("${dmaap.dmaapUpdateProducerConfiguration.dmaapHostName:}") + public String updateProducerDmaapHostName; + + @Value("${dmaap.dmaapUpdateProducerConfiguration.dmaapPortNumber:}") + public Integer updateProducerDmaapPortNumber; + + @Value("${dmaap.dmaapUpdateProducerConfiguration.dmaapTopicName:}") + public String updateProducerDmaapTopicName; + + @Value("${dmaap.dmaapUpdateProducerConfiguration.dmaapProtocol:}") + public String updateProducerDmaapProtocol; + + @Value("${dmaap.dmaapUpdateProducerConfiguration.dmaapUserName:}") + public String updateProducerDmaapUserName; + + @Value("${dmaap.dmaapUpdateProducerConfiguration.dmaapUserPassword:}") + public String updateProducerDmaapUserPassword; + + @Value("${dmaap.dmaapUpdateProducerConfiguration.dmaapContentType:}") + public String updateProducerDmaapContentType; + @Value("${aai.aaiClientConfiguration.aaiHost:}") public String aaiHost; @@ -120,6 +141,9 @@ public class AppConfig extends PrhAppConfig { @Value("${aai.aaiClientConfiguration.aaiPnfPath:}") public String aaiPnfPath; + @Value("${aai.aaiClientConfiguration.aaiServiceInstancePath:}") + public String aaiServiceInstancePath; + @Value("${security.trustStorePath:}") public String trustStorePath; @@ -215,6 +239,8 @@ public class AppConfig extends PrhAppConfig { .orElse(aaiClientConfiguration.aaiBasePath())) .aaiPnfPath( Optional.ofNullable(aaiPnfPath).filter(isEmpty.negate()).orElse(aaiClientConfiguration.aaiPnfPath())) + .aaiServiceInstancePath( + Optional.ofNullable(aaiServiceInstancePath).filter(isEmpty.negate()).orElse(aaiClientConfiguration.aaiServiceInstancePath())) .aaiHeaders(aaiClientConfiguration.aaiHeaders()) .trustStorePath( Optional.ofNullable(trustStorePath).filter(isEmpty.negate()) @@ -279,6 +305,51 @@ public class AppConfig extends PrhAppConfig { .build(); } + @Override + public DmaapPublisherConfiguration getDmaapUpdatePublisherConfiguration() { + if (noFileConfiguration(dmaapUpdatePublisherConfiguration)) { + return null; + } + return new ImmutableDmaapPublisherConfiguration.Builder() + .dmaapContentType( + Optional.ofNullable(updateProducerDmaapContentType).filter(isEmpty.negate()) + .orElse(dmaapUpdatePublisherConfiguration.dmaapContentType())) + .dmaapHostName( + Optional.ofNullable(updateProducerDmaapHostName).filter(isEmpty.negate()) + .orElse(dmaapUpdatePublisherConfiguration.dmaapHostName())) + .dmaapPortNumber( + Optional.ofNullable(updateProducerDmaapPortNumber).filter(p -> !p.toString().isEmpty()) + .orElse(dmaapPublisherConfiguration.dmaapPortNumber())) + .dmaapProtocol( + Optional.ofNullable(updateProducerDmaapProtocol).filter(isEmpty.negate()) + .orElse(dmaapUpdatePublisherConfiguration.dmaapProtocol())) + .dmaapTopicName( + Optional.ofNullable(updateProducerDmaapTopicName).filter(isEmpty.negate()) + .orElse(dmaapUpdatePublisherConfiguration.dmaapTopicName())) + .dmaapUserName( + Optional.ofNullable(updateProducerDmaapUserName).filter(isEmpty.negate()) + .orElse(dmaapUpdatePublisherConfiguration.dmaapUserName())) + .dmaapUserPassword( + Optional.ofNullable(updateProducerDmaapUserPassword).filter(isEmpty.negate()) + .orElse(dmaapUpdatePublisherConfiguration.dmaapUserPassword())) + .trustStorePath( + Optional.ofNullable(trustStorePath).filter(isEmpty.negate()) + .orElse(dmaapUpdatePublisherConfiguration.trustStorePath())) + .trustStorePasswordPath( + Optional.ofNullable(trustStorePasswordPath).filter(isEmpty.negate()) + .orElse(dmaapUpdatePublisherConfiguration.trustStorePasswordPath())) + .keyStorePath( + Optional.ofNullable(keyStorePath).filter(isEmpty.negate()) + .orElse(dmaapUpdatePublisherConfiguration.keyStorePath())) + .keyStorePasswordPath( + Optional.ofNullable(keyStorePasswordPath).filter(isEmpty.negate()) + .orElse(dmaapUpdatePublisherConfiguration.keyStorePasswordPath())) + .enableDmaapCertAuth( + Optional.ofNullable(enableDmaapCertAuth).filter(p -> !p.toString().isEmpty()) + .orElse(dmaapUpdatePublisherConfiguration.enableDmaapCertAuth())) + .build(); + } + private boolean noFileConfiguration(Object object) { return Objects.isNull(object); } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CloudConfigParser.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CloudConfigParser.java index 9d7b3396..2360c075 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CloudConfigParser.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CloudConfigParser.java @@ -52,6 +52,25 @@ class CloudConfigParser { .dmaapContentType(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapContentType").getAsString()) .dmaapHostName(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapHostName").getAsString()) .dmaapUserName(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapUserName").getAsString()) + .dmaapUserPassword(jsonObject.get("dmaap.dmaapProducerConfiguration.dmaapUserPassword").getAsString()) + .trustStorePath(jsonObject.get(SECURITY_TRUST_STORE_PATH).getAsString()) + .trustStorePasswordPath(jsonObject.get(SECURITY_TRUST_STORE_PASS_PATH).getAsString()) + .keyStorePath(jsonObject.get(SECURITY_KEY_STORE_PATH).getAsString()) + .keyStorePasswordPath(jsonObject.get(SECURITY_KEY_STORE_PASS_PATH).getAsString()) + .enableDmaapCertAuth(jsonObject.get("security.enableDmaapCertAuth").getAsBoolean()) + .build(); + } + + DmaapPublisherConfiguration getDmaapUpdatePublisherConfig() { + return new ImmutableDmaapPublisherConfiguration.Builder() + .dmaapTopicName(jsonObject.get("dmaap.dmaapUpdateProducerConfiguration.dmaapTopicName").getAsString()) + .dmaapUserPassword(jsonObject.get("dmaap.dmaapUpdateProducerConfiguration.dmaapUserPassword").getAsString()) + .dmaapPortNumber(jsonObject.get("dmaap.dmaapUpdateProducerConfiguration.dmaapPortNumber").getAsInt()) + .dmaapProtocol(jsonObject.get("dmaap.dmaapUpdateProducerConfiguration.dmaapProtocol").getAsString()) + .dmaapContentType(jsonObject.get("dmaap.dmaapUpdateProducerConfiguration.dmaapContentType").getAsString()) + .dmaapHostName(jsonObject.get("dmaap.dmaapUpdateProducerConfiguration.dmaapHostName").getAsString()) + .dmaapUserName(jsonObject.get("dmaap.dmaapUpdateProducerConfiguration.dmaapUserName").getAsString()) + .dmaapUserPassword(jsonObject.get("dmaap.dmaapUpdateProducerConfiguration.dmaapUserPassword").getAsString()) .trustStorePath(jsonObject.get(SECURITY_TRUST_STORE_PATH).getAsString()) .trustStorePasswordPath(jsonObject.get(SECURITY_TRUST_STORE_PASS_PATH).getAsString()) .keyStorePath(jsonObject.get(SECURITY_KEY_STORE_PATH).getAsString()) @@ -66,6 +85,7 @@ class CloudConfigParser { .aaiPort(jsonObject.get("aai.aaiClientConfiguration.aaiHostPortNumber").getAsInt()) .aaiUserName(jsonObject.get("aai.aaiClientConfiguration.aaiUserName").getAsString()) .aaiPnfPath(jsonObject.get("aai.aaiClientConfiguration.aaiPnfPath").getAsString()) + .aaiServiceInstancePath(jsonObject.get("aai.aaiClientConfiguration.aaiServiceInstancePath").getAsString()) .aaiIgnoreSslCertificateErrors( jsonObject.get("aai.aaiClientConfiguration.aaiIgnoreSslCertificateErrors").getAsBoolean()) .aaiUserPassword(jsonObject.get("aai.aaiClientConfiguration.aaiUserPassword").getAsString()) diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CloudConfiguration.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CloudConfiguration.java index d2849500..10ece50b 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CloudConfiguration.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CloudConfiguration.java @@ -56,6 +56,7 @@ public class CloudConfiguration extends AppConfig { private AaiClientConfiguration aaiClientCloudConfiguration; private DmaapPublisherConfiguration dmaapPublisherCloudConfiguration; + private DmaapPublisherConfiguration dmaapUpdatePublisherCloudConfiguration; private DmaapConsumerConfiguration dmaapConsumerCloudConfiguration; @Value("#{systemEnvironment}") @@ -90,6 +91,7 @@ public class CloudConfiguration extends AppConfig { LOGGER.info("Received application configuration: {}", jsonObject); CloudConfigParser cloudConfigParser = new CloudConfigParser(jsonObject); dmaapPublisherCloudConfiguration = cloudConfigParser.getDmaapPublisherConfig(); + dmaapUpdatePublisherCloudConfiguration = cloudConfigParser.getDmaapUpdatePublisherConfig(); aaiClientCloudConfiguration = ImmutableAaiClientConfiguration.copyOf(cloudConfigParser.getAaiClientConfig()) .withAaiHeaders(aaiClientConfiguration.aaiHeaders()); dmaapConsumerCloudConfiguration = cloudConfigParser.getDmaapConsumerConfig(); @@ -101,6 +103,11 @@ public class CloudConfiguration extends AppConfig { } @Override + public DmaapPublisherConfiguration getDmaapUpdatePublisherConfiguration() { + return Optional.ofNullable(dmaapUpdatePublisherCloudConfiguration).orElse(super.getDmaapPublisherConfiguration()); + } + + @Override public AaiClientConfiguration getAaiClientConfiguration() { return Optional.ofNullable(aaiClientCloudConfiguration).orElse(super.getAaiClientConfiguration()); } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/Config.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/Config.java index d26fbd81..88f20205 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/Config.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/Config.java @@ -38,5 +38,7 @@ public interface Config { DmaapPublisherConfiguration getDmaapPublisherConfiguration(); + DmaapPublisherConfiguration getDmaapUpdatePublisherConfiguration(); + void initFileStreamReader(); } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/DmaapPublisherTaskConfig.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/DmaapPublisherTaskConfig.java new file mode 100644 index 00000000..7355cf48 --- /dev/null +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/DmaapPublisherTaskConfig.java @@ -0,0 +1,42 @@ +/* + * ============LICENSE_START======================================================= + * PNF-REGISTRATION-HANDLER + * ================================================================================ + * Copyright (C) 2018-2019 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.services.prh.configuration; + +import org.onap.dcaegen2.services.prh.tasks.DmaapPublisherTask; +import org.onap.dcaegen2.services.prh.tasks.DmaapPublisherTaskImpl; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class DmaapPublisherTaskConfig { + @Bean(name = "ReadyPublisherTask") + @Autowired + public DmaapPublisherTask getReadyPublisherTask(final Config config) { + return new DmaapPublisherTaskImpl(config::getDmaapPublisherConfiguration); + } + + @Bean(name = "UpdatePublisherTask") + @Autowired + public DmaapPublisherTask getUpdatePublisherTask(final Config config) { + return new DmaapPublisherTaskImpl(config::getDmaapUpdatePublisherConfiguration); + } +} diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/PrhAppConfig.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/PrhAppConfig.java index 85f7e983..108a3551 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/PrhAppConfig.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/PrhAppConfig.java @@ -37,6 +37,8 @@ import org.springframework.util.StreamUtils; import javax.annotation.PostConstruct; import javax.validation.constraints.NotNull; import java.io.*; +import java.net.URI; +import java.net.URL; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.util.ServiceLoader; @@ -55,6 +57,7 @@ public abstract class PrhAppConfig implements Config { private static final String DMAAP = "dmaap"; private static final String AAI_CONFIG = "aaiClientConfiguration"; private static final String DMAAP_PRODUCER = "dmaapProducerConfiguration"; + private static final String DMAAP_UPDATE_PRODUCER = "dmaapUpdateProducerConfiguration"; private static final String DMAAP_CONSUMER = "dmaapConsumerConfiguration"; private static final String SECURITY = "security"; @@ -64,6 +67,8 @@ public abstract class PrhAppConfig implements Config { DmaapPublisherConfiguration dmaapPublisherConfiguration; + DmaapPublisherConfiguration dmaapUpdatePublisherConfiguration; + @Value("classpath:prh_endpoints.json") private Resource prhEndpoints; @@ -96,6 +101,11 @@ public abstract class PrhAppConfig implements Config { } @Override + public DmaapPublisherConfiguration getDmaapUpdatePublisherConfiguration() { + return dmaapUpdatePublisherConfiguration; + } + + @Override public void initFileStreamReader() { GsonBuilder gsonBuilder = new GsonBuilder(); ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory); @@ -106,17 +116,23 @@ public abstract class PrhAppConfig implements Config { if (rootElement.isJsonObject()) { deserializeAaiConfiguration(gsonBuilder, rootElement); deserializeDmaapConsumerConfiguration(gsonBuilder, rootElement); - deserializeDmaapPublisherConfiguration(gsonBuilder, rootElement); + dmaapPublisherConfiguration = + deserializeDmaapPublisherConfiguration(DMAAP_PRODUCER, gsonBuilder, rootElement); + dmaapUpdatePublisherConfiguration = + deserializeDmaapPublisherConfiguration(DMAAP_UPDATE_PRODUCER, gsonBuilder, rootElement); } } catch (IOException e) { LOGGER.warn("Failed to load/parse file", e); } } - private void deserializeDmaapPublisherConfiguration(GsonBuilder gsonBuilder, JsonElement rootElement) { - dmaapPublisherConfiguration = deserializeType(gsonBuilder, concatenateJsonObjects( + private DmaapPublisherConfiguration deserializeDmaapPublisherConfiguration( + final String dmaapProducerType, + final GsonBuilder gsonBuilder, + final JsonElement rootElement) { + return deserializeType(gsonBuilder, concatenateJsonObjects( rootElement.getAsJsonObject().getAsJsonObject(CONFIG).getAsJsonObject(DMAAP) - .getAsJsonObject(DMAAP_PRODUCER), + .getAsJsonObject(dmaapProducerType), rootElement.getAsJsonObject().getAsJsonObject(CONFIG).getAsJsonObject(SECURITY)), DmaapPublisherConfiguration.class); } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTask.java index 705b085b..f49723e2 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTask.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTask.java @@ -20,20 +20,13 @@ package org.onap.dcaegen2.services.prh.tasks; -import javax.net.ssl.SSLException; import org.onap.dcaegen2.services.prh.exceptions.AaiNotFoundException; import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; -import org.onap.dcaegen2.services.sdk.rest.services.aai.client.config.AaiClientConfiguration; - - -import org.onap.dcaegen2.services.sdk.rest.services.aai.client.service.AaiHttpClientFactory; -import org.onap.dcaegen2.services.sdk.rest.services.aai.client.service.http.patch.AaiHttpPatchClient; -import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.CloudHttpClient; -import org.onap.dcaegen2.services.sdk.rest.services.ssl.SslFactory; -import org.springframework.web.reactive.function.client.WebClient; import reactor.core.publisher.Mono; +import javax.net.ssl.SSLException; + /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18 @@ -42,14 +35,6 @@ public abstract class AaiProducerTask { abstract Mono<ConsumerDmaapModel> publish(ConsumerDmaapModel message) throws AaiNotFoundException; - abstract AaiHttpPatchClient resolveClient(); - - protected abstract AaiClientConfiguration resolveConfiguration(); - protected abstract Mono<ConsumerDmaapModel> execute(ConsumerDmaapModel consumerDmaapModel) - throws PrhTaskException, SSLException; - - CloudHttpClient buildHttpClient() { - return new AaiHttpClientFactory(resolveConfiguration()).build(); - } + throws PrhTaskException, SSLException; } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTaskImpl.java index d3bee5ee..8a6fbf04 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTaskImpl.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiProducerTaskImpl.java @@ -29,7 +29,11 @@ import org.onap.dcaegen2.services.prh.model.AaiJsonBodyBuilderImpl; import org.onap.dcaegen2.services.prh.model.utils.HttpUtils; import org.onap.dcaegen2.services.sdk.rest.services.aai.client.config.AaiClientConfiguration; import org.onap.dcaegen2.services.sdk.rest.services.aai.client.service.AaiHttpClientFactory; +import org.onap.dcaegen2.services.sdk.rest.services.aai.client.service.http.AaiHttpClient; import org.onap.dcaegen2.services.sdk.rest.services.aai.client.service.http.patch.AaiHttpPatchClient; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.CloudHttpClient; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse; +import org.onap.dcaegen2.services.sdk.rest.services.model.AaiModel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -46,43 +50,30 @@ public class AaiProducerTaskImpl extends AaiProducerTask { private static final Logger LOGGER = LoggerFactory.getLogger(AaiProducerTaskImpl.class); - private final Config config; - private AaiHttpPatchClient aaiHttpPatchClient; + private final AaiHttpClient<AaiModel, HttpResponse> aaiHttpPatchClient; @Autowired - public AaiProducerTaskImpl(Config config) { - this.config = config; + public AaiProducerTaskImpl(final AaiHttpClient<AaiModel, HttpResponse> aaiHttpPatchClient) { + this.aaiHttpPatchClient = aaiHttpPatchClient; } @Override Mono<ConsumerDmaapModel> publish(ConsumerDmaapModel consumerDmaapModel) { - Mono<HttpClientResponse> resposne = aaiHttpPatchClient.getAaiResponse(consumerDmaapModel); + Mono<HttpResponse> resposne = aaiHttpPatchClient.getAaiResponse(consumerDmaapModel); return resposne.flatMap(response -> { - if (HttpUtils.isSuccessfulResponseCode(response.status().code())) { + if (HttpUtils.isSuccessfulResponseCode(response.statusCode())) { return Mono.just(consumerDmaapModel); } return Mono - .error(new AaiNotFoundException("Incorrect response code for continuation of tasks workflow" + response.status().code())); + .error(new AaiNotFoundException("Incorrect response code for continuation of tasks workflow" + response.statusCode())); }); } @Override - AaiHttpPatchClient resolveClient() { - return new AaiHttpPatchClient(resolveConfiguration(), - new AaiJsonBodyBuilderImpl(), new AaiHttpClientFactory(resolveConfiguration()).build()); - } - - @Override - protected AaiClientConfiguration resolveConfiguration() { - return config.getAaiClientConfiguration(); - } - - @Override protected Mono<ConsumerDmaapModel> execute(ConsumerDmaapModel consumerDmaapModel) throws PrhTaskException { if (consumerDmaapModel == null) { throw new DmaapNotFoundException("Invoked null object to DMaaP task"); } - aaiHttpPatchClient = resolveClient(); LOGGER.debug("Method called with arg {}", consumerDmaapModel); return publish(consumerDmaapModel); } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiQueryTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiQueryTask.java new file mode 100644 index 00000000..dec783ff --- /dev/null +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiQueryTask.java @@ -0,0 +1,28 @@ +/* + * ============LICENSE_START======================================================= + * PNF-REGISTRATION-HANDLER + * ================================================================================ + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.services.prh.tasks; + +import org.onap.dcaegen2.services.sdk.rest.services.model.AaiModel; +import reactor.core.publisher.Mono; + +public interface AaiQueryTask { + Mono<Boolean> execute(final AaiModel aaiModel); +} diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiQueryTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiQueryTaskImpl.java new file mode 100644 index 00000000..b30d199f --- /dev/null +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/AaiQueryTaskImpl.java @@ -0,0 +1,107 @@ +/* + * ============LICENSE_START======================================================= + * PNF-REGISTRATION-HANDLER + * ================================================================================ + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.services.prh.tasks; + +import org.onap.dcaegen2.services.prh.model.*; +import org.onap.dcaegen2.services.sdk.rest.services.aai.client.service.http.AaiHttpClient; +import org.onap.dcaegen2.services.sdk.rest.services.model.AaiModel; +import org.onap.dcaegen2.services.sdk.rest.services.model.AaiServiceInstanceQueryModel; +import org.onap.dcaegen2.services.sdk.rest.services.model.ImmutableAaiServiceInstanceQueryModel; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; +import reactor.core.publisher.Mono; + +import java.util.List; +import java.util.Optional; +import java.util.stream.Stream; + +@Component +public class AaiQueryTaskImpl implements AaiQueryTask { + public final static String ACTIVE_STATUS = "Active"; + public final static String RELATED_TO = "service-instance"; + public final static String CUSTOMER = "customer.global-customer-id"; + public final static String SERVICE_TYPE = "service-subscription.service-type"; + public final static String SERVICE_INSTANCE_ID = "service-instance.service-instance-id"; + + private final AaiHttpClient<AaiModel, AaiPnfResultModel> getPnfModelClient; + private final AaiHttpClient<AaiServiceInstanceQueryModel, AaiServiceInstanceResultModel> getServiceClient; + + @Autowired + public AaiQueryTaskImpl( + final AaiHttpClient<AaiModel, AaiPnfResultModel> getPnfModelClient, + final AaiHttpClient<AaiServiceInstanceQueryModel, AaiServiceInstanceResultModel> getServiceClient) { + this.getPnfModelClient = getPnfModelClient; + this.getServiceClient = getServiceClient; + } + + @Override + public Mono<Boolean> execute(AaiModel aaiModel) { + return getPnfModelClient + .getAaiResponse(aaiModel) + .flatMap(this::checkIfPnfHasRelationToService) + .flatMap(getServiceClient::getAaiResponse) + .map(this::checkIfRelatedServiceInstanceIsActive) + .defaultIfEmpty(false); + } + + private Mono<AaiServiceInstanceQueryModel> checkIfPnfHasRelationToService(final AaiPnfResultModel model) { + return Mono + .justOrEmpty(model.getRelationshipList()) + .map(this::findRelatedTo) + .flatMap(Mono::justOrEmpty) + .map(RelationshipDict::getRelationshipData) + .flatMap(x -> { + final Optional<String> customer = findValue(x, CUSTOMER); + final Optional<String> serviceType = findValue(x, SERVICE_TYPE); + final Optional<String> serviceInstanceId= findValue(x, SERVICE_INSTANCE_ID); + + return customer.isPresent() && serviceType.isPresent() && serviceInstanceId.isPresent() + ? Mono.just(ImmutableAaiServiceInstanceQueryModel + .builder() + .customerId(customer.get()) + .serviceType(serviceType.get()) + .serviceInstanceId(serviceInstanceId.get()) + .build()) + : Mono.empty(); + }); + } + + private Boolean checkIfRelatedServiceInstanceIsActive(final AaiServiceInstanceResultModel model) { + return ACTIVE_STATUS.equalsIgnoreCase(model.getOrchestrationStatus()); + } + + private Optional<RelationshipDict> findRelatedTo(final Relationship data) { + return Optional.ofNullable(data.getRelationship()) + .map(Stream::of) + .orElseGet(Stream::empty) + .flatMap(List::stream) + .filter(x -> RELATED_TO.equals(x.getRelatedTo())) + .findFirst(); + } + + private Optional<String> findValue(final List<RelationshipData> data, final String key) { + return data + .stream() + .filter(y -> key.equals(y.getRelationshipKey())) + .findFirst() + .map(RelationshipData::getRelationshipValue); + } +} diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/BbsActionsTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/BbsActionsTask.java index 7485de8a..d0b8187c 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/BbsActionsTask.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/BbsActionsTask.java @@ -20,134 +20,9 @@ package org.onap.dcaegen2.services.prh.tasks; -import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpMethod.PUT; - -import com.google.gson.JsonObject; -import io.netty.buffer.ByteBuf; -import io.vavr.collection.HashMap; -import io.vavr.collection.Map; -import java.util.Arrays; -import java.util.List; -import java.util.function.Function; -import org.onap.dcaegen2.services.prh.configuration.Config; -import org.onap.dcaegen2.services.prh.exceptions.AaiFailureException; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; -import org.onap.dcaegen2.services.prh.model.bbs.ImmutableLogicalLink; -import org.onap.dcaegen2.services.prh.model.bbs.ImmutableRelationship; -import org.onap.dcaegen2.services.prh.model.bbs.ImmutableRelationshipWrapper; -import org.onap.dcaegen2.services.prh.model.bbs.RelationshipWrapper; -import org.onap.dcaegen2.services.prh.model.utils.GsonSerializer; -import org.onap.dcaegen2.services.prh.model.utils.HttpUtils; -import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse; -import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.ImmutableHttpRequest; -import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RequestBody; -import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClient; -import org.onap.dcaegen2.services.sdk.rest.services.uri.URI.URIBuilder; -import org.onap.dcaegen2.services.sdk.security.ssl.SslFactory; -import org.reactivestreams.Publisher; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; import reactor.core.publisher.Mono; -@Component -public class BbsActionsTask { - - private static final Logger LOGGER = LoggerFactory.getLogger(BbsActionsTask.class); - private static final String ATTACHMENT_POINT = "attachmentPoint"; - private static final String LOGICAL_LINK_URI = "/network/logical-links/logical-link/"; - private static final String PNF_URI = "/network/pnfs/pnf/"; - - private final RxHttpClient httpClient; - private final Config config; - - @Autowired - BbsActionsTask(Config config) { - this(config, RxHttpClient.create(new SslFactory().createInsecureClientContext())); - } - - BbsActionsTask(Config config, RxHttpClient httpClient) { - this.config = config; - this.httpClient = httpClient; - } - - public Mono<ConsumerDmaapModel> execute(ConsumerDmaapModel consumerDmaapModel) { - config.initFileStreamReader(); - - JsonObject additionalFields = consumerDmaapModel.getAdditionalFields(); - if (additionalFields == null || !additionalFields.has(ATTACHMENT_POINT)) { - return Mono.just(consumerDmaapModel); - } - String linkName = additionalFields.get(ATTACHMENT_POINT).getAsString(); - if (linkName.isEmpty()) { - LOGGER.warn("Attachment point is empty! Ignore related actions."); - return Mono.just(consumerDmaapModel); - } - String pnfName = consumerDmaapModel.getCorrelationId(); - return createLogicalLinkInAai(linkName, pnfName).flatMap(handleResponse(consumerDmaapModel)); - } - - private Function<HttpResponse, Mono<ConsumerDmaapModel>> handleResponse(ConsumerDmaapModel consumerDmaapModel) { - return response -> HttpUtils.isSuccessfulResponseCode(response.statusCode()) - ? Mono.just(consumerDmaapModel) - : Mono.error(new AaiFailureException( - "Incorrect response when performing BBS-related actions: " + response.statusCode())); - } - - private Mono<HttpResponse> createLogicalLinkInAai(String linkName, String pnfName) { - ImmutableHttpRequest request = buildLogicalLinkRequest(linkName, pnfName); - - return httpClient.call(request); - } - - private ImmutableHttpRequest buildLogicalLinkRequest(String linkName, String pnfName) { - String uri = buildLogicalLinkUri(linkName); - ImmutableLogicalLink logicalLink = buildModel(linkName, pnfName); - Publisher<ByteBuf> jsonPayload = RequestBody.fromString(GsonSerializer.createJsonBody(logicalLink)); - - // FIXME: AAI headers for PUT are different than PATCH (taken from prh_endpoints.json) - Map<String, String> aaiHeaders = HashMap - .ofAll(config.getAaiClientConfiguration().aaiHeaders()) - .replaceValue("Content-Type", "application/json"); - - return ImmutableHttpRequest - .builder() - .method(PUT) - .url(uri) - .body(jsonPayload) - .customHeaders(aaiHeaders) - .build(); - } - - private ImmutableLogicalLink buildModel(String linkName, String pnfName) { - List<RelationshipWrapper> relationships = buildRelationLink(pnfName); - - return ImmutableLogicalLink - .builder() - .linkName(linkName) - .linkType(ATTACHMENT_POINT) - .relationshipList(relationships) - .build(); - } - - private List<RelationshipWrapper> buildRelationLink(String pnfName) { - return Arrays.asList(ImmutableRelationshipWrapper - .builder() - .relationship(ImmutableRelationship - .builder() - .relatedLink(PNF_URI + pnfName) - .build()) - .build()); - } - - private String buildLogicalLinkUri(String linkName) { - return new URIBuilder() - .scheme(config.getAaiClientConfiguration().aaiProtocol()) - .host(config.getAaiClientConfiguration().aaiHost()) - .port(config.getAaiClientConfiguration().aaiPort()) - .path(config.getAaiClientConfiguration().aaiBasePath() + LOGICAL_LINK_URI + linkName) - .build() - .toString(); - } -}
\ No newline at end of file +public interface BbsActionsTask { + Mono<ConsumerDmaapModel> execute(ConsumerDmaapModel consumerDmaapModel); +} diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/BbsActionsTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/BbsActionsTaskImpl.java new file mode 100644 index 00000000..9648efdf --- /dev/null +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/BbsActionsTaskImpl.java @@ -0,0 +1,154 @@ +/* + * ============LICENSE_START======================================================= + * PNF-REGISTRATION-HANDLER + * ================================================================================ + * Copyright (C) 2019 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.services.prh.tasks; + +import com.google.gson.JsonObject; +import io.netty.buffer.ByteBuf; +import io.vavr.collection.HashMap; +import io.vavr.collection.Map; +import org.onap.dcaegen2.services.prh.configuration.Config; +import org.onap.dcaegen2.services.prh.exceptions.AaiFailureException; +import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; +import org.onap.dcaegen2.services.prh.model.bbs.ImmutableLogicalLink; +import org.onap.dcaegen2.services.prh.model.bbs.ImmutableRelationship; +import org.onap.dcaegen2.services.prh.model.bbs.ImmutableRelationshipWrapper; +import org.onap.dcaegen2.services.prh.model.bbs.RelationshipWrapper; +import org.onap.dcaegen2.services.prh.model.utils.GsonSerializer; +import org.onap.dcaegen2.services.prh.model.utils.HttpUtils; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.ImmutableHttpRequest; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RequestBody; +import org.onap.dcaegen2.services.sdk.rest.services.adapters.http.RxHttpClient; +import org.onap.dcaegen2.services.sdk.rest.services.uri.URI.URIBuilder; +import org.onap.dcaegen2.services.sdk.security.ssl.SslFactory; +import org.reactivestreams.Publisher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; +import reactor.core.publisher.Mono; + +import java.util.Arrays; +import java.util.List; +import java.util.function.Function; + +import static org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpMethod.PUT; + +@Component +public class BbsActionsTaskImpl implements BbsActionsTask { + + private static final Logger LOGGER = LoggerFactory.getLogger(BbsActionsTaskImpl.class); + private static final String ATTACHMENT_POINT = "attachmentPoint"; + private static final String LOGICAL_LINK_URI = "/network/logical-links/logical-link/"; + private static final String PNF_URI = "/network/pnfs/pnf/"; + + private final Config config; + private final RxHttpClient httpClient; + + @Autowired + BbsActionsTaskImpl(Config config) { + this(config, RxHttpClient.create(new SslFactory().createInsecureClientContext())); + } + + BbsActionsTaskImpl(Config config, RxHttpClient httpClient) { + this.config = config; + this.httpClient = httpClient; + } + + public Mono<ConsumerDmaapModel> execute(ConsumerDmaapModel consumerDmaapModel) { + config.initFileStreamReader(); + + JsonObject additionalFields = consumerDmaapModel.getAdditionalFields(); + if (additionalFields == null || !additionalFields.has(ATTACHMENT_POINT)) { + return Mono.just(consumerDmaapModel); + } + String linkName = additionalFields.get(ATTACHMENT_POINT).getAsString(); + if (linkName.isEmpty()) { + LOGGER.warn("Attachment point is empty! Ignore related actions."); + return Mono.just(consumerDmaapModel); + } + String pnfName = consumerDmaapModel.getCorrelationId(); + return createLogicalLinkInAai(linkName, pnfName).flatMap(handleResponse(consumerDmaapModel)); + } + + private Function<HttpResponse, Mono<ConsumerDmaapModel>> handleResponse(ConsumerDmaapModel consumerDmaapModel) { + return response -> HttpUtils.isSuccessfulResponseCode(response.statusCode()) + ? Mono.just(consumerDmaapModel) + : Mono.error(new AaiFailureException( + "Incorrect response when performing BBS-related actions: " + response.statusCode())); + } + + private Mono<HttpResponse> createLogicalLinkInAai(String linkName, String pnfName) { + ImmutableHttpRequest request = buildLogicalLinkRequest(linkName, pnfName); + + return httpClient.call(request); + } + + private ImmutableHttpRequest buildLogicalLinkRequest(String linkName, String pnfName) { + String uri = buildLogicalLinkUri(linkName); + ImmutableLogicalLink logicalLink = buildModel(linkName, pnfName); + Publisher<ByteBuf> jsonPayload = RequestBody.fromString(GsonSerializer.createJsonBody(logicalLink)); + + // FIXME: AAI headers for PUT are different than PATCH (taken from prh_endpoints.json) + Map<String, String> aaiHeaders = HashMap + .ofAll(config.getAaiClientConfiguration().aaiHeaders()) + .put("Content-Type", "application/json"); + + return ImmutableHttpRequest + .builder() + .method(PUT) + .url(uri) + .body(jsonPayload) + .customHeaders(aaiHeaders) + .build(); + } + + private ImmutableLogicalLink buildModel(String linkName, String pnfName) { + List<RelationshipWrapper> relationships = buildRelationLink(pnfName); + + return ImmutableLogicalLink + .builder() + .linkName(linkName) + .linkType(ATTACHMENT_POINT) + .relationshipList(relationships) + .build(); + } + + private List<RelationshipWrapper> buildRelationLink(String pnfName) { + return Arrays.asList(ImmutableRelationshipWrapper + .builder() + .relationship(ImmutableRelationship + .builder() + .relatedLink(PNF_URI + pnfName) + .build()) + .build()); + } + + private String buildLogicalLinkUri(String linkName) { + return new URIBuilder() + .scheme(config.getAaiClientConfiguration().aaiProtocol()) + .host(config.getAaiClientConfiguration().aaiHost()) + .port(config.getAaiClientConfiguration().aaiPort()) + .path(config.getAaiClientConfiguration().aaiBasePath() + LOGICAL_LINK_URI + linkName) + .build() + .toString(); + } +}
\ No newline at end of file diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java index e2a91f78..7fc596c1 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTask.java @@ -24,14 +24,12 @@ import javax.net.ssl.SSLException; import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.DMaaPPublisherReactiveHttpClient; -import reactor.netty.http.client.HttpClientResponse; import reactor.core.publisher.Mono; -import org.apache.http.HttpResponse; /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18 */ -interface DmaapPublisherTask { +public interface DmaapPublisherTask { /** * @@ -39,9 +37,10 @@ interface DmaapPublisherTask { * As WA plesae use Mono<HttpResponse> executeWithApache(ConsumerDmaapModel consumerDmaapModel); * */ @Deprecated - Mono<HttpClientResponse> execute(ConsumerDmaapModel consumerDmaapModel) throws PrhTaskException, SSLException; + Mono<org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse> + execute(ConsumerDmaapModel consumerDmaapModel) throws PrhTaskException, SSLException; - Mono<HttpResponse> executeWithApache(ConsumerDmaapModel consumerDmaapModel); + Mono<org.apache.http.HttpResponse> executeWithApache(ConsumerDmaapModel consumerDmaapModel); DMaaPPublisherReactiveHttpClient resolveClient() throws SSLException;; } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java index 1a9abf0f..55a8bb58 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/DmaapPublisherTaskImpl.java @@ -20,52 +20,52 @@ package org.onap.dcaegen2.services.prh.tasks; -import java.util.Optional; -import javax.net.ssl.SSLException; import org.apache.http.HttpResponse; import org.apache.http.client.methods.HttpPost; import org.apache.http.entity.StringEntity; import org.apache.http.impl.client.DefaultHttpClient; -import org.onap.dcaegen2.services.prh.configuration.Config; import org.onap.dcaegen2.services.prh.exceptions.DmaapNotFoundException; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; import org.onap.dcaegen2.services.prh.model.PnfReadyJsonBodyBuilderImpl; +import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.config.DmaapPublisherConfiguration; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.DMaaPPublisherReactiveHttpClient; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.DmaaPRestTemplateFactory; import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.service.producer.PublisherReactiveHttpClientFactory; -import org.onap.dcaegen2.services.sdk.rest.services.uri.URI.URIBuilder; +import org.onap.dcaegen2.services.sdk.rest.services.uri.URI; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; import reactor.core.publisher.Mono; -import reactor.netty.http.client.HttpClientResponse; + +import javax.net.ssl.SSLException; +import java.util.Optional; +import java.util.function.Supplier; /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 4/13/18 */ -@Component public class DmaapPublisherTaskImpl implements DmaapPublisherTask { private static final Logger LOGGER = LoggerFactory.getLogger(DmaapPublisherTaskImpl.class); + private final Supplier<DmaapPublisherConfiguration> config; private final PnfReadyJsonBodyBuilderImpl pnfReadyJsonBodyBuilder = new PnfReadyJsonBodyBuilderImpl(); - private Config config; private final PublisherReactiveHttpClientFactory httpClientFactory; - @Autowired - public DmaapPublisherTaskImpl(Config config) { - this(config, - new PublisherReactiveHttpClientFactory(new DmaaPRestTemplateFactory(), new PnfReadyJsonBodyBuilderImpl())); + public DmaapPublisherTaskImpl(final Supplier<DmaapPublisherConfiguration> config) { + this(config, new PublisherReactiveHttpClientFactory( + new DmaaPRestTemplateFactory(), + new PnfReadyJsonBodyBuilderImpl())); } - DmaapPublisherTaskImpl(Config config, PublisherReactiveHttpClientFactory httpClientFactory) { + DmaapPublisherTaskImpl( + Supplier<DmaapPublisherConfiguration> config, + PublisherReactiveHttpClientFactory httpClientFactory) { this.config = config; this.httpClientFactory = httpClientFactory; } @Override - public Mono<HttpClientResponse> execute(ConsumerDmaapModel consumerDmaapModel) - throws DmaapNotFoundException, SSLException { + public Mono<org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse> + execute(ConsumerDmaapModel consumerDmaapModel) throws DmaapNotFoundException, SSLException { if (consumerDmaapModel == null) { throw new DmaapNotFoundException("Invoked null object to DMaaP task"); } @@ -74,10 +74,9 @@ public class DmaapPublisherTaskImpl implements DmaapPublisherTask { return dmaapPublisherReactiveHttpClient.getDMaaPProducerResponse(consumerDmaapModel, Optional.empty()); } - @Override public DMaaPPublisherReactiveHttpClient resolveClient() throws SSLException { - return httpClientFactory.create(config.getDmaapPublisherConfiguration()); + return httpClientFactory.create(config.get()); } @@ -87,13 +86,13 @@ public class DmaapPublisherTaskImpl implements DmaapPublisherTask { * As WA plesae use Mono<HttpResponse> executeWithApache(ConsumerDmaapModel consumerDmaapModel); * */ @Override - public Mono<HttpResponse> executeWithApache(ConsumerDmaapModel consumerDmaapModel) { + public Mono<org.apache.http.HttpResponse> executeWithApache(ConsumerDmaapModel consumerDmaapModel) { String json = pnfReadyJsonBodyBuilder.createJsonBody(consumerDmaapModel); DefaultHttpClient httpClient = new DefaultHttpClient(); HttpPost postRequest = new HttpPost(getUrl()); try { StringEntity input = new StringEntity(json); - input.setContentType(config.getDmaapPublisherConfiguration().dmaapContentType()); + input.setContentType(config.get().dmaapContentType()); postRequest.setEntity(input); HttpResponse response = httpClient.execute(postRequest); return Mono.just(response); @@ -104,15 +103,13 @@ public class DmaapPublisherTaskImpl implements DmaapPublisherTask { } private String getUrl() { - return (new URIBuilder()).scheme(config.getDmaapPublisherConfiguration().dmaapProtocol()) - .host(config.getDmaapPublisherConfiguration().dmaapHostName()) - .port(config.getDmaapPublisherConfiguration().dmaapPortNumber()).path(this.createRequestPath()).build() + return (new URI.URIBuilder()).scheme(config.get().dmaapProtocol()) + .host(config.get().dmaapHostName()) + .port(config.get().dmaapPortNumber()).path(this.createRequestPath()).build() .toString(); } private String createRequestPath() { - return "/" + config.getDmaapPublisherConfiguration().dmaapTopicName(); + return "/" + config.get().dmaapTopicName(); } - - }
\ No newline at end of file diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java index a7bf42d1..aae5bc77 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasks.java @@ -20,30 +20,27 @@ package org.onap.dcaegen2.services.prh.tasks; -import static org.onap.dcaegen2.services.prh.model.logging.MdcVariables.INSTANCE_UUID; -import static org.onap.dcaegen2.services.prh.model.logging.MdcVariables.RESPONSE_CODE; - -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.CountDownLatch; -import java.util.function.Predicate; -import javax.net.ssl.SSLException; - +import org.apache.http.HttpResponse; import org.onap.dcaegen2.services.prh.exceptions.DmaapEmptyResponseException; import org.onap.dcaegen2.services.prh.exceptions.PrhTaskException; import org.onap.dcaegen2.services.prh.model.ConsumerDmaapModel; import org.onap.dcaegen2.services.prh.model.logging.MdcVariables; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.slf4j.MDC; -import org.slf4j.Marker; -import org.slf4j.MarkerFactory; +import org.slf4j.*; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Component; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.netty.http.client.HttpClientResponse; -import org.apache.http.HttpResponse; + +import javax.net.ssl.SSLException; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.function.Predicate; + +import static org.onap.dcaegen2.services.prh.model.logging.MdcVariables.INSTANCE_UUID; +import static org.onap.dcaegen2.services.prh.model.logging.MdcVariables.RESPONSE_CODE; /** * @author <a href="mailto:przemyslaw.wasala@nokia.com">Przemysław Wąsala</a> on 3/23/18 @@ -54,7 +51,9 @@ public class ScheduledTasks { private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class); private static final Marker INVOKE = MarkerFactory.getMarker("INVOKE"); private final DmaapConsumerTask dmaapConsumerTask; - private final DmaapPublisherTask dmaapProducerTask; + private final DmaapPublisherTask dmaapReadyProducerTask; + private final DmaapPublisherTask dmaapUpdateProducerTask; + private final AaiQueryTask aaiQueryTask; private final AaiProducerTask aaiProducerTask; private final BbsActionsTask bbsActionsTask; private Map<String, String> mdcContextMap; @@ -62,24 +61,39 @@ public class ScheduledTasks { /** * Constructor for tasks registration in PRHWorkflow. * - * @param dmaapConsumerTask - fist task - * @param dmaapPublisherTask - third task - * @param aaiPublisherTask - second task + * @param dmaapConsumerTask - fist task + * @param dmaapReadyPublisherTask - third task + * @param dmaapUpdatePublisherTask - fourth task + * @param aaiPublisherTask - second task */ @Autowired public ScheduledTasks( - DmaapConsumerTask dmaapConsumerTask, - DmaapPublisherTask dmaapPublisherTask, - AaiProducerTask aaiPublisherTask, - BbsActionsTask bbsActionsTask, - Map<String, String> mdcContextMap) { + final DmaapConsumerTask dmaapConsumerTask, + @Qualifier("ReadyPublisherTask") final DmaapPublisherTask dmaapReadyPublisherTask, + @Qualifier("UpdatePublisherTask") final DmaapPublisherTask dmaapUpdatePublisherTask, + final AaiQueryTask aaiQueryTask, + final AaiProducerTask aaiPublisherTask, + final BbsActionsTask bbsActionsTask, + final Map<String, String> mdcContextMap) { this.dmaapConsumerTask = dmaapConsumerTask; - this.dmaapProducerTask = dmaapPublisherTask; + this.dmaapReadyProducerTask = dmaapReadyPublisherTask; + this.dmaapUpdateProducerTask = dmaapUpdatePublisherTask; + this.aaiQueryTask = aaiQueryTask; this.aaiProducerTask = aaiPublisherTask; this.bbsActionsTask = bbsActionsTask; this.mdcContextMap = mdcContextMap; } + static class State { + public final ConsumerDmaapModel DmaapModel; + public final Boolean ActivationStatus; + + public State(final ConsumerDmaapModel DmaapModel, final Boolean ActivationStatus) { + this.DmaapModel = DmaapModel; + this.ActivationStatus = ActivationStatus; + } + } + /** * Main function for scheduling prhWorkflow. */ @@ -89,22 +103,23 @@ public class ScheduledTasks { logger.trace("Execution of tasks was registered"); CountDownLatch mainCountDownLatch = new CountDownLatch(1); consumeFromDMaaPMessage() - .doOnError(DmaapEmptyResponseException.class, error -> - logger.warn("Nothing to consume from DMaaP") - ) - .flatMap(this::publishToAaiConfiguration) - .doOnError(exception -> - logger.warn("AAIProducerTask exception has been registered: ", exception)) - .onErrorResume(resumePrhPredicate(), exception -> Mono.empty()) - .flatMap(this::processAdditionalFields) - .doOnError(exception -> - logger.warn("BBSActionsTask exception has been registered: ", exception)) - .flatMap(this::publishToDmaapConfigurationWithApache) - .doOnError(exception -> - logger.warn("DMaaPProducerTask exception has been registered: ", exception)) - .onErrorResume(resumePrhPredicate(), exception -> Mono.empty()) - .doOnTerminate(mainCountDownLatch::countDown) - .subscribe(this::onSuccess, this::onError, this::onComplete); + .doOnError(DmaapEmptyResponseException.class, error -> + logger.warn("Nothing to consume from DMaaP") + ) + .flatMap(this::queryAaiForConfiguration) + .flatMap(this::publishToAaiConfiguration) + .doOnError(exception -> + logger.warn("AAIProducerTask exception has been registered: ", exception)) + .onErrorResume(resumePrhPredicate(), exception -> Mono.empty()) + .flatMap(this::processAdditionalFields) + .doOnError(exception -> + logger.warn("BBSActionsTask exception has been registered: ", exception)) + .flatMap(this::publishToDmaapConfigurationWithApache) + .doOnError(exception -> + logger.warn("DMaaPProducerTask exception has been registered: ", exception)) + .onErrorResume(resumePrhPredicate(), exception -> Mono.empty()) + .doOnTerminate(mainCountDownLatch::countDown) + .subscribe(this::onSuccess, this::onError, this::onComplete); mainCountDownLatch.await(); } catch (InterruptedException e) { @@ -119,13 +134,13 @@ public class ScheduledTasks { /** * Marked as deprecated due to problems with DMaaP MR, to be fixed in future - * */ + */ @Deprecated private void onSuccess(HttpClientResponse response) { String statusCode = Integer.toString(response.status().code()); MDC.put(RESPONSE_CODE, statusCode); logger.info("Prh consumed tasks successfully. HTTP Response code from DMaaPProducer {}", - statusCode); + statusCode); MDC.remove(RESPONSE_CODE); } @@ -133,12 +148,11 @@ public class ScheduledTasks { String statusCode = Integer.toString(response.getStatusLine().getStatusCode()); MDC.put(RESPONSE_CODE, statusCode); logger.info("Prh consumed tasks successfully. HTTP Response code from DMaaPProducer {}", - statusCode); + statusCode); MDC.remove(RESPONSE_CODE); } - private void onError(Throwable throwable) { if (!(throwable instanceof DmaapEmptyResponseException)) { logger.warn("Chain of tasks have been aborted due to errors in PRH workflow", throwable); @@ -163,40 +177,66 @@ public class ScheduledTasks { } } - private Mono<ConsumerDmaapModel> publishToAaiConfiguration(ConsumerDmaapModel monoDMaaPModel) { + private Mono<State> queryAaiForConfiguration(final ConsumerDmaapModel monoDMaaPModel) { + return aaiQueryTask + .execute(monoDMaaPModel) + .map(x -> new State(monoDMaaPModel, x)); + } + + private Mono<State> publishToAaiConfiguration(final State state) { try { - return aaiProducerTask.execute(monoDMaaPModel); + return state.ActivationStatus + ? Mono.just(state) + : aaiProducerTask + .execute(state.DmaapModel) + .map(x -> state); } catch (PrhTaskException | SSLException e) { return Mono.error(e); } } - private Mono<ConsumerDmaapModel> processAdditionalFields(ConsumerDmaapModel consumerDmaapModel) { - return bbsActionsTask.execute(consumerDmaapModel); + private Mono<State> processAdditionalFields(final State state) { + if (state.ActivationStatus) { + logger.debug("Re-registration - Logical links won't be updated."); + + return Mono.just(state); + } + + return bbsActionsTask.execute(state.DmaapModel).map(x -> state); } /** * Marked as deprecated due to problems with DMaaP MR, to be fixed in future - * */ + */ @Deprecated - private Mono<HttpClientResponse> publishToDmaapConfiguration(ConsumerDmaapModel monoAaiModel) { + private Mono<org.onap.dcaegen2.services.sdk.rest.services.adapters.http.HttpResponse> + publishToDmaapConfiguration(final State state) { try { - return dmaapProducerTask.execute(monoAaiModel); + if (state.ActivationStatus) { + logger.debug("Re-registration - Using PNF_UPDATE DMaaP topic."); + return dmaapUpdateProducerTask.execute(state.DmaapModel); + } + + return dmaapReadyProducerTask.execute(state.DmaapModel); } catch (PrhTaskException | SSLException e) { return Mono.error(e); } } - private Mono<HttpResponse> publishToDmaapConfigurationWithApache(ConsumerDmaapModel monoAaiModel) { + private Mono<org.apache.http.HttpResponse> + publishToDmaapConfigurationWithApache(final State state) { try { - return dmaapProducerTask.executeWithApache(monoAaiModel); + if (state.ActivationStatus) { + logger.debug("Re-registration - Using PNF_UPDATE DMaaP topic."); + return dmaapUpdateProducerTask.executeWithApache(state.DmaapModel); + } + + return dmaapReadyProducerTask.executeWithApache(state.DmaapModel); } catch (Exception e) { return Mono.error(e); } } - - private Predicate<Throwable> resumePrhPredicate() { return exception -> exception instanceof PrhTaskException; } |