diff options
Diffstat (limited to 'bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp')
2 files changed, 88 insertions, 20 deletions
diff --git a/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/JsonUtilForCorrelationId.java b/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/JsonUtilForCorrelationId.java new file mode 100644 index 0000000000..2853899ae2 --- /dev/null +++ b/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/JsonUtilForCorrelationId.java @@ -0,0 +1,71 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP - SO + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.openecomp.mso.bpmn.infrastructure.pnf.dmaap; + +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.Spliterator; + +final class JsonUtilForCorrelationId { + + private static final String JSON_HEADER = "pnfRegistrationFields"; + private static final String JSON_CORRELATION_ID_FIELD_NAME = "correlationId"; + + static List<String> parseJsonToGelAllCorrelationId(String json) { + List<String> list = new ArrayList<>(); + JsonElement je = new JsonParser().parse(json); + if (je.isJsonObject()) { + getCorrelationIdFromJsonObject(je.getAsJsonObject()).ifPresent(corr -> list.add(corr)); + } else { + JsonArray array = je.getAsJsonArray(); + Spliterator<JsonElement> spliterator = array.spliterator(); + spliterator.forEachRemaining(jsonElement -> { + parseJsonElementToJsonObject(jsonElement) + .ifPresent(jsonObject -> getCorrelationIdFromJsonObject(jsonObject) + .ifPresent(correlationId -> list.add(correlationId))); + }); + } + return list; + } + + private static Optional<JsonObject> parseJsonElementToJsonObject(JsonElement jsonElement) { + if (jsonElement.isJsonObject()) { + return Optional.ofNullable(jsonElement.getAsJsonObject()); + } + return Optional.ofNullable(new JsonParser().parse(jsonElement.getAsString()).getAsJsonObject()); + } + + private static Optional<String> getCorrelationIdFromJsonObject(JsonObject jsonObject) { + if (jsonObject.has(JSON_HEADER)) { + JsonObject jo = jsonObject.getAsJsonObject(JSON_HEADER); + if (jo.has(JSON_CORRELATION_ID_FIELD_NAME)) { + return Optional.ofNullable(jo.get(JSON_CORRELATION_ID_FIELD_NAME).getAsString()); + } + } + return Optional.empty(); + } + +} diff --git a/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClient.java b/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClient.java index 830574bad4..2c7309def4 100644 --- a/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClient.java +++ b/bpmn/MSOInfrastructureBPMN/src/main/java/org/openecomp/mso/bpmn/infrastructure/pnf/dmaap/PnfEventReadyDmaapClient.java @@ -22,8 +22,9 @@ package org.openecomp.mso.bpmn.infrastructure.pnf.dmaap; import java.io.IOException; import java.net.URI; +import java.util.Collections; +import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -34,14 +35,14 @@ import org.apache.http.client.HttpClient; import org.apache.http.client.methods.HttpGet; import org.apache.http.impl.client.HttpClientBuilder; import org.apache.http.util.EntityUtils; -import org.openecomp.mso.jsonpath.JsonPathUtil; +import org.openecomp.mso.bpmn.core.PropertyConfiguration; import org.openecomp.mso.logger.MsoLogger; +import org.openecomp.mso.logger.MsoLogger.Catalog; public class PnfEventReadyDmaapClient implements DmaapClient { - private static final MsoLogger LOGGER = MsoLogger.getMsoLogger(MsoLogger.Catalog.RA); + private static final MsoLogger LOGGER = MsoLogger.getMsoLogger(Catalog.GENERAL); - private static final String JSON_PATH_CORRELATION_ID = "$.pnfRegistrationFields.correlationId"; private HttpClient httpClient; private String dmaapHost; private int dmaapPort; @@ -56,18 +57,20 @@ public class PnfEventReadyDmaapClient implements DmaapClient { private int dmaapClientDelayInSeconds; private volatile boolean dmaapThreadListenerIsRunning; - public PnfEventReadyDmaapClient() { + public void init() { httpClient = HttpClientBuilder.create().build(); pnfCorrelationIdToThreadMap = new ConcurrentHashMap<>(); + dmaapHost = PropertyConfiguration.getInstance().getProperties(PropertyConfiguration.MSO_BPMN_URN_PROPERTIES) + .get("dmaapHost"); + dmaapPort = Integer.parseInt(PropertyConfiguration.getInstance() + .getProperties(PropertyConfiguration.MSO_BPMN_URN_PROPERTIES).get("dmaapPort")); executor = null; - } - - public void init() { getRequest = new HttpGet(buildURI()); } @Override public synchronized void registerForUpdate(String correlationId, Runnable informConsumer) { + LOGGER.debug("registering for pnf ready dmaap event for correlation id: " + correlationId); pnfCorrelationIdToThreadMap.put(correlationId, informConsumer); if (!dmaapThreadListenerIsRunning) { startDmaapThreadListener(); @@ -76,6 +79,7 @@ public class PnfEventReadyDmaapClient implements DmaapClient { @Override public synchronized Runnable unregister(String correlationId) { + LOGGER.debug("unregistering from pnf ready dmaap event for correlation id: " + correlationId); Runnable runnable = pnfCorrelationIdToThreadMap.remove(correlationId); if (pnfCorrelationIdToThreadMap.isEmpty()) { stopDmaapThreadListener(); @@ -108,14 +112,6 @@ public class PnfEventReadyDmaapClient implements DmaapClient { .path(consumerGroup).path(consumerId).build(); } - public void setDmaapHost(String dmaapHost) { - this.dmaapHost = dmaapHost; - } - - public void setDmaapPort(int dmaapPort) { - this.dmaapPort = dmaapPort; - } - public void setDmaapProtocol(String dmaapProtocol) { this.dmaapProtocol = dmaapProtocol; } @@ -146,25 +142,26 @@ public class PnfEventReadyDmaapClient implements DmaapClient { public void run() { try { HttpResponse response = httpClient.execute(getRequest); - getCorrelationIdFromResponse(response).ifPresent(this::informAboutPnfReadyIfCorrelationIdFound); + getCorrelationIdListFromResponse(response).forEach(this::informAboutPnfReadyIfCorrelationIdFound); } catch (IOException e) { LOGGER.error("Exception caught during sending rest request to dmaap for listening event topic", e); } } - private Optional<String> getCorrelationIdFromResponse(HttpResponse response) throws IOException { + private List<String> getCorrelationIdListFromResponse(HttpResponse response) throws IOException { if (response.getStatusLine().getStatusCode() == 200) { String responseString = EntityUtils.toString(response.getEntity(), "UTF-8"); if (responseString != null) { - return JsonPathUtil.getInstance().locateResult(responseString, JSON_PATH_CORRELATION_ID); + return JsonUtilForCorrelationId.parseJsonToGelAllCorrelationId(responseString); } } - return Optional.empty(); + return Collections.emptyList(); } private synchronized void informAboutPnfReadyIfCorrelationIdFound(String correlationId) { Runnable runnable = unregister(correlationId); if (runnable != null) { + LOGGER.debug("pnf ready event got from dmaap for correlationId: " + correlationId); runnable.run(); } } |