From 552af85d4daa66dff8d4b920db61fe6010eee388 Mon Sep 17 00:00:00 2001 From: Ram Krishna Verma Date: Fri, 19 Jun 2020 17:49:21 -0400 Subject: Parallel execution of Client Health check Issue-ID: POLICY-2390 Signed-off-by: puthuparambil.aditya Change-Id: I42f443c64bcf6652adb9795ee8e71e37d8fa8c71 --- .../rest/PolicyComponentsHealthCheckProvider.java | 89 ++++++++++++++-------- .../policy/pap/main/startstop/PapActivator.java | 8 ++ .../TestPolicyComponentsHealthCheckProvider.java | 32 ++++---- 3 files changed, 85 insertions(+), 44 deletions(-) diff --git a/main/src/main/java/org/onap/policy/pap/main/rest/PolicyComponentsHealthCheckProvider.java b/main/src/main/java/org/onap/policy/pap/main/rest/PolicyComponentsHealthCheckProvider.java index 2da354da..09e148b4 100644 --- a/main/src/main/java/org/onap/policy/pap/main/rest/PolicyComponentsHealthCheckProvider.java +++ b/main/src/main/java/org/onap/policy/pap/main/rest/PolicyComponentsHealthCheckProvider.java @@ -21,12 +21,20 @@ package org.onap.policy.pap.main.rest; import java.net.HttpURLConnection; +import java.util.AbstractMap; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.regex.Matcher; import java.util.regex.Pattern; +import java.util.stream.Collectors; import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; import org.apache.commons.lang3.tuple.Pair; @@ -34,12 +42,12 @@ import org.onap.policy.common.endpoints.event.comm.bus.internal.BusTopicParams; import org.onap.policy.common.endpoints.http.client.HttpClient; import org.onap.policy.common.endpoints.http.client.HttpClientConfigException; import org.onap.policy.common.endpoints.http.client.HttpClientFactory; -import org.onap.policy.common.endpoints.http.client.HttpClientFactoryInstance; import org.onap.policy.common.endpoints.parameters.RestServerParameters; import org.onap.policy.common.endpoints.report.HealthCheckReport; import org.onap.policy.common.parameters.ParameterService; import org.onap.policy.common.utils.services.Registry; import org.onap.policy.models.base.PfModelException; +import org.onap.policy.models.base.PfModelRuntimeException; import org.onap.policy.models.pdp.concepts.Pdp; import org.onap.policy.models.pdp.concepts.PdpGroup; import org.onap.policy.models.pdp.concepts.PdpSubGroup; @@ -63,31 +71,25 @@ public class PolicyComponentsHealthCheckProvider { private static final String HEALTH_STATUS = "healthy"; private static final Pattern IP_REPLACEMENT_PATTERN = Pattern.compile("//(\\S+):"); private static final String POLICY_PAP_HEALTHCHECK_URI = "/policy/pap/v1/healthcheck"; - private PapParameterGroup papParameterGroup = ParameterService.get(PAP_GROUP_PARAMS_NAME); - private List clients = new ArrayList<>(); + private static List clients = new ArrayList<>(); + private static ExecutorService clientHealthCheckExecutorService; - /** - * Constructs the object. - * - * @throws HttpClientConfigException if creating http client failed - */ - public PolicyComponentsHealthCheckProvider() throws HttpClientConfigException { - this(HttpClientFactoryInstance.getClientFactory()); - } + private PapParameterGroup papParameterGroup = ParameterService.get(PAP_GROUP_PARAMS_NAME); /** - * Constructs the object with provided http client factory. - * - *

This constructor is for unit test to use a mock {@link HttpClientFactory}. - * - * @param clientFactory factory used to construct http client - * @throws HttpClientConfigException if creating http client failed + * This method is used to initialize clients and executor. + * @param papParameterGroup + * @{link PapParameterGroup} contains the Pap Parameters set during startup + * @param clientFactory + * @{link HttpClientFactory} contains the client details */ - PolicyComponentsHealthCheckProvider(HttpClientFactory clientFactory) throws HttpClientConfigException { + public static void initializeClientHealthCheckExecutorService(PapParameterGroup papParameterGroup, + HttpClientFactory clientFactory) throws HttpClientConfigException { for (BusTopicParams params : papParameterGroup.getHealthCheckRestClientParameters()) { params.setManaged(false); clients.add(clientFactory.build(params)); } + clientHealthCheckExecutorService = Executors.newFixedThreadPool(clients.isEmpty() ? 1 : clients.size()); } /** @@ -100,19 +102,37 @@ public class PolicyComponentsHealthCheckProvider { Map result = new HashMap<>(); // Check remote components + List>> tasks = new ArrayList<>(); + for (HttpClient client : clients) { - HealthCheckReport report = fetchPolicyComponentHealthStatus(client); - if (!report.isHealthy()) { - isHealthy = false; - } - result.put(client.getName(), report); + tasks.add(() -> new AbstractMap.SimpleEntry<>(client.getName(), fetchPolicyComponentHealthStatus(client))); + } + + try { + List>> futures = clientHealthCheckExecutorService.invokeAll(tasks); + result = futures.stream().map(entryFuture -> { + try { + return entryFuture.get(); + } catch (ExecutionException e) { + throw new PfModelRuntimeException(Status.BAD_REQUEST, "Client Health check Failed ", e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PfModelRuntimeException(Status.BAD_REQUEST, "Client Health check interrupted ", e); + } + }).collect(Collectors.toMap(Entry::getKey, Entry::getValue)); + //true when all the clients health status is true + isHealthy = result.values().stream().allMatch(o -> ((HealthCheckReport) o).isHealthy()); + } catch (InterruptedException exp) { + Thread.currentThread().interrupt(); + throw new PfModelRuntimeException(Status.BAD_REQUEST, "Client Health check interrupted ", exp); } // Check PAP itself HealthCheckReport papReport = new HealthCheckProvider().performHealthCheck(); RestServerParameters restServerParameters = papParameterGroup.getRestServerParameters(); - papReport.setUrl((restServerParameters.isHttps() ? "https://" : "http://") + papReport.getUrl() + ":" - + restServerParameters.getPort() + POLICY_PAP_HEALTHCHECK_URI); + papReport.setUrl( + (restServerParameters.isHttps() ? "https://" : "http://") + papReport.getUrl() + ":" + restServerParameters + .getPort() + POLICY_PAP_HEALTHCHECK_URI); if (!papReport.isHealthy()) { isHealthy = false; } @@ -133,13 +153,13 @@ public class PolicyComponentsHealthCheckProvider { result.put(HEALTH_STATUS, isHealthy); LOGGER.debug("Policy Components HealthCheck Response - {}", result); - return Pair.of(Response.Status.OK, result); + return Pair.of(Status.OK, result); } private Map> fetchPdpsHealthStatus() throws PfModelException { Map> pdpListWithType = new HashMap<>(); - final PolicyModelsProviderFactoryWrapper modelProviderWrapper = - Registry.get(PapConstants.REG_PAP_DAO_FACTORY, PolicyModelsProviderFactoryWrapper.class); + final PolicyModelsProviderFactoryWrapper modelProviderWrapper = Registry + .get(PapConstants.REG_PAP_DAO_FACTORY, PolicyModelsProviderFactoryWrapper.class); try (PolicyModelsProvider databaseProvider = modelProviderWrapper.create()) { final List groups = databaseProvider.getPdpGroups(null); for (final PdpGroup group : groups) { @@ -156,8 +176,7 @@ public class PolicyComponentsHealthCheckProvider { HealthCheckReport clientReport; try { Response resp = httpClient.get(); - clientReport = replaceIpWithHostname( - resp.readEntity(HealthCheckReport.class), httpClient.getBaseUrl()); + clientReport = replaceIpWithHostname(resp.readEntity(HealthCheckReport.class), httpClient.getBaseUrl()); // A health report is read successfully when HTTP status is not OK, it is also not healthy // even in the report it says healthy. @@ -191,4 +210,12 @@ public class PolicyComponentsHealthCheckProvider { } return report; } -} + + /** + * This method clears clients {@link List} and clientHealthCheckExecutorService {@link ExecutorService}. + */ + public static void cleanup() { + clients.clear(); + clientHealthCheckExecutorService.shutdown(); + } +} \ No newline at end of file diff --git a/main/src/main/java/org/onap/policy/pap/main/startstop/PapActivator.java b/main/src/main/java/org/onap/policy/pap/main/startstop/PapActivator.java index fd13c232..4272e491 100644 --- a/main/src/main/java/org/onap/policy/pap/main/startstop/PapActivator.java +++ b/main/src/main/java/org/onap/policy/pap/main/startstop/PapActivator.java @@ -25,6 +25,7 @@ import java.util.Arrays; import java.util.concurrent.atomic.AtomicReference; import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager; import org.onap.policy.common.endpoints.event.comm.TopicSource; +import org.onap.policy.common.endpoints.http.client.HttpClientFactoryInstance; import org.onap.policy.common.endpoints.http.server.RestServer; import org.onap.policy.common.endpoints.listeners.MessageTypeDispatcher; import org.onap.policy.common.endpoints.listeners.RequestIdDispatcher; @@ -57,6 +58,7 @@ import org.onap.policy.pap.main.rest.PdpGroupHealthCheckControllerV1; import org.onap.policy.pap.main.rest.PdpGroupQueryControllerV1; import org.onap.policy.pap.main.rest.PdpGroupStateChangeControllerV1; import org.onap.policy.pap.main.rest.PolicyComponentsHealthCheckControllerV1; +import org.onap.policy.pap.main.rest.PolicyComponentsHealthCheckProvider; import org.onap.policy.pap.main.rest.PolicyStatusControllerV1; import org.onap.policy.pap.main.rest.PolicyUndeployerImpl; import org.onap.policy.pap.main.rest.StatisticsRestControllerV1; @@ -236,6 +238,12 @@ public class PapActivator extends ServiceManagerContainer { .build()), () -> Registry.unregister(PapConstants.REG_PDP_TRACKER)); + addAction("PAP client executor", + () -> + PolicyComponentsHealthCheckProvider.initializeClientHealthCheckExecutorService(papParameterGroup, + HttpClientFactoryInstance.getClientFactory()), + PolicyComponentsHealthCheckProvider::cleanup); + addAction("REST server", () -> { RestServer server = new RestServer(papParameterGroup.getRestServerParameters(), PapAafFilter.class, diff --git a/main/src/test/java/org/onap/policy/pap/main/rest/TestPolicyComponentsHealthCheckProvider.java b/main/src/test/java/org/onap/policy/pap/main/rest/TestPolicyComponentsHealthCheckProvider.java index 9d2fa63f..bdf4700c 100644 --- a/main/src/test/java/org/onap/policy/pap/main/rest/TestPolicyComponentsHealthCheckProvider.java +++ b/main/src/test/java/org/onap/policy/pap/main/rest/TestPolicyComponentsHealthCheckProvider.java @@ -65,6 +65,7 @@ public class TestPolicyComponentsHealthCheckProvider { private static final String CLIENT_1 = "client1"; private static final String PDP_GROUP_DATA_FILE = "rest/pdpGroup.json"; private static final String PAP_GROUP_PARAMS_NAME = "PapGroup"; + private static final String HEALTHY = "healthy"; @Mock private PolicyModelsProvider dao; @@ -135,6 +136,9 @@ public class TestPolicyComponentsHealthCheckProvider { List params = papParameterGroup.getHealthCheckRestClientParameters(); when(clientFactory.build(params.get(0))).thenReturn(client1); when(clientFactory.build(params.get(1))).thenReturn(client2); + + PolicyComponentsHealthCheckProvider.initializeClientHealthCheckExecutorService(papParameterGroup, + clientFactory); } /** @@ -147,56 +151,58 @@ public class TestPolicyComponentsHealthCheckProvider { } else { ParameterService.deregister(PAP_GROUP_PARAMS_NAME); } + PolicyComponentsHealthCheckProvider.cleanup(); } + @Test - public void testFetchPolicyComponentsHealthStatus_allHealthy() throws Exception { - PolicyComponentsHealthCheckProvider provider = new PolicyComponentsHealthCheckProvider(clientFactory); + public void testFetchPolicyComponentsHealthStatus_allHealthy() { + PolicyComponentsHealthCheckProvider provider = new PolicyComponentsHealthCheckProvider(); Pair> ret = provider.fetchPolicyComponentsHealthStatus(); assertEquals(Response.Status.OK, ret.getLeft()); - assertTrue((Boolean) ret.getRight().get("healthy")); + assertTrue((Boolean) ret.getRight().get(HEALTHY)); } @Test - public void testFetchPolicyComponentsHealthStatus_unhealthyClient() throws Exception { + public void testFetchPolicyComponentsHealthStatus_unhealthyClient() { when(response1.getStatus()).thenReturn(HttpURLConnection.HTTP_INTERNAL_ERROR); when(response1.readEntity(HealthCheckReport.class)) - .thenReturn(createReport(HttpURLConnection.HTTP_INTERNAL_ERROR, false)); + .thenReturn(createReport(HttpURLConnection.HTTP_INTERNAL_ERROR, false)); Map result = callFetchPolicyComponentsHealthStatus(); - assertFalse((Boolean) result.get("healthy")); + assertFalse((Boolean) result.get(HEALTHY)); HealthCheckReport report = (HealthCheckReport) result.get(CLIENT_1); assertFalse(report.isHealthy()); when(response1.getStatus()).thenReturn(HttpURLConnection.HTTP_OK); when(response1.readEntity(HealthCheckReport.class)).thenReturn(createReport(HttpURLConnection.HTTP_OK, false)); Map result2 = callFetchPolicyComponentsHealthStatus(); - assertFalse((Boolean) result2.get("healthy")); + assertFalse((Boolean) result2.get(HEALTHY)); HealthCheckReport report2 = (HealthCheckReport) result.get(CLIENT_1); assertFalse(report2.isHealthy()); } @SuppressWarnings("unchecked") @Test - public void testFetchPolicyComponentsHealthStatus_unhealthyPdps() throws Exception { + public void testFetchPolicyComponentsHealthStatus_unhealthyPdps() { // Get a PDP and set it unhealthy groups.get(0).getPdpSubgroups().get(0).getPdpInstances().get(0).setHealthy(PdpHealthStatus.NOT_HEALTHY); Map result = callFetchPolicyComponentsHealthStatus(); Map> pdpListWithType = (Map>) result.get(PapConstants.POLICY_PDPS); assertEquals(2, pdpListWithType.size()); - assertFalse((Boolean) result.get("healthy")); + assertFalse((Boolean) result.get(HEALTHY)); } @Test - public void testFetchPolicyComponentsHealthStatus_unhealthyPap() throws Exception { + public void testFetchPolicyComponentsHealthStatus_unhealthyPap() { when(papActivator.isAlive()).thenReturn(false); Map result = callFetchPolicyComponentsHealthStatus(); - assertFalse((Boolean) result.get("healthy")); + assertFalse((Boolean) result.get(HEALTHY)); HealthCheckReport report = (HealthCheckReport) result.get(PapConstants.POLICY_PAP); assertFalse(report.isHealthy()); } - private Map callFetchPolicyComponentsHealthStatus() throws Exception { - PolicyComponentsHealthCheckProvider provider = new PolicyComponentsHealthCheckProvider(clientFactory); + private Map callFetchPolicyComponentsHealthStatus() { + PolicyComponentsHealthCheckProvider provider = new PolicyComponentsHealthCheckProvider(); return provider.fetchPolicyComponentsHealthStatus().getRight(); } -- cgit 1.2.3-korg