summaryrefslogtreecommitdiffstats
path: root/src/main/java
diff options
context:
space:
mode:
authorJessica Wagantall <jwagantall@linuxfoundation.org>2018-08-13 23:42:40 +0000
committerRamaSubbaReddy <rama.subba.reddy.s@huawei.com>2018-10-03 12:08:26 +0530
commit11a3345cf03c2ad820fa40440dbe4c89eb963b26 (patch)
tree24a5be90c240d49d553ec79c267729c9e976e1dc /src/main/java
parentf9cbcb5f93fa07852ea06cffe6eac6bc09c53ce1 (diff)
Add RestConf Collector
Issue-ID: DCAEGEN2-612 1. Instantiated to support CCVPN Close Loop Use Case 2. In general, this supports data collection from all PNF or devices that supports RestConf protocol Change-Id: I6311ad618e8d68badc5423a63d7781a19dc62829 Signed-off-by: rama-huawei <rama.subba.reddy.s@huawei.com>
Diffstat (limited to 'src/main/java')
-rwxr-xr-xsrc/main/java/org/onap/dcae/collectors/restconf/common/AnyNode.java127
-rwxr-xr-xsrc/main/java/org/onap/dcae/collectors/restconf/common/AuthType.java43
-rwxr-xr-xsrc/main/java/org/onap/dcae/collectors/restconf/common/Constants.java38
-rwxr-xr-xsrc/main/java/org/onap/dcae/collectors/restconf/common/DataChangeEventListener.java51
-rwxr-xr-xsrc/main/java/org/onap/dcae/collectors/restconf/common/EventProcessor.java88
-rwxr-xr-xsrc/main/java/org/onap/dcae/collectors/restconf/common/Format.java37
-rwxr-xr-xsrc/main/java/org/onap/dcae/collectors/restconf/common/HttpMethod.java48
-rwxr-xr-xsrc/main/java/org/onap/dcae/collectors/restconf/common/HttpResponse.java30
-rwxr-xr-xsrc/main/java/org/onap/dcae/collectors/restconf/common/JsonParser.java92
-rwxr-xr-xsrc/main/java/org/onap/dcae/collectors/restconf/common/Parameters.java52
-rwxr-xr-xsrc/main/java/org/onap/dcae/collectors/restconf/common/RestConfCollector.java66
-rwxr-xr-xsrc/main/java/org/onap/dcae/collectors/restconf/common/RestConfContext.java51
-rwxr-xr-xsrc/main/java/org/onap/dcae/collectors/restconf/common/RestConfProc.java224
-rwxr-xr-xsrc/main/java/org/onap/dcae/collectors/restconf/common/RestapiCallNode.java600
-rwxr-xr-xsrc/main/java/org/onap/dcae/collectors/restconf/common/RetryException.java27
-rwxr-xr-xsrc/main/java/org/onap/dcae/collectors/restconf/common/RetryPolicy.java58
-rwxr-xr-xsrc/main/java/org/onap/dcae/collectors/restconf/common/RetryPolicyStore.java53
-rwxr-xr-xsrc/main/java/org/onap/dcae/collectors/restconf/common/XmlJsonUtil.java412
-rwxr-xr-xsrc/main/java/org/onap/dcae/collectors/restconf/common/XmlParser.java178
-rwxr-xr-xsrc/main/java/org/onap/dcae/collectors/restconf/common/event/publishing/DMaaPConfigurationParser.java107
-rwxr-xr-xsrc/main/java/org/onap/dcae/collectors/restconf/common/event/publishing/DMaaPEventPublisher.java75
-rwxr-xr-xsrc/main/java/org/onap/dcae/collectors/restconf/common/event/publishing/DMaaPPublishersBuilder.java63
-rwxr-xr-xsrc/main/java/org/onap/dcae/collectors/restconf/common/event/publishing/DMaaPPublishersCache.java110
-rwxr-xr-xsrc/main/java/org/onap/dcae/collectors/restconf/common/event/publishing/EventPublisher.java35
-rwxr-xr-xsrc/main/java/org/onap/dcae/collectors/restconf/common/event/publishing/PublisherConfig.java95
-rwxr-xr-xsrc/main/java/org/onap/dcae/collectors/restconf/common/event/publishing/VavrUtils.java48
26 files changed, 2808 insertions, 0 deletions
diff --git a/src/main/java/org/onap/dcae/collectors/restconf/common/AnyNode.java b/src/main/java/org/onap/dcae/collectors/restconf/common/AnyNode.java
new file mode 100755
index 0000000..044a9cf
--- /dev/null
+++ b/src/main/java/org/onap/dcae/collectors/restconf/common/AnyNode.java
@@ -0,0 +1,127 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.restconf
+ * ================================================================================
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcae.collectors.restconf.common;
+
+import io.vavr.collection.List;
+import io.vavr.collection.Set;
+import io.vavr.control.Option;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import java.util.stream.StreamSupport;
+
+import static io.vavr.API.Set;
+
+public class AnyNode {
+ private Object obj;
+
+ private AnyNode(Object object) {
+ this.obj = object;
+ }
+
+ public static AnyNode fromString(String content) {
+ return new AnyNode(new JSONObject(content));
+ }
+
+ /**
+ * Returns key set of underlying object. It is assumed that underlying object is of type org.json.JSONObject.
+ *
+ * @return key set of underlying objects
+ */
+ public Set<String> keys() {
+ return Set(asJsonObject().keySet().toArray(new String[]{}));
+ }
+
+ /**
+ * Returns value associated with specified key wrapped with AnyValue object. It is assumed that this is of type
+ * org.json.JSONObject.
+ *
+ * @param key for querying value from jsonobject
+ * @return value associated with specified key
+ */
+ public AnyNode get(String key) {
+ return new AnyNode(asJsonObject().get(key));
+ }
+
+ /**
+ * Returns string representation of this. If it happens to have null, the value is treated as
+ * org.json.JSONObject.NULL and "null" string is returned then.
+ *
+ * @return string representation of this
+ */
+ public String toString() {
+ return this.obj.toString();
+ }
+
+ /**
+ * Returns optional of object under specified key, wrapped with AnyNode object.
+ * If underlying object is not of type org.json.JSONObject
+ * or underlying object has no given key
+ * or given key is null
+ * then Optional.empty will be returned.
+ *
+ * @param key for querying value from AnyNode object
+ * @return optional of object under specified key
+ */
+ public Option<AnyNode> getAsOption(String key) {
+ try {
+ AnyNode value = get(key);
+ if (value.toString().equals("null")) {
+ return Option.none();
+ }
+ return Option.some(value);
+ } catch (JSONException ex) {
+ return Option.none();
+ }
+ }
+
+ /**
+ * Converts underlying object to map representation with map values wrapped with AnyNode object. It is assumed that
+ * underlying object is of type org.json.JSONObject.
+ *
+ * @return converts underlying object to map representation
+ */
+ public List<AnyNode> toList() {
+ return List.ofAll(StreamSupport.stream(((JSONArray) this.obj).spliterator(), false).map(AnyNode::new));
+ }
+
+ /**
+ * Checks if specified key is present in this. It is assumed that this is of type JSONObject.
+ *
+ * @param key is used to check presence in anynode object
+ * @return true if specified key is present in this
+ */
+ public boolean has(String key) {
+ return !getAsOption(key).isEmpty();
+ }
+
+ /**
+ * Returns as JSONObject.
+ *
+ * @return jsonobject
+ */
+ private JSONObject asJsonObject() {
+ return (JSONObject) this.obj;
+ }
+
+
+}
diff --git a/src/main/java/org/onap/dcae/collectors/restconf/common/AuthType.java b/src/main/java/org/onap/dcae/collectors/restconf/common/AuthType.java
new file mode 100755
index 0000000..47ae8ac
--- /dev/null
+++ b/src/main/java/org/onap/dcae/collectors/restconf/common/AuthType.java
@@ -0,0 +1,43 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.restconf
+ * ================================================================================
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.restconf.common;
+
+public enum AuthType {
+ NONE, BASIC, DIGEST, OAUTH, Unspecified;
+
+ public static AuthType fromString(String s) {
+ if ("basic".equalsIgnoreCase(s)) {
+ return BASIC;
+ }
+ if ("digest".equalsIgnoreCase(s)) {
+ return DIGEST;
+ }
+ if ("oauth".equalsIgnoreCase(s)) {
+ return OAUTH;
+ }
+ if ("none".equalsIgnoreCase(s)) {
+ return NONE;
+ }
+ if ("unspecified".equalsIgnoreCase(s)) {
+ return Unspecified;
+ }
+ throw new IllegalArgumentException("Invalid value for format: " + s);
+ }
+}
diff --git a/src/main/java/org/onap/dcae/collectors/restconf/common/Constants.java b/src/main/java/org/onap/dcae/collectors/restconf/common/Constants.java
new file mode 100755
index 0000000..4845bfc
--- /dev/null
+++ b/src/main/java/org/onap/dcae/collectors/restconf/common/Constants.java
@@ -0,0 +1,38 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.restconf
+ * ================================================================================
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcae.collectors.restconf.common;
+
+public class Constants {
+ public static final String KDEFAULT_TEMP_FILENAME = "templateFileName";
+ public static final String KSETTING_REST_API_URL = "restapiUrl";
+ public static final String KSETTING_HTTP_METHOD = "httpMethod";
+ public static final String KSETTING_RESP_PREFIX = "responsePrefix";
+ public static final String KSETTING_SKIP_SENDING = "skipSending";
+ public static final String KSETTING_FORMAT = "format";
+ public static final String KSETTING_DMAAPCONFIGS = "collector.dmaapfile";
+ public static final String[] KDEFAULT_DMAAPCONFIGS = new String[]{"./etc/DmaapConfig.json"};
+ public static final String KSETTING_SSE_CONNECT_URL = "sseConnectURL";
+ public static final int KDEFAULT_MAXQUEUEDEVENTS = 1024 * 4;
+ public static final String RESPONSE_CODE = "restapi-result.response-code";
+ public static final String OUTPUT_IDENTIFIER = "restapi-result.ietf-subscribed-notifications:output.identifier";
+ public static final String RESPONSE_CODE_200 = "200";
+ public static final String KCONFIG = "c";
+}
diff --git a/src/main/java/org/onap/dcae/collectors/restconf/common/DataChangeEventListener.java b/src/main/java/org/onap/dcae/collectors/restconf/common/DataChangeEventListener.java
new file mode 100755
index 0000000..97ee623
--- /dev/null
+++ b/src/main/java/org/onap/dcae/collectors/restconf/common/DataChangeEventListener.java
@@ -0,0 +1,51 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.restconf
+ * ================================================================================
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcae.collectors.restconf.common;
+
+import org.glassfish.jersey.media.sse.EventListener;
+import org.glassfish.jersey.media.sse.InboundEvent;
+import org.json.JSONArray;
+import org.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DataChangeEventListener implements EventListener {
+ private static final Logger log = LoggerFactory.getLogger(DataChangeEventListener.class);
+ private RestConfContext ctx;
+
+ public DataChangeEventListener(RestConfContext ctx) {
+ this.ctx = ctx;
+ }
+
+ @Override
+ public void onEvent(InboundEvent event) {
+ JSONArray jsonArrayMod;
+ log.info("On SSE Event is received");
+ String s = event.readData();
+ JSONObject jsonObj = new JSONObject(s);
+ jsonArrayMod = new JSONArray().put(jsonObj);
+ try {
+ RestConfProc.handleEvents(jsonArrayMod);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+}
diff --git a/src/main/java/org/onap/dcae/collectors/restconf/common/EventProcessor.java b/src/main/java/org/onap/dcae/collectors/restconf/common/EventProcessor.java
new file mode 100755
index 0000000..3409f9c
--- /dev/null
+++ b/src/main/java/org/onap/dcae/collectors/restconf/common/EventProcessor.java
@@ -0,0 +1,88 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.restconf
+ * ================================================================================
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcae.collectors.restconf.common;
+
+
+import org.json.JSONObject;
+import org.onap.dcae.collectors.restconf.common.event.publishing.EventPublisher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+public class EventProcessor implements Runnable {
+ private static final Logger log = LoggerFactory.getLogger(EventProcessor.class);
+
+ static Map<String, String[]> streamidHash = new HashMap<>();
+ public JSONObject event;
+ private EventPublisher eventPublisher;
+
+ public EventProcessor(EventPublisher eventPublisher) {
+ this.eventPublisher = eventPublisher;
+ streamidHash = parseStreamIdToStreamHashMapping(new RestConfProc().streamID);
+ }
+
+ private Map<String, String[]> parseStreamIdToStreamHashMapping(String streamId) {
+ Map<String, String[]> streamidHash = new HashMap<>();
+ String[] list = streamId.split("\\|");
+ for (String aList : list) {
+ String domain = aList.split("=")[0];
+ String[] streamIdList = aList.substring(aList.indexOf('=') + 1).split(",");
+ streamidHash.put(domain, streamIdList);
+ }
+ return streamidHash;
+ }
+
+ @Override
+ public void run() {
+ try {
+
+ while (true) {
+ event = RestConfProc.fProcessingInputQueue.take();
+ // As long as the producer is running we remove elements from
+ // the queue.
+ log.info("QueueSize:" + RestConfProc.fProcessingInputQueue.size() + "\tEventProcessor\tRemoving element: " +
+ event);
+ String[] streamIdList = streamidHash.get("route");
+ log.debug("streamIdList:" + Arrays.toString(streamIdList));
+
+ if (streamIdList.length == 0) {
+ log.error("No StreamID defined for publish - Message dropped" + event);
+ } else {
+ sendEventsToStreams(streamIdList);
+ }
+ log.debug("Event published" + event);
+ }
+ } catch (Exception e) {
+ log.error("EventProcessor InterruptedException" + e.getMessage());
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ private void sendEventsToStreams(String[] streamIdList) {
+ for (String aStreamIdList : streamIdList) {
+ log.info("Invoking publisher for streamId:" + aStreamIdList);
+ eventPublisher.sendEvent(event, aStreamIdList);
+ }
+ }
+}
diff --git a/src/main/java/org/onap/dcae/collectors/restconf/common/Format.java b/src/main/java/org/onap/dcae/collectors/restconf/common/Format.java
new file mode 100755
index 0000000..94344d6
--- /dev/null
+++ b/src/main/java/org/onap/dcae/collectors/restconf/common/Format.java
@@ -0,0 +1,37 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.restconf
+ * ================================================================================
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.restconf.common;
+
+public enum Format {
+ JSON, XML, NONE;
+
+ public static Format fromString(String s) {
+ if ("json".equalsIgnoreCase(s)) {
+ return JSON;
+ }
+ if ("xml".equalsIgnoreCase(s)) {
+ return XML;
+ }
+ if ("none".equalsIgnoreCase(s)) {
+ return NONE;
+ }
+ throw new IllegalArgumentException("Invalid value for format: " + s);
+ }
+}
diff --git a/src/main/java/org/onap/dcae/collectors/restconf/common/HttpMethod.java b/src/main/java/org/onap/dcae/collectors/restconf/common/HttpMethod.java
new file mode 100755
index 0000000..d85d5f5
--- /dev/null
+++ b/src/main/java/org/onap/dcae/collectors/restconf/common/HttpMethod.java
@@ -0,0 +1,48 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.restconf
+ * ================================================================================
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+
+package org.onap.dcae.collectors.restconf.common;
+
+public enum HttpMethod {
+ GET, POST, PUT, DELETE, PATCH;
+
+ public static HttpMethod fromString(String s) {
+ if (s == null) {
+ return null;
+ }
+ if (s.equalsIgnoreCase("get")) {
+ return GET;
+ }
+ if (s.equalsIgnoreCase("post")) {
+ return POST;
+ }
+ if (s.equalsIgnoreCase("put")) {
+ return PUT;
+ }
+ if (s.equalsIgnoreCase("delete")) {
+ return DELETE;
+ }
+ if (s.equalsIgnoreCase("patch")) {
+ return PATCH;
+ }
+ throw new IllegalArgumentException("Invalid value for HTTP Method: " + s);
+ }
+}
diff --git a/src/main/java/org/onap/dcae/collectors/restconf/common/HttpResponse.java b/src/main/java/org/onap/dcae/collectors/restconf/common/HttpResponse.java
new file mode 100755
index 0000000..e1b97da
--- /dev/null
+++ b/src/main/java/org/onap/dcae/collectors/restconf/common/HttpResponse.java
@@ -0,0 +1,30 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.restconf
+ * ================================================================================
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcae.collectors.restconf.common;
+
+import javax.ws.rs.core.MultivaluedMap;
+
+public class HttpResponse {
+ public int code;
+ public String message;
+ public String body;
+ public MultivaluedMap<String, String> headers;
+}
diff --git a/src/main/java/org/onap/dcae/collectors/restconf/common/JsonParser.java b/src/main/java/org/onap/dcae/collectors/restconf/common/JsonParser.java
new file mode 100755
index 0000000..f29bbc3
--- /dev/null
+++ b/src/main/java/org/onap/dcae/collectors/restconf/common/JsonParser.java
@@ -0,0 +1,92 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.restconf
+ * ================================================================================
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcae.collectors.restconf.common;
+
+import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+public class JsonParser {
+
+ private static final Logger log = LoggerFactory.getLogger(JsonParser.class);
+
+ private JsonParser() {
+ // Preventing instantiation of the same.
+ }
+
+ @SuppressWarnings("unchecked")
+ public static Map<String, String> convertToProperties(String s)
+ throws Exception {
+
+ checkNotNull(s, "Input should not be null.");
+
+ try {
+ JSONObject json = new JSONObject(s);
+ Map<String, Object> wm = new HashMap<>();
+ Iterator<String> ii = json.keys();
+ while (ii.hasNext()) {
+ String key1 = ii.next();
+ wm.put(key1, json.get(key1));
+ }
+
+ Map<String, String> mm = new HashMap<>();
+
+ while (!wm.isEmpty())
+ for (String key : new ArrayList<>(wm.keySet())) {
+ Object o = wm.get(key);
+ wm.remove(key);
+
+ if (o instanceof Boolean || o instanceof Number || o instanceof String) {
+ mm.put(key, o.toString());
+
+ log.info("Added property: {} : {}", key, o.toString());
+ } else if (o instanceof JSONObject) {
+ JSONObject jo = (JSONObject) o;
+ Iterator<String> i = jo.keys();
+ while (i.hasNext()) {
+ String key1 = i.next();
+ wm.put(key + "." + key1, jo.get(key1));
+ }
+ } else if (o instanceof JSONArray) {
+ JSONArray ja = (JSONArray) o;
+ mm.put(key + "_length", String.valueOf(ja.length()));
+
+ log.info("Added property: {}_length: {}", key, String.valueOf(ja.length()));
+
+ for (int i = 0; i < ja.length(); i++)
+ wm.put(key + '[' + i + ']', ja.get(i));
+ }
+ }
+ return mm;
+ } catch (JSONException e) {
+ throw new Exception("Unable to convert JSON to properties" + e.getLocalizedMessage(), e);
+ }
+ }
+}
diff --git a/src/main/java/org/onap/dcae/collectors/restconf/common/Parameters.java b/src/main/java/org/onap/dcae/collectors/restconf/common/Parameters.java
new file mode 100755
index 0000000..02fd68b
--- /dev/null
+++ b/src/main/java/org/onap/dcae/collectors/restconf/common/Parameters.java
@@ -0,0 +1,52 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.restconf
+ * ================================================================================
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcae.collectors.restconf.common;
+
+import java.util.Set;
+
+public class Parameters {
+ public String templateFileName;
+ public String restapiUrl;
+ public String restapiUser;
+ public String restapiPassword;
+ public Format format;
+ public String contentType;
+ public HttpMethod httpMethod;
+ public String responsePrefix;
+ public Set<String> listNameList;
+ public boolean skipSending;
+ public boolean convertResponse;
+ public String keyStoreFileName;
+ public String keyStorePassword;
+ public String trustStoreFileName;
+ public String trustStorePassword;
+ public boolean ssl;
+ public String customHttpHeaders;
+ public String partner;
+ public Boolean dumpHeaders;
+ public String requestBody;
+ public String oAuthConsumerKey;
+ public String oAuthConsumerSecret;
+ public String oAuthSignatureMethod;
+ public String oAuthVersion;
+ public AuthType authtype;
+ public Boolean returnRequestPayload;
+}
diff --git a/src/main/java/org/onap/dcae/collectors/restconf/common/RestConfCollector.java b/src/main/java/org/onap/dcae/collectors/restconf/common/RestConfCollector.java
new file mode 100755
index 0000000..754c73d
--- /dev/null
+++ b/src/main/java/org/onap/dcae/collectors/restconf/common/RestConfCollector.java
@@ -0,0 +1,66 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.restconf
+ * ================================================================================
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcae.collectors.restconf.common;
+
+import com.att.nsa.cmdLine.NsaCommandLineUtil;
+import com.att.nsa.drumlin.service.framework.DrumlinServlet;
+import com.att.nsa.drumlin.till.nv.impl.nvPropertiesFile;
+import com.att.nsa.drumlin.till.nv.impl.nvReadableStack;
+import com.att.nsa.drumlin.till.nv.impl.nvReadableTable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URL;
+import java.util.Map;
+
+public class RestConfCollector {
+
+ public static final Logger eplog = LoggerFactory.getLogger("org.onap.restconf.common.error");
+
+ public static void main(String[] args) {
+ try {
+ final Map<String, String> argMap = NsaCommandLineUtil.processCmdLine(args, true);
+ final String config = NsaCommandLineUtil.getSetting(argMap, Constants.KCONFIG, "collector.properties");
+ final URL settingStream = DrumlinServlet.findStream(config, RestConfCollector.class);
+
+ final nvReadableStack settings = new nvReadableStack();
+ settings.push(new nvPropertiesFile(settingStream));
+ settings.push(new nvReadableTable(argMap));
+
+ RestConfProc restConfProc = new RestConfProc(settings);
+ Map<String, String> paraMap = restConfProc.getParaMap();
+ String restApiURL = paraMap.get(Constants.KSETTING_REST_API_URL);
+ String sseEventsURL = paraMap.get(Constants.KSETTING_SSE_CONNECT_URL);
+ String[] listRestApiURL = restApiURL.split(";");
+ String[] listSseEventsURL = sseEventsURL.split(";");
+ for (int i = 0; i < listRestApiURL.length; i++) {
+ paraMap.put(Constants.KSETTING_REST_API_URL, "http://" + listRestApiURL[i] +
+ "/RestConfServer/rest/operations/establish-subscription");
+ paraMap.put(Constants.KSETTING_SSE_CONNECT_URL, listSseEventsURL[i]);
+ restConfProc.establishSubscription(paraMap, restConfProc.getCtx());
+ }
+
+ } catch (Exception e) {
+ RestConfCollector.eplog.error("Fatal error during application startup", e);
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/src/main/java/org/onap/dcae/collectors/restconf/common/RestConfContext.java b/src/main/java/org/onap/dcae/collectors/restconf/common/RestConfContext.java
new file mode 100755
index 0000000..4cb16ae
--- /dev/null
+++ b/src/main/java/org/onap/dcae/collectors/restconf/common/RestConfContext.java
@@ -0,0 +1,51 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.restconf
+ * ================================================================================
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcae.collectors.restconf.common;
+
+import java.util.HashMap;
+import java.util.Set;
+
+public class RestConfContext {
+ private HashMap<String, String> attributes;
+
+ public RestConfContext() {
+ attributes = new HashMap<>();
+ }
+
+ public String getAttribute(String name) {
+ return attributes.getOrDefault(name, null);
+ }
+
+ public void setAttribute(String name, String value) {
+ if (value == null) {
+ if (attributes.containsKey(name)) {
+ attributes.remove(name);
+ }
+ } else {
+ attributes.put(name, value);
+ }
+ }
+
+ public Set<String> getAttributeKeySet() {
+ return attributes.keySet();
+ }
+
+}
diff --git a/src/main/java/org/onap/dcae/collectors/restconf/common/RestConfProc.java b/src/main/java/org/onap/dcae/collectors/restconf/common/RestConfProc.java
new file mode 100755
index 0000000..67ea1fb
--- /dev/null
+++ b/src/main/java/org/onap/dcae/collectors/restconf/common/RestConfProc.java
@@ -0,0 +1,224 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.restconf
+ * ================================================================================
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcae.collectors.restconf.common;
+
+import com.att.nsa.drumlin.till.nv.rrNvReadable;
+import org.glassfish.jersey.media.sse.EventSource;
+import org.glassfish.jersey.media.sse.SseFeature;
+import org.json.JSONArray;
+import org.json.JSONObject;
+import org.onap.dcae.collectors.restconf.common.event.publishing.DMaaPConfigurationParser;
+import org.onap.dcae.collectors.restconf.common.event.publishing.EventPublisher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.client.Client;
+import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.client.WebTarget;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+
+
+public class RestConfProc {
+
+ private static final Logger log = LoggerFactory.getLogger(RestConfProc.class);
+
+ public static String format;
+
+ private static RestConfContext ctx = new RestConfContext();
+
+ private static final Logger oplog = LoggerFactory.getLogger("org.onap.restconf.common.output");
+
+ private Map<String, PersistentConnection> runnableInfo = new ConcurrentHashMap<>();
+
+ private final Map<String, String> paraMap = new HashMap<>();
+ private static String cambriaConfigFile;
+
+ public static LinkedBlockingQueue<JSONObject> fProcessingInputQueue;
+
+ public static String streamID;
+ private ExecutorService executor = Executors.newCachedThreadPool();
+
+ public RestConfProc() {
+ }
+
+ private void parseInputParameters(rrNvReadable settings) {
+ String tempFileName;
+ String restApiUrl;
+ String httpMetthod;
+ String respPrefix;
+ String skipSending;
+ String sseConnectUrl;
+ String[] currentConfigFile;
+
+ currentConfigFile = settings.getStrings(Constants.KSETTING_DMAAPCONFIGS, Constants.KDEFAULT_DMAAPCONFIGS);
+ cambriaConfigFile = currentConfigFile[0];
+
+ tempFileName = settings.getString(Constants.KDEFAULT_TEMP_FILENAME, null);
+ restApiUrl = settings.getString(Constants.KSETTING_REST_API_URL, null);
+ httpMetthod = settings.getString(Constants.KSETTING_HTTP_METHOD, null);
+ respPrefix = settings.getString(Constants.KSETTING_RESP_PREFIX, null);
+ skipSending = settings.getString(Constants.KSETTING_SKIP_SENDING, null);
+ sseConnectUrl = settings.getString(Constants.KSETTING_SSE_CONNECT_URL, null);
+ format = settings.getString(Constants.KSETTING_FORMAT, null);
+ streamID = "route=route_failure";
+
+ paraMap.put(Constants.KDEFAULT_TEMP_FILENAME, tempFileName);
+ paraMap.put(Constants.KSETTING_REST_API_URL, restApiUrl);
+ paraMap.put(Constants.KSETTING_HTTP_METHOD, httpMetthod);
+ paraMap.put(Constants.KSETTING_RESP_PREFIX, respPrefix);
+ paraMap.put(Constants.KSETTING_SKIP_SENDING, skipSending);
+ paraMap.put(Constants.KSETTING_SSE_CONNECT_URL, sseConnectUrl);
+ paraMap.put(Constants.KSETTING_FORMAT, format);
+
+ ctx.setAttribute("prop.encoding-json", "encoding-json");
+ ctx.setAttribute("restapi-result.response-code", "200");
+ ctx.setAttribute("restapi-result.ietf-subscribed-notifications:output.identifier", "100");
+ }
+
+ public RestConfProc(rrNvReadable settings) {
+
+ parseInputParameters(settings);
+
+ fProcessingInputQueue = new LinkedBlockingQueue<>(Constants.KDEFAULT_MAXQUEUEDEVENTS);
+
+ EventProcessor ep = new EventProcessor(EventPublisher.createPublisher(oplog,
+ DMaaPConfigurationParser
+ .parseToDomainMapping(Paths.get(cambriaConfigFile))
+ .get()));
+ ExecutorService executor = Executors.newFixedThreadPool(20);
+ for (int i = 0; i < 20; ++i) {
+ executor.execute(ep);
+ }
+ }
+
+ /**
+ * To establish a subscription with controller by sending HTTP request
+ *
+ * @param paramMap holds the input configuration
+ * @param ctx restconf context
+ * @throws Exception exception
+ */
+ public void establishSubscription(Map<String, String> paramMap, RestConfContext ctx) throws Exception {
+
+ RestapiCallNode restApiCallNode = new RestapiCallNode();
+
+ restApiCallNode.sendRequest(paramMap, ctx, null);
+
+ establishPersistentConnection(paramMap, ctx);
+ }
+
+ /**
+ * To establish persistent connection after receiving successful subscription response from controller
+ *
+ * @param paramMap holds the input configuration
+ * @param ctx restconf context
+ */
+ public void establishPersistentConnection(Map<String, String> paramMap, RestConfContext ctx) {
+
+ // check whether response is ok
+ if (ctx.getAttribute(Constants.RESPONSE_CODE).equals(Constants.RESPONSE_CODE_200)) {
+
+ String id = ctx.getAttribute(Constants.OUTPUT_IDENTIFIER);
+
+ String url = paramMap.get(Constants.KSETTING_SSE_CONNECT_URL);
+
+ PersistentConnection connection = new PersistentConnection(url, ctx);
+ runnableInfo.put(id, connection);
+ executor.execute(connection);
+ } else {
+ // error response is already updated in ctx
+ log.info("Failed to subscribe");
+ }
+ }
+
+ /**
+ * Get input parameter map
+ *
+ * @return input parameters map
+ */
+ public Map<String, String> getParaMap() {
+ return paraMap;
+ }
+
+
+ /**
+ * Get restConf context which has information about message encoding type
+ *
+ * @return restconf context
+ */
+ public RestConfContext getCtx() {
+ return ctx;
+ }
+
+ public class PersistentConnection implements Runnable {
+ private String url;
+ private RestConfContext ctx;
+ private volatile boolean running = true;
+
+ public PersistentConnection(String url, RestConfContext ctx) {
+ this.url = url;
+ this.ctx = ctx;
+ }
+
+ @Override
+ public void run() {
+ Client client = ClientBuilder.newBuilder()
+ .register(SseFeature.class).build();
+ WebTarget target = client.target(url);
+ EventSource eventSource = EventSource.target(target).build();
+ eventSource.register(new DataChangeEventListener(ctx));
+ eventSource.open();
+ log.info("Connected to SSE source");
+ while (running) {
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException ie) {
+ log.info("Exception: " + ie.getMessage());
+ }
+ }
+ eventSource.close();
+ log.info("Closed connection to SSE source");
+ }
+ }
+
+ /**
+ * To process the array of events which are received from controller
+ *
+ * @param a JSONArray
+ * @throws Exception exception
+ */
+ public static void handleEvents(JSONArray a) throws Exception {
+ for (int i = 0; i < a.length(); i++) {
+ if (!fProcessingInputQueue.offer(a.getJSONObject(i))) {
+ throw new Exception();
+ }
+ }
+ log.debug("RestConfCollector.handleEvents:EVENTS has been published successfully!");
+ }
+}
+
+
diff --git a/src/main/java/org/onap/dcae/collectors/restconf/common/RestapiCallNode.java b/src/main/java/org/onap/dcae/collectors/restconf/common/RestapiCallNode.java
new file mode 100755
index 0000000..a324e87
--- /dev/null
+++ b/src/main/java/org/onap/dcae/collectors/restconf/common/RestapiCallNode.java
@@ -0,0 +1,600 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.restconf
+ * ================================================================================
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcae.collectors.restconf.common;
+
+import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.ClientHandlerException;
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.UniformInterfaceException;
+import com.sun.jersey.api.client.WebResource;
+import com.sun.jersey.api.client.config.ClientConfig;
+import com.sun.jersey.api.client.config.DefaultClientConfig;
+import com.sun.jersey.api.client.filter.HTTPBasicAuthFilter;
+import com.sun.jersey.api.client.filter.HTTPDigestAuthFilter;
+import com.sun.jersey.client.urlconnection.HTTPSProperties;
+import com.sun.jersey.oauth.client.OAuthClientFilter;
+import com.sun.jersey.oauth.signature.OAuthParameters;
+import com.sun.jersey.oauth.signature.OAuthSecrets;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.HostnameVerifier;
+import javax.net.ssl.HttpsURLConnection;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.ws.rs.core.EntityTag;
+import javax.ws.rs.core.MultivaluedMap;
+import javax.ws.rs.core.UriBuilder;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.net.SocketException;
+import java.net.URI;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.security.KeyStore;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class RestapiCallNode {
+ private static final Logger log = LoggerFactory.getLogger(RestapiCallNode.class);
+
+ public void sendRequest(Map<String, String> paramMap, RestConfContext ctx, Integer retryCount) throws Exception {
+ RetryPolicy retryPolicy = null;
+ HttpResponse r = new HttpResponse();
+ try {
+ Parameters p = getParameters(paramMap);
+ String pp = p.responsePrefix != null ? p.responsePrefix + '.' : "";
+ String req = null;
+ if (p.templateFileName != null) {
+ String reqTemplate = readFile(p.templateFileName);
+ req = buildXmlJsonRequest(ctx, reqTemplate, p.format);
+ } else if (p.requestBody != null) {
+ req = p.requestBody;
+ }
+
+ r = sendHttpRequest(req, p);
+ setResponseStatus(ctx, p.responsePrefix, r);
+
+ if (p.dumpHeaders && r.headers != null) {
+ for (Map.Entry<String, List<String>> a : r.headers.entrySet()) {
+ ctx.setAttribute(pp + "header." + a.getKey(), StringUtils.join(a.getValue(), ","));
+ }
+ }
+
+ if (p.returnRequestPayload && req != null) {
+ ctx.setAttribute(pp + "httpRequest", req);
+ }
+
+ if (r.body != null && r.body.trim().length() > 0) {
+ ctx.setAttribute(pp + "httpResponse", r.body);
+
+ if (p.convertResponse) {
+ Map<String, String> mm = null;
+ if (p.format == Format.XML) {
+ mm = XmlParser.convertToProperties(r.body, p.listNameList);
+ } else if (p.format == Format.JSON) {
+ mm = JsonParser.convertToProperties(r.body);
+ }
+
+ if (mm != null) {
+ for (Map.Entry<String, String> entry : mm.entrySet())
+ ctx.setAttribute(pp + entry.getKey(), entry.getValue());
+ }
+ }
+ }
+ } catch (Exception e) {
+ boolean shouldRetry = false;
+ if (e.getCause().getCause() instanceof SocketException) {
+ shouldRetry = true;
+ }
+
+ log.error("Error sending the request: " + e.getMessage(), e);
+ String prefix = parseParam(paramMap, "responsePrefix", false, null);
+ if (retryPolicy == null || shouldRetry == false) {
+ setFailureResponseStatus(ctx, prefix, e.getMessage(), r);
+ } else {
+ if (retryCount == null) {
+ retryCount = 0;
+ }
+ String retryMessage = retryCount + " attempts were made out of " + retryPolicy.getMaximumRetries() +
+ " maximum retries.";
+ log.debug(retryMessage);
+ try {
+ retryCount = retryCount + 1;
+ if (retryCount < retryPolicy.getMaximumRetries() + 1) {
+ URI uri = new URI(paramMap.get("restapiUrl"));
+ String hostname = uri.getHost();
+ String retryString = retryPolicy.getNextHostName(uri.toString());
+ URI uriTwo = new URI(retryString);
+ URI retryUri = UriBuilder.fromUri(uri).host(uriTwo.getHost()).port(uriTwo.getPort()).scheme(
+ uriTwo.getScheme()).build();
+ paramMap.put("restapiUrl", retryUri.toString());
+ log.debug("URL was set to {}", retryUri.toString());
+ log.debug("Failed to communicate with host {}. Request will be re-attempted using the host {}.",
+ hostname, retryString);
+ log.debug("This is retry attempt {} out of {}", retryCount, retryPolicy.getMaximumRetries());
+ sendRequest(paramMap, ctx, retryCount);
+ } else {
+ log.debug("Maximum retries reached, calling setFailureResponseStatus.");
+ setFailureResponseStatus(ctx, prefix, e.getMessage(), r);
+ }
+ } catch (Exception ex) {
+ log.error("Could not attempt retry.", ex);
+ String retryErrorMessage =
+ "Retry attempt has failed. No further retry shall be attempted, calling " +
+ "setFailureResponseStatus.";
+ setFailureResponseStatus(ctx, prefix, retryErrorMessage, r);
+ }
+ }
+ }
+
+ if (r != null && r.code >= 300) {
+ throw new Exception(String.valueOf(r.code) + ": " + r.message);
+ }
+ }
+
+ protected Parameters getParameters(Map<String, String> paramMap) throws Exception {
+ Parameters p = new Parameters();
+ p.templateFileName = parseParam(paramMap, "templateFileName", false, null);
+ p.requestBody = parseParam(paramMap, "requestBody", false, null);
+ p.restapiUrl = parseParam(paramMap, "restapiUrl", true, null);
+ validateUrl(p.restapiUrl);
+ p.restapiUser = parseParam(paramMap, "restapiUser", false, null);
+ p.restapiPassword = parseParam(paramMap, "restapiPassword", false, null);
+ p.oAuthConsumerKey = parseParam(paramMap, "oAuthConsumerKey", false, null);
+ p.oAuthConsumerSecret = parseParam(paramMap, "oAuthConsumerSecret", false, null);
+ p.oAuthSignatureMethod = parseParam(paramMap, "oAuthSignatureMethod", false, null);
+ p.oAuthVersion = parseParam(paramMap, "oAuthVersion", false, null);
+ p.contentType = parseParam(paramMap, "contentType", false, null);
+ p.format = Format.fromString(parseParam(paramMap, "format", false, "json"));
+ p.authtype = AuthType.fromString(parseParam(paramMap, "authType", false, "unspecified"));
+ p.httpMethod = HttpMethod.fromString(parseParam(paramMap, "httpMethod", false, "post"));
+ p.responsePrefix = parseParam(paramMap, "responsePrefix", false, null);
+ p.listNameList = getListNameList(paramMap);
+ String skipSendingStr = paramMap.get("skipSending");
+ p.skipSending = "true".equalsIgnoreCase(skipSendingStr);
+ p.convertResponse = Boolean.valueOf(parseParam(paramMap, "convertResponse", false, "true"));
+ p.trustStoreFileName = parseParam(paramMap, "trustStoreFileName", false, null);
+ p.trustStorePassword = parseParam(paramMap, "trustStorePassword", false, null);
+ p.keyStoreFileName = parseParam(paramMap, "keyStoreFileName", false, null);
+ p.keyStorePassword = parseParam(paramMap, "keyStorePassword", false, null);
+ p.ssl = p.trustStoreFileName != null && p.trustStorePassword != null && p.keyStoreFileName != null &&
+ p.keyStorePassword != null;
+ p.customHttpHeaders = parseParam(paramMap, "customHttpHeaders", false, null);
+ p.partner = parseParam(paramMap, "partner", false, null);
+ p.dumpHeaders = Boolean.valueOf(parseParam(paramMap, "dumpHeaders", false, null));
+ p.returnRequestPayload = Boolean.valueOf(parseParam(paramMap, "returnRequestPayload", false, null));
+ return p;
+ }
+
+ private void validateUrl(String restapiUrl) throws Exception {
+ try {
+ URI.create(restapiUrl);
+ } catch (IllegalArgumentException e) {
+ throw new Exception("Invalid input of url " + e.getLocalizedMessage(), e);
+ }
+ }
+
+ protected Set<String> getListNameList(Map<String, String> paramMap) {
+ Set<String> ll = new HashSet<>();
+ for (Map.Entry<String, String> entry : paramMap.entrySet())
+ if (entry.getKey().startsWith("listName")) {
+ ll.add(entry.getValue());
+ }
+ return ll;
+ }
+
+ protected String parseParam(Map<String, String> paramMap, String name, boolean required, String def)
+ throws Exception {
+ String s = paramMap.get(name);
+
+ if (s == null || s.trim().length() == 0) {
+ if (!required) {
+ return def;
+ }
+ throw new Exception("Parameter " + name + " is required in RestapiCallNode");
+ }
+
+ s = s.trim();
+ StringBuilder value = new StringBuilder();
+ int i = 0;
+ int i1 = s.indexOf('%');
+ while (i1 >= 0) {
+ int i2 = s.indexOf('%', i1 + 1);
+ if (i2 < 0) {
+ break;
+ }
+
+ String varName = s.substring(i1 + 1, i2);
+ String varValue = System.getenv(varName);
+ if (varValue == null) {
+ varValue = "%" + varName + "%";
+ }
+
+ value.append(s.substring(i, i1));
+ value.append(varValue);
+
+ i = i2 + 1;
+ i1 = s.indexOf('%', i);
+ }
+ value.append(s.substring(i));
+
+ log.info("Parameter {}: [{}]", name, value);
+ return value.toString();
+ }
+
+ protected String buildXmlJsonRequest(RestConfContext ctx, String template, Format format) throws Exception {
+ log.info("Building {} started", format);
+ long t1 = System.currentTimeMillis();
+
+ template = expandRepeats(ctx, template, 1);
+
+ Map<String, String> mm = new HashMap<>();
+ for (String s : ctx.getAttributeKeySet())
+ mm.put(s, ctx.getAttribute(s));
+ StringBuilder ss = new StringBuilder();
+ int i = 0;
+ while (i < template.length()) {
+ int i1 = template.indexOf("${", i);
+ if (i1 < 0) {
+ ss.append(template.substring(i));
+ break;
+ }
+
+ int i2 = template.indexOf('}', i1 + 2);
+ if (i2 < 0) {
+ throw new Exception("Template error: Matching } not found");
+ }
+
+ String var1 = template.substring(i1 + 2, i2);
+ String value1 = format == Format.XML ? XmlJsonUtil.getXml(mm, var1) : XmlJsonUtil.getJson(mm, var1);
+ // log.info(" " + var1 + ": " + value1);
+ if (value1 == null || value1.trim().length() == 0) {
+ // delete the whole element (line)
+ int i3 = template.lastIndexOf('\n', i1);
+ if (i3 < 0) {
+ i3 = 0;
+ }
+ int i4 = template.indexOf('\n', i1);
+ if (i4 < 0) {
+ i4 = template.length();
+ }
+
+ if (i < i3) {
+ ss.append(template.substring(i, i3));
+ }
+ i = i4;
+ } else {
+ ss.append(template.substring(i, i1)).append(value1);
+ i = i2 + 1;
+ }
+ }
+
+ String req = format == Format.XML
+ ? XmlJsonUtil.removeEmptyStructXml(ss.toString()) : XmlJsonUtil.removeEmptyStructJson(ss.toString());
+
+ if (format == Format.JSON) {
+ req = XmlJsonUtil.removeLastCommaJson(req);
+ }
+
+ long t2 = System.currentTimeMillis();
+ log.info("Building {} completed. Time: {}", format, (t2 - t1));
+
+ return req;
+ }
+
+ protected String expandRepeats(RestConfContext ctx, String template, int level) throws Exception {
+ StringBuilder newTemplate = new StringBuilder();
+ int k = 0;
+ while (k < template.length()) {
+ int i1 = template.indexOf("${repeat:", k);
+ if (i1 < 0) {
+ newTemplate.append(template.substring(k));
+ break;
+ }
+
+ int i2 = template.indexOf(':', i1 + 9);
+ if (i2 < 0) {
+ throw new Exception(
+ "Template error: Context variable name followed by : is required after repeat");
+ }
+
+ // Find the closing }, store in i3
+ int nn = 1;
+ int i3 = -1;
+ int i = i2;
+ while (nn > 0 && i < template.length()) {
+ i3 = template.indexOf('}', i);
+ if (i3 < 0) {
+ throw new Exception("Template error: Matching } not found");
+ }
+ int i32 = template.indexOf('{', i);
+ if (i32 >= 0 && i32 < i3) {
+ nn++;
+ i = i32 + 1;
+ } else {
+ nn--;
+ i = i3 + 1;
+ }
+ }
+
+ String var1 = template.substring(i1 + 9, i2);
+ String value1 = ctx.getAttribute(var1);
+ log.info(" {}:{}", var1, value1);
+ int n = 0;
+ try {
+ n = Integer.parseInt(value1);
+ } catch (NumberFormatException e) {
+ log.info("value1 not set or not a number, n will remain set at zero");
+ }
+
+ newTemplate.append(template.substring(k, i1));
+
+ String rpt = template.substring(i2 + 1, i3);
+
+ for (int ii = 0; ii < n; ii++) {
+ String ss = rpt.replaceAll("\\[\\$\\{" + level + "\\}\\]", "[" + ii + "]");
+ if (ii == n - 1 && ss.trim().endsWith(",")) {
+ int i4 = ss.lastIndexOf(',');
+ if (i4 > 0) {
+ ss = ss.substring(0, i4) + ss.substring(i4 + 1);
+ }
+ }
+ newTemplate.append(ss);
+ }
+
+ k = i3 + 1;
+ }
+
+ if (k == 0) {
+ return newTemplate.toString();
+ }
+
+ return expandRepeats(ctx, newTemplate.toString(), level + 1);
+ }
+
+ protected String readFile(String fileName) throws Exception {
+ try {
+ byte[] encoded = Files.readAllBytes(Paths.get(fileName));
+ return new String(encoded, "UTF-8");
+ } catch (IOException | SecurityException e) {
+ throw new Exception("Unable to read file " + fileName + e.getLocalizedMessage(), e);
+ }
+ }
+
+ protected Client addAuthType(Client client, Parameters p) throws Exception {
+ if (p.authtype == AuthType.Unspecified) {
+ if (p.restapiUser != null && p.restapiPassword != null) {
+ client.addFilter(new HTTPBasicAuthFilter(p.restapiUser, p.restapiPassword));
+ } else if (p.oAuthConsumerKey != null && p.oAuthConsumerSecret != null
+ && p.oAuthSignatureMethod != null) {
+ OAuthParameters params = new OAuthParameters()
+ .signatureMethod(p.oAuthSignatureMethod)
+ .consumerKey(p.oAuthConsumerKey)
+ .version(p.oAuthVersion);
+
+ OAuthSecrets secrets = new OAuthSecrets()
+ .consumerSecret(p.oAuthConsumerSecret);
+ client.addFilter(new OAuthClientFilter(client.getProviders(), params, secrets));
+ }
+ } else {
+ if (p.authtype == AuthType.DIGEST) {
+ if (p.restapiUser != null && p.restapiPassword != null) {
+ client.addFilter(new HTTPDigestAuthFilter(p.restapiUser, p.restapiPassword));
+ } else {
+ throw new Exception("oAUTH authentication type selected but all restapiUser and restapiPassword " +
+ "parameters doesn't exist", new Throwable());
+ }
+ } else if (p.authtype == AuthType.BASIC) {
+ if (p.restapiUser != null && p.restapiPassword != null) {
+ client.addFilter(new HTTPBasicAuthFilter(p.restapiUser, p.restapiPassword));
+ } else {
+ throw new Exception("oAUTH authentication type selected but all restapiUser and restapiPassword " +
+ "parameters doesn't exist", new Throwable());
+ }
+ } else if (p.authtype == AuthType.OAUTH) {
+ if (p.oAuthConsumerKey != null && p.oAuthConsumerSecret != null && p.oAuthSignatureMethod != null) {
+ OAuthParameters params = new OAuthParameters()
+ .signatureMethod(p.oAuthSignatureMethod)
+ .consumerKey(p.oAuthConsumerKey)
+ .version(p.oAuthVersion);
+
+ OAuthSecrets secrets = new OAuthSecrets()
+ .consumerSecret(p.oAuthConsumerSecret);
+ client.addFilter(new OAuthClientFilter(client.getProviders(), params, secrets));
+ } else {
+ throw new Exception("oAUTH authentication type selected but all oAuthConsumerKey, oAuthConsumerSecret " +
+ "and oAuthSignatureMethod parameters doesn't exist", new Throwable());
+ }
+ }
+ }
+ return client;
+ }
+
+ protected HttpResponse sendHttpRequest(String request, Parameters p) throws Exception {
+
+ ClientConfig config = new DefaultClientConfig();
+ SSLContext ssl = null;
+ if (p.ssl && p.restapiUrl.startsWith("https")) {
+ ssl = createSSLContext(p);
+ }
+ if (ssl != null) {
+ HostnameVerifier hostnameVerifier = (hostname, session) -> true;
+
+ config.getProperties().put(HTTPSProperties.PROPERTY_HTTPS_PROPERTIES,
+ new HTTPSProperties(hostnameVerifier, ssl));
+ }
+
+ logProperties(config.getProperties());
+
+ Client client = Client.create(config);
+ client.setConnectTimeout(5000);
+ WebResource webResource = addAuthType(client, p).resource(p.restapiUrl);
+
+ log.info("Sending request:");
+ log.info(request);
+ long t1 = System.currentTimeMillis();
+
+ HttpResponse r = new HttpResponse();
+ r.code = 200;
+
+ if (!p.skipSending) {
+ String tt = p.format == Format.XML ? "application/xml" : "application/json";
+ String tt1 = tt + ";charset=UTF-8";
+ if (p.contentType != null) {
+ tt = p.contentType;
+ tt1 = p.contentType;
+ }
+
+ WebResource.Builder webResourceBuilder = webResource.accept(tt).type(tt1);
+ if (p.format == Format.NONE) {
+ webResourceBuilder = webResource.header("", "");
+ }
+
+ if (p.customHttpHeaders != null && p.customHttpHeaders.length() > 0) {
+ String[] keyValuePairs = p.customHttpHeaders.split(",");
+ for (String singlePair : keyValuePairs) {
+ int equalPosition = singlePair.indexOf('=');
+ webResourceBuilder.header(singlePair.substring(0, equalPosition),
+ singlePair.substring(equalPosition + 1, singlePair.length()));
+ }
+ }
+
+ webResourceBuilder.header("X-ECOMP-RequestID", org.slf4j.MDC.get("X-ECOMP-RequestID"));
+
+ ClientResponse response;
+
+ try {
+ response = webResourceBuilder.method(p.httpMethod.toString(), ClientResponse.class, request);
+ } catch (UniformInterfaceException | ClientHandlerException e) {
+ throw new Exception("Exception while sending http request to client "
+ + e.getLocalizedMessage(), e);
+ }
+
+ r.code = response.getStatus();
+ r.headers = response.getHeaders();
+ EntityTag etag = response.getEntityTag();
+ if (etag != null) {
+ r.message = etag.getValue();
+ }
+ if (response.hasEntity() && r.code != 204) {
+ r.body = response.getEntity(String.class);
+ }
+ }
+
+ long t2 = System.currentTimeMillis();
+ log.info("Response received. Time: {}", (t2 - t1));
+ log.info("HTTP response code: {}", r.code);
+ log.info("HTTP response message: {}", r.message);
+ logHeaders(r.headers);
+ log.info("HTTP response: {}", r.body);
+
+ return r;
+ }
+
+ protected void setFailureResponseStatus(RestConfContext ctx, String prefix, String errorMessage,
+ HttpResponse resp) {
+ resp.code = 500;
+ resp.message = errorMessage;
+ String pp = prefix != null ? prefix + '.' : "";
+ ctx.setAttribute(pp + "response-code", String.valueOf(resp.code));
+ ctx.setAttribute(pp + "response-message", resp.message);
+ }
+
+ protected void setResponseStatus(RestConfContext ctx, String prefix, HttpResponse r) {
+ String pp = prefix != null ? prefix + '.' : "";
+ ctx.setAttribute(pp + "response-code", String.valueOf(r.code));
+ ctx.setAttribute(pp + "response-message", r.message);
+ }
+
+ protected SSLContext createSSLContext(Parameters p) {
+ try (FileInputStream in = new FileInputStream(p.keyStoreFileName)) {
+ System.setProperty("jsse.enableSNIExtension", "false");
+ System.setProperty("javax.net.ssl.trustStore", p.trustStoreFileName);
+ System.setProperty("javax.net.ssl.trustStorePassword", p.trustStorePassword);
+
+ HttpsURLConnection.setDefaultHostnameVerifier((string, ssls) -> true);
+
+ KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
+ KeyStore ks = KeyStore.getInstance("PKCS12");
+ char[] pwd = p.keyStorePassword.toCharArray();
+ ks.load(in, pwd);
+ kmf.init(ks, pwd);
+
+ SSLContext ctx = SSLContext.getInstance("TLS");
+ ctx.init(kmf.getKeyManagers(), null, null);
+ return ctx;
+ } catch (Exception e) {
+ log.error("Error creating SSLContext: {}", e.getMessage(), e);
+ }
+ return null;
+ }
+
+ protected void logProperties(Map<String, Object> mm) {
+ List<String> ll = new ArrayList<>();
+ for (Object o : mm.keySet())
+ ll.add((String) o);
+ Collections.sort(ll);
+
+ log.info("Properties:");
+ for (String name : ll)
+ log.info("--- {}:{}", name, String.valueOf(mm.get(name)));
+ }
+
+ protected void logHeaders(MultivaluedMap<String, String> mm) {
+ log.info("HTTP response headers:");
+
+ if (mm == null) {
+ return;
+ }
+
+ List<String> ll = new ArrayList<>();
+ for (Object o : mm.keySet())
+ ll.add((String) o);
+ Collections.sort(ll);
+
+ for (String name : ll)
+ log.info("--- {}:{}", name, String.valueOf(mm.get(name)));
+ }
+
+ private static class FileParam {
+
+ public String fileName;
+ public String url;
+ public String user;
+ public String password;
+ public HttpMethod httpMethod;
+ public String responsePrefix;
+ public boolean skipSending;
+ public String oAuthConsumerKey;
+ public String oAuthConsumerSecret;
+ public String oAuthSignatureMethod;
+ public String oAuthVersion;
+ public AuthType authtype;
+ }
+
+} \ No newline at end of file
diff --git a/src/main/java/org/onap/dcae/collectors/restconf/common/RetryException.java b/src/main/java/org/onap/dcae/collectors/restconf/common/RetryException.java
new file mode 100755
index 0000000..91c0a9c
--- /dev/null
+++ b/src/main/java/org/onap/dcae/collectors/restconf/common/RetryException.java
@@ -0,0 +1,27 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.restconf
+ * ================================================================================
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcae.collectors.restconf.common;
+
+public class RetryException extends Exception {
+ public RetryException(String message) {
+ super(message);
+ }
+}
diff --git a/src/main/java/org/onap/dcae/collectors/restconf/common/RetryPolicy.java b/src/main/java/org/onap/dcae/collectors/restconf/common/RetryPolicy.java
new file mode 100755
index 0000000..7bc0759
--- /dev/null
+++ b/src/main/java/org/onap/dcae/collectors/restconf/common/RetryPolicy.java
@@ -0,0 +1,58 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.restconf
+ * ================================================================================
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcae.collectors.restconf.common;
+
+public class RetryPolicy {
+ private String[] hostnames;
+ private Integer maximumRetries;
+
+ public Integer getMaximumRetries() {
+ return maximumRetries;
+ }
+
+ public String getNextHostName(String uri) throws RetryException {
+ Integer position = null;
+
+ for (int i = 0; i < hostnames.length; i++) {
+ if (uri.contains(hostnames[i])) {
+ position = i;
+ break;
+ }
+ }
+
+ if (position == null) {
+ throw new RetryException("No match found for the provided uri[" + uri + "] " +
+ "so the next host name could not be retreived");
+ }
+ position++;
+
+ if (position > hostnames.length - 1) {
+ position = 0;
+ }
+ return hostnames[position];
+ }
+
+ public RetryPolicy(String[] hostnames, Integer maximumRetries) {
+ this.hostnames = hostnames;
+ this.maximumRetries = maximumRetries;
+ }
+
+}
diff --git a/src/main/java/org/onap/dcae/collectors/restconf/common/RetryPolicyStore.java b/src/main/java/org/onap/dcae/collectors/restconf/common/RetryPolicyStore.java
new file mode 100755
index 0000000..10b0e00
--- /dev/null
+++ b/src/main/java/org/onap/dcae/collectors/restconf/common/RetryPolicyStore.java
@@ -0,0 +1,53 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.restconf
+ * ================================================================================
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcae.collectors.restconf.common;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+
+public class RetryPolicyStore {
+ private static final Logger log = LoggerFactory.getLogger(RetryPolicyStore.class);
+
+ HashMap<String, RetryPolicy> retryPolicies;
+ public String proxyServers;
+
+ public String getProxyServers() {
+ return proxyServers;
+ }
+
+ public void setProxyServers(String admServers) {
+ this.proxyServers = admServers;
+ String[] adminServersArray = admServers.split(",");
+ RetryPolicy adminPortalRetry = new RetryPolicy(adminServersArray, adminServersArray.length);
+ retryPolicies.put("dme2proxy", adminPortalRetry);
+ }
+
+ public RetryPolicyStore() {
+ retryPolicies = new HashMap<>();
+ }
+
+ public RetryPolicy getRetryPolicy(String policyName) {
+ return (this.retryPolicies.get(policyName));
+ }
+
+}
diff --git a/src/main/java/org/onap/dcae/collectors/restconf/common/XmlJsonUtil.java b/src/main/java/org/onap/dcae/collectors/restconf/common/XmlJsonUtil.java
new file mode 100755
index 0000000..382ef4d
--- /dev/null
+++ b/src/main/java/org/onap/dcae/collectors/restconf/common/XmlJsonUtil.java
@@ -0,0 +1,412 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.restconf
+ * ================================================================================
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcae.collectors.restconf.common;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class XmlJsonUtil {
+
+ private static final Logger log = LoggerFactory.getLogger(XmlJsonUtil.class);
+
+ private XmlJsonUtil() {
+ // Preventing instantiation of the same.
+ }
+
+ public static String getXml(Map<String, String> varmap, String var) {
+ boolean escape = true;
+ if (var.startsWith("'")) {
+ var = var.substring(1);
+ escape = false;
+ }
+
+ Object o = createStructure(varmap, var);
+ return generateXml(o, 0, escape);
+ }
+
+ public static String getJson(Map<String, String> varmap, String var) {
+ boolean escape = true;
+ if (var.startsWith("'")) {
+ var = var.substring(1);
+ escape = false;
+ }
+
+ boolean quotes = true;
+ if (var.startsWith("\"")) {
+ var = var.substring(1);
+ quotes = false;
+ }
+
+ Object o = createStructure(varmap, var);
+ return generateJson(o, escape, quotes);
+ }
+
+ private static Object createStructure(Map<String, String> flatmap, String var) {
+ if (flatmap.containsKey(var)) {
+ if (var.endsWith("_length") || var.endsWith("].key")) {
+ return null;
+ }
+ return flatmap.get(var);
+ }
+
+ Map<String, Object> mm = new HashMap<>();
+ for (String k : flatmap.keySet())
+ if (k.startsWith(var + ".")) {
+ int i1 = k.indexOf('.', var.length() + 1);
+ int i2 = k.indexOf('[', var.length() + 1);
+ int i3 = k.length();
+ if (i1 > 0 && i1 < i3) {
+ i3 = i1;
+ }
+ if (i2 > 0 && i2 < i3) {
+ i3 = i2;
+ }
+ String k1 = k.substring(var.length() + 1, i3);
+ String var1 = k.substring(0, i3);
+ if (!mm.containsKey(k1)) {
+ Object str = createStructure(flatmap, var1);
+ if (str != null && (!(str instanceof String) || ((String) str).trim().length() > 0)) {
+ mm.put(k1, str);
+ }
+ }
+ }
+ if (!mm.isEmpty()) {
+ return mm;
+ }
+
+ boolean arrayFound = false;
+ for (String k : flatmap.keySet())
+ if (k.startsWith(var + "[")) {
+ arrayFound = true;
+ break;
+ }
+
+ if (arrayFound) {
+ List<Object> ll = new ArrayList<>();
+
+ int length = Integer.MAX_VALUE;
+ String lengthStr = flatmap.get(var + "_length");
+ if (lengthStr != null) {
+ try {
+ length = Integer.parseInt(lengthStr);
+ } catch (Exception e) {
+ log.warn("Invalid number for {}_length:{}", var, lengthStr, e);
+ }
+ }
+
+ for (int i = 0; i < length; i++) {
+ Object v = createStructure(flatmap, var + '[' + i + ']');
+ if (v == null) {
+ break;
+ }
+ ll.add(v);
+ }
+
+ if (!ll.isEmpty()) {
+ return ll;
+ }
+ }
+
+ return null;
+ }
+
+ @SuppressWarnings("unchecked")
+ private static String generateXml(Object o, int indent, boolean escape) {
+ if (o == null) {
+ return null;
+ }
+
+ if (o instanceof String) {
+ return escape ? escapeXml((String) o) : (String) o;
+ }
+ ;
+
+ if (o instanceof Map) {
+ StringBuilder ss = new StringBuilder();
+ Map<String, Object> mm = (Map<String, Object>) o;
+ for (Map.Entry<String, Object> entry : mm.entrySet()) {
+ Object v = entry.getValue();
+ String key = entry.getKey();
+ if (v instanceof String) {
+ String s = escape ? escapeXml((String) v) : (String) v;
+ ss.append(pad(indent)).append('<').append(key).append('>');
+ ss.append(s);
+ ss.append("</").append(key).append('>').append('\n');
+ } else if (v instanceof Map) {
+ ss.append(pad(indent)).append('<').append(key).append('>').append('\n');
+ ss.append(generateXml(v, indent + 1, escape));
+ ss.append(pad(indent)).append("</").append(key).append('>').append('\n');
+ } else if (v instanceof List) {
+ List<Object> ll = (List<Object>) v;
+ for (Object o1 : ll) {
+ ss.append(pad(indent)).append('<').append(key).append('>').append('\n');
+ ss.append(generateXml(o1, indent + 1, escape));
+ ss.append(pad(indent)).append("</").append(key).append('>').append('\n');
+ }
+ }
+ }
+ return ss.toString();
+ }
+
+ return null;
+ }
+
+ private static String generateJson(Object o, boolean escape, boolean quotes) {
+ if (o == null) {
+ return null;
+ }
+
+ StringBuilder ss = new StringBuilder();
+ generateJson(ss, o, 0, false, escape, quotes);
+ return ss.toString();
+ }
+
+ @SuppressWarnings("unchecked")
+ private static void generateJson(StringBuilder ss, Object o, int indent, boolean padFirst, boolean escape, boolean quotes) {
+ if (o instanceof String) {
+ String s = escape ? escapeJson((String) o) : (String) o;
+ if (padFirst) {
+ ss.append(pad(indent));
+ }
+ if (quotes) {
+ ss.append('"').append(s).append('"');
+ } else {
+ ss.append(s);
+ }
+ return;
+ }
+
+ if (o instanceof Map) {
+ Map<String, Object> mm = (Map<String, Object>) o;
+
+ if (padFirst) {
+ ss.append(pad(indent));
+ }
+ ss.append("{\n");
+
+ boolean first = true;
+ for (Map.Entry<String, Object> entry : mm.entrySet()) {
+ if (!first) {
+ ss.append(",\n");
+ }
+ first = false;
+ Object v = entry.getValue();
+ String key = entry.getKey();
+ ss.append(pad(indent + 1)).append('"').append(key).append("\": ");
+ generateJson(ss, v, indent + 1, false, escape, true);
+ }
+
+ ss.append("\n");
+ ss.append(pad(indent)).append('}');
+
+ return;
+ }
+
+ if (o instanceof List) {
+ List<Object> ll = (List<Object>) o;
+
+ if (padFirst) {
+ ss.append(pad(indent));
+ }
+ ss.append("[\n");
+
+ boolean first = true;
+ for (Object o1 : ll) {
+ if (!first) {
+ ss.append(",\n");
+ }
+ first = false;
+
+ generateJson(ss, o1, indent + 1, true, escape, quotes);
+ }
+
+ ss.append("\n");
+ ss.append(pad(indent)).append(']');
+ }
+ }
+
+ public static String removeLastCommaJson(String s) {
+ StringBuilder sb = new StringBuilder();
+ int k = 0;
+ int start = 0;
+ while (k < s.length()) {
+ int i11 = s.indexOf('}', k);
+ int i12 = s.indexOf(']', k);
+ int i1 = -1;
+ if (i11 < 0) {
+ i1 = i12;
+ } else if (i12 < 0) {
+ i1 = i11;
+ } else {
+ i1 = i11 < i12 ? i11 : i12;
+ }
+ if (i1 < 0) {
+ break;
+ }
+
+ int i2 = s.lastIndexOf(',', i1);
+ if (i2 < 0) {
+ k = i1 + 1;
+ continue;
+ }
+
+ String between = s.substring(i2 + 1, i1);
+ if (between.trim().length() > 0) {
+ k = i1 + 1;
+ continue;
+ }
+
+ sb.append(s.substring(start, i2));
+ start = i2 + 1;
+ k = i1 + 1;
+ }
+
+ sb.append(s.substring(start, s.length()));
+
+ return sb.toString();
+ }
+
+ public static String removeEmptyStructJson(String s) {
+ int k = 0;
+ while (k < s.length()) {
+ boolean curly = true;
+ int i11 = s.indexOf('{', k);
+ int i12 = s.indexOf('[', k);
+ int i1 = -1;
+ if (i11 < 0) {
+ i1 = i12;
+ curly = false;
+ } else if (i12 < 0) {
+ i1 = i11;
+ } else if (i11 < i12) {
+ i1 = i11;
+ } else {
+ i1 = i12;
+ curly = false;
+ }
+
+ if (i1 >= 0) {
+ int i2 = curly ? s.indexOf('}', i1) : s.indexOf(']', i1);
+ if (i2 > 0) {
+ String value = s.substring(i1 + 1, i2);
+ if (value.trim().length() == 0) {
+ int i4 = s.lastIndexOf('\n', i1);
+ if (i4 < 0) {
+ i4 = 0;
+ }
+ int i5 = s.indexOf('\n', i2);
+ if (i5 < 0) {
+ i5 = s.length();
+ }
+
+ s = s.substring(0, i4) + s.substring(i5);
+ k = 0;
+ } else {
+ k = i1 + 1;
+ }
+ } else {
+ break;
+ }
+ } else {
+ break;
+ }
+ }
+
+ return s;
+ }
+
+ public static String removeEmptyStructXml(String s) {
+ int k = 0;
+ while (k < s.length()) {
+ int i1 = s.indexOf('<', k);
+ if (i1 < 0 || i1 == s.length() - 1) {
+ break;
+ }
+
+ char c1 = s.charAt(i1 + 1);
+ if (c1 == '?' || c1 == '!') {
+ k = i1 + 2;
+ continue;
+ }
+
+ int i2 = s.indexOf('>', i1);
+ if (i2 < 0) {
+ k = i1 + 1;
+ continue;
+ }
+
+ String closingTag = "</" + s.substring(i1 + 1, i2 + 1);
+ int i3 = s.indexOf(closingTag, i2 + 1);
+ if (i3 < 0) {
+ k = i2 + 1;
+ continue;
+ }
+
+ String value = s.substring(i2 + 1, i3);
+ if (value.trim().length() > 0) {
+ k = i2 + 1;
+ continue;
+ }
+
+ int i4 = s.lastIndexOf('\n', i1);
+ if (i4 < 0) {
+ i4 = 0;
+ }
+ int i5 = s.indexOf('\n', i3);
+ if (i5 < 0) {
+ i5 = s.length();
+ }
+
+ s = s.substring(0, i4) + s.substring(i5);
+ k = 0;
+ }
+
+ return s;
+ }
+
+ private static String escapeXml(String v) {
+ String s = v.replaceAll("&", "&amp;");
+ s = s.replaceAll("<", "&lt;");
+ s = s.replaceAll("'", "&apos;");
+ s = s.replaceAll("\"", "&quot;");
+ s = s.replaceAll(">", "&gt;");
+ return s;
+ }
+
+ private static String escapeJson(String v) {
+ String s = v.replaceAll("\\\\", "\\\\\\\\");
+ s = s.replaceAll("\"", "\\\\\"");
+ return s;
+ }
+
+ private static String pad(int n) {
+ StringBuilder s = new StringBuilder();
+ for (int i = 0; i < n; i++)
+ s.append(Character.toString('\t'));
+ return s.toString();
+ }
+}
+
diff --git a/src/main/java/org/onap/dcae/collectors/restconf/common/XmlParser.java b/src/main/java/org/onap/dcae/collectors/restconf/common/XmlParser.java
new file mode 100755
index 0000000..e073cd6
--- /dev/null
+++ b/src/main/java/org/onap/dcae/collectors/restconf/common/XmlParser.java
@@ -0,0 +1,178 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.restconf
+ * ================================================================================
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcae.collectors.restconf.common;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.xml.sax.Attributes;
+import org.xml.sax.SAXException;
+import org.xml.sax.helpers.DefaultHandler;
+
+import javax.xml.parsers.ParserConfigurationException;
+import javax.xml.parsers.SAXParser;
+import javax.xml.parsers.SAXParserFactory;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+public class XmlParser {
+
+ private static final Logger log = LoggerFactory.getLogger(XmlParser.class);
+
+ private XmlParser() {
+ // Preventing instantiation of the same.
+ }
+
+ public static Map<String, String> convertToProperties(String s, Set<String> listNameList)
+ throws Exception {
+
+ checkNotNull(s, "Input should not be null.");
+
+ Handler handler = new Handler(listNameList);
+ try {
+ SAXParserFactory factory = SAXParserFactory.newInstance();
+ SAXParser saxParser = factory.newSAXParser();
+ InputStream in = new ByteArrayInputStream(s.getBytes());
+ saxParser.parse(in, handler);
+ } catch (ParserConfigurationException | IOException | SAXException | NumberFormatException e) {
+ throw new Exception("Unable to convert XML to properties" + e.getLocalizedMessage(), e);
+ }
+ return handler.getProperties();
+ }
+
+ private static class Handler extends DefaultHandler {
+
+ private Set<String> listNameList;
+
+ private Map<String, String> properties = new HashMap<>();
+
+ public Map<String, String> getProperties() {
+ return properties;
+ }
+
+ public Handler(Set<String> listNameList) {
+ super();
+ this.listNameList = listNameList;
+ if (this.listNameList == null) {
+ this.listNameList = new HashSet<>();
+ }
+ }
+
+ StringBuilder currentName = new StringBuilder();
+ StringBuilder currentValue = new StringBuilder();
+
+ @Override
+ public void startElement(String uri, String localName, String qName, Attributes attributes)
+ throws SAXException {
+ super.startElement(uri, localName, qName, attributes);
+
+ String name = localName;
+ if (name == null || name.trim().length() == 0) {
+ name = qName;
+ }
+ int i2 = name.indexOf(':');
+ if (i2 >= 0) {
+ name = name.substring(i2 + 1);
+ }
+
+ if (currentName.length() > 0) {
+ currentName.append(Character.toString('.'));
+ }
+ currentName.append(name);
+
+ String listName = removeIndexes(currentName.toString());
+
+ if (listNameList.contains(listName)) {
+ String n = currentName.toString() + "_length";
+ int len = getInt(properties, n);
+ properties.put(n, String.valueOf(len + 1));
+ currentName.append("[").append(len).append("]");
+ }
+ }
+
+ @Override
+ public void endElement(String uri, String localName, String qName) throws SAXException {
+ super.endElement(uri, localName, qName);
+
+ String name = localName;
+ if (name == null || name.trim().length() == 0) {
+ name = qName;
+ }
+ int i2 = name.indexOf(':');
+ if (i2 >= 0) {
+ name = name.substring(i2 + 1);
+ }
+
+ String s = currentValue.toString().trim();
+ if (s.length() > 0) {
+ properties.put(currentName.toString(), s);
+
+ log.info("Added property: {} : {}", currentName, s);
+ currentValue = new StringBuilder();
+ }
+
+ int i1 = currentName.lastIndexOf("." + name);
+ if (i1 <= 0) {
+ currentName = new StringBuilder();
+ } else {
+ currentName = new StringBuilder(currentName.substring(0, i1));
+ }
+ }
+
+ @Override
+ public void characters(char[] ch, int start, int length) throws SAXException {
+ super.characters(ch, start, length);
+
+ String value = new String(ch, start, length);
+ currentValue.append(value);
+ }
+
+ private static int getInt(Map<String, String> mm, String name) {
+ String s = mm.get(name);
+ if (s == null) {
+ return 0;
+ }
+ return Integer.parseInt(s);
+ }
+
+ private String removeIndexes(String currentName) {
+ StringBuilder b = new StringBuilder();
+ boolean add = true;
+ for (int i = 0; i < currentName.length(); i++) {
+ char c = currentName.charAt(i);
+ if (c == '[') {
+ add = false;
+ } else if (c == ']') {
+ add = true;
+ } else if (add) {
+ b.append(Character.toString(c));
+ }
+ }
+ return b.toString();
+ }
+ }
+}
diff --git a/src/main/java/org/onap/dcae/collectors/restconf/common/event/publishing/DMaaPConfigurationParser.java b/src/main/java/org/onap/dcae/collectors/restconf/common/event/publishing/DMaaPConfigurationParser.java
new file mode 100755
index 0000000..04271e4
--- /dev/null
+++ b/src/main/java/org/onap/dcae/collectors/restconf/common/event/publishing/DMaaPConfigurationParser.java
@@ -0,0 +1,107 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.restconf
+ * ================================================================================
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcae.collectors.restconf.common.event.publishing;
+
+import io.vavr.collection.List;
+import io.vavr.collection.Map;
+import io.vavr.control.Option;
+import io.vavr.control.Try;
+import org.onap.dcae.collectors.restconf.common.AnyNode;
+
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+
+import static io.vavr.API.List;
+import static io.vavr.API.Try;
+import static io.vavr.API.Tuple;
+import static io.vavr.API.unchecked;
+import static org.onap.dcae.collectors.restconf.common.event.publishing.VavrUtils.enhanceError;
+import static org.onap.dcae.collectors.restconf.common.event.publishing.VavrUtils.f;
+
+public class DMaaPConfigurationParser {
+
+ public static Try<Map<String, PublisherConfig>> parseToDomainMapping(Path configLocation) {
+ return readFromFile(configLocation)
+ .flatMap(DMaaPConfigurationParser::toJSON)
+ .flatMap(DMaaPConfigurationParser::toConfigMap);
+ }
+
+ private static Try<String> readFromFile(Path configLocation) {
+ return Try(() -> new String(Files.readAllBytes(configLocation)))
+ .mapFailure(enhanceError(f("Could not read DMaaP configuration from location: '%s'", configLocation)));
+ }
+
+ private static Try<AnyNode> toJSON(String config) {
+ return Try(() -> AnyNode.fromString(config))
+ .mapFailure(enhanceError(f("DMaaP configuration '%s' is not a valid JSON document", config)));
+ }
+
+ private static Try<Map<String, PublisherConfig>> toConfigMap(AnyNode config) {
+ return Try(() -> usesLegacyFormat(config) ? parseLegacyFormat(config) : parseNewFormat(config))
+ .mapFailure(enhanceError(
+ f("Parsing DMaaP configuration: '%s' failed, probably it is in unexpected format", config)));
+ }
+
+ private static boolean usesLegacyFormat(AnyNode dMaaPConfig) {
+ return dMaaPConfig.has("channels");
+ }
+
+ private static Map<String, PublisherConfig> parseLegacyFormat(AnyNode root) {
+ return root.get("channels").toList().toMap(
+ channel -> channel.get("name").toString(),
+ channel -> {
+ String destinationsStr = channel.getAsOption("cambria.url")
+ .getOrElse(channel.getAsOption("cambria.hosts").get())
+ .toString();
+ String topic = channel.get("cambria.topic").toString();
+ Option<String> maybeUser = channel.getAsOption("basicAuthUsername").map(AnyNode::toString);
+ Option<String> maybePassword = channel.getAsOption("basicAuthPassword").map(AnyNode::toString);
+ List<String> destinations = List(destinationsStr.split(","));
+ return buildBasedOnAuth(maybeUser, maybePassword, topic, destinations);
+ });
+ }
+
+ private static Map<String, PublisherConfig> parseNewFormat(AnyNode root) {
+ return root.keys().toMap(
+ channelName -> channelName,
+ channelName -> {
+ AnyNode channelConfig = root.get(channelName);
+ Option<String> maybeUser = channelConfig.getAsOption("aaf_username").map(AnyNode::toString);
+ Option<String> maybePassword = channelConfig.getAsOption("aaf_password").map(AnyNode::toString);
+ URL topicURL = unchecked(
+ () -> new URL(channelConfig.get("dmaap_info").get("topic_url").toString())).apply();
+ String[] pathSegments = topicURL.getPath().substring(1).split("/");
+ String topic = pathSegments[1];
+ String destination = "events".equals(pathSegments[0]) ? topicURL.getAuthority() : topicURL.getHost();
+ List<String> destinations = List(destination);
+ return buildBasedOnAuth(maybeUser, maybePassword, topic, destinations);
+ });
+ }
+
+ private static PublisherConfig buildBasedOnAuth(Option<String> maybeUser, Option<String> maybePassword,
+ String topic, List<String> destinations) {
+ return maybeUser.flatMap(user -> maybePassword.map(password -> Tuple(user, password)))
+ .map(credentials -> new PublisherConfig(destinations, topic, credentials._1, credentials._2))
+ .getOrElse(new PublisherConfig(destinations, topic));
+ }
+
+}
diff --git a/src/main/java/org/onap/dcae/collectors/restconf/common/event/publishing/DMaaPEventPublisher.java b/src/main/java/org/onap/dcae/collectors/restconf/common/event/publishing/DMaaPEventPublisher.java
new file mode 100755
index 0000000..36e950f
--- /dev/null
+++ b/src/main/java/org/onap/dcae/collectors/restconf/common/event/publishing/DMaaPEventPublisher.java
@@ -0,0 +1,75 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.restconf
+ * ================================================================================
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcae.collectors.restconf.common.event.publishing;
+
+import com.att.nsa.cambria.client.CambriaBatchingPublisher;
+import io.vavr.control.Try;
+import org.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class DMaaPEventPublisher implements EventPublisher {
+
+ private static final int PENDING_MESSAGE_LOG_THRESHOLD = 100;
+ private static final Logger log = LoggerFactory.getLogger(DMaaPEventPublisher.class);
+ private final DMaaPPublishersCache publishersCache;
+ private final Logger outputLogger;
+
+ DMaaPEventPublisher(DMaaPPublishersCache DMaaPPublishersCache,
+ Logger outputLogger) {
+ this.publishersCache = DMaaPPublishersCache;
+ this.outputLogger = outputLogger;
+ }
+
+ @Override
+ public void sendEvent(JSONObject event, String domain) {
+ publishersCache.getPublisher(domain)
+ .onEmpty(() ->
+ log.warn(VavrUtils.f("Could not find event publisher for domain: '%s', dropping message: '%s'", domain, event)))
+ .forEach(publisher -> sendEvent(event, domain, publisher));
+ }
+
+ private void sendEvent(JSONObject event, String domain, CambriaBatchingPublisher publisher) {
+ Try.run(() -> uncheckedSendEvent(event, domain, publisher))
+ .onFailure(exc -> closePublisher(event, domain, exc));
+ }
+
+ private void uncheckedSendEvent(JSONObject event, String domain, CambriaBatchingPublisher publisher)
+ throws IOException {
+ System.out.println("printing publisher information" + publisher);
+ int pendingMsgs = publisher.send("MyPartitionKey", event.toString());
+ if (pendingMsgs > PENDING_MESSAGE_LOG_THRESHOLD) {
+ log.info("Pending messages count: " + pendingMsgs);
+ }
+ String infoMsg = VavrUtils.f("Event: '%s' scheduled to be send asynchronously on domain: '%s'", event, domain);
+ log.info(infoMsg);
+ outputLogger.info(infoMsg);
+ }
+
+ private void closePublisher(JSONObject event, String domain, Throwable e) {
+ log.error(VavrUtils.f("Unable to schedule event: '%s' on domain: '%s'. Closing publisher and dropping message.",
+ event, domain), e);
+ publishersCache.closePublisherFor(domain);
+ }
+
+}
diff --git a/src/main/java/org/onap/dcae/collectors/restconf/common/event/publishing/DMaaPPublishersBuilder.java b/src/main/java/org/onap/dcae/collectors/restconf/common/event/publishing/DMaaPPublishersBuilder.java
new file mode 100755
index 0000000..2f9b3ed
--- /dev/null
+++ b/src/main/java/org/onap/dcae/collectors/restconf/common/event/publishing/DMaaPPublishersBuilder.java
@@ -0,0 +1,63 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.restconf
+ * ================================================================================
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcae.collectors.restconf.common.event.publishing;
+
+import com.att.nsa.cambria.client.CambriaBatchingPublisher;
+import com.att.nsa.cambria.client.CambriaClientBuilders;
+import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder;
+import io.vavr.control.Try;
+
+import static io.vavr.API.Try;
+import static org.onap.dcae.collectors.restconf.common.event.publishing.VavrUtils.enhanceError;
+import static org.onap.dcae.collectors.restconf.common.event.publishing.VavrUtils.f;
+
+/**
+ * @author Pawel Szalapski (pawel.szalapski@nokia.com)
+ */
+final class DMaaPPublishersBuilder {
+
+ @SuppressWarnings("mapFailure takes a generic varargs, unchecked because of Javas type system limitation, actually safe to do")
+ static Try<CambriaBatchingPublisher> buildPublisher(PublisherConfig config) {
+ return Try(() -> builder(config).build())
+ .mapFailure(enhanceError(f("DMaaP client builder throws exception for this configuration: '%s'", config)));
+ }
+
+ private static PublisherBuilder builder(PublisherConfig config) {
+ if (config.isSecured()) {
+ return authenticatedBuilder(config);
+ } else {
+ return unAuthenticatedBuilder(config);
+ }
+ }
+
+ private static PublisherBuilder authenticatedBuilder(PublisherConfig config) {
+ return unAuthenticatedBuilder(config)
+ .usingHttps()
+ .authenticatedByHttp(config.userName().get(), config.password().get());
+ }
+
+ private static PublisherBuilder unAuthenticatedBuilder(PublisherConfig config) {
+ return new CambriaClientBuilders.PublisherBuilder()
+ .usingHosts(config.destinations().mkString(","))
+ .onTopic(config.topic())
+ .logSendFailuresAfter(5);
+ }
+}
diff --git a/src/main/java/org/onap/dcae/collectors/restconf/common/event/publishing/DMaaPPublishersCache.java b/src/main/java/org/onap/dcae/collectors/restconf/common/event/publishing/DMaaPPublishersCache.java
new file mode 100755
index 0000000..acad96d
--- /dev/null
+++ b/src/main/java/org/onap/dcae/collectors/restconf/common/event/publishing/DMaaPPublishersCache.java
@@ -0,0 +1,110 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.restconf
+ * ================================================================================
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcae.collectors.restconf.common.event.publishing;
+
+import com.att.nsa.cambria.client.CambriaBatchingPublisher;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import io.vavr.collection.Map;
+import io.vavr.control.Option;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static io.vavr.API.Option;
+import static org.onap.dcae.collectors.restconf.common.event.publishing.VavrUtils.f;
+
+public class DMaaPPublishersCache {
+
+ private static final Logger log = LoggerFactory.getLogger(DMaaPPublishersCache.class);
+ private final LoadingCache<String, CambriaBatchingPublisher> publishersCache;
+ private AtomicReference<Map<String, PublisherConfig>> dMaaPConfiguration;
+
+ DMaaPPublishersCache(Map<String, PublisherConfig> dMaaPConfiguration) {
+ this.dMaaPConfiguration = new AtomicReference<>(dMaaPConfiguration);
+ this.publishersCache = CacheBuilder.newBuilder()
+ .removalListener(new OnPublisherRemovalListener())
+ .build(new CambriaPublishersCacheLoader());
+ }
+
+ DMaaPPublishersCache(CambriaPublishersCacheLoader dMaaPPublishersCacheLoader,
+ OnPublisherRemovalListener onPublisherRemovalListener,
+ Map<String, PublisherConfig> dMaaPConfiguration) {
+ this.dMaaPConfiguration = new AtomicReference<>(dMaaPConfiguration);
+ this.publishersCache = CacheBuilder.newBuilder()
+ .removalListener(onPublisherRemovalListener)
+ .build(dMaaPPublishersCacheLoader);
+ }
+
+ Option<CambriaBatchingPublisher> getPublisher(String streamID) {
+ try {
+ return Option(publishersCache.getUnchecked(streamID));
+ } catch (Exception e) {
+ log.warn("Could not create / load Cambria Publisher for streamID", e);
+ return Option.none();
+ }
+ }
+
+ void closePublisherFor(String streamId) {
+ publishersCache.invalidate(streamId);
+ }
+
+ static class OnPublisherRemovalListener implements RemovalListener<String, CambriaBatchingPublisher> {
+
+ @Override
+ public void onRemoval(@Nonnull RemovalNotification<String, CambriaBatchingPublisher> notification) {
+ CambriaBatchingPublisher publisher = notification.getValue();
+ if (publisher != null) { // The value might get Garbage Collected at this moment, regardless of @Nonnull
+ try {
+ int timeout = 20;
+ TimeUnit unit = TimeUnit.SECONDS;
+ java.util.List<?> stuck = publisher.close(timeout, unit);
+ if (!stuck.isEmpty()) {
+ log.error(f("Publisher got stuck and did not manage to close in '%s' '%s', "
+ + "%s messages were dropped", stuck.size(), timeout, unit));
+ }
+ } catch (InterruptedException | IOException e) {
+ log.error("Could not close Cambria publisher, some messages might have been dropped", e);
+ }
+ }
+ }
+ }
+
+ class CambriaPublishersCacheLoader extends CacheLoader<String, CambriaBatchingPublisher> {
+
+ @Override
+ public CambriaBatchingPublisher load(@Nonnull String domain) {
+ return dMaaPConfiguration.get()
+ .get(domain)
+ .toTry(() -> new RuntimeException(
+ f("DMaaP configuration contains no configuration for domain: '%s'", domain)))
+ .flatMap(DMaaPPublishersBuilder::buildPublisher)
+ .get();
+ }
+ }
+}
diff --git a/src/main/java/org/onap/dcae/collectors/restconf/common/event/publishing/EventPublisher.java b/src/main/java/org/onap/dcae/collectors/restconf/common/event/publishing/EventPublisher.java
new file mode 100755
index 0000000..3d339b3
--- /dev/null
+++ b/src/main/java/org/onap/dcae/collectors/restconf/common/event/publishing/EventPublisher.java
@@ -0,0 +1,35 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.restconf
+ * ================================================================================
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.restconf.common.event.publishing;
+
+
+import io.vavr.collection.Map;
+import org.json.JSONObject;
+import org.slf4j.Logger;
+
+public interface EventPublisher {
+
+ static EventPublisher createPublisher(Logger outputLogger, Map<String, PublisherConfig> dMaaPConfig) {
+ return new DMaaPEventPublisher(new DMaaPPublishersCache(dMaaPConfig), outputLogger);
+ }
+
+ void sendEvent(JSONObject event, String domain);
+
+}
diff --git a/src/main/java/org/onap/dcae/collectors/restconf/common/event/publishing/PublisherConfig.java b/src/main/java/org/onap/dcae/collectors/restconf/common/event/publishing/PublisherConfig.java
new file mode 100755
index 0000000..33fa15b
--- /dev/null
+++ b/src/main/java/org/onap/dcae/collectors/restconf/common/event/publishing/PublisherConfig.java
@@ -0,0 +1,95 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.restconf
+ * ================================================================================
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.restconf.common.event.publishing;
+
+import io.vavr.collection.List;
+import io.vavr.control.Option;
+
+import java.util.Objects;
+
+public class PublisherConfig {
+ private final List<String> destinations;
+ private final String topic;
+ private String userName;
+ private String password;
+
+ PublisherConfig(List<String> destinations, String topic) {
+ this.destinations = destinations;
+ this.topic = topic;
+ }
+
+ PublisherConfig(List<String> destinations, String topic, String userName, String password) {
+ this.destinations = destinations;
+ this.topic = topic;
+ this.userName = userName;
+ this.password = password;
+ }
+
+ List<String> destinations() {
+ return destinations;
+ }
+
+ String topic() {
+ return topic;
+ }
+
+ Option<String> userName() {
+ return Option.of(userName);
+ }
+
+ Option<String> password() {
+ return Option.of(password);
+ }
+
+ boolean isSecured() {
+ return userName().isDefined() && password().isDefined();
+ }
+
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ PublisherConfig that = (PublisherConfig) o;
+ return Objects.equals(destinations, that.destinations) &&
+ Objects.equals(topic, that.topic) &&
+ Objects.equals(userName, that.userName) &&
+ Objects.equals(password, that.password);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(destinations, topic, userName, password);
+ }
+
+ @Override
+ public String toString() {
+ return "PublisherConfig{" +
+ "destinations=" + destinations +
+ ", topic='" + topic + '\'' +
+ ", userName='" + userName + '\'' +
+ ", password='" + password + '\'' +
+ '}';
+ }
+}
diff --git a/src/main/java/org/onap/dcae/collectors/restconf/common/event/publishing/VavrUtils.java b/src/main/java/org/onap/dcae/collectors/restconf/common/event/publishing/VavrUtils.java
new file mode 100755
index 0000000..77a3052
--- /dev/null
+++ b/src/main/java/org/onap/dcae/collectors/restconf/common/event/publishing/VavrUtils.java
@@ -0,0 +1,48 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.restconf
+ * ================================================================================
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcae.collectors.restconf.common.event.publishing;
+
+import io.vavr.API;
+import io.vavr.API.Match.Case;
+
+import static io.vavr.API.$;
+
+
+public class VavrUtils {
+ private VavrUtils() {
+ // utils aggregator
+ }
+
+ /**
+ * Shortcut for 'string interpolation'
+ */
+ static String f(String msg, Object... args) {
+ return String.format(msg, args);
+ }
+
+ /**
+ * Wrap failure with a more descriptive message of what has failed and chain original cause. Used to provide a
+ * context for errors instead of raw exception.
+ */
+ static Case<Throwable, Throwable> enhanceError(String msg) {
+ return API.Case($(), e -> new RuntimeException(msg, e));
+ }
+}