From 7c73254c00e5dc598f68e074d160b2d263872cda Mon Sep 17 00:00:00 2001 From: Lukasz Muszkieta Date: Wed, 25 Apr 2018 11:54:47 +0200 Subject: PnfReadyEventConsumer implementation Change-Id: Ic8d5814c555bad436bfcbe1b4e212637a6800947 Issue-ID: SO-466 Signed-off-by: Lukasz Muszkieta --- .../pnf/dmaap/PnfEventReadyConsumer.java | 173 --------------------- .../pnf/dmaap/PnfEventReadyDmaapClient.java | 173 +++++++++++++++++++++ 2 files changed, 173 insertions(+), 173 deletions(-) delete mode 100644 bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyConsumer.java create mode 100644 bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClient.java (limited to 'bpmn/MSOInfrastructureBPMN/src/main/java/org') diff --git a/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyConsumer.java b/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyConsumer.java deleted file mode 100644 index 1fd2de97f2..0000000000 --- a/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyConsumer.java +++ /dev/null @@ -1,173 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * ONAP - SO - * ================================================================================ - * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.openecomp.mso.bpmn.infrastructure.pnf.dmaap; - -import com.google.common.annotations.VisibleForTesting; -import java.io.IOException; -import java.net.URI; -import java.util.Map; -import java.util.Optional; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import javax.ws.rs.core.UriBuilder; -import org.apache.http.HttpResponse; -import org.apache.http.client.HttpClient; -import org.apache.http.client.methods.HttpGet; -import org.apache.http.impl.client.HttpClientBuilder; -import org.apache.http.util.EntityUtils; -import org.openecomp.mso.jsonpath.JsonPathUtil; -import org.openecomp.mso.logger.MsoLogger; - -public class PnfEventReadyConsumer implements DmaapClient { - - private static final MsoLogger LOGGER = MsoLogger.getMsoLogger(MsoLogger.Catalog.RA); - - private static final String JSON_PATH_CORRELATION_ID = "$.pnfRegistrationFields.correlationId"; - private HttpClient httpClient; - private String dmaapHost; - private int dmaapPort; - private String dmaapProtocol; - private String dmaapUriPathPrefix; - private String dmaapTopicName; - private String consumerId; - private String consumerGroup; - private Map pnfCorrelationIdToThreadMap; - private HttpGet getRequest; - private ScheduledExecutorService executor; - private int dmaapClientDelayInSeconds; - private volatile boolean dmaapThreadListenerIsRunning; - - public PnfEventReadyConsumer() { - httpClient = HttpClientBuilder.create().build(); - pnfCorrelationIdToThreadMap = new ConcurrentHashMap<>(); - executor = null; - } - - public void init() { - getRequest = new HttpGet(buildURI()); - } - - //TODO: extract this logic to separate class and test it there to avoid using VisibleForTesting - @VisibleForTesting - void sendRequest() { - try { - HttpResponse response = httpClient.execute(getRequest); - getCorrelationIdFromResponse(response).ifPresent(this::informAboutPnfReadyIfCorrelationIdFound); - } catch (IOException e) { - LOGGER.error("Exception caught during sending rest request to dmaap for listening event topic", e); - } - } - - @Override - public synchronized void registerForUpdate(String correlationId, Runnable informConsumer) { - pnfCorrelationIdToThreadMap.put(correlationId, informConsumer); - if (!dmaapThreadListenerIsRunning) { - startDmaapThreadListener(); - } - } - - @Override - public synchronized Runnable unregister(String correlationId) { - Runnable runnable = pnfCorrelationIdToThreadMap.remove(correlationId); - if (pnfCorrelationIdToThreadMap.isEmpty()) { - stopDmaapThreadListener(); - } - return runnable; - } - - private synchronized void startDmaapThreadListener() { - if (!dmaapThreadListenerIsRunning) { - executor = Executors.newScheduledThreadPool(1); - executor.scheduleWithFixedDelay(this::sendRequest, 0, - dmaapClientDelayInSeconds, TimeUnit.SECONDS); - dmaapThreadListenerIsRunning = true; - } - } - - private synchronized void stopDmaapThreadListener() { - if (dmaapThreadListenerIsRunning) { - executor.shutdownNow(); - dmaapThreadListenerIsRunning = false; - executor = null; - } - } - - private URI buildURI() { - return UriBuilder.fromUri(dmaapUriPathPrefix) - .scheme(dmaapProtocol) - .host(dmaapHost) - .port(dmaapPort).path(dmaapTopicName) - .path(consumerGroup).path(consumerId).build(); - } - - private Optional getCorrelationIdFromResponse(HttpResponse response) throws IOException { - if (response.getStatusLine().getStatusCode() == 200) { - String responseString = EntityUtils.toString(response.getEntity(), "UTF-8"); - if (responseString != null) { - return JsonPathUtil.getInstance().locateResult(responseString, JSON_PATH_CORRELATION_ID); - } - } - return Optional.empty(); - } - - - private synchronized void informAboutPnfReadyIfCorrelationIdFound(String correlationId) { - Runnable runnable = unregister(correlationId); - if (runnable != null) { - runnable.run(); - } - } - - public void setDmaapHost(String dmaapHost) { - this.dmaapHost = dmaapHost; - } - - public void setDmaapPort(int dmaapPort) { - this.dmaapPort = dmaapPort; - } - - public void setDmaapProtocol(String dmaapProtocol) { - this.dmaapProtocol = dmaapProtocol; - } - - public void setDmaapUriPathPrefix(String dmaapUriPathPrefix) { - this.dmaapUriPathPrefix = dmaapUriPathPrefix; - } - - public void setDmaapTopicName(String dmaapTopicName) { - this.dmaapTopicName = dmaapTopicName; - } - - public void setConsumerId(String consumerId) { - this.consumerId = consumerId; - } - - public void setConsumerGroup(String consumerGroup) { - this.consumerGroup = consumerGroup; - } - - public void setDmaapClientDelayInSeconds(int dmaapClientDelayInSeconds) { - this.dmaapClientDelayInSeconds = dmaapClientDelayInSeconds; - } - -} diff --git a/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClient.java b/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClient.java new file mode 100644 index 0000000000..830574bad4 --- /dev/null +++ b/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClient.java @@ -0,0 +1,173 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP - SO + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.mso.bpmn.infrastructure.pnf.dmaap; + +import java.io.IOException; +import java.net.URI; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import javax.ws.rs.core.UriBuilder; +import org.apache.http.HttpResponse; +import org.apache.http.client.HttpClient; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.HttpClientBuilder; +import org.apache.http.util.EntityUtils; +import org.openecomp.mso.jsonpath.JsonPathUtil; +import org.openecomp.mso.logger.MsoLogger; + +public class PnfEventReadyDmaapClient implements DmaapClient { + + private static final MsoLogger LOGGER = MsoLogger.getMsoLogger(MsoLogger.Catalog.RA); + + private static final String JSON_PATH_CORRELATION_ID = "$.pnfRegistrationFields.correlationId"; + private HttpClient httpClient; + private String dmaapHost; + private int dmaapPort; + private String dmaapProtocol; + private String dmaapUriPathPrefix; + private String dmaapTopicName; + private String consumerId; + private String consumerGroup; + private Map pnfCorrelationIdToThreadMap; + private HttpGet getRequest; + private ScheduledExecutorService executor; + private int dmaapClientDelayInSeconds; + private volatile boolean dmaapThreadListenerIsRunning; + + public PnfEventReadyDmaapClient() { + httpClient = HttpClientBuilder.create().build(); + pnfCorrelationIdToThreadMap = new ConcurrentHashMap<>(); + executor = null; + } + + public void init() { + getRequest = new HttpGet(buildURI()); + } + + @Override + public synchronized void registerForUpdate(String correlationId, Runnable informConsumer) { + pnfCorrelationIdToThreadMap.put(correlationId, informConsumer); + if (!dmaapThreadListenerIsRunning) { + startDmaapThreadListener(); + } + } + + @Override + public synchronized Runnable unregister(String correlationId) { + Runnable runnable = pnfCorrelationIdToThreadMap.remove(correlationId); + if (pnfCorrelationIdToThreadMap.isEmpty()) { + stopDmaapThreadListener(); + } + return runnable; + } + + private synchronized void startDmaapThreadListener() { + if (!dmaapThreadListenerIsRunning) { + executor = Executors.newScheduledThreadPool(1); + executor.scheduleWithFixedDelay(new DmaapTopicListenerThread(), 0, + dmaapClientDelayInSeconds, TimeUnit.SECONDS); + dmaapThreadListenerIsRunning = true; + } + } + + private synchronized void stopDmaapThreadListener() { + if (dmaapThreadListenerIsRunning) { + executor.shutdownNow(); + dmaapThreadListenerIsRunning = false; + executor = null; + } + } + + private URI buildURI() { + return UriBuilder.fromUri(dmaapUriPathPrefix) + .scheme(dmaapProtocol) + .host(dmaapHost) + .port(dmaapPort).path(dmaapTopicName) + .path(consumerGroup).path(consumerId).build(); + } + + public void setDmaapHost(String dmaapHost) { + this.dmaapHost = dmaapHost; + } + + public void setDmaapPort(int dmaapPort) { + this.dmaapPort = dmaapPort; + } + + public void setDmaapProtocol(String dmaapProtocol) { + this.dmaapProtocol = dmaapProtocol; + } + + public void setDmaapUriPathPrefix(String dmaapUriPathPrefix) { + this.dmaapUriPathPrefix = dmaapUriPathPrefix; + } + + public void setDmaapTopicName(String dmaapTopicName) { + this.dmaapTopicName = dmaapTopicName; + } + + public void setConsumerId(String consumerId) { + this.consumerId = consumerId; + } + + public void setConsumerGroup(String consumerGroup) { + this.consumerGroup = consumerGroup; + } + + public void setDmaapClientDelayInSeconds(int dmaapClientDelayInSeconds) { + this.dmaapClientDelayInSeconds = dmaapClientDelayInSeconds; + } + + class DmaapTopicListenerThread implements Runnable { + + @Override + public void run() { + try { + HttpResponse response = httpClient.execute(getRequest); + getCorrelationIdFromResponse(response).ifPresent(this::informAboutPnfReadyIfCorrelationIdFound); + } catch (IOException e) { + LOGGER.error("Exception caught during sending rest request to dmaap for listening event topic", e); + } + } + + private Optional getCorrelationIdFromResponse(HttpResponse response) throws IOException { + if (response.getStatusLine().getStatusCode() == 200) { + String responseString = EntityUtils.toString(response.getEntity(), "UTF-8"); + if (responseString != null) { + return JsonPathUtil.getInstance().locateResult(responseString, JSON_PATH_CORRELATION_ID); + } + } + return Optional.empty(); + } + + private synchronized void informAboutPnfReadyIfCorrelationIdFound(String correlationId) { + Runnable runnable = unregister(correlationId); + if (runnable != null) { + runnable.run(); + } + } + } + +} -- cgit 1.2.3-korg