diff options
author | Seshu Kumar M <seshu.kumar.m@huawei.com> | 2018-04-16 12:46:19 +0000 |
---|---|---|
committer | Gerrit Code Review <gerrit@onap.org> | 2018-04-16 12:46:19 +0000 |
commit | 5304d1554c35486203ca86a353d9059996430ad3 (patch) | |
tree | e65dd2a46cefc5a26ba3727d6a5c233c9eb34bab /bpmn/MSOInfrastructureBPMN/src/main | |
parent | c2b037bece932832773ee93ff42e376d110909f1 (diff) | |
parent | b196509debdb43411abfcdc55b3ce87a0a30da6e (diff) |
Merge "PnfReadyEventConsumer implementation"
Diffstat (limited to 'bpmn/MSOInfrastructureBPMN/src/main')
3 files changed, 103 insertions, 21 deletions
diff --git a/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyConsumer.java b/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyConsumer.java index e6019f73f0..6871665ba1 100644 --- a/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyConsumer.java +++ b/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyConsumer.java @@ -22,17 +22,28 @@ package org.openecomp.mso.bpmn.infrastructure.pnf.dmaap; import java.io.IOException; import java.net.URI; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import javax.ws.rs.core.UriBuilder; import org.apache.http.HttpResponse; import org.apache.http.client.HttpClient; import org.apache.http.client.methods.HttpGet; import org.apache.http.impl.client.HttpClientBuilder; +import org.apache.http.util.EntityUtils; +import org.openecomp.mso.bpmn.infrastructure.pnf.implementation.DmaapClient; +import org.openecomp.mso.jsonpath.JsonPathUtil; +import org.openecomp.mso.logger.MsoLogger; -public class PnfEventReadyConsumer { +public class PnfEventReadyConsumer implements Runnable, DmaapClient { + + private static final MsoLogger LOGGER = MsoLogger.getMsoLogger(MsoLogger.Catalog.RA); private static final String JSON_PATH_CORRELATION_ID = "$.pnfRegistrationFields.correlationId"; private HttpClient httpClient; - private String dmaapHost; private int dmaapPort; private String dmaapProtocol; @@ -40,24 +51,57 @@ public class PnfEventReadyConsumer { private String dmaapTopicName; private String consumerId; private String consumerGroup; + private Map<String, Runnable> pnfCorrelationIdToThreadMap; + private HttpGet getRequest; + private ScheduledExecutorService executor; + private int dmaapClientInitialDelayInSeconds; + private int dmaapClientDelayInSeconds; + private boolean dmaapThreadListenerIsRunning; public PnfEventReadyConsumer() { httpClient = HttpClientBuilder.create().build(); + pnfCorrelationIdToThreadMap = new ConcurrentHashMap<>(); + executor = null; + } + + public void init() { + getRequest = new HttpGet(buildURI()); + } + + @Override + public void run() { + try { + HttpResponse response = httpClient.execute(getRequest); + getCorrelationIdFromResponse(response).ifPresent(this::informAboutPnfReadyIfCorrelationIdFound); + } catch (IOException e) { + LOGGER.error("Exception caught during sending rest request to dmaap for listening event topic", e); + } + } + + @Override + public void registerForUpdate(String correlationId, Runnable informConsumer) { + pnfCorrelationIdToThreadMap.put(correlationId, informConsumer); + if (!dmaapThreadListenerIsRunning) { + startDmaapThreadListener(); + } } - public void notifyWhenPnfReady(String correlationId) - throws IOException { - HttpGet getRequest = new HttpGet(buildURI(consumerGroup, consumerId)); - HttpResponse response = httpClient.execute(getRequest); - checkIfResponseIsAccepted(response, correlationId); + private void startDmaapThreadListener() { + executor = Executors.newScheduledThreadPool(1); + executor.scheduleWithFixedDelay(this, dmaapClientInitialDelayInSeconds, + dmaapClientDelayInSeconds, TimeUnit.SECONDS); + dmaapThreadListenerIsRunning = true; } - private boolean checkIfResponseIsAccepted(HttpResponse response, String correlationId) { - // TODO parse response if contains proper correlationId - return false; + private void stopDmaapThreadListener() { + if (dmaapThreadListenerIsRunning) { + executor.shutdownNow(); + dmaapThreadListenerIsRunning = false; + executor = null; + } } - private URI buildURI(String consumerGroup, String consumerId) { + private URI buildURI() { return UriBuilder.fromUri(dmaapUriPathPrefix) .scheme(dmaapProtocol) .host(dmaapHost) @@ -65,6 +109,31 @@ public class PnfEventReadyConsumer { .path(consumerGroup).path(consumerId).build(); } + private Optional<String> getCorrelationIdFromResponse(HttpResponse response) throws IOException { + if (response.getStatusLine().getStatusCode() == 200) { + String responseString = EntityUtils.toString(response.getEntity(), "UTF-8"); + if (responseString != null) { + return JsonPathUtil.getInstance().locateResult(responseString, JSON_PATH_CORRELATION_ID); + } + } + return Optional.empty(); + } + + + private void informAboutPnfReadyIfCorrelationIdFound(String correlationId) { + pnfCorrelationIdToThreadMap.keySet().stream().filter(key -> key.equals(correlationId)).findAny() + .ifPresent(this::informAboutPnfReady); + } + + private void informAboutPnfReady(String correlationId) { + pnfCorrelationIdToThreadMap.get(correlationId).run(); + pnfCorrelationIdToThreadMap.remove(correlationId); + + if (pnfCorrelationIdToThreadMap.isEmpty()) { + stopDmaapThreadListener(); + } + } + public void setDmaapHost(String dmaapHost) { this.dmaapHost = dmaapHost; } @@ -93,4 +162,12 @@ public class PnfEventReadyConsumer { this.consumerGroup = consumerGroup; } + public void setDmaapClientInitialDelayInSeconds(int dmaapClientInitialDelayInSeconds) { + this.dmaapClientInitialDelayInSeconds = dmaapClientInitialDelayInSeconds; + } + + public void setDmaapClientDelayInSeconds(int dmaapClientDelayInSeconds) { + this.dmaapClientDelayInSeconds = dmaapClientDelayInSeconds; + } + } diff --git a/bpmn/MSOInfrastructureBPMN/src/main/resources/dmaap.properties b/bpmn/MSOInfrastructureBPMN/src/main/resources/dmaap.properties index 3c4dca49cf..5b1ffac571 100644 --- a/bpmn/MSOInfrastructureBPMN/src/main/resources/dmaap.properties +++ b/bpmn/MSOInfrastructureBPMN/src/main/resources/dmaap.properties @@ -1,7 +1,9 @@ -dmaapHost=HOSTNAME -dmaapPort=3905 -dmaapProtocol=http -dmaapUriPathPrefix = events +host=HOSTNAME +port=3905 +protocol=http +uriPathPrefix = events eventReadyTopicName=pnfEventReady consumerId=consumerId consumerGroup=group +clientThreadInitialDelayInSeconds=1 +clientThreadDelayInSeconds=5
\ No newline at end of file diff --git a/bpmn/MSOInfrastructureBPMN/src/main/webapp/WEB-INF/applicationContext.xml b/bpmn/MSOInfrastructureBPMN/src/main/webapp/WEB-INF/applicationContext.xml index ed1556b05d..6fe70ff614 100644 --- a/bpmn/MSOInfrastructureBPMN/src/main/webapp/WEB-INF/applicationContext.xml +++ b/bpmn/MSOInfrastructureBPMN/src/main/webapp/WEB-INF/applicationContext.xml @@ -19,14 +19,17 @@ <!--<property name="dmaapClient" ref="dmaapClient"/>-->
</bean>
- <bean id="pnfEventReadyConsumer" class="org.openecomp.mso.bpmn.infrastructure.pnf.dmaap.PnfEventReadyConsumer">
- <property name="dmaapHost" value="${dmaapHost}" />
- <property name="dmaapPort" value="${dmaapPort}"/>
- <property name="dmaapProtocol" value="${dmaapProtocol}"/>
- <property name="dmaapUriPathPrefix" value="${dmaapUriPathPrefix}"/>
+ <bean id="pnfEventReadyConsumer" class="org.openecomp.mso.bpmn.infrastructure.pnf.dmaap.PnfEventReadyConsumer"
+ init-method="init">
+ <property name="dmaapHost" value="${host}"/>
+ <property name="dmaapPort" value="${port}"/>
+ <property name="dmaapProtocol" value="${protocol}"/>
+ <property name="dmaapUriPathPrefix" value="${uriPathPrefix}"/>
<property name="dmaapTopicName" value="${eventReadyTopicName}"/>
- <property name= "consumerGroup" value="${consumerGroup}"/>
+ <property name="consumerGroup" value="${consumerGroup}"/>
<property name="consumerId" value="${consumerId}"/>
+ <property name="dmaapClientInitialDelayInSeconds" value="${clientThreadInitialDelayInSeconds}"/>
+ <property name="dmaapClientDelayInSeconds" value="${clientThreadDelayInSeconds}"/>
</bean>
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
|