summaryrefslogtreecommitdiffstats
path: root/src/main/java/org/onap/dcae/common
diff options
context:
space:
mode:
authors00370346 <swarup.nayak1@huawei.com>2019-03-14 15:06:40 +0530
committers00370346 <swarup.nayak1@huawei.com>2019-03-18 15:25:23 +0530
commitbe11dee889f5a740d584458b62804e5fd4296e53 (patch)
tree2cc5e14c29df1a40f8e7bbe3bb08fffa38e023b8 /src/main/java/org/onap/dcae/common
parentd76c2d0f61bfb4373b13fcdb6fc3317467dd19b4 (diff)
Issue-ID: DCAEGEN2-1055 Generic RestConfCollector
Change-Id: I1800affa2b34cbb7487c0d8411e078adec5a0c48 Signed-off-by: s00370346 <swarup.nayak1@huawei.com>
Diffstat (limited to 'src/main/java/org/onap/dcae/common')
-rw-r--r--src/main/java/org/onap/dcae/common/AdditionalHeaderWebTarget.java159
-rw-r--r--src/main/java/org/onap/dcae/common/AnyNode.java89
-rwxr-xr-xsrc/main/java/org/onap/dcae/common/AuthType.java43
-rwxr-xr-xsrc/main/java/org/onap/dcae/common/Constants.java48
-rwxr-xr-xsrc/main/java/org/onap/dcae/common/DataChangeEventListener.java86
-rw-r--r--src/main/java/org/onap/dcae/common/EventConnectionState.java43
-rw-r--r--src/main/java/org/onap/dcae/common/EventData.java42
-rw-r--r--src/main/java/org/onap/dcae/common/EventProcessor.java103
-rwxr-xr-xsrc/main/java/org/onap/dcae/common/Format.java38
-rwxr-xr-xsrc/main/java/org/onap/dcae/common/HttpMethod.java47
-rwxr-xr-xsrc/main/java/org/onap/dcae/common/HttpResponse.java30
-rwxr-xr-xsrc/main/java/org/onap/dcae/common/JsonParser.java92
-rwxr-xr-xsrc/main/java/org/onap/dcae/common/Parameters.java52
-rwxr-xr-xsrc/main/java/org/onap/dcae/common/RestConfContext.java51
-rwxr-xr-xsrc/main/java/org/onap/dcae/common/RestapiCallNode.java467
-rwxr-xr-xsrc/main/java/org/onap/dcae/common/RestapiCallNodeUtil.java194
-rw-r--r--src/main/java/org/onap/dcae/common/SSLContextCreator.java83
-rwxr-xr-xsrc/main/java/org/onap/dcae/common/XmlJsonUtil.java412
-rwxr-xr-xsrc/main/java/org/onap/dcae/common/XmlParser.java178
-rw-r--r--src/main/java/org/onap/dcae/common/publishing/DMaaPConfigurationParser.java113
-rw-r--r--src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java87
-rw-r--r--src/main/java/org/onap/dcae/common/publishing/DMaaPPublishersBuilder.java62
-rw-r--r--src/main/java/org/onap/dcae/common/publishing/DMaaPPublishersCache.java123
-rw-r--r--src/main/java/org/onap/dcae/common/publishing/EventPublisher.java39
-rw-r--r--src/main/java/org/onap/dcae/common/publishing/PublisherConfig.java100
-rw-r--r--src/main/java/org/onap/dcae/common/publishing/VavrUtils.java62
26 files changed, 2843 insertions, 0 deletions
diff --git a/src/main/java/org/onap/dcae/common/AdditionalHeaderWebTarget.java b/src/main/java/org/onap/dcae/common/AdditionalHeaderWebTarget.java
new file mode 100644
index 0000000..fe61155
--- /dev/null
+++ b/src/main/java/org/onap/dcae/common/AdditionalHeaderWebTarget.java
@@ -0,0 +1,159 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2018-2019 Huawei. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcae.common;
+
+import javax.ws.rs.client.Invocation;
+import javax.ws.rs.client.WebTarget;
+import javax.ws.rs.core.Configuration;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.UriBuilder;
+import java.net.URI;
+import java.util.Map;
+
+public class AdditionalHeaderWebTarget implements WebTarget {
+ private WebTarget base;
+ private String token;
+ private String headerName;
+
+ public AdditionalHeaderWebTarget(WebTarget target, String token, String headerName) {
+ base = target;
+ this.token = token;
+ this.headerName = headerName;
+ }
+
+ @Override
+ public Invocation.Builder request() {
+ return base.request().header(headerName, token);
+ }
+
+ @Override
+ public Invocation.Builder request(String... acceptedResponseTypes) {
+ return base.request().header(headerName, token);
+ }
+
+ @Override
+ public Invocation.Builder request(MediaType... acceptedResponseTypes) {
+ return base.request().header(headerName, token);
+ }
+
+ @Override
+ public Configuration getConfiguration() {
+ return base.getConfiguration();
+ }
+
+ @Override
+ public URI getUri() {
+ return base.getUri();
+ }
+
+ @Override
+ public UriBuilder getUriBuilder() {
+ return base.getUriBuilder();
+ }
+
+ @Override
+ public WebTarget path(String path) {
+ return base.path(path);
+ }
+
+ @Override
+ public WebTarget resolveTemplate(String name, Object value) {
+ return base.resolveTemplate(name, value);
+ }
+
+ @Override
+ public WebTarget resolveTemplate(String name, Object value, boolean encodeSlashInPath) {
+ return base.resolveTemplate(name, value, encodeSlashInPath);
+ }
+
+ @Override
+ public WebTarget resolveTemplateFromEncoded(String name, Object value) {
+ return base.resolveTemplateFromEncoded(name, value);
+ }
+
+ @Override
+ public WebTarget resolveTemplates(Map<String, Object> templateValues) {
+ return base.resolveTemplates(templateValues);
+ }
+
+ @Override
+ public WebTarget resolveTemplates(Map<String, Object> templateValues, boolean encodeSlashInPath) {
+ return base.resolveTemplates(templateValues, encodeSlashInPath);
+ }
+
+ @Override
+ public WebTarget resolveTemplatesFromEncoded(Map<String, Object> templateValues) {
+ return base.resolveTemplatesFromEncoded(templateValues);
+ }
+
+ @Override
+ public WebTarget matrixParam(String name, Object... values) {
+ return base.matrixParam(name, values);
+ }
+
+ @Override
+ public WebTarget queryParam(String name, Object... values) {
+ return base.queryParam(name, values);
+ }
+
+ @Override
+ public WebTarget property(String name, Object value) {
+ return base.property(name, value);
+ }
+
+ @Override
+ public WebTarget register(Class<?> componentClass) {
+ return base.register(componentClass);
+ }
+
+ @Override
+ public WebTarget register(Class<?> componentClass, int priority) {
+ return base.register(componentClass, priority);
+ }
+
+ @Override
+ public WebTarget register(Class<?> componentClass, Class<?>... contracts) {
+ return base.register(componentClass, contracts);
+ }
+
+ @Override
+ public WebTarget register(Class<?> componentClass, Map<Class<?>, Integer> contracts) {
+ return base.register(componentClass, contracts);
+ }
+
+ @Override
+ public WebTarget register(Object component) {
+ return base.register(component);
+ }
+
+ @Override
+ public WebTarget register(Object component, int priority) {
+ return base.register(component, priority);
+ }
+
+ @Override
+ public WebTarget register(Object component, Class<?>... contracts) {
+ return base.register(component, contracts);
+ }
+
+ @Override
+ public WebTarget register(Object component, Map<Class<?>, Integer> contracts) {
+ return base.register(component, contracts);
+ }
+}
diff --git a/src/main/java/org/onap/dcae/common/AnyNode.java b/src/main/java/org/onap/dcae/common/AnyNode.java
new file mode 100644
index 0000000..8980f1b
--- /dev/null
+++ b/src/main/java/org/onap/dcae/common/AnyNode.java
@@ -0,0 +1,89 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.restconfcollector
+ * ================================================================================
+ * Copyright (C) 2018 Nokia Networks Intellectual Property. All rights reserved.
+ * Copyright (C) 2018-2019 Huawei. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.common;
+
+import io.vavr.collection.List;
+import io.vavr.collection.Set;
+import io.vavr.control.Option;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import java.util.stream.StreamSupport;
+
+import static io.vavr.API.Set;
+
+/**
+ * This class is a wrapper for 2 most used entities of org.json lib: JSONArray and JSONObject and comprises utility
+ * methods for fast access of json structures without need to explicitly coerce between them. While using this, bear in
+ * mind it does not contain exception handling - it is assumed that when using, the parsed json structure is known.
+ *
+ * @author koblosz
+ */
+public class AnyNode {
+
+ private Object obj;
+
+ private AnyNode(Object object) {
+ this.obj = object;
+ }
+
+ public static AnyNode fromString(String content) {
+ return new AnyNode(new JSONObject(content));
+ }
+
+ public Set<String> keys() {
+ return Set(asJsonObject().keySet().toArray(new String[]{}));
+ }
+
+ public AnyNode get(String key) {
+ return new AnyNode(asJsonObject().get(key));
+ }
+
+ public String toString() {
+ return this.obj.toString();
+ }
+
+ public Option<AnyNode> getAsOption(String key) {
+ try {
+ AnyNode value = get(key);
+ if ("null".equals(value.toString())) {
+ return Option.none();
+ }
+ return Option.some(value);
+ } catch (JSONException ex) {
+ return Option.none();
+ }
+ }
+
+ public List<AnyNode> toList() {
+ return List.ofAll(StreamSupport.stream(((JSONArray) this.obj).spliterator(), false).map(AnyNode::new));
+ }
+
+ public boolean has(String key) {
+ return !getAsOption(key).isEmpty();
+ }
+
+ private JSONObject asJsonObject() {
+ return (JSONObject) this.obj;
+ }
+
+}
diff --git a/src/main/java/org/onap/dcae/common/AuthType.java b/src/main/java/org/onap/dcae/common/AuthType.java
new file mode 100755
index 0000000..d87321e
--- /dev/null
+++ b/src/main/java/org/onap/dcae/common/AuthType.java
@@ -0,0 +1,43 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.restconf
+ * ================================================================================
+ * Copyright (C) 2018-2019 Huawei. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.common;
+
+public enum AuthType {
+ NONE, BASIC, DIGEST, OAUTH, Unspecified;
+
+ public static AuthType fromString(String s) {
+ if ("basic".equalsIgnoreCase(s)) {
+ return BASIC;
+ }
+ if ("digest".equalsIgnoreCase(s)) {
+ return DIGEST;
+ }
+ if ("oauth".equalsIgnoreCase(s)) {
+ return OAUTH;
+ }
+ if ("none".equalsIgnoreCase(s)) {
+ return NONE;
+ }
+ if ("unspecified".equalsIgnoreCase(s)) {
+ return Unspecified;
+ }
+ throw new IllegalArgumentException("Invalid value for format: " + s);
+ }
+}
diff --git a/src/main/java/org/onap/dcae/common/Constants.java b/src/main/java/org/onap/dcae/common/Constants.java
new file mode 100755
index 0000000..1fe5624
--- /dev/null
+++ b/src/main/java/org/onap/dcae/common/Constants.java
@@ -0,0 +1,48 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.restconf
+ * ================================================================================
+ * Copyright (C) 2018-2019 Huawei. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcae.common;
+
+public class Constants {
+ public static final String KDEFAULT_TEMP_FILENAME = "templateFileName";
+ public static final String KDEFAULT_REQUESTBODY = "requestBody";
+ public static final String KSETTING_REST_API_URL = "restapiUrl";
+ public static final String KSETTING_REST_UNAME = "restapiUser";
+ public static final String KSETTING_REST_PASSWORD = "restapiPassword";
+ public static final String KSETTING_HTTP_METHOD = "httpMethod";
+ public static final String KSETTING_RESP_PREFIX = "responsePrefix";
+ public static final String KSETTING_SKIP_SENDING = "skipSending";
+ public static final String KSETTING_FORMAT = "format";
+ public static final String KSETTING_SSE_CONNECT_URL = "sseConnectURL";
+ public static final String KSETTING_AUTH_TYPE = "authType";
+ public static final String KSETTING_CONTENT_TYPE = "contentType";
+ public static final String KSETTING_OAUTH_CONSUMER_KEY = "oAuthConsumerKey";
+ public static final String KSETTING_OAUTH_CONSUMER_SECRET = "oAuthConsumerSecret";
+ public static final String KSETTING_OAUTH_SIGNATURE_METHOD = "oAuthSignatureMethod";
+ public static final String KSETTING_OAUTH_VERSION = "oAuthVersion";
+ public static final String KSETTING_TOKENID = "tokenId";
+ public static final String KSETTING_CUSTOMHTTP_HEADER = "customHttpHeaders";
+ public static final String KSETTING_DUMP_HEADER = "dumpHeaders";
+ public static final String KSETTING_RETURN_REQUEST_PAYLOAD = "returnRequestPayload";
+ public static final String KSETTING_TRUST_STORE_FILENAME = "trustStoreFileName";
+ public static final String KSETTING_TRUST_STORE_PASSWORD = "trustStorePassword";
+ public static final String KSETTING_KEY_STORE_FILENAME = "keyStoreFileName";
+ public static final String KSETTING_KEY_STORE_PASSWORD = "keyStorePassword";
+}
diff --git a/src/main/java/org/onap/dcae/common/DataChangeEventListener.java b/src/main/java/org/onap/dcae/common/DataChangeEventListener.java
new file mode 100755
index 0000000..6e13f73
--- /dev/null
+++ b/src/main/java/org/onap/dcae/common/DataChangeEventListener.java
@@ -0,0 +1,86 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.restconf
+ * ================================================================================
+ * Copyright (C) 2018-2019 Huawei. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcae.common;
+
+import org.glassfish.jersey.media.sse.EventListener;
+import org.glassfish.jersey.media.sse.InboundEvent;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.onap.dcae.RestConfCollector;
+import org.onap.dcae.controller.PersistentEventConnection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DataChangeEventListener implements EventListener {
+ private static final Logger log = LoggerFactory.getLogger(DataChangeEventListener.class);
+ private PersistentEventConnection conn;
+
+ public DataChangeEventListener(PersistentEventConnection conn) {
+ this.conn = conn;
+ }
+
+ @Override
+ public void onEvent(InboundEvent event) {
+ try {
+ log.info("SSE Event is received");
+ String s = event.readData();
+ jsonType type = isJSONValid(s);
+ if (type == jsonType.OBJECT) {
+ JSONObject jsonObj = new JSONObject(s);
+ EventData ev = new EventData(this.conn, jsonObj);
+ log.info("SSE Event in json " + jsonObj.toString());
+ RestConfCollector.handleEvents(ev);
+ } else if (type == jsonType.ARRAY) {
+ JSONArray jsonArr = new JSONArray(s);
+ for (int j = 0; j < jsonArr.length(); j++) {
+ JSONObject jsonObj = jsonArr.getJSONObject(j);
+ EventData ev = new EventData(this.conn, jsonObj);
+ log.info("SSE Event in json " + jsonObj.toString());
+ RestConfCollector.handleEvents(ev);
+ }
+ } else {
+ log.info("Received heart beat ");
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ private enum jsonType {
+ OBJECT, ARRAY, NONE;
+ }
+
+ public jsonType isJSONValid(String test) {
+ try {
+ new JSONObject(test);
+ log.info("Received a Json object");
+ } catch (JSONException ex) {
+ try {
+ new JSONArray(test);
+ return jsonType.ARRAY;
+ } catch (JSONException ex1) {
+ return jsonType.NONE;
+ }
+ }
+ return jsonType.OBJECT;
+ }
+}
diff --git a/src/main/java/org/onap/dcae/common/EventConnectionState.java b/src/main/java/org/onap/dcae/common/EventConnectionState.java
new file mode 100644
index 0000000..3e53247
--- /dev/null
+++ b/src/main/java/org/onap/dcae/common/EventConnectionState.java
@@ -0,0 +1,43 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.restconf
+ * ================================================================================
+ * Copyright (C) 2018-2019 Huawei. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcae.common;
+
+public enum EventConnectionState {
+
+ INIT, SUBSCRIBED, UNSUBSCRIBED, Unspecified;
+
+ public static EventConnectionState fromString(String s) {
+ if ("init".equalsIgnoreCase(s)) {
+ return INIT;
+ }
+ if ("subscribed".equalsIgnoreCase(s)) {
+ return SUBSCRIBED;
+ }
+ if ("unsubscribed".equalsIgnoreCase(s)) {
+ return UNSUBSCRIBED;
+ }
+ if ("unspecified".equalsIgnoreCase(s)) {
+ return Unspecified;
+ }
+ throw new IllegalArgumentException("Invalid value for format: " + s);
+ }
+
+}
diff --git a/src/main/java/org/onap/dcae/common/EventData.java b/src/main/java/org/onap/dcae/common/EventData.java
new file mode 100644
index 0000000..b97d8e4
--- /dev/null
+++ b/src/main/java/org/onap/dcae/common/EventData.java
@@ -0,0 +1,42 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.restconfcollector
+ * ================================================================================
+ * Copyright (C) 2018-2019 Huawei. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.common;
+
+import org.json.JSONObject;
+import org.onap.dcae.controller.PersistentEventConnection;
+
+public class EventData {
+ private PersistentEventConnection conn;
+ private JSONObject eventObj;
+
+
+ public EventData(PersistentEventConnection conn, JSONObject eventObj) {
+ this.conn = conn;
+ this.eventObj = eventObj;
+ }
+
+ public PersistentEventConnection getConn() {
+ return conn;
+ }
+
+ public JSONObject getEventObj() {
+ return eventObj;
+ }
+}
diff --git a/src/main/java/org/onap/dcae/common/EventProcessor.java b/src/main/java/org/onap/dcae/common/EventProcessor.java
new file mode 100644
index 0000000..bb0f095
--- /dev/null
+++ b/src/main/java/org/onap/dcae/common/EventProcessor.java
@@ -0,0 +1,103 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.restconf
+ * ================================================================================
+ * Copyright (C) 2018-2019 Huawei. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcae.common;
+
+
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.onap.dcae.RestConfCollector;
+import org.onap.dcae.common.publishing.EventPublisher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+public class EventProcessor implements Runnable {
+ private static final Logger log = LoggerFactory.getLogger(EventProcessor.class);
+
+ private Map<String, String[]> streamidHash = new HashMap<>();
+ public EventData ev;
+ private EventPublisher eventPublisher;
+
+ public EventProcessor(EventPublisher eventPublisher, Map<String, String[]> streamidHash) {
+ this.eventPublisher = eventPublisher;
+ this.streamidHash.putAll(streamidHash);
+ }
+
+
+ @Override
+ public void run() {
+ try {
+
+ while (true) {
+ ev = RestConfCollector.fProcessingInputQueue.take();
+
+ // As long as the producer is running we remove elements from
+ // the queue.
+ log.info("QueueSize:" + RestConfCollector.fProcessingInputQueue.size() + "\tEventProcessor\tRemoving element: " +
+ ev.getEventObj());
+ /*@TODO: Right now all event publish to single domain and consume by VES collector. Later maybe send to specific domain */
+ String[] streamIdList = streamidHash.get("notification");
+ log.info("streamIdList:" + Arrays.toString(streamIdList));
+
+ if (streamIdList.length == 0) {
+ log.error("No StreamID defined for publish - Message dropped" + ev.getEventObj());
+ } else {
+ sendEventsToStreams(streamIdList, ev);
+ }
+ log.info("Event published" + ev.getEventObj());
+ }
+ } catch (Exception e) {
+ log.error("EventProcessor InterruptedException" + e.getMessage());
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ private void sendEventsToStreams(String[] streamIdList, EventData ev) {
+ for (String aStreamIdList : streamIdList) {
+ log.info("Invoking publisher for streamId:" + aStreamIdList);
+ if (!ev.getConn().getEvent_ruleId().equals("")) {
+ JSONObject customHeader = new JSONObject();
+ customHeader.put("rule-id", ev.getConn().getEvent_ruleId());
+ eventPublisher.sendEvent(overrideEvent(customHeader, ev.getEventObj()), aStreamIdList);
+ } else {
+ eventPublisher.sendEvent(ev.getEventObj(), aStreamIdList);
+ }
+ }
+ }
+
+ private static JSONObject overrideEvent(JSONObject json1, JSONObject json2) {
+ JSONObject mergedJSON;
+ try {
+ mergedJSON = new JSONObject(json1, JSONObject.getNames(json1));
+ for (String key : JSONObject.getNames(json2)) {
+ mergedJSON.put(key, json2.get(key));
+ }
+
+ } catch (JSONException e) {
+ throw new RuntimeException("JSON Exception" + e);
+ }
+ log.info("Merged json " + mergedJSON);
+ return mergedJSON;
+ }
+}
diff --git a/src/main/java/org/onap/dcae/common/Format.java b/src/main/java/org/onap/dcae/common/Format.java
new file mode 100755
index 0000000..3539684
--- /dev/null
+++ b/src/main/java/org/onap/dcae/common/Format.java
@@ -0,0 +1,38 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.restconf
+ * ================================================================================
+ * Copyright (C) 2018-2019 Huawei. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcae.common;
+
+public enum Format {
+ JSON, XML, NONE;
+
+ public static Format fromString(String s) {
+ if ("json".equalsIgnoreCase(s)) {
+ return JSON;
+ }
+ if ("xml".equalsIgnoreCase(s)) {
+ return XML;
+ }
+ if ("none".equalsIgnoreCase(s)) {
+ return NONE;
+ }
+ throw new IllegalArgumentException("Invalid value for format: " + s);
+ }
+}
diff --git a/src/main/java/org/onap/dcae/common/HttpMethod.java b/src/main/java/org/onap/dcae/common/HttpMethod.java
new file mode 100755
index 0000000..730ff2d
--- /dev/null
+++ b/src/main/java/org/onap/dcae/common/HttpMethod.java
@@ -0,0 +1,47 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.restconf
+ * ================================================================================
+ * Copyright (C) 2018-2019 Huawei. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcae.common;
+
+public enum HttpMethod {
+ GET, POST, PUT, DELETE, PATCH;
+
+ public static HttpMethod fromString(String s) {
+ if (s == null) {
+ return null;
+ }
+ if ("get".equalsIgnoreCase(s)) {
+ return GET;
+ }
+ if ("post".equalsIgnoreCase(s)) {
+ return POST;
+ }
+ if ("put".equalsIgnoreCase(s)) {
+ return PUT;
+ }
+ if ("delete".equalsIgnoreCase(s)) {
+ return DELETE;
+ }
+ if ("patch".equalsIgnoreCase(s)) {
+ return PATCH;
+ }
+ throw new IllegalArgumentException("Invalid value for HTTP Method: " + s);
+ }
+}
diff --git a/src/main/java/org/onap/dcae/common/HttpResponse.java b/src/main/java/org/onap/dcae/common/HttpResponse.java
new file mode 100755
index 0000000..3d69ec8
--- /dev/null
+++ b/src/main/java/org/onap/dcae/common/HttpResponse.java
@@ -0,0 +1,30 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.restconf
+ * ================================================================================
+ * Copyright (C) 2018-2019 Huawei. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcae.common;
+
+import javax.ws.rs.core.MultivaluedMap;
+
+public class HttpResponse {
+ public int code;
+ public String message;
+ public String body;
+ public MultivaluedMap<String, String> headers;
+}
diff --git a/src/main/java/org/onap/dcae/common/JsonParser.java b/src/main/java/org/onap/dcae/common/JsonParser.java
new file mode 100755
index 0000000..6ce02f2
--- /dev/null
+++ b/src/main/java/org/onap/dcae/common/JsonParser.java
@@ -0,0 +1,92 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.restconf
+ * ================================================================================
+ * Copyright (C) 2018-2019 Huawei. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcae.common;
+
+import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+public class JsonParser {
+
+ private static final Logger log = LoggerFactory.getLogger(JsonParser.class);
+
+ private JsonParser() {
+ // Preventing instantiation of the same.
+ }
+
+ @SuppressWarnings("unchecked")
+ public static Map<String, String> convertToProperties(String s)
+ throws Exception {
+
+ checkNotNull(s, "Input should not be null.");
+
+ try {
+ JSONObject json = new JSONObject(s);
+ Map<String, Object> wm = new HashMap<>();
+ Iterator<String> ii = json.keys();
+ while (ii.hasNext()) {
+ String key1 = ii.next();
+ wm.put(key1, json.get(key1));
+ }
+
+ Map<String, String> mm = new HashMap<>();
+
+ while (!wm.isEmpty())
+ for (String key : new ArrayList<>(wm.keySet())) {
+ Object o = wm.get(key);
+ wm.remove(key);
+
+ if (o instanceof Boolean || o instanceof Number || o instanceof String) {
+ mm.put(key, o.toString());
+
+ log.info("Added property: {} : {}", key, o.toString());
+ } else if (o instanceof JSONObject) {
+ JSONObject jo = (JSONObject) o;
+ Iterator<String> i = jo.keys();
+ while (i.hasNext()) {
+ String key1 = i.next();
+ wm.put(key + "." + key1, jo.get(key1));
+ }
+ } else if (o instanceof JSONArray) {
+ JSONArray ja = (JSONArray) o;
+ mm.put(key + "_length", String.valueOf(ja.length()));
+
+ log.info("Added property: {}_length: {}", key, String.valueOf(ja.length()));
+
+ for (int i = 0; i < ja.length(); i++)
+ wm.put(key + '[' + i + ']', ja.get(i));
+ }
+ }
+ return mm;
+ } catch (JSONException e) {
+ throw new Exception("Unable to convert JSON to properties" + e.getLocalizedMessage(), e);
+ }
+ }
+}
diff --git a/src/main/java/org/onap/dcae/common/Parameters.java b/src/main/java/org/onap/dcae/common/Parameters.java
new file mode 100755
index 0000000..5bc85a5
--- /dev/null
+++ b/src/main/java/org/onap/dcae/common/Parameters.java
@@ -0,0 +1,52 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.restconf
+ * ================================================================================
+ * Copyright (C) 2018-2019 Huawei. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcae.common;
+
+import java.util.Set;
+
+public class Parameters {
+ public String templateFileName;
+ public String restapiUrl;
+ public String restapiUser;
+ public String restapiPassword;
+ public Format format;
+ public String contentType;
+ public HttpMethod httpMethod;
+ public String responsePrefix;
+ public Set<String> listNameList;
+ public boolean skipSending;
+ public boolean convertResponse;
+ public String keyStoreFileName;
+ public String keyStorePassword;
+ public String trustStoreFileName;
+ public String trustStorePassword;
+ public boolean ssl;
+ public String customHttpHeaders;
+ public String partner;
+ public Boolean dumpHeaders;
+ public String requestBody;
+ public String oAuthConsumerKey;
+ public String oAuthConsumerSecret;
+ public String oAuthSignatureMethod;
+ public String oAuthVersion;
+ public AuthType authtype;
+ public Boolean returnRequestPayload;
+}
diff --git a/src/main/java/org/onap/dcae/common/RestConfContext.java b/src/main/java/org/onap/dcae/common/RestConfContext.java
new file mode 100755
index 0000000..0f95151
--- /dev/null
+++ b/src/main/java/org/onap/dcae/common/RestConfContext.java
@@ -0,0 +1,51 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.restconf
+ * ================================================================================
+ * Copyright (C) 2018-2019 Huawei. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcae.common;
+
+import java.util.HashMap;
+import java.util.Set;
+
+public class RestConfContext {
+ private HashMap<String, String> attributes;
+
+ public RestConfContext() {
+ attributes = new HashMap<>();
+ }
+
+ public String getAttribute(String name) {
+ return attributes.getOrDefault(name, null);
+ }
+
+ public void setAttribute(String name, String value) {
+ if (value == null) {
+ if (attributes.containsKey(name)) {
+ attributes.remove(name);
+ }
+ } else {
+ attributes.put(name, value);
+ }
+ }
+
+ public Set<String> getAttributeKeySet() {
+ return attributes.keySet();
+ }
+
+}
diff --git a/src/main/java/org/onap/dcae/common/RestapiCallNode.java b/src/main/java/org/onap/dcae/common/RestapiCallNode.java
new file mode 100755
index 0000000..af0245d
--- /dev/null
+++ b/src/main/java/org/onap/dcae/common/RestapiCallNode.java
@@ -0,0 +1,467 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.restconf
+ * ================================================================================
+ * Copyright (C) 2018-2019 Huawei. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcae.common;
+
+import com.sun.jersey.api.client.*;
+import com.sun.jersey.api.client.config.ClientConfig;
+import com.sun.jersey.api.client.config.DefaultClientConfig;
+import com.sun.jersey.api.client.filter.HTTPBasicAuthFilter;
+import com.sun.jersey.api.client.filter.HTTPDigestAuthFilter;
+import com.sun.jersey.api.client.filter.LoggingFilter;
+import com.sun.jersey.client.urlconnection.HTTPSProperties;
+import com.sun.jersey.oauth.client.OAuthClientFilter;
+import com.sun.jersey.oauth.signature.OAuthParameters;
+import com.sun.jersey.oauth.signature.OAuthSecrets;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.HostnameVerifier;
+import javax.net.ssl.HttpsURLConnection;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.ws.rs.core.EntityTag;
+import javax.ws.rs.core.MultivaluedMap;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.net.SocketException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.security.KeyStore;
+import java.util.*;
+
+import static org.onap.dcae.common.RestapiCallNodeUtil.getParameters;
+import static org.onap.dcae.common.RestapiCallNodeUtil.parseParam;
+
+public class RestapiCallNode {
+ private static final Logger log = LoggerFactory.getLogger(RestapiCallNode.class);
+
+ public void sendRequest(Map<String, String> paramMap, RestConfContext ctx, Integer retryCount) throws Exception {
+ HttpResponse r = new HttpResponse();
+ try {
+ Parameters p = getParameters(paramMap);
+ String pp = p.responsePrefix != null ? p.responsePrefix + '.' : "";
+ String req = null;
+ if (p.templateFileName != null) {
+ log.info("p.templateFileName " + p.templateFileName);
+ String reqTemplate = readFile(p.templateFileName);
+ req = buildXmlJsonRequest(ctx, reqTemplate, p.format);
+ } else if (p.requestBody != null) {
+ req = p.requestBody;
+ }
+
+ r = sendHttpRequest(req, p);
+ setResponseStatus(ctx, p.responsePrefix, r);
+
+ if (p.dumpHeaders && r.headers != null) {
+ for (Map.Entry<String, List<String>> a : r.headers.entrySet()) {
+ ctx.setAttribute(pp + "header." + a.getKey(), StringUtils.join(a.getValue(), ","));
+ }
+ }
+
+ if (p.returnRequestPayload && req != null) {
+ ctx.setAttribute(pp + "httpRequest", req);
+ }
+
+ if (r.body != null && r.body.trim().length() > 0) {
+ ctx.setAttribute(pp + "httpResponse", r.body);
+
+ if (p.convertResponse) {
+ Map<String, String> mm = null;
+ if (p.format == Format.XML) {
+ mm = XmlParser.convertToProperties(r.body, p.listNameList);
+ } else if (p.format == Format.JSON) {
+ mm = JsonParser.convertToProperties(r.body);
+ }
+
+ if (mm != null) {
+ for (Map.Entry<String, String> entry : mm.entrySet()) {
+ ctx.setAttribute(pp + entry.getKey(), entry.getValue());
+ log.info("ctx.setAttribute :=> {} value {} ", pp + entry.getKey(), entry.getValue());
+ }
+ }
+ }
+ }
+ } catch (Exception e) {
+ boolean shouldRetry = false;
+ if (e.getCause().getCause() instanceof SocketException) {
+ shouldRetry = true;
+ }
+
+ log.error("Error sending the request: " + e.getMessage(), e);
+ String prefix = parseParam(paramMap, "responsePrefix", false, null);
+ if (!shouldRetry || (retryCount == null) || (retryCount == 0)) {
+ setFailureResponseStatus(ctx, prefix, e.getMessage(), r);
+ } else {
+ try {
+ retryCount = retryCount - 1;
+ log.debug("This is retry attempt {} ", retryCount);
+ sendRequest(paramMap, ctx, retryCount);
+ } catch (Exception ex) {
+ log.error("Could not attempt retry.", ex);
+ String retryErrorMessage =
+ "Retry attempt has failed. No further retry shall be attempted, calling " +
+ "setFailureResponseStatus.";
+ setFailureResponseStatus(ctx, prefix, retryErrorMessage, r);
+ }
+ }
+ }
+
+ if (r != null && r.code >= 300) {
+ throw new Exception(String.valueOf(r.code) + ": " + r.message);
+ }
+ }
+
+ protected String buildXmlJsonRequest(RestConfContext ctx, String template, Format format) throws Exception {
+ log.info("Building {} started", format);
+ long t1 = System.currentTimeMillis();
+
+ template = expandRepeats(ctx, template, 1);
+
+ Map<String, String> mm = new HashMap<>();
+ for (String s : ctx.getAttributeKeySet()) {
+ mm.put(s, ctx.getAttribute(s));
+ }
+ StringBuilder ss = new StringBuilder();
+ int i = 0;
+ while (i < template.length()) {
+ int i1 = template.indexOf("${", i);
+ if (i1 < 0) {
+ ss.append(template.substring(i));
+ break;
+ }
+
+ int i2 = template.indexOf('}', i1 + 2);
+ if (i2 < 0) {
+ throw new Exception("Template error: Matching } not found");
+ }
+
+ String var1 = template.substring(i1 + 2, i2);
+ String value1 = format == Format.XML ? XmlJsonUtil.getXml(mm, var1) : XmlJsonUtil.getJson(mm, var1);
+ if (value1 == null || value1.trim().length() == 0) {
+ // delete the whole element (line)
+ int i3 = template.lastIndexOf('\n', i1);
+ if (i3 < 0) {
+ i3 = 0;
+ }
+ int i4 = template.indexOf('\n', i1);
+ if (i4 < 0) {
+ i4 = template.length();
+ }
+
+ if (i < i3) {
+ ss.append(template.substring(i, i3));
+ }
+ i = i4;
+ } else {
+ ss.append(template.substring(i, i1)).append(value1);
+ i = i2 + 1;
+ }
+ }
+
+ String req = format == Format.XML
+ ? XmlJsonUtil.removeEmptyStructXml(ss.toString()) : XmlJsonUtil.removeEmptyStructJson(ss.toString());
+
+ if (format == Format.JSON) {
+ req = XmlJsonUtil.removeLastCommaJson(req);
+ }
+
+ long t2 = System.currentTimeMillis();
+ log.info("Building {} completed. Time: {}", format, (t2 - t1));
+
+ return req;
+ }
+
+ protected String expandRepeats(RestConfContext ctx, String template, int level) throws Exception {
+ StringBuilder newTemplate = new StringBuilder();
+ int k = 0;
+ while (k < template.length()) {
+ int i1 = template.indexOf("${repeat:", k);
+ if (i1 < 0) {
+ newTemplate.append(template.substring(k));
+ break;
+ }
+
+ int i2 = template.indexOf(':', i1 + 9);
+ if (i2 < 0) {
+ throw new Exception(
+ "Template error: Context variable name followed by : is required after repeat");
+ }
+
+ // Find the closing }, store in i3
+ int nn = 1;
+ int i3 = -1;
+ int i = i2;
+ while (nn > 0 && i < template.length()) {
+ i3 = template.indexOf('}', i);
+ if (i3 < 0) {
+ throw new Exception("Template error: Matching } not found");
+ }
+ int i32 = template.indexOf('{', i);
+ if (i32 >= 0 && i32 < i3) {
+ nn++;
+ i = i32 + 1;
+ } else {
+ nn--;
+ i = i3 + 1;
+ }
+ }
+
+ String var1 = template.substring(i1 + 9, i2);
+ String value1 = ctx.getAttribute(var1);
+ log.info(" {}:{}", var1, value1);
+ int n = 0;
+ try {
+ n = Integer.parseInt(value1);
+ } catch (NumberFormatException e) {
+ log.info("value1 not set or not a number, n will remain set at zero");
+ }
+
+ newTemplate.append(template.substring(k, i1));
+
+ String rpt = template.substring(i2 + 1, i3);
+
+ for (int ii = 0; ii < n; ii++) {
+ String ss = rpt.replaceAll("\\[\\$\\{" + level + "\\}\\]", "[" + ii + "]");
+ if (ii == n - 1 && ss.trim().endsWith(",")) {
+ int i4 = ss.lastIndexOf(',');
+ if (i4 > 0) {
+ ss = ss.substring(0, i4) + ss.substring(i4 + 1);
+ }
+ }
+ newTemplate.append(ss);
+ }
+
+ k = i3 + 1;
+ }
+
+ if (k == 0) {
+ return newTemplate.toString();
+ }
+
+ return expandRepeats(ctx, newTemplate.toString(), level + 1);
+ }
+
+ protected String readFile(String fileName) throws Exception {
+ try {
+ byte[] encoded = Files.readAllBytes(Paths.get(fileName));
+ return new String(encoded, "UTF-8");
+ } catch (IOException | SecurityException e) {
+ throw new Exception("Unable to read file " + fileName + e.getLocalizedMessage(), e);
+ }
+ }
+
+ protected Client addAuthType(Client client, Parameters p) throws Exception {
+ if (p.authtype == AuthType.Unspecified) {
+ if (p.restapiUser != null && p.restapiPassword != null) {
+ client.addFilter(new HTTPBasicAuthFilter(p.restapiUser, p.restapiPassword));
+ } else if (p.oAuthConsumerKey != null && p.oAuthConsumerSecret != null
+ && p.oAuthSignatureMethod != null) {
+ OAuthParameters params = new OAuthParameters()
+ .signatureMethod(p.oAuthSignatureMethod)
+ .consumerKey(p.oAuthConsumerKey)
+ .version(p.oAuthVersion);
+
+ OAuthSecrets secrets = new OAuthSecrets()
+ .consumerSecret(p.oAuthConsumerSecret);
+ client.addFilter(new OAuthClientFilter(client.getProviders(), params, secrets));
+ }
+ } else {
+ if (p.authtype == AuthType.DIGEST) {
+ if (p.restapiUser != null && p.restapiPassword != null) {
+ client.addFilter(new HTTPDigestAuthFilter(p.restapiUser, p.restapiPassword));
+ } else {
+ throw new Exception("oAUTH authentication type selected but all restapiUser and restapiPassword " +
+ "parameters doesn't exist", new Throwable());
+ }
+ } else if (p.authtype == AuthType.BASIC) {
+ if (p.restapiUser != null && p.restapiPassword != null) {
+ client.addFilter(new HTTPBasicAuthFilter(p.restapiUser, p.restapiPassword));
+ } else {
+ throw new Exception("oAUTH authentication type selected but all restapiUser and restapiPassword " +
+ "parameters doesn't exist", new Throwable());
+ }
+ } else if (p.authtype == AuthType.OAUTH) {
+ if (p.oAuthConsumerKey != null && p.oAuthConsumerSecret != null && p.oAuthSignatureMethod != null) {
+ OAuthParameters params = new OAuthParameters()
+ .signatureMethod(p.oAuthSignatureMethod)
+ .consumerKey(p.oAuthConsumerKey)
+ .version(p.oAuthVersion);
+
+ OAuthSecrets secrets = new OAuthSecrets()
+ .consumerSecret(p.oAuthConsumerSecret);
+ client.addFilter(new OAuthClientFilter(client.getProviders(), params, secrets));
+ } else {
+ throw new Exception("oAUTH authentication type selected but all oAuthConsumerKey, voAuthConsumerSecret " +
+ "and oAuthSignatureMethod parameters doesn't exist", new Throwable());
+ }
+ }
+ }
+ return client;
+ }
+
+ protected HttpResponse sendHttpRequest(String request, Parameters p) throws Exception {
+ ClientConfig config = new DefaultClientConfig();
+ SSLContext ssl = null;
+ if (p.ssl && p.restapiUrl.startsWith("https")) {
+ ssl = createSSLContext(p);
+ }
+ if (ssl != null) {
+ HostnameVerifier hostnameVerifier = (hostname, session) -> true;
+
+ config.getProperties().put(HTTPSProperties.PROPERTY_HTTPS_PROPERTIES,
+ new HTTPSProperties(hostnameVerifier, ssl));
+ }
+
+ logProperties(config.getProperties());
+
+ Client client = Client.create(config);
+ client.setConnectTimeout(5000);
+ client.addFilter(new LoggingFilter());
+ WebResource webResource = addAuthType(client, p).resource(p.restapiUrl);
+
+ log.info("Sending request:");
+ log.info(request);
+ log.info("URL: " + p.restapiUrl + " method " + p.httpMethod.toString() + " Custome headr " + p.customHttpHeaders);
+
+ long t1 = System.currentTimeMillis();
+
+ HttpResponse r = new HttpResponse();
+ r.code = 200;
+
+ if (!p.skipSending) {
+ String tt = p.format == Format.XML ? "application/xml" : "application/json";
+ String tt1 = tt + ";charset=UTF-8";
+ if (p.contentType != null) {
+ tt = p.contentType;
+ tt1 = p.contentType;
+ }
+
+ WebResource.Builder webResourceBuilder = webResource.accept(tt).type(tt1);
+ if (p.format == Format.NONE) {
+ webResourceBuilder = webResource.header("", "");
+ }
+
+ if (p.customHttpHeaders != null && p.customHttpHeaders.length() > 0) {
+ String[] keyValuePairs = p.customHttpHeaders.split(",");
+ for (String singlePair : keyValuePairs) {
+ int equalPosition = singlePair.indexOf('=');
+ webResourceBuilder.header(singlePair.substring(0, equalPosition),
+ singlePair.substring(equalPosition + 1, singlePair.length()));
+ }
+ }
+
+ ClientResponse response;
+
+ try {
+ response = webResourceBuilder.method(p.httpMethod.toString(), ClientResponse.class, request);
+ } catch (UniformInterfaceException | ClientHandlerException e) {
+ throw new Exception("Exception while sending http request to client "
+ + e.getLocalizedMessage(), e);
+ }
+
+ r.code = response.getStatus();
+ r.headers = response.getHeaders();
+ EntityTag etag = response.getEntityTag();
+ if (etag != null) {
+ r.message = etag.getValue();
+ }
+ if (response.hasEntity() && r.code != 204) {
+ r.body = response.getEntity(String.class);
+ }
+ }
+
+ long t2 = System.currentTimeMillis();
+ log.info("Response received. Time: {}", (t2 - t1));
+ log.info("HTTP response code: {}", r.code);
+ log.info("HTTP response message: {}", r.message);
+ logHeaders(r.headers);
+ log.info("HTTP response: {}", r.body);
+
+ return r;
+ }
+
+ protected void setFailureResponseStatus(RestConfContext ctx, String prefix, String errorMessage,
+ HttpResponse resp) {
+ resp.code = 500;
+ resp.message = errorMessage;
+ String pp = prefix != null ? prefix + '.' : "";
+ ctx.setAttribute(pp + "response-code", String.valueOf(resp.code));
+ ctx.setAttribute(pp + "response-message", resp.message);
+ }
+
+ protected void setResponseStatus(RestConfContext ctx, String prefix, HttpResponse r) {
+ String pp = prefix != null ? prefix + '.' : "";
+ ctx.setAttribute(pp + "response-code", String.valueOf(r.code));
+ ctx.setAttribute(pp + "response-message", r.message);
+ }
+
+ protected SSLContext createSSLContext(Parameters p) {
+ try (FileInputStream in = new FileInputStream(p.keyStoreFileName)) {
+ System.setProperty("jsse.enableSNIExtension", "false");
+ System.setProperty("javax.net.ssl.trustStore", p.trustStoreFileName);
+ System.setProperty("javax.net.ssl.trustStorePassword", p.trustStorePassword);
+
+ HttpsURLConnection.setDefaultHostnameVerifier((string, ssls) -> true);
+
+ KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
+ KeyStore ks = KeyStore.getInstance("PKCS12");
+ char[] pwd = p.keyStorePassword.toCharArray();
+ log.info("pwd " + pwd + " " + p.keyStorePassword);
+ ks.load(in, pwd);
+ kmf.init(ks, pwd);
+
+ SSLContext ctx = SSLContext.getInstance("TLS");
+ ctx.init(kmf.getKeyManagers(), null, null);
+ return ctx;
+ } catch (Exception e) {
+ log.error("Error creating SSLContext: {}", e.getMessage(), e);
+ }
+ return null;
+ }
+
+ protected void logProperties(Map<String, Object> mm) {
+ List<String> ll = new ArrayList<>();
+ for (Object o : mm.keySet())
+ ll.add((String) o);
+ Collections.sort(ll);
+
+ log.info("Properties:");
+ for (String name : ll)
+ log.info("--- {}:{}", name, String.valueOf(mm.get(name)));
+ }
+
+ protected void logHeaders(MultivaluedMap<String, String> mm) {
+ log.info("HTTP response headers:");
+
+ if (mm == null) {
+ return;
+ }
+
+ List<String> ll = new ArrayList<>();
+ for (Object o : mm.keySet())
+ ll.add((String) o);
+ Collections.sort(ll);
+
+ for (String name : ll)
+ log.info("--- {}:{}", name, String.valueOf(mm.get(name)));
+ }
+} \ No newline at end of file
diff --git a/src/main/java/org/onap/dcae/common/RestapiCallNodeUtil.java b/src/main/java/org/onap/dcae/common/RestapiCallNodeUtil.java
new file mode 100755
index 0000000..1ff00dd
--- /dev/null
+++ b/src/main/java/org/onap/dcae/common/RestapiCallNodeUtil.java
@@ -0,0 +1,194 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.restconf
+ * ================================================================================
+ * Copyright (C) 2018-2019 Huawei. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcae.common;
+
+import org.glassfish.jersey.client.authentication.HttpAuthenticationFeature;
+import org.glassfish.jersey.client.oauth1.ConsumerCredentials;
+import org.glassfish.jersey.client.oauth1.OAuth1ClientSupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.client.Client;
+import javax.ws.rs.core.Feature;
+import java.net.URI;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+public class RestapiCallNodeUtil {
+
+ private static final Logger log = LoggerFactory.getLogger(RestapiCallNodeUtil.class);
+
+
+ private RestapiCallNodeUtil() {
+ // Preventing instantiation of the same.
+ }
+
+ public static String getUriMethod(Boolean authEnabled) {
+ /*@TODO: As per configuration */
+// String uri;
+// if (authEnabled) {
+// uri = "https://";
+// } else {
+// uri = "http://";
+// }
+ return "https://";
+ }
+
+ public static Parameters getParameters(Map<String, String> paramMap) throws Exception {
+ Parameters p = new Parameters();
+ p.templateFileName = parseParam(paramMap, "templateFileName", false, null);
+ p.requestBody = parseParam(paramMap, "requestBody", false, null);
+ p.restapiUrl = parseParam(paramMap, "restapiUrl", true, null);
+ validateUrl(p.restapiUrl);
+ p.restapiUser = null;//parseParam(paramMap, "restapiUser", false, null);
+ p.restapiPassword = null;//parseParam(paramMap, "restapiPassword", false, null);
+ p.oAuthConsumerKey = parseParam(paramMap, "oAuthConsumerKey", false, null);
+ p.oAuthConsumerSecret = parseParam(paramMap, "oAuthConsumerSecret", false, null);
+ p.oAuthSignatureMethod = parseParam(paramMap, "oAuthSignatureMethod", false, null);
+ p.oAuthVersion = parseParam(paramMap, "oAuthVersion", false, null);
+ p.contentType = parseParam(paramMap, "contentType", false, null);
+ p.format = Format.fromString(parseParam(paramMap, "format", false, "json"));
+ p.authtype = AuthType.fromString(parseParam(paramMap, "authType", false, "unspecified"));
+ p.httpMethod = HttpMethod.fromString(parseParam(paramMap, "httpMethod", false, "post"));
+ p.responsePrefix = parseParam(paramMap, "responsePrefix", false, null);
+ p.listNameList = getListNameList(paramMap);
+ String skipSendingStr = paramMap.get("skipSending");
+ p.skipSending = "true".equalsIgnoreCase(skipSendingStr);
+ p.convertResponse = Boolean.valueOf(parseParam(paramMap, "convertResponse", false, "true"));
+ p.trustStoreFileName = parseParam(paramMap, "trustStoreFileName", false, null);
+ p.trustStorePassword = parseParam(paramMap, "trustStorePassword", false, null);
+ p.keyStoreFileName = parseParam(paramMap, "keyStoreFileName", false, null);
+ p.keyStorePassword = parseParam(paramMap, "keyStorePassword", false, null);
+ p.ssl = p.trustStoreFileName != null && p.trustStorePassword != null && p.keyStoreFileName != null &&
+ p.keyStorePassword != null;
+ p.customHttpHeaders = parseParam(paramMap, "customHttpHeaders", false, null);
+ p.partner = parseParam(paramMap, "partner", false, null);
+ p.dumpHeaders = Boolean.valueOf(parseParam(paramMap, "dumpHeaders", false, null));
+ p.returnRequestPayload = Boolean.valueOf(parseParam(paramMap, "returnRequestPayload", false, null));
+ log.info(p.toString());
+ return p;
+ }
+
+
+ public static String parseParam(Map<String, String> paramMap, String name, boolean required, String def)
+ throws Exception {
+ String s = paramMap.get(name);
+
+ if (s == null || s.trim().length() == 0) {
+ if (!required) {
+ return def;
+ }
+ throw new Exception("Parameter " + name + " is required in RestapiCallNode");
+ }
+
+ s = s.trim();
+ StringBuilder value = new StringBuilder();
+ int i = 0;
+ int i1 = s.indexOf('%');
+ while (i1 >= 0) {
+ int i2 = s.indexOf('%', i1 + 1);
+ if (i2 < 0) {
+ break;
+ }
+
+ String varName = s.substring(i1 + 1, i2);
+ String varValue = System.getenv(varName);
+ if (varValue == null) {
+ varValue = "%" + varName + "%";
+ }
+
+ value.append(s.substring(i, i1));
+ value.append(varValue);
+
+ i = i2 + 1;
+ i1 = s.indexOf('%', i);
+ }
+ value.append(s.substring(i));
+
+ log.info("Parameter {}: [{}]", name, value);
+ return value.toString();
+ }
+
+ private static void validateUrl(String restapiUrl) throws Exception {
+ try {
+ URI.create(restapiUrl);
+ } catch (IllegalArgumentException e) {
+ throw new Exception("Invalid input of url " + e.getLocalizedMessage(), e);
+ }
+ }
+
+ private static Set<String> getListNameList(Map<String, String> paramMap) {
+ Set<String> ll = new HashSet<>();
+ for (Map.Entry<String, String> entry : paramMap.entrySet())
+ if (entry.getKey().startsWith("listName")) {
+ ll.add(entry.getValue());
+ }
+ return ll;
+ }
+
+ public static Client addAuthType(Client client, Parameters p) {
+ if (p != null) {
+ if (p.authtype == AuthType.Unspecified) {
+ if (p.restapiUser != null && p.restapiPassword != null) {
+ client.register(HttpAuthenticationFeature.basic(p.restapiUser, p.restapiPassword));
+ } else if (p.oAuthConsumerKey != null && p.oAuthConsumerSecret != null
+ && p.oAuthSignatureMethod != null) {
+ Feature oAuth1Feature = OAuth1ClientSupport
+ .builder(new ConsumerCredentials(p.oAuthConsumerKey, p.oAuthConsumerSecret))
+ .version(p.oAuthVersion).signatureMethod(p.oAuthSignatureMethod).feature().build();
+ client.register(oAuth1Feature);
+ }
+ } else {
+ if (p.authtype == AuthType.DIGEST) {
+ if (p.restapiUser != null && p.restapiPassword != null) {
+ client.register(HttpAuthenticationFeature.digest(p.restapiUser, p.restapiPassword));
+ } else {
+ throw new IllegalArgumentException(
+ "oAUTH authentication type selected but all restapiUser and restapiPassword " +
+ "parameters doesn't exist", new Throwable());
+ }
+ } else if (p.authtype == AuthType.BASIC) {
+ if (p.restapiUser != null && p.restapiPassword != null) {
+ client.register(HttpAuthenticationFeature.basic(p.restapiUser, p.restapiPassword));
+ } else {
+ throw new IllegalArgumentException(
+ "oAUTH authentication type selected but all restapiUser and restapiPassword " +
+ "parameters doesn't exist", new Throwable());
+ }
+ } else if (p.authtype == AuthType.OAUTH) {
+ if (p.oAuthConsumerKey != null && p.oAuthConsumerSecret != null && p.oAuthSignatureMethod != null) {
+ Feature oAuth1Feature = OAuth1ClientSupport
+ .builder(new ConsumerCredentials(p.oAuthConsumerKey, p.oAuthConsumerSecret))
+ .version(p.oAuthVersion).signatureMethod(p.oAuthSignatureMethod).feature().build();
+ client.register(oAuth1Feature);
+ } else {
+ throw new IllegalArgumentException(
+ "oAUTH authentication type selected but all oAuthConsumerKey, oAuthConsumerSecret " +
+ "and oAuthSignatureMethod parameters doesn't exist", new Throwable());
+ }
+ }
+ }
+ }
+ return client;
+ }
+}
+
diff --git a/src/main/java/org/onap/dcae/common/SSLContextCreator.java b/src/main/java/org/onap/dcae/common/SSLContextCreator.java
new file mode 100644
index 0000000..db3d123
--- /dev/null
+++ b/src/main/java/org/onap/dcae/common/SSLContextCreator.java
@@ -0,0 +1,83 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.restconfcollector
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018-2019 Huawei. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcae.common;
+
+import org.springframework.boot.web.server.Ssl;
+
+import java.nio.file.Path;
+
+public class SSLContextCreator {
+ private final String keyStorePassword;
+ private final String certAlias;
+ private final Path keyStoreFile;
+
+ private Path trustStoreFile;
+ private String trustStorePassword;
+ private boolean hasTlsClientAuthentication = false;
+
+ public static SSLContextCreator create(final Path keyStoreFile, final String certAlias, final String password) {
+ return new SSLContextCreator(keyStoreFile, certAlias, password);
+ }
+
+ private SSLContextCreator(final Path keyStoreFile, final String certAlias, final String password) {
+ this.certAlias = certAlias;
+ this.keyStoreFile = keyStoreFile;
+ this.keyStorePassword = password;
+ }
+
+ public SSLContextCreator withTlsClientAuthentication(final Path trustStoreFile, final String password) {
+ hasTlsClientAuthentication = true;
+ this.trustStoreFile = trustStoreFile;
+ this.trustStorePassword = password;
+
+ return this;
+ }
+
+ private void configureKeyStore(final Ssl ssl) {
+ final String keyStore = keyStoreFile.toAbsolutePath().toString();
+
+ ssl.setKeyStore(keyStore);
+ ssl.setKeyPassword(keyStorePassword);
+ ssl.setKeyAlias(certAlias);
+ }
+
+ private void configureTrustStore(final Ssl ssl) {
+ final String trustStore = trustStoreFile.toAbsolutePath().toString();
+
+ ssl.setTrustStore(trustStore);
+ ssl.setTrustStorePassword(trustStorePassword);
+ ssl.setClientAuth(Ssl.ClientAuth.NEED);
+ }
+
+ public Ssl build() {
+ final Ssl ssl = new Ssl();
+ ssl.setEnabled(true);
+
+ configureKeyStore(ssl);
+
+ if (hasTlsClientAuthentication) {
+ configureTrustStore(ssl);
+ }
+
+ return ssl;
+ }
+} \ No newline at end of file
diff --git a/src/main/java/org/onap/dcae/common/XmlJsonUtil.java b/src/main/java/org/onap/dcae/common/XmlJsonUtil.java
new file mode 100755
index 0000000..877f9e5
--- /dev/null
+++ b/src/main/java/org/onap/dcae/common/XmlJsonUtil.java
@@ -0,0 +1,412 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.restconf
+ * ================================================================================
+ * Copyright (C) 2018-2019 Huawei. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcae.common;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class XmlJsonUtil {
+
+ private static final Logger log = LoggerFactory.getLogger(XmlJsonUtil.class);
+
+ private XmlJsonUtil() {
+ // Preventing instantiation of the same.
+ }
+
+ public static String getXml(Map<String, String> varmap, String var) {
+ boolean escape = true;
+ if (var.startsWith("'")) {
+ var = var.substring(1);
+ escape = false;
+ }
+
+ Object o = createStructure(varmap, var);
+ return generateXml(o, 0, escape);
+ }
+
+ public static String getJson(Map<String, String> varmap, String var) {
+ boolean escape = true;
+ if (var.startsWith("'")) {
+ var = var.substring(1);
+ escape = false;
+ }
+
+ boolean quotes = true;
+ if (var.startsWith("\"")) {
+ var = var.substring(1);
+ quotes = false;
+ }
+
+ Object o = createStructure(varmap, var);
+ return generateJson(o, escape, quotes);
+ }
+
+ private static Object createStructure(Map<String, String> flatmap, String var) {
+ if (flatmap.containsKey(var)) {
+ if (var.endsWith("_length") || var.endsWith("].key")) {
+ return null;
+ }
+ return flatmap.get(var);
+ }
+
+ Map<String, Object> mm = new HashMap<>();
+ for (String k : flatmap.keySet())
+ if (k.startsWith(var + ".")) {
+ int i1 = k.indexOf('.', var.length() + 1);
+ int i2 = k.indexOf('[', var.length() + 1);
+ int i3 = k.length();
+ if (i1 > 0 && i1 < i3) {
+ i3 = i1;
+ }
+ if (i2 > 0 && i2 < i3) {
+ i3 = i2;
+ }
+ String k1 = k.substring(var.length() + 1, i3);
+ String var1 = k.substring(0, i3);
+ if (!mm.containsKey(k1)) {
+ Object str = createStructure(flatmap, var1);
+ if (str != null && (!(str instanceof String) || ((String) str).trim().length() > 0)) {
+ mm.put(k1, str);
+ }
+ }
+ }
+ if (!mm.isEmpty()) {
+ return mm;
+ }
+
+ boolean arrayFound = false;
+ for (String k : flatmap.keySet())
+ if (k.startsWith(var + "[")) {
+ arrayFound = true;
+ break;
+ }
+
+ if (arrayFound) {
+ List<Object> ll = new ArrayList<>();
+
+ int length = Integer.MAX_VALUE;
+ String lengthStr = flatmap.get(var + "_length");
+ if (lengthStr != null) {
+ try {
+ length = Integer.parseInt(lengthStr);
+ } catch (Exception e) {
+ log.warn("Invalid number for {}_length:{}", var, lengthStr, e);
+ }
+ }
+
+ for (int i = 0; i < length; i++) {
+ Object v = createStructure(flatmap, var + '[' + i + ']');
+ if (v == null) {
+ break;
+ }
+ ll.add(v);
+ }
+
+ if (!ll.isEmpty()) {
+ return ll;
+ }
+ }
+
+ return null;
+ }
+
+ @SuppressWarnings("unchecked")
+ private static String generateXml(Object o, int indent, boolean escape) {
+ if (o == null) {
+ return null;
+ }
+
+ if (o instanceof String) {
+ return escape ? escapeXml((String) o) : (String) o;
+ }
+ ;
+
+ if (o instanceof Map) {
+ StringBuilder ss = new StringBuilder();
+ Map<String, Object> mm = (Map<String, Object>) o;
+ for (Map.Entry<String, Object> entry : mm.entrySet()) {
+ Object v = entry.getValue();
+ String key = entry.getKey();
+ if (v instanceof String) {
+ String s = escape ? escapeXml((String) v) : (String) v;
+ ss.append(pad(indent)).append('<').append(key).append('>');
+ ss.append(s);
+ ss.append("</").append(key).append('>').append('\n');
+ } else if (v instanceof Map) {
+ ss.append(pad(indent)).append('<').append(key).append('>').append('\n');
+ ss.append(generateXml(v, indent + 1, escape));
+ ss.append(pad(indent)).append("</").append(key).append('>').append('\n');
+ } else if (v instanceof List) {
+ List<Object> ll = (List<Object>) v;
+ for (Object o1 : ll) {
+ ss.append(pad(indent)).append('<').append(key).append('>').append('\n');
+ ss.append(generateXml(o1, indent + 1, escape));
+ ss.append(pad(indent)).append("</").append(key).append('>').append('\n');
+ }
+ }
+ }
+ return ss.toString();
+ }
+
+ return null;
+ }
+
+ private static String generateJson(Object o, boolean escape, boolean quotes) {
+ if (o == null) {
+ return null;
+ }
+
+ StringBuilder ss = new StringBuilder();
+ generateJson(ss, o, 0, false, escape, quotes);
+ return ss.toString();
+ }
+
+ @SuppressWarnings("unchecked")
+ private static void generateJson(StringBuilder ss, Object o, int indent, boolean padFirst, boolean escape, boolean quotes) {
+ if (o instanceof String) {
+ String s = escape ? escapeJson((String) o) : (String) o;
+ if (padFirst) {
+ ss.append(pad(indent));
+ }
+ if (quotes) {
+ ss.append('"').append(s).append('"');
+ } else {
+ ss.append(s);
+ }
+ return;
+ }
+
+ if (o instanceof Map) {
+ Map<String, Object> mm = (Map<String, Object>) o;
+
+ if (padFirst) {
+ ss.append(pad(indent));
+ }
+ ss.append("{\n");
+
+ boolean first = true;
+ for (Map.Entry<String, Object> entry : mm.entrySet()) {
+ if (!first) {
+ ss.append(",\n");
+ }
+ first = false;
+ Object v = entry.getValue();
+ String key = entry.getKey();
+ ss.append(pad(indent + 1)).append('"').append(key).append("\": ");
+ generateJson(ss, v, indent + 1, false, escape, true);
+ }
+
+ ss.append("\n");
+ ss.append(pad(indent)).append('}');
+
+ return;
+ }
+
+ if (o instanceof List) {
+ List<Object> ll = (List<Object>) o;
+
+ if (padFirst) {
+ ss.append(pad(indent));
+ }
+ ss.append("[\n");
+
+ boolean first = true;
+ for (Object o1 : ll) {
+ if (!first) {
+ ss.append(",\n");
+ }
+ first = false;
+
+ generateJson(ss, o1, indent + 1, true, escape, quotes);
+ }
+
+ ss.append("\n");
+ ss.append(pad(indent)).append(']');
+ }
+ }
+
+ public static String removeLastCommaJson(String s) {
+ StringBuilder sb = new StringBuilder();
+ int k = 0;
+ int start = 0;
+ while (k < s.length()) {
+ int i11 = s.indexOf('}', k);
+ int i12 = s.indexOf(']', k);
+ int i1 = -1;
+ if (i11 < 0) {
+ i1 = i12;
+ } else if (i12 < 0) {
+ i1 = i11;
+ } else {
+ i1 = i11 < i12 ? i11 : i12;
+ }
+ if (i1 < 0) {
+ break;
+ }
+
+ int i2 = s.lastIndexOf(',', i1);
+ if (i2 < 0) {
+ k = i1 + 1;
+ continue;
+ }
+
+ String between = s.substring(i2 + 1, i1);
+ if (between.trim().length() > 0) {
+ k = i1 + 1;
+ continue;
+ }
+
+ sb.append(s.substring(start, i2));
+ start = i2 + 1;
+ k = i1 + 1;
+ }
+
+ sb.append(s.substring(start, s.length()));
+
+ return sb.toString();
+ }
+
+ public static String removeEmptyStructJson(String s) {
+ int k = 0;
+ while (k < s.length()) {
+ boolean curly = true;
+ int i11 = s.indexOf('{', k);
+ int i12 = s.indexOf('[', k);
+ int i1 = -1;
+ if (i11 < 0) {
+ i1 = i12;
+ curly = false;
+ } else if (i12 < 0) {
+ i1 = i11;
+ } else if (i11 < i12) {
+ i1 = i11;
+ } else {
+ i1 = i12;
+ curly = false;
+ }
+
+ if (i1 >= 0) {
+ int i2 = curly ? s.indexOf('}', i1) : s.indexOf(']', i1);
+ if (i2 > 0) {
+ String value = s.substring(i1 + 1, i2);
+ if (value.trim().length() == 0) {
+ int i4 = s.lastIndexOf('\n', i1);
+ if (i4 < 0) {
+ i4 = 0;
+ }
+ int i5 = s.indexOf('\n', i2);
+ if (i5 < 0) {
+ i5 = s.length();
+ }
+
+ s = s.substring(0, i4) + s.substring(i5);
+ k = 0;
+ } else {
+ k = i1 + 1;
+ }
+ } else {
+ break;
+ }
+ } else {
+ break;
+ }
+ }
+
+ return s;
+ }
+
+ public static String removeEmptyStructXml(String s) {
+ int k = 0;
+ while (k < s.length()) {
+ int i1 = s.indexOf('<', k);
+ if (i1 < 0 || i1 == s.length() - 1) {
+ break;
+ }
+
+ char c1 = s.charAt(i1 + 1);
+ if (c1 == '?' || c1 == '!') {
+ k = i1 + 2;
+ continue;
+ }
+
+ int i2 = s.indexOf('>', i1);
+ if (i2 < 0) {
+ k = i1 + 1;
+ continue;
+ }
+
+ String closingTag = "</" + s.substring(i1 + 1, i2 + 1);
+ int i3 = s.indexOf(closingTag, i2 + 1);
+ if (i3 < 0) {
+ k = i2 + 1;
+ continue;
+ }
+
+ String value = s.substring(i2 + 1, i3);
+ if (value.trim().length() > 0) {
+ k = i2 + 1;
+ continue;
+ }
+
+ int i4 = s.lastIndexOf('\n', i1);
+ if (i4 < 0) {
+ i4 = 0;
+ }
+ int i5 = s.indexOf('\n', i3);
+ if (i5 < 0) {
+ i5 = s.length();
+ }
+
+ s = s.substring(0, i4) + s.substring(i5);
+ k = 0;
+ }
+
+ return s;
+ }
+
+ private static String escapeXml(String v) {
+ String s = v.replaceAll("&", "&amp;");
+ s = s.replaceAll("<", "&lt;");
+ s = s.replaceAll("'", "&apos;");
+ s = s.replaceAll("\"", "&quot;");
+ s = s.replaceAll(">", "&gt;");
+ return s;
+ }
+
+ private static String escapeJson(String v) {
+ String s = v.replaceAll("\\\\", "\\\\\\\\");
+ s = s.replaceAll("\"", "\\\\\"");
+ return s;
+ }
+
+ private static String pad(int n) {
+ StringBuilder s = new StringBuilder();
+ for (int i = 0; i < n; i++)
+ s.append(Character.toString('\t'));
+ return s.toString();
+ }
+}
+
diff --git a/src/main/java/org/onap/dcae/common/XmlParser.java b/src/main/java/org/onap/dcae/common/XmlParser.java
new file mode 100755
index 0000000..06a4a66
--- /dev/null
+++ b/src/main/java/org/onap/dcae/common/XmlParser.java
@@ -0,0 +1,178 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.collectors.restconf
+ * ================================================================================
+ * Copyright (C) 2018-2019 Huawei. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcae.common;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.xml.sax.Attributes;
+import org.xml.sax.SAXException;
+import org.xml.sax.helpers.DefaultHandler;
+
+import javax.xml.parsers.ParserConfigurationException;
+import javax.xml.parsers.SAXParser;
+import javax.xml.parsers.SAXParserFactory;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+public class XmlParser {
+
+ private static final Logger log = LoggerFactory.getLogger(XmlParser.class);
+
+ private XmlParser() {
+ // Preventing instantiation of the same.
+ }
+
+ public static Map<String, String> convertToProperties(String s, Set<String> listNameList)
+ throws Exception {
+
+ checkNotNull(s, "Input should not be null.");
+
+ Handler handler = new Handler(listNameList);
+ try {
+ SAXParserFactory factory = SAXParserFactory.newInstance();
+ SAXParser saxParser = factory.newSAXParser();
+ InputStream in = new ByteArrayInputStream(s.getBytes());
+ saxParser.parse(in, handler);
+ } catch (ParserConfigurationException | IOException | SAXException | NumberFormatException e) {
+ throw new Exception("Unable to convert XML to properties" + e.getLocalizedMessage(), e);
+ }
+ return handler.getProperties();
+ }
+
+ private static class Handler extends DefaultHandler {
+
+ private Set<String> listNameList;
+
+ private Map<String, String> properties = new HashMap<>();
+
+ public Map<String, String> getProperties() {
+ return properties;
+ }
+
+ public Handler(Set<String> listNameList) {
+ super();
+ this.listNameList = listNameList;
+ if (this.listNameList == null) {
+ this.listNameList = new HashSet<>();
+ }
+ }
+
+ StringBuilder currentName = new StringBuilder();
+ StringBuilder currentValue = new StringBuilder();
+
+ @Override
+ public void startElement(String uri, String localName, String qName, Attributes attributes)
+ throws SAXException {
+ super.startElement(uri, localName, qName, attributes);
+
+ String name = localName;
+ if (name == null || name.trim().length() == 0) {
+ name = qName;
+ }
+ int i2 = name.indexOf(':');
+ if (i2 >= 0) {
+ name = name.substring(i2 + 1);
+ }
+
+ if (currentName.length() > 0) {
+ currentName.append(Character.toString('.'));
+ }
+ currentName.append(name);
+
+ String listName = removeIndexes(currentName.toString());
+
+ if (listNameList.contains(listName)) {
+ String n = currentName.toString() + "_length";
+ int len = getInt(properties, n);
+ properties.put(n, String.valueOf(len + 1));
+ currentName.append("[").append(len).append("]");
+ }
+ }
+
+ @Override
+ public void endElement(String uri, String localName, String qName) throws SAXException {
+ super.endElement(uri, localName, qName);
+
+ String name = localName;
+ if (name == null || name.trim().length() == 0) {
+ name = qName;
+ }
+ int i2 = name.indexOf(':');
+ if (i2 >= 0) {
+ name = name.substring(i2 + 1);
+ }
+
+ String s = currentValue.toString().trim();
+ if (s.length() > 0) {
+ properties.put(currentName.toString(), s);
+
+ log.info("Added property: {} : {}", currentName, s);
+ currentValue = new StringBuilder();
+ }
+
+ int i1 = currentName.lastIndexOf("." + name);
+ if (i1 <= 0) {
+ currentName = new StringBuilder();
+ } else {
+ currentName = new StringBuilder(currentName.substring(0, i1));
+ }
+ }
+
+ @Override
+ public void characters(char[] ch, int start, int length) throws SAXException {
+ super.characters(ch, start, length);
+
+ String value = new String(ch, start, length);
+ currentValue.append(value);
+ }
+
+ private static int getInt(Map<String, String> mm, String name) {
+ String s = mm.get(name);
+ if (s == null) {
+ return 0;
+ }
+ return Integer.parseInt(s);
+ }
+
+ private String removeIndexes(String currentName) {
+ StringBuilder b = new StringBuilder();
+ boolean add = true;
+ for (int i = 0; i < currentName.length(); i++) {
+ char c = currentName.charAt(i);
+ if (c == '[') {
+ add = false;
+ } else if (c == ']') {
+ add = true;
+ } else if (add) {
+ b.append(Character.toString(c));
+ }
+ }
+ return b.toString();
+ }
+ }
+}
diff --git a/src/main/java/org/onap/dcae/common/publishing/DMaaPConfigurationParser.java b/src/main/java/org/onap/dcae/common/publishing/DMaaPConfigurationParser.java
new file mode 100644
index 0000000..dada578
--- /dev/null
+++ b/src/main/java/org/onap/dcae/common/publishing/DMaaPConfigurationParser.java
@@ -0,0 +1,113 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.restconfcollector
+ * ================================================================================
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * Copyright (C) 2018-2019 Huawei. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.common.publishing;
+
+import io.vavr.collection.List;
+import io.vavr.collection.Map;
+import io.vavr.control.Option;
+import io.vavr.control.Try;
+import org.onap.dcae.common.AnyNode;
+
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+
+import org.json.JSONObject;
+
+import static io.vavr.API.*;
+import static org.onap.dcae.common.publishing.VavrUtils.enhanceError;
+import static org.onap.dcae.common.publishing.VavrUtils.f;
+
+/**
+ * @author Pawel Szalapski (pawel.szalapski@nokia.com)
+ */
+@SuppressWarnings("mapFailure takes a generic varargs, unchecked because of Javas type system limitation, actually safe to do")
+public final class DMaaPConfigurationParser {
+
+ public static Try<Map<String, PublisherConfig>> parseToDomainMapping(Path configLocation) {
+ return readFromFile(configLocation)
+ .flatMap(DMaaPConfigurationParser::toJSON)
+ .flatMap(DMaaPConfigurationParser::toConfigMap);
+ }
+
+ public static Try<Map<String, PublisherConfig>> parseToDomainMapping(JSONObject config) {
+ return toJSON(config.toString())
+ .flatMap(DMaaPConfigurationParser::toConfigMap);
+ }
+
+ private static Try<String> readFromFile(Path configLocation) {
+ return Try(() -> new String(Files.readAllBytes(configLocation)))
+ .mapFailure(enhanceError(f("Could not read DMaaP configuration from location: '%s'", configLocation)));
+ }
+
+ private static Try<AnyNode> toJSON(String config) {
+ return Try(() -> AnyNode.fromString(config))
+ .mapFailure(enhanceError(f("DMaaP configuration '%s' is not a valid JSON document", config)));
+ }
+
+ private static Try<Map<String, PublisherConfig>> toConfigMap(AnyNode config) {
+ return Try(() -> usesLegacyFormat(config) ? parseLegacyFormat(config) : parseNewFormat(config))
+ .mapFailure(enhanceError(f("Parsing DMaaP configuration: '%s' failed, probably it is in unexpected format", config)));
+ }
+
+ private static boolean usesLegacyFormat(AnyNode dMaaPConfig) {
+ return dMaaPConfig.has("channels");
+ }
+
+ private static Map<String, PublisherConfig> parseLegacyFormat(AnyNode root) {
+ return root.get("channels").toList().toMap(
+ channel -> channel.get("name").toString(),
+ channel -> {
+ String destinationsStr = channel.getAsOption("cambria.url")
+ .getOrElse(channel.getAsOption("cambria.hosts").get())
+ .toString();
+ String topic = channel.get("cambria.topic").toString();
+ Option<String> maybeUser = channel.getAsOption("basicAuthUsername").map(AnyNode::toString);
+ Option<String> maybePassword = channel.getAsOption("basicAuthPassword").map(AnyNode::toString);
+ List<String> destinations = List(destinationsStr.split(","));
+ return buildBasedOnAuth(maybeUser, maybePassword, topic, destinations);
+ });
+ }
+
+ private static Map<String, PublisherConfig> parseNewFormat(AnyNode root) {
+ return root.keys().toMap(
+ channelName -> channelName,
+ channelName -> {
+ AnyNode channelConfig = root.get(channelName);
+ Option<String> maybeUser = channelConfig.getAsOption("aaf_username").map(AnyNode::toString);
+ Option<String> maybePassword = channelConfig.getAsOption("aaf_password").map(AnyNode::toString);
+ URL topicURL = unchecked(
+ () -> new URL(channelConfig.get("dmaap_info").get("topic_url").toString())).apply();
+ String[] pathSegments = topicURL.getPath().substring(1).split("/");
+ String topic = pathSegments[1];
+ String destination = "events".equals(pathSegments[0]) ? topicURL.getAuthority() : topicURL.getHost();
+ List<String> destinations = List(destination);
+ return buildBasedOnAuth(maybeUser, maybePassword, topic, destinations);
+ });
+ }
+
+ private static PublisherConfig buildBasedOnAuth(Option<String> maybeUser, Option<String> maybePassword,
+ String topic, List<String> destinations) {
+ return maybeUser.flatMap(user -> maybePassword.map(password -> Tuple(user, password)))
+ .map(credentials -> new PublisherConfig(destinations, topic, credentials._1, credentials._2))
+ .getOrElse(new PublisherConfig(destinations, topic));
+ }
+}
diff --git a/src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java b/src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java
new file mode 100644
index 0000000..1209b38
--- /dev/null
+++ b/src/main/java/org/onap/dcae/common/publishing/DMaaPEventPublisher.java
@@ -0,0 +1,87 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.restconfcollector
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * Copyright (C) 2018-2019 Huawei. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.dcae.common.publishing;
+
+import com.att.nsa.cambria.client.CambriaBatchingPublisher;
+import io.vavr.collection.Map;
+import io.vavr.control.Try;
+import org.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+import static org.onap.dcae.common.publishing.VavrUtils.f;
+
+/**
+ * @author Pawel Szalapski (pawel.szalapski@nokia.com)
+ */
+class DMaaPEventPublisher implements EventPublisher {
+ private static final int PENDING_MESSAGE_LOG_THRESHOLD = 100;
+ private static final Logger log = LoggerFactory.getLogger(DMaaPEventPublisher.class);
+ private final DMaaPPublishersCache publishersCache;
+ private final Logger outputLogger;
+
+ DMaaPEventPublisher(DMaaPPublishersCache publishersCache,
+ Logger outputLogger) {
+ this.publishersCache = publishersCache;
+ this.outputLogger = outputLogger;
+ }
+
+ @Override
+ public void sendEvent(JSONObject event, String domain) {
+
+ publishersCache.getPublisher(domain)
+ .onEmpty(() ->
+ log.warn(f("Could not find event publisher for domain: '%s', dropping message: '%s'", domain, event)))
+ .forEach(publisher -> sendEvent(event, domain, publisher));
+ }
+
+ @Override
+ public void reconfigure(Map<String, PublisherConfig> dMaaPConfig) {
+ log.info("reconfigure ");
+ publishersCache.reconfigure(dMaaPConfig);
+ }
+
+ private void sendEvent(JSONObject event, String domain, CambriaBatchingPublisher publisher) {
+ Try.run(() -> uncheckedSendEvent(event, domain, publisher))
+ .onFailure(exc -> closePublisher(event, domain, exc));
+ }
+
+ private void uncheckedSendEvent(JSONObject event, String domain, CambriaBatchingPublisher publisher)
+ throws IOException {
+ int pendingMsgs = publisher.send("MyPartitionKey", event.toString());
+ if (pendingMsgs > PENDING_MESSAGE_LOG_THRESHOLD) {
+ log.info("Pending messages count: " + pendingMsgs);
+ }
+ String infoMsg = f("Event: '%s' scheduled to be send asynchronously on domain: '%s'", event, domain);
+ log.info(infoMsg);
+ outputLogger.info(infoMsg);
+ }
+
+ private void closePublisher(JSONObject event, String domain, Throwable e) {
+ log.error(f("Unable to schedule event: '%s' on domain: '%s'. Closing publisher and dropping message.",
+ event, domain), e);
+ publishersCache.closePublisherFor(domain);
+ }
+}
diff --git a/src/main/java/org/onap/dcae/common/publishing/DMaaPPublishersBuilder.java b/src/main/java/org/onap/dcae/common/publishing/DMaaPPublishersBuilder.java
new file mode 100644
index 0000000..8020b60
--- /dev/null
+++ b/src/main/java/org/onap/dcae/common/publishing/DMaaPPublishersBuilder.java
@@ -0,0 +1,62 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.restconfcollector
+ * ================================================================================
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * Copyright (C) 2018-2019 Huawei. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.common.publishing;
+
+import com.att.nsa.cambria.client.CambriaBatchingPublisher;
+import com.att.nsa.cambria.client.CambriaClientBuilders;
+import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder;
+import io.vavr.control.Try;
+
+import static io.vavr.API.Try;
+import static org.onap.dcae.common.publishing.VavrUtils.enhanceError;
+import static org.onap.dcae.common.publishing.VavrUtils.f;
+
+/**
+ * @author Pawel Szalapski (pawel.szalapski@nokia.com)
+ */
+final class DMaaPPublishersBuilder {
+
+ static Try<CambriaBatchingPublisher> buildPublisher(PublisherConfig config) {
+ return Try(() -> builder(config).build())
+ .mapFailure(enhanceError(f("DMaaP client builder throws exception for this configuration: '%s'", config)));
+ }
+
+ private static PublisherBuilder builder(PublisherConfig config) {
+ if (config.isSecured()) {
+ return authenticatedBuilder(config);
+ } else {
+ return unAuthenticatedBuilder(config);
+ }
+ }
+
+ private static PublisherBuilder authenticatedBuilder(PublisherConfig config) {
+ return unAuthenticatedBuilder(config)
+ .usingHttps()
+ .authenticatedByHttp(config.userName().get(), config.password().get());
+ }
+
+ private static PublisherBuilder unAuthenticatedBuilder(PublisherConfig config) {
+ return new CambriaClientBuilders.PublisherBuilder()
+ .usingHosts(config.destinations().mkString(","))
+ .onTopic(config.topic())
+ .logSendFailuresAfter(5);
+ }
+}
diff --git a/src/main/java/org/onap/dcae/common/publishing/DMaaPPublishersCache.java b/src/main/java/org/onap/dcae/common/publishing/DMaaPPublishersCache.java
new file mode 100644
index 0000000..8f4c761
--- /dev/null
+++ b/src/main/java/org/onap/dcae/common/publishing/DMaaPPublishersCache.java
@@ -0,0 +1,123 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.restconfcollector
+ * ================================================================================
+ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * Copyright (C) 2018-2019 Huawei. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.common.publishing;
+
+import com.att.nsa.cambria.client.CambriaBatchingPublisher;
+import com.google.common.cache.*;
+import io.vavr.collection.Map;
+import io.vavr.control.Option;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static io.vavr.API.Option;
+import static org.onap.dcae.common.publishing.VavrUtils.f;
+
+/**
+ * @author Pawel Szalapski (pawel.szalapski@nokia.com)
+ */
+class DMaaPPublishersCache {
+
+ private static final Logger log = LoggerFactory.getLogger(DMaaPPublishersCache.class);
+ private final LoadingCache<String, CambriaBatchingPublisher> publishersCache;
+ private AtomicReference<Map<String, PublisherConfig>> dMaaPConfiguration;
+
+ DMaaPPublishersCache(Map<String, PublisherConfig> dMaaPConfiguration) {
+ this.dMaaPConfiguration = new AtomicReference<>(dMaaPConfiguration);
+ this.publishersCache = CacheBuilder.newBuilder()
+ .removalListener(new OnPublisherRemovalListener())
+ .build(new CambriaPublishersCacheLoader());
+ }
+
+ DMaaPPublishersCache(CambriaPublishersCacheLoader dMaaPPublishersCacheLoader,
+ OnPublisherRemovalListener onPublisherRemovalListener,
+ Map<String, PublisherConfig> dMaaPConfiguration) {
+ this.dMaaPConfiguration = new AtomicReference<>(dMaaPConfiguration);
+ this.publishersCache = CacheBuilder.newBuilder()
+ .removalListener(onPublisherRemovalListener)
+ .build(dMaaPPublishersCacheLoader);
+ }
+
+ Option<CambriaBatchingPublisher> getPublisher(String streamID) {
+ try {
+ return Option(publishersCache.getUnchecked(streamID));
+ } catch (Exception e) {
+ log.warn("Could not create / load Cambria Publisher for streamID", e);
+ return Option.none();
+ }
+ }
+
+ void closePublisherFor(String streamId) {
+ publishersCache.invalidate(streamId);
+ }
+
+ synchronized void reconfigure(Map<String, PublisherConfig> newConfig) {
+ log.info("reconfigure in DMaaPPublishersCache");
+ Map<String, PublisherConfig> currentConfig = dMaaPConfiguration.get();
+ Map<String, PublisherConfig> removedConfigurations = currentConfig
+ .filterKeys(domain -> !newConfig.containsKey(domain));
+ Map<String, PublisherConfig> changedConfigurations = newConfig
+ .filterKeys(e -> currentConfig.containsKey(e) && !currentConfig.get(e).equals(newConfig.get(e)));
+ dMaaPConfiguration.set(newConfig);
+ removedConfigurations.merge(changedConfigurations).forEach(e -> publishersCache.invalidate(e._1));
+ }
+
+ static class OnPublisherRemovalListener implements RemovalListener<String, CambriaBatchingPublisher> {
+
+ @Override
+ public void onRemoval(@Nonnull RemovalNotification<String, CambriaBatchingPublisher> notification) {
+ CambriaBatchingPublisher publisher = notification.getValue();
+ if (publisher != null) { // The value might get Garbage Collected at this moment, regardless of @Nonnull
+ try {
+ int timeout = 20;
+ TimeUnit unit = TimeUnit.SECONDS;
+ java.util.List<?> stuck = publisher.close(timeout, unit);
+ if (!stuck.isEmpty()) {
+ log.error(f("Publisher got stuck and did not manage to close in '%s' '%s', "
+ + "%s messages were dropped", stuck.size(), timeout, unit));
+ }
+ } catch (InterruptedException | IOException e) {
+ log.error("Could not close Cambria publisher, some messages might have been dropped", e);
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+ }
+
+ class CambriaPublishersCacheLoader extends CacheLoader<String, CambriaBatchingPublisher> {
+
+ @Override
+ public CambriaBatchingPublisher load(@Nonnull String domain) {
+ return dMaaPConfiguration.get()
+ .get(domain)
+ .toTry(() -> new RuntimeException(
+ f("DMaaP configuration contains no configuration for domain: '%s'", domain)))
+ .flatMap(DMaaPPublishersBuilder::buildPublisher)
+ .get();
+ }
+ }
+
+}
diff --git a/src/main/java/org/onap/dcae/common/publishing/EventPublisher.java b/src/main/java/org/onap/dcae/common/publishing/EventPublisher.java
new file mode 100644
index 0000000..91736ec
--- /dev/null
+++ b/src/main/java/org/onap/dcae/common/publishing/EventPublisher.java
@@ -0,0 +1,39 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.restconfcollector
+ * ================================================================================
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * Copyright (C) 2018-2019 Huawei. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.common.publishing;
+
+import io.vavr.collection.Map;
+import org.json.JSONObject;
+import org.slf4j.Logger;
+
+/**
+ * @author Pawel Szalapski (pawel.szalapski@nokia.com)
+ */
+public interface EventPublisher {
+
+ static EventPublisher createPublisher(Logger outputLogger, Map<String, PublisherConfig> dMaaPConfig) {
+ return new DMaaPEventPublisher(new DMaaPPublishersCache(dMaaPConfig), outputLogger);
+ }
+
+ void sendEvent(JSONObject event, String domain);
+
+ void reconfigure(Map<String, PublisherConfig> dMaaPConfig);
+}
diff --git a/src/main/java/org/onap/dcae/common/publishing/PublisherConfig.java b/src/main/java/org/onap/dcae/common/publishing/PublisherConfig.java
new file mode 100644
index 0000000..67aca1d
--- /dev/null
+++ b/src/main/java/org/onap/dcae/common/publishing/PublisherConfig.java
@@ -0,0 +1,100 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.restconfcollector
+ * ================================================================================
+ * Copyright (C) 2018 Nokia. All rights reserved.
+ * Copyright (C) 2018-2019 Huawei. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.common.publishing;
+
+import io.vavr.collection.List;
+import io.vavr.control.Option;
+
+import java.util.Objects;
+
+/**
+ * @author Pawel Szalapski (pawel.szalapski@nokia.com)
+ */
+public final class PublisherConfig {
+
+ private final List<String> destinations;
+ private final String topic;
+ private String userName;
+ private String password;
+
+ PublisherConfig(List<String> destinations, String topic) {
+ this.destinations = destinations;
+ this.topic = topic;
+ }
+
+ PublisherConfig(List<String> destinations, String topic, String userName, String password) {
+ this.destinations = destinations;
+ this.topic = topic;
+ this.userName = userName;
+ this.password = password;
+ }
+
+ List<String> destinations() {
+ return destinations;
+ }
+
+ String topic() {
+ return topic;
+ }
+
+ Option<String> userName() {
+ return Option.of(userName);
+ }
+
+ Option<String> password() {
+ return Option.of(password);
+ }
+
+ boolean isSecured() {
+ return userName().isDefined() && password().isDefined();
+ }
+
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ PublisherConfig that = (PublisherConfig) o;
+ return Objects.equals(destinations, that.destinations) &&
+ Objects.equals(topic, that.topic) &&
+ Objects.equals(userName, that.userName) &&
+ Objects.equals(password, that.password);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(destinations, topic, userName, password);
+ }
+
+ @Override
+ public String toString() {
+ return "PublisherConfig{" +
+ "destinations=" + destinations +
+ ", topic='" + topic + '\'' +
+ ", userName='" + userName + '\'' +
+ ", password='" + password + '\'' +
+ '}';
+ }
+}
diff --git a/src/main/java/org/onap/dcae/common/publishing/VavrUtils.java b/src/main/java/org/onap/dcae/common/publishing/VavrUtils.java
new file mode 100644
index 0000000..1db4e18
--- /dev/null
+++ b/src/main/java/org/onap/dcae/common/publishing/VavrUtils.java
@@ -0,0 +1,62 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * org.onap.dcaegen2.restconfcollector
+ * ================================================================================
+ * Copyright (C) 2018-2019 Huawei. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.common.publishing;
+
+import io.vavr.API;
+import io.vavr.API.Match.Case;
+import java.util.function.Consumer;
+import org.slf4j.Logger;
+
+import static io.vavr.API.$;
+
+/**
+ * @author Pawel Szalapski (pawel.szalapski@nokia.com)
+ */
+public final class VavrUtils {
+
+ private VavrUtils() {
+ // utils aggregator
+ }
+
+ /**
+ * Shortcut for 'string interpolation'
+ */
+ public static String f(String msg, Object... args) {
+ return String.format(msg, args);
+ }
+
+ /**
+ * Wrap failure with a more descriptive message of what has failed and chain original cause. Used to provide a
+ * context for errors instead of raw exception.
+ */
+ public static Case<Throwable, Throwable> enhanceError(String msg) {
+ return API.Case($(), e -> new RuntimeException(msg, e));
+ }
+
+ public static Case<Throwable, Throwable> enhanceError(String pattern, Object... arguments) {
+ return API.Case($(), e -> new RuntimeException(f(pattern, arguments), e));
+ }
+
+ public static Consumer<Throwable> logError(Logger withLogger) {
+ return e -> withLogger.error(e.getMessage(), e);
+ }
+
+
+}