diff options
author | KAPIL SINGAL <ks220y@att.com> | 2020-10-09 13:23:34 +0000 |
---|---|---|
committer | Gerrit Code Review <gerrit@onap.org> | 2020-10-09 13:23:34 +0000 |
commit | d0d1f2b91278124a9782d7b516ae7b784290f68f (patch) | |
tree | 76b74212c3cac676dace58688604968b685a7bf0 /a1-policy-management/src/main | |
parent | 5ada52af92483ebc031e72c9bb4708737a9a59f0 (diff) | |
parent | 49763b0cf15a58b804e19e1d2907cb46044fd552 (diff) |
Merge "Improvent of AsynchRestClient"
Diffstat (limited to 'a1-policy-management/src/main')
11 files changed, 225 insertions, 124 deletions
diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/A1ClientFactory.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/A1ClientFactory.java index cfb01d0b..f0f03cfe 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/A1ClientFactory.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/A1ClientFactory.java @@ -20,8 +20,6 @@ package org.onap.ccsdk.oran.a1policymanagementservice.clients; -import lombok.Getter; - import org.onap.ccsdk.oran.a1policymanagementservice.clients.A1Client.A1ProtocolType; import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfig; import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ControllerConfig; @@ -40,12 +38,14 @@ public class A1ClientFactory { private static final Logger logger = LoggerFactory.getLogger(A1ClientFactory.class); - @Getter private final ApplicationConfig appConfig; + private final AsyncRestClientFactory restClientFactory; + @Autowired public A1ClientFactory(ApplicationConfig appConfig) { this.appConfig = appConfig; + this.restClientFactory = new AsyncRestClientFactory(appConfig.getWebClientConfig()); } /** @@ -72,15 +72,14 @@ public class A1ClientFactory { A1Client createClient(Ric ric, A1ProtocolType version) throws ServiceException { if (version == A1ProtocolType.STD_V1_1) { assertNoControllerConfig(ric, version); - return new StdA1ClientVersion1(ric.getConfig(), this.appConfig.getWebClientConfig()); + return new StdA1ClientVersion1(ric.getConfig(), this.restClientFactory); } else if (version == A1ProtocolType.OSC_V1) { assertNoControllerConfig(ric, version); - return new OscA1Client(ric.getConfig(), this.appConfig.getWebClientConfig()); + return new OscA1Client(ric.getConfig(), this.restClientFactory); } else if (version == A1ProtocolType.SDNC_OSC_STD_V1_1 || version == A1ProtocolType.SDNC_OSC_OSC_V1) { - return new SdncOscA1Client(version, ric.getConfig(), getControllerConfig(ric), - this.appConfig.getWebClientConfig()); + return new SdncOscA1Client(version, ric.getConfig(), getControllerConfig(ric), this.restClientFactory); } else if (version == A1ProtocolType.SDNC_ONAP) { - return new SdncOnapA1Client(ric.getConfig(), getControllerConfig(ric), this.appConfig.getWebClientConfig()); + return new SdncOnapA1Client(ric.getConfig(), getControllerConfig(ric), this.restClientFactory); } else { logger.error("Unhandled protocol: {}", version); throw new ServiceException("Unhandled protocol"); diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/AsyncRestClient.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/AsyncRestClient.java index c1cea8b5..8409f45c 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/AsyncRestClient.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/AsyncRestClient.java @@ -22,36 +22,18 @@ package org.onap.ccsdk.oran.a1policymanagementservice.clients; import io.netty.channel.ChannelOption; import io.netty.handler.ssl.SslContext; -import io.netty.handler.ssl.SslContextBuilder; -import io.netty.handler.ssl.util.InsecureTrustManagerFactory; import io.netty.handler.timeout.ReadTimeoutHandler; import io.netty.handler.timeout.WriteTimeoutHandler; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.InputStream; import java.lang.invoke.MethodHandles; -import java.security.KeyStore; -import java.security.KeyStoreException; -import java.security.NoSuchAlgorithmException; -import java.security.cert.Certificate; -import java.security.cert.CertificateException; -import java.security.cert.X509Certificate; -import java.util.Collections; -import java.util.List; import java.util.concurrent.atomic.AtomicInteger; -import java.util.stream.Collectors; -import javax.net.ssl.KeyManagerFactory; - -import org.onap.ccsdk.oran.a1policymanagementservice.configuration.WebClientConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.http.MediaType; import org.springframework.http.ResponseEntity; import org.springframework.http.client.reactive.ReactorClientHttpConnector; import org.springframework.lang.Nullable; -import org.springframework.util.ResourceUtils; import org.springframework.web.reactive.function.client.ExchangeStrategies; import org.springframework.web.reactive.function.client.WebClient; import org.springframework.web.reactive.function.client.WebClient.RequestHeadersSpec; @@ -66,22 +48,25 @@ import reactor.netty.tcp.TcpClient; * Generic reactive REST client. */ public class AsyncRestClient { + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private WebClient webClient = null; private final String baseUrl; private static final AtomicInteger sequenceNumber = new AtomicInteger(); - private final WebClientConfig clientConfig; - static KeyStore clientTrustStore = null; - private boolean sslEnabled = true; + private final SslContext sslContext; + /** + * Note that only http (not https) will work when this constructor is used. + * + * @param baseUrl + */ public AsyncRestClient(String baseUrl) { this(baseUrl, null); - this.sslEnabled = false; } - public AsyncRestClient(String baseUrl, WebClientConfig config) { + public AsyncRestClient(String baseUrl, SslContext sslContext) { this.baseUrl = baseUrl; - this.clientConfig = config; + this.sslContext = sslContext; } public Mono<ResponseEntity<String>> postForEntity(String uri, @Nullable String body) { @@ -215,65 +200,6 @@ public class AsyncRestClient { } } - private boolean isCertificateEntry(KeyStore trustStore, String alias) { - try { - return trustStore.isCertificateEntry(alias); - } catch (KeyStoreException e) { - logger.error("Error reading truststore {}", e.getMessage()); - return false; - } - } - - private Certificate getCertificate(KeyStore trustStore, String alias) { - try { - return trustStore.getCertificate(alias); - } catch (KeyStoreException e) { - logger.error("Error reading truststore {}", e.getMessage()); - return null; - } - } - - private static synchronized KeyStore getTrustStore(String trustStorePath, String trustStorePass) - throws NoSuchAlgorithmException, CertificateException, IOException, KeyStoreException { - if (clientTrustStore == null) { - KeyStore store = KeyStore.getInstance(KeyStore.getDefaultType()); - store.load(new FileInputStream(ResourceUtils.getFile(trustStorePath)), trustStorePass.toCharArray()); - clientTrustStore = store; - } - return clientTrustStore; - } - - private SslContext createSslContextRejectingUntrustedPeers(String trustStorePath, String trustStorePass, - KeyManagerFactory keyManager) - throws NoSuchAlgorithmException, CertificateException, IOException, KeyStoreException { - - final KeyStore trustStore = getTrustStore(trustStorePath, trustStorePass); - List<Certificate> certificateList = Collections.list(trustStore.aliases()).stream() // - .filter(alias -> isCertificateEntry(trustStore, alias)) // - .map(alias -> getCertificate(trustStore, alias)) // - .collect(Collectors.toList()); - final X509Certificate[] certificates = certificateList.toArray(new X509Certificate[certificateList.size()]); - - return SslContextBuilder.forClient() // - .keyManager(keyManager) // - .trustManager(certificates) // - .build(); - } - - private SslContext createSslContext(KeyManagerFactory keyManager) - throws NoSuchAlgorithmException, CertificateException, KeyStoreException, IOException { - if (this.clientConfig.isTrustStoreUsed()) { - return createSslContextRejectingUntrustedPeers(this.clientConfig.trustStore(), - this.clientConfig.trustStorePassword(), keyManager); - } else { - // Trust anyone - return SslContextBuilder.forClient() // - .keyManager(keyManager) // - .trustManager(InsecureTrustManagerFactory.INSTANCE) // - .build(); - } - } - private TcpClient createTcpClientSecure(SslContext sslContext) { return TcpClient.create(ConnectionProvider.newConnection()) // .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10_000) // @@ -309,18 +235,7 @@ public class AsyncRestClient { private Mono<WebClient> getWebClient() { if (this.webClient == null) { try { - if (this.sslEnabled) { - final KeyManagerFactory keyManager = - KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()); - final KeyStore keyStore = KeyStore.getInstance(this.clientConfig.keyStoreType()); - final String keyStoreFile = this.clientConfig.keyStore(); - final String keyStorePassword = this.clientConfig.keyStorePassword(); - final String keyPassword = this.clientConfig.keyPassword(); - try (final InputStream inputStream = new FileInputStream(keyStoreFile)) { - keyStore.load(inputStream, keyStorePassword.toCharArray()); - } - keyManager.init(keyStore, keyPassword.toCharArray()); - SslContext sslContext = createSslContext(keyManager); + if (this.sslContext != null) { TcpClient tcpClient = createTcpClientSecure(sslContext); this.webClient = createWebClient(this.baseUrl, tcpClient); } else { diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/AsyncRestClientFactory.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/AsyncRestClientFactory.java new file mode 100644 index 00000000..5b88fce4 --- /dev/null +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/AsyncRestClientFactory.java @@ -0,0 +1,179 @@ +/*- + * ========================LICENSE_START================================= + * ONAP : ccsdk oran + * ====================================================================== + * Copyright (C) 2019-2020 Nordix Foundation. All rights reserved. + * ====================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package org.onap.ccsdk.oran.a1policymanagementservice.clients; + +import io.netty.handler.ssl.SslContext; +import io.netty.handler.ssl.SslContextBuilder; +import io.netty.handler.ssl.util.InsecureTrustManagerFactory; + +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.lang.invoke.MethodHandles; +import java.security.KeyStore; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; +import java.security.UnrecoverableKeyException; +import java.security.cert.Certificate; +import java.security.cert.CertificateException; +import java.security.cert.X509Certificate; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +import javax.net.ssl.KeyManagerFactory; + +import org.onap.ccsdk.oran.a1policymanagementservice.configuration.WebClientConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.util.ResourceUtils; + +/** + * Factory for a generic reactive REST client. + */ +public class AsyncRestClientFactory { + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + private final SslContextFactory sslContextFactory; + + public AsyncRestClientFactory(WebClientConfig clientConfig) { + if (clientConfig != null) { + this.sslContextFactory = new CachingSslContextFactory(clientConfig); + } else { + this.sslContextFactory = null; + } + } + + public AsyncRestClient createRestClient(String baseUrl) { + if (this.sslContextFactory != null) { + try { + return new AsyncRestClient(baseUrl, this.sslContextFactory.createSslContext()); + } catch (Exception e) { + String exceptionString = e.toString(); + logger.error("Could not init SSL context, reason: {}", exceptionString); + } + } + return new AsyncRestClient(baseUrl); + } + + private class SslContextFactory { + private final WebClientConfig clientConfig; + + public SslContextFactory(WebClientConfig clientConfig) { + this.clientConfig = clientConfig; + } + + public SslContext createSslContext() throws UnrecoverableKeyException, NoSuchAlgorithmException, + CertificateException, KeyStoreException, IOException { + return this.createSslContext(createKeyManager()); + } + + private SslContext createSslContext(KeyManagerFactory keyManager) + throws NoSuchAlgorithmException, CertificateException, KeyStoreException, IOException { + if (this.clientConfig.isTrustStoreUsed()) { + return createSslContextRejectingUntrustedPeers(this.clientConfig.trustStore(), + this.clientConfig.trustStorePassword(), keyManager); + } else { + // Trust anyone + return SslContextBuilder.forClient() // + .keyManager(keyManager) // + .trustManager(InsecureTrustManagerFactory.INSTANCE) // + .build(); + } + } + + private SslContext createSslContextRejectingUntrustedPeers(String trustStorePath, String trustStorePass, + KeyManagerFactory keyManager) + throws NoSuchAlgorithmException, CertificateException, IOException, KeyStoreException { + + final KeyStore trustStore = getTrustStore(trustStorePath, trustStorePass); + List<Certificate> certificateList = Collections.list(trustStore.aliases()).stream() // + .filter(alias -> isCertificateEntry(trustStore, alias)) // + .map(alias -> getCertificate(trustStore, alias)) // + .collect(Collectors.toList()); + final X509Certificate[] certificates = certificateList.toArray(new X509Certificate[certificateList.size()]); + + return SslContextBuilder.forClient() // + .keyManager(keyManager) // + .trustManager(certificates) // + .build(); + } + + private boolean isCertificateEntry(KeyStore trustStore, String alias) { + try { + return trustStore.isCertificateEntry(alias); + } catch (KeyStoreException e) { + logger.error("Error reading truststore {}", e.getMessage()); + return false; + } + } + + private Certificate getCertificate(KeyStore trustStore, String alias) { + try { + return trustStore.getCertificate(alias); + } catch (KeyStoreException e) { + logger.error("Error reading truststore {}", e.getMessage()); + return null; + } + } + + private KeyManagerFactory createKeyManager() throws NoSuchAlgorithmException, CertificateException, IOException, + UnrecoverableKeyException, KeyStoreException { + final KeyManagerFactory keyManager = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()); + final KeyStore keyStore = KeyStore.getInstance(this.clientConfig.keyStoreType()); + final String keyStoreFile = this.clientConfig.keyStore(); + final String keyStorePassword = this.clientConfig.keyStorePassword(); + final String keyPassword = this.clientConfig.keyPassword(); + try (final InputStream inputStream = new FileInputStream(keyStoreFile)) { + keyStore.load(inputStream, keyStorePassword.toCharArray()); + } + keyManager.init(keyStore, keyPassword.toCharArray()); + return keyManager; + } + + private synchronized KeyStore getTrustStore(String trustStorePath, String trustStorePass) + throws NoSuchAlgorithmException, CertificateException, IOException, KeyStoreException { + + KeyStore store = KeyStore.getInstance(KeyStore.getDefaultType()); + store.load(new FileInputStream(ResourceUtils.getFile(trustStorePath)), trustStorePass.toCharArray()); + return store; + } + } + + public class CachingSslContextFactory extends SslContextFactory { + private SslContext cachedContext = null; + + public CachingSslContextFactory(WebClientConfig clientConfig) { + super(clientConfig); + } + + @Override + public SslContext createSslContext() throws UnrecoverableKeyException, NoSuchAlgorithmException, + CertificateException, KeyStoreException, IOException { + if (this.cachedContext == null) { + this.cachedContext = super.createSslContext(); + } + return this.cachedContext; + + } + } + +} diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/OscA1Client.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/OscA1Client.java index fed218e8..741ab1b9 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/OscA1Client.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/OscA1Client.java @@ -25,7 +25,6 @@ import java.util.List; import org.json.JSONObject; import org.onap.ccsdk.oran.a1policymanagementservice.configuration.RicConfig; -import org.onap.ccsdk.oran.a1policymanagementservice.configuration.WebClientConfig; import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -117,8 +116,8 @@ public class OscA1Client implements A1Client { private final AsyncRestClient restClient; private final UriBuilder uri; - public OscA1Client(RicConfig ricConfig, WebClientConfig clientConfig) { - this(ricConfig, new AsyncRestClient("", clientConfig)); + public OscA1Client(RicConfig ricConfig, AsyncRestClientFactory restClientFactory) { + this(ricConfig, restClientFactory.createRestClient("")); } public OscA1Client(RicConfig ricConfig, AsyncRestClient restClient) { diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/SdncOnapA1Client.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/SdncOnapA1Client.java index 2c564c3f..68b0ab18 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/SdncOnapA1Client.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/SdncOnapA1Client.java @@ -29,7 +29,6 @@ import org.immutables.gson.Gson; import org.immutables.value.Value; import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ControllerConfig; import org.onap.ccsdk.oran.a1policymanagementservice.configuration.RicConfig; -import org.onap.ccsdk.oran.a1policymanagementservice.configuration.WebClientConfig; import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -64,9 +63,10 @@ public class SdncOnapA1Client implements A1Client { private final RicConfig ricConfig; private final AsyncRestClient restClient; - public SdncOnapA1Client(RicConfig ricConfig, ControllerConfig controllerConfig, WebClientConfig clientConfig) { + public SdncOnapA1Client(RicConfig ricConfig, ControllerConfig controllerConfig, + AsyncRestClientFactory restClientFactory) { this(ricConfig, controllerConfig, - new AsyncRestClient(controllerConfig.baseUrl() + "/restconf/operations", clientConfig)); + restClientFactory.createRestClient(controllerConfig.baseUrl() + "/restconf/operations")); logger.debug("SdncOnapA1Client for ric: {}, a1ControllerBaseUrl: {}", ricConfig.ricId(), controllerConfig.baseUrl()); } diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/SdncOscA1Client.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/SdncOscA1Client.java index 3647f225..3ba2ba7b 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/SdncOscA1Client.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/SdncOscA1Client.java @@ -33,7 +33,6 @@ import org.immutables.value.Value; import org.json.JSONObject; import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ControllerConfig; import org.onap.ccsdk.oran.a1policymanagementservice.configuration.RicConfig; -import org.onap.ccsdk.oran.a1policymanagementservice.configuration.WebClientConfig; import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -90,9 +89,9 @@ public class SdncOscA1Client implements A1Client { * @throws IllegalArgumentException when the protocolType is wrong. */ public SdncOscA1Client(A1ProtocolType protocolType, RicConfig ricConfig, ControllerConfig controllerConfig, - WebClientConfig clientConfig) { + AsyncRestClientFactory restClientFactory) { this(protocolType, ricConfig, controllerConfig, - new AsyncRestClient(controllerConfig.baseUrl() + "/restconf/operations", clientConfig)); + restClientFactory.createRestClient(controllerConfig.baseUrl() + "/restconf/operations")); logger.debug("SdncOscA1Client for ric: {}, a1Controller: {}", ricConfig.ricId(), controllerConfig); } diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/StdA1ClientVersion1.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/StdA1ClientVersion1.java index 0da9a267..d094e7a1 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/StdA1ClientVersion1.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/clients/StdA1ClientVersion1.java @@ -24,7 +24,6 @@ import java.util.Arrays; import java.util.List; import org.onap.ccsdk.oran.a1policymanagementservice.configuration.RicConfig; -import org.onap.ccsdk.oran.a1policymanagementservice.configuration.WebClientConfig; import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policy; import reactor.core.publisher.Flux; @@ -85,8 +84,8 @@ public class StdA1ClientVersion1 implements A1Client { private final AsyncRestClient restClient; private final UriBuilder uri; - public StdA1ClientVersion1(RicConfig ricConfig, WebClientConfig webClientConfig) { - this(new AsyncRestClient("", webClientConfig), ricConfig); + public StdA1ClientVersion1(RicConfig ricConfig, AsyncRestClientFactory restClientFactory) { + this(restClientFactory.createRestClient(""), ricConfig); } public StdA1ClientVersion1(AsyncRestClient restClient, RicConfig ricConfig) { diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageConsumer.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageConsumer.java index bafa8453..3a365178 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageConsumer.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/dmaap/DmaapMessageConsumer.java @@ -35,6 +35,7 @@ import java.util.Optional; import java.util.ServiceLoader; import org.onap.ccsdk.oran.a1policymanagementservice.clients.AsyncRestClient; +import org.onap.ccsdk.oran.a1policymanagementservice.clients.AsyncRestClientFactory; import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfig; import org.onap.ccsdk.oran.a1policymanagementservice.exceptions.ServiceException; import org.slf4j.Logger; @@ -71,6 +72,8 @@ public class DmaapMessageConsumer { private final Gson gson; + private final AsyncRestClientFactory restClientFactory; + @Value("${server.http-port}") private int localServerHttpPort; @@ -80,6 +83,7 @@ public class DmaapMessageConsumer { GsonBuilder gsonBuilder = new GsonBuilder(); ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory); gson = gsonBuilder.create(); + this.restClientFactory = new AsyncRestClientFactory(applicationConfig.getWebClientConfig()); } /** @@ -187,9 +191,9 @@ public class DmaapMessageConsumer { protected DmaapMessageHandler getDmaapMessageHandler() { if (this.dmaapMessageHandler == null) { String pmsBaseUrl = "http://localhost:" + this.localServerHttpPort; - AsyncRestClient pmsClient = new AsyncRestClient(pmsBaseUrl, this.applicationConfig.getWebClientConfig()); - AsyncRestClient producer = new AsyncRestClient(this.applicationConfig.getDmaapProducerTopicUrl(), - this.applicationConfig.getWebClientConfig()); + AsyncRestClient pmsClient = restClientFactory.createRestClient(pmsBaseUrl); + AsyncRestClient producer = + restClientFactory.createRestClient(this.applicationConfig.getDmaapProducerTopicUrl()); this.dmaapMessageHandler = new DmaapMessageHandler(producer, pmsClient); } return this.dmaapMessageHandler; @@ -204,7 +208,7 @@ public class DmaapMessageConsumer { } protected AsyncRestClient getMessageRouterConsumer() { - return new AsyncRestClient("", this.applicationConfig.getWebClientConfig()); + return restClientFactory.createRestClient(""); } } diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java index b63a8acd..84cfe24a 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RefreshConfigTask.java @@ -234,7 +234,7 @@ public class RefreshConfigTask { void runRicSynchronization(Ric ric) { RicSynchronizationTask synchronizationTask = - new RicSynchronizationTask(a1ClientFactory, policyTypes, policies, services); + new RicSynchronizationTask(a1ClientFactory, policyTypes, policies, services, appConfig); synchronizationTask.run(ric); } diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java index 46059fd9..5ff6d191 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSupervision.java @@ -24,6 +24,7 @@ import java.util.Collection; import org.onap.ccsdk.oran.a1policymanagementservice.clients.A1Client; import org.onap.ccsdk.oran.a1policymanagementservice.clients.A1ClientFactory; +import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfig; import org.onap.ccsdk.oran.a1policymanagementservice.exceptions.ServiceException; import org.onap.ccsdk.oran.a1policymanagementservice.repository.Lock.LockType; import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policies; @@ -58,6 +59,7 @@ public class RicSupervision { private final PolicyTypes policyTypes; private final A1ClientFactory a1ClientFactory; private final Services services; + private final ApplicationConfig config; private static class SynchStartedException extends ServiceException { private static final long serialVersionUID = 1L; @@ -83,12 +85,13 @@ public class RicSupervision { @Autowired public RicSupervision(Rics rics, Policies policies, A1ClientFactory a1ClientFactory, PolicyTypes policyTypes, - Services services) { + Services services, ApplicationConfig config) { this.rics = rics; this.policies = policies; this.a1ClientFactory = a1ClientFactory; this.policyTypes = policyTypes; this.services = services; + this.config = config; } /** @@ -207,6 +210,6 @@ public class RicSupervision { } RicSynchronizationTask createSynchronizationTask() { - return new RicSynchronizationTask(a1ClientFactory, policyTypes, policies, services); + return new RicSynchronizationTask(a1ClientFactory, policyTypes, policies, services, config); } } diff --git a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java index ccc57852..df0771cb 100644 --- a/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java +++ b/a1-policy-management/src/main/java/org/onap/ccsdk/oran/a1policymanagementservice/tasks/RicSynchronizationTask.java @@ -25,6 +25,8 @@ import static org.onap.ccsdk.oran.a1policymanagementservice.repository.Ric.RicSt import org.onap.ccsdk.oran.a1policymanagementservice.clients.A1Client; import org.onap.ccsdk.oran.a1policymanagementservice.clients.A1ClientFactory; import org.onap.ccsdk.oran.a1policymanagementservice.clients.AsyncRestClient; +import org.onap.ccsdk.oran.a1policymanagementservice.clients.AsyncRestClientFactory; +import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfig; import org.onap.ccsdk.oran.a1policymanagementservice.repository.ImmutablePolicyType; import org.onap.ccsdk.oran.a1policymanagementservice.repository.Lock.LockType; import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policies; @@ -64,13 +66,15 @@ public class RicSynchronizationTask { private final PolicyTypes policyTypes; private final Policies policies; private final Services services; + private final AsyncRestClientFactory restClientFactory; public RicSynchronizationTask(A1ClientFactory a1ClientFactory, PolicyTypes policyTypes, Policies policies, - Services services) { + Services services, ApplicationConfig config) { this.a1ClientFactory = a1ClientFactory; this.policyTypes = policyTypes; this.policies = policies; this.services = services; + this.restClientFactory = new AsyncRestClientFactory(config.getWebClientConfig()); } public void run(Ric ric) { @@ -161,7 +165,7 @@ public class RicSynchronizationTask { } AsyncRestClient createNotificationClient(final String url) { - return new AsyncRestClient(url, this.a1ClientFactory.getAppConfig().getWebClientConfig()); + return restClientFactory.createRestClient(url); } private Flux<PolicyType> synchronizePolicyTypes(Ric ric, A1Client a1Client) { |