From 11a3345cf03c2ad820fa40440dbe4c89eb963b26 Mon Sep 17 00:00:00 2001 From: Jessica Wagantall Date: Mon, 13 Aug 2018 23:42:40 +0000 Subject: Add RestConf Collector Issue-ID: DCAEGEN2-612 1. Instantiated to support CCVPN Close Loop Use Case 2. In general, this supports data collection from all PNF or devices that supports RestConf protocol Change-Id: I6311ad618e8d68badc5423a63d7781a19dc62829 Signed-off-by: rama-huawei --- etc/DmaapConfig.json | 12 + etc/collector.properties | 22 + etc/establish-subscription-input-template.json | 5 + etc/log4j.xml | 188 +++++++ etc/passwordfile | 1 + pom.xml | 332 ++++++++++++ restconf.iml | 107 ++++ src/assembly/dep.xml | 43 ++ .../dcae/collectors/restconf/common/AnyNode.java | 127 +++++ .../dcae/collectors/restconf/common/AuthType.java | 43 ++ .../dcae/collectors/restconf/common/Constants.java | 38 ++ .../restconf/common/DataChangeEventListener.java | 51 ++ .../collectors/restconf/common/EventProcessor.java | 88 +++ .../dcae/collectors/restconf/common/Format.java | 37 ++ .../collectors/restconf/common/HttpMethod.java | 48 ++ .../collectors/restconf/common/HttpResponse.java | 30 ++ .../collectors/restconf/common/JsonParser.java | 92 ++++ .../collectors/restconf/common/Parameters.java | 52 ++ .../restconf/common/RestConfCollector.java | 66 +++ .../restconf/common/RestConfContext.java | 51 ++ .../collectors/restconf/common/RestConfProc.java | 224 ++++++++ .../restconf/common/RestapiCallNode.java | 600 +++++++++++++++++++++ .../collectors/restconf/common/RetryException.java | 27 + .../collectors/restconf/common/RetryPolicy.java | 58 ++ .../restconf/common/RetryPolicyStore.java | 53 ++ .../collectors/restconf/common/XmlJsonUtil.java | 412 ++++++++++++++ .../dcae/collectors/restconf/common/XmlParser.java | 178 ++++++ .../event/publishing/DMaaPConfigurationParser.java | 107 ++++ .../event/publishing/DMaaPEventPublisher.java | 75 +++ .../event/publishing/DMaaPPublishersBuilder.java | 63 +++ .../event/publishing/DMaaPPublishersCache.java | 110 ++++ .../common/event/publishing/EventPublisher.java | 35 ++ .../common/event/publishing/PublisherConfig.java | 95 ++++ .../common/event/publishing/VavrUtils.java | 48 ++ src/main/scripts/docker_entry.sh | 6 + src/main/scripts/restConfCollector.sh | 100 ++++ .../publishing/DMaaPConfigurationParserTest.java | 111 ++++ .../event/publishing/DMaaPEventPublisherTest.java | 77 +++ .../event/publishing/DMaaPPublishersCacheTest.java | 92 ++++ .../restconf/restconftest/AnyNodeTest.java | 62 +++ .../restconf/restconftest/RestConfProcTest.java | 75 +++ .../restconf/restconftest/SseResource.java | 69 +++ .../restconftest/TestRestConfCollector.java | 66 +++ src/test/resources/RestConfEvent.json | 26 + .../resources/testParseDMaaPCredentialsGen2.json | 21 + .../resources/testParseDMaaPCredentialsLegacy.json | 26 + src/test/resources/testParseDMaaPGen2.json | 12 + src/test/resources/testParseDMaaPLegacy.json | 21 + 48 files changed, 4282 insertions(+) create mode 100755 etc/DmaapConfig.json create mode 100755 etc/collector.properties create mode 100755 etc/establish-subscription-input-template.json create mode 100755 etc/log4j.xml create mode 100755 etc/passwordfile create mode 100755 pom.xml create mode 100755 restconf.iml create mode 100755 src/assembly/dep.xml create mode 100755 src/main/java/org/onap/dcae/collectors/restconf/common/AnyNode.java create mode 100755 src/main/java/org/onap/dcae/collectors/restconf/common/AuthType.java create mode 100755 src/main/java/org/onap/dcae/collectors/restconf/common/Constants.java create mode 100755 src/main/java/org/onap/dcae/collectors/restconf/common/DataChangeEventListener.java create mode 100755 src/main/java/org/onap/dcae/collectors/restconf/common/EventProcessor.java create mode 100755 src/main/java/org/onap/dcae/collectors/restconf/common/Format.java create mode 100755 src/main/java/org/onap/dcae/collectors/restconf/common/HttpMethod.java create mode 100755 src/main/java/org/onap/dcae/collectors/restconf/common/HttpResponse.java create mode 100755 src/main/java/org/onap/dcae/collectors/restconf/common/JsonParser.java create mode 100755 src/main/java/org/onap/dcae/collectors/restconf/common/Parameters.java create mode 100755 src/main/java/org/onap/dcae/collectors/restconf/common/RestConfCollector.java create mode 100755 src/main/java/org/onap/dcae/collectors/restconf/common/RestConfContext.java create mode 100755 src/main/java/org/onap/dcae/collectors/restconf/common/RestConfProc.java create mode 100755 src/main/java/org/onap/dcae/collectors/restconf/common/RestapiCallNode.java create mode 100755 src/main/java/org/onap/dcae/collectors/restconf/common/RetryException.java create mode 100755 src/main/java/org/onap/dcae/collectors/restconf/common/RetryPolicy.java create mode 100755 src/main/java/org/onap/dcae/collectors/restconf/common/RetryPolicyStore.java create mode 100755 src/main/java/org/onap/dcae/collectors/restconf/common/XmlJsonUtil.java create mode 100755 src/main/java/org/onap/dcae/collectors/restconf/common/XmlParser.java create mode 100755 src/main/java/org/onap/dcae/collectors/restconf/common/event/publishing/DMaaPConfigurationParser.java create mode 100755 src/main/java/org/onap/dcae/collectors/restconf/common/event/publishing/DMaaPEventPublisher.java create mode 100755 src/main/java/org/onap/dcae/collectors/restconf/common/event/publishing/DMaaPPublishersBuilder.java create mode 100755 src/main/java/org/onap/dcae/collectors/restconf/common/event/publishing/DMaaPPublishersCache.java create mode 100755 src/main/java/org/onap/dcae/collectors/restconf/common/event/publishing/EventPublisher.java create mode 100755 src/main/java/org/onap/dcae/collectors/restconf/common/event/publishing/PublisherConfig.java create mode 100755 src/main/java/org/onap/dcae/collectors/restconf/common/event/publishing/VavrUtils.java create mode 100755 src/main/scripts/docker_entry.sh create mode 100755 src/main/scripts/restConfCollector.sh create mode 100755 src/test/java/org/onap/dcae/collectors/restconf/common/event/publishing/DMaaPConfigurationParserTest.java create mode 100755 src/test/java/org/onap/dcae/collectors/restconf/common/event/publishing/DMaaPEventPublisherTest.java create mode 100755 src/test/java/org/onap/dcae/collectors/restconf/common/event/publishing/DMaaPPublishersCacheTest.java create mode 100755 src/test/java/org/onap/dcae/collectors/restconf/restconftest/AnyNodeTest.java create mode 100755 src/test/java/org/onap/dcae/collectors/restconf/restconftest/RestConfProcTest.java create mode 100755 src/test/java/org/onap/dcae/collectors/restconf/restconftest/SseResource.java create mode 100755 src/test/java/org/onap/dcae/collectors/restconf/restconftest/TestRestConfCollector.java create mode 100755 src/test/resources/RestConfEvent.json create mode 100755 src/test/resources/testParseDMaaPCredentialsGen2.json create mode 100755 src/test/resources/testParseDMaaPCredentialsLegacy.json create mode 100755 src/test/resources/testParseDMaaPGen2.json create mode 100755 src/test/resources/testParseDMaaPLegacy.json diff --git a/etc/DmaapConfig.json b/etc/DmaapConfig.json new file mode 100755 index 0000000..5176cea --- /dev/null +++ b/etc/DmaapConfig.json @@ -0,0 +1,12 @@ +{ + "channels": [ + { + "name": "route_failure", + "cambria.topic": "unauthenticated.DCAE_RESTCONF_COL_OUTPUT", + "class": "HpCambriaOutputStream", + "stripHpId": "true", + "type": "out", + "cambria.hosts": "onap-message-router" + } + ] +} diff --git a/etc/collector.properties b/etc/collector.properties new file mode 100755 index 0000000..a013578 --- /dev/null +++ b/etc/collector.properties @@ -0,0 +1,22 @@ +############################################################################### +## +## Collector config +## +## - Default values are shown as commented settings. +## +############################################################################### +## Processing +collector.dmaapfile=./etc/DmaapConfig.json +############################################################################### +## +## Tomcat control +## +#tomcat.maxthreads=(tomcat default, which is usually 200) +# list all restconf collector parameters +templateFileName=./etc/establish-subscription-input-template.json +restapiUrl=10.0.4.1:8080;10.0.4.2:8080 +httpMethod=post +responsePrefix=restapi-result +skipSending=false +sseConnectURL=http://10.0.4.1:8080/RestConfServer/rest/ssevents; http://10.0.4.2:8080/RestConfServer/rest/ssevents +format=json diff --git a/etc/establish-subscription-input-template.json b/etc/establish-subscription-input-template.json new file mode 100755 index 0000000..c47ba01 --- /dev/null +++ b/etc/establish-subscription-input-template.json @@ -0,0 +1,5 @@ +{ + "ietf-subscribed-notification:input": { + "encoding": "encoding-json" + } +} diff --git a/etc/log4j.xml b/etc/log4j.xml new file mode 100755 index 0000000..3e3b132 --- /dev/null +++ b/etc/log4j.xml @@ -0,0 +1,188 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/etc/passwordfile b/etc/passwordfile new file mode 100755 index 0000000..702a4cb --- /dev/null +++ b/etc/passwordfile @@ -0,0 +1 @@ +collector diff --git a/pom.xml b/pom.xml new file mode 100755 index 0000000..f0efa8f --- /dev/null +++ b/pom.xml @@ -0,0 +1,332 @@ + + + 4.0.0 + + + org.onap.oparent + oparent + 1.1.0 + + + + org.onap.dcaegen2.collectors.restconf + restconfcollector + 1.0.0-SNAPSHOT + + dcaegen2-collectors-restconf + RestConfCollector + + + + UTF-8 + UTF-8 + 8 + restconfcollector + + + false + + + + + + + + maven-compiler-plugin + 3.7.0 + + + maven-source-plugin + 3.0.1 + + + maven-jar-plugin + 3.1.0 + + + maven-assembly-plugin + 3.1.0 + + + maven-javadoc-plugin + 3.0.1 + + + maven-project-info-reports-plugin + 2.9 + + + com.spotify + docker-maven-plugin + 1.1.1 + + + + + + + maven-compiler-plugin + + ${java.version} + ${java.version} + true + true + + + + + maven-source-plugin + + true + + + + attach-sources + verify + + jar-no-fork + + + + + + + maven-jar-plugin + + + + true + + + ${project.version} + + + + + + + maven-assembly-plugin + + + src/assembly/dep.xml + + false + false + true + + + + make-assembly + package + + single + + + + + + + maven-javadoc-plugin + + + true + false + false + + + + aggregate + site + + aggregate + + + + attach-javadoc + + jar + + + + + + + + + + + maven-project-info-reports-plugin + + + + dependencies + license + + + + + + + maven-javadoc-plugin + + false + org.umlgraph.doclet.UmlGraphDoc + + org.umlgraph + umlgraph + 5.6 + + -views + true + + + + + + + + + + com.googlecode.json-simple + json-simple + 1.1.1 + + + com.github.fge + json-schema-validator + 2.2.6 + + + com.github.fge + json-schema-core + 1.2.5 + + + com.google.code.gson + gson + 2.3.1 + + + org.json + json + 20160810 + + + + + com.att.nsa + nsaServerLibrary + 1.0.10 + + + com.fasterxml.jackson.core + jackson-databind + 2.8.11 + + + + + org.slf4j + slf4j-log4j12 + 1.7.21 + + + log4j + apache-log4j-extras + 1.2.17 + + + + + com.google.guava + guava + 18.0 + + + commons-collections + commons-collections + 3.2.2 + + + commons-configuration + commons-configuration + 1.10 + + + javax.mail + mail + 1.4.7 + + + io.vavr + vavr + 0.9.2 + + + + org.mockito + mockito-core + 2.18.0 + test + + + org.apache.commons + commons-lang3 + 3.7 + + + org.codehaus.jettison + jettison + 1.3.7 + + + org.glassfish.jersey.core + jersey-client + 2.27 + + + org.glassfish.jersey.containers + jersey-container-grizzly2-http + 2.27 + + + org.glassfish.jersey.inject + jersey-hk2 + 2.27 + + + org.glassfish.jersey.media + jersey-media-sse + 2.27 + + + com.sun.jersey.contribs.jersey-oauth + oauth-client + 1.19.1 + + + com.sun.jersey.contribs.jersey-oauth + oauth-signature + 1.19.1 + + + org.hamcrest + hamcrest-library + 1.3 + + + org.assertj + assertj-core + 3.8.0 + test + + + + + external-repository + https://oss.sonatype.org/content/repositories + + + \ No newline at end of file diff --git a/restconf.iml b/restconf.iml new file mode 100755 index 0000000..0267434 --- /dev/null +++ b/restconf.iml @@ -0,0 +1,107 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/src/assembly/dep.xml b/src/assembly/dep.xml new file mode 100755 index 0000000..472d966 --- /dev/null +++ b/src/assembly/dep.xml @@ -0,0 +1,43 @@ + + + bundle + false + + dir + + + + + src/main/scripts + bin + + **/*.* + + 0755 + unix + + + etc + etc + + + src/main/resources + + **/*.conf + + etc + + + + + + + runtime + true + false + lib + + + diff --git a/src/main/java/org/onap/dcae/collectors/restconf/common/AnyNode.java b/src/main/java/org/onap/dcae/collectors/restconf/common/AnyNode.java new file mode 100755 index 0000000..044a9cf --- /dev/null +++ b/src/main/java/org/onap/dcae/collectors/restconf/common/AnyNode.java @@ -0,0 +1,127 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.collectors.restconf + * ================================================================================ + * Copyright (C) 2018 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcae.collectors.restconf.common; + +import io.vavr.collection.List; +import io.vavr.collection.Set; +import io.vavr.control.Option; +import org.json.JSONArray; +import org.json.JSONException; +import org.json.JSONObject; + +import java.util.stream.StreamSupport; + +import static io.vavr.API.Set; + +public class AnyNode { + private Object obj; + + private AnyNode(Object object) { + this.obj = object; + } + + public static AnyNode fromString(String content) { + return new AnyNode(new JSONObject(content)); + } + + /** + * Returns key set of underlying object. It is assumed that underlying object is of type org.json.JSONObject. + * + * @return key set of underlying objects + */ + public Set keys() { + return Set(asJsonObject().keySet().toArray(new String[]{})); + } + + /** + * Returns value associated with specified key wrapped with AnyValue object. It is assumed that this is of type + * org.json.JSONObject. + * + * @param key for querying value from jsonobject + * @return value associated with specified key + */ + public AnyNode get(String key) { + return new AnyNode(asJsonObject().get(key)); + } + + /** + * Returns string representation of this. If it happens to have null, the value is treated as + * org.json.JSONObject.NULL and "null" string is returned then. + * + * @return string representation of this + */ + public String toString() { + return this.obj.toString(); + } + + /** + * Returns optional of object under specified key, wrapped with AnyNode object. + * If underlying object is not of type org.json.JSONObject + * or underlying object has no given key + * or given key is null + * then Optional.empty will be returned. + * + * @param key for querying value from AnyNode object + * @return optional of object under specified key + */ + public Option getAsOption(String key) { + try { + AnyNode value = get(key); + if (value.toString().equals("null")) { + return Option.none(); + } + return Option.some(value); + } catch (JSONException ex) { + return Option.none(); + } + } + + /** + * Converts underlying object to map representation with map values wrapped with AnyNode object. It is assumed that + * underlying object is of type org.json.JSONObject. + * + * @return converts underlying object to map representation + */ + public List toList() { + return List.ofAll(StreamSupport.stream(((JSONArray) this.obj).spliterator(), false).map(AnyNode::new)); + } + + /** + * Checks if specified key is present in this. It is assumed that this is of type JSONObject. + * + * @param key is used to check presence in anynode object + * @return true if specified key is present in this + */ + public boolean has(String key) { + return !getAsOption(key).isEmpty(); + } + + /** + * Returns as JSONObject. + * + * @return jsonobject + */ + private JSONObject asJsonObject() { + return (JSONObject) this.obj; + } + + +} diff --git a/src/main/java/org/onap/dcae/collectors/restconf/common/AuthType.java b/src/main/java/org/onap/dcae/collectors/restconf/common/AuthType.java new file mode 100755 index 0000000..47ae8ac --- /dev/null +++ b/src/main/java/org/onap/dcae/collectors/restconf/common/AuthType.java @@ -0,0 +1,43 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.collectors.restconf + * ================================================================================ + * Copyright (C) 2018 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.restconf.common; + +public enum AuthType { + NONE, BASIC, DIGEST, OAUTH, Unspecified; + + public static AuthType fromString(String s) { + if ("basic".equalsIgnoreCase(s)) { + return BASIC; + } + if ("digest".equalsIgnoreCase(s)) { + return DIGEST; + } + if ("oauth".equalsIgnoreCase(s)) { + return OAUTH; + } + if ("none".equalsIgnoreCase(s)) { + return NONE; + } + if ("unspecified".equalsIgnoreCase(s)) { + return Unspecified; + } + throw new IllegalArgumentException("Invalid value for format: " + s); + } +} diff --git a/src/main/java/org/onap/dcae/collectors/restconf/common/Constants.java b/src/main/java/org/onap/dcae/collectors/restconf/common/Constants.java new file mode 100755 index 0000000..4845bfc --- /dev/null +++ b/src/main/java/org/onap/dcae/collectors/restconf/common/Constants.java @@ -0,0 +1,38 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.collectors.restconf + * ================================================================================ + * Copyright (C) 2018 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcae.collectors.restconf.common; + +public class Constants { + public static final String KDEFAULT_TEMP_FILENAME = "templateFileName"; + public static final String KSETTING_REST_API_URL = "restapiUrl"; + public static final String KSETTING_HTTP_METHOD = "httpMethod"; + public static final String KSETTING_RESP_PREFIX = "responsePrefix"; + public static final String KSETTING_SKIP_SENDING = "skipSending"; + public static final String KSETTING_FORMAT = "format"; + public static final String KSETTING_DMAAPCONFIGS = "collector.dmaapfile"; + public static final String[] KDEFAULT_DMAAPCONFIGS = new String[]{"./etc/DmaapConfig.json"}; + public static final String KSETTING_SSE_CONNECT_URL = "sseConnectURL"; + public static final int KDEFAULT_MAXQUEUEDEVENTS = 1024 * 4; + public static final String RESPONSE_CODE = "restapi-result.response-code"; + public static final String OUTPUT_IDENTIFIER = "restapi-result.ietf-subscribed-notifications:output.identifier"; + public static final String RESPONSE_CODE_200 = "200"; + public static final String KCONFIG = "c"; +} diff --git a/src/main/java/org/onap/dcae/collectors/restconf/common/DataChangeEventListener.java b/src/main/java/org/onap/dcae/collectors/restconf/common/DataChangeEventListener.java new file mode 100755 index 0000000..97ee623 --- /dev/null +++ b/src/main/java/org/onap/dcae/collectors/restconf/common/DataChangeEventListener.java @@ -0,0 +1,51 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.collectors.restconf + * ================================================================================ + * Copyright (C) 2018 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcae.collectors.restconf.common; + +import org.glassfish.jersey.media.sse.EventListener; +import org.glassfish.jersey.media.sse.InboundEvent; +import org.json.JSONArray; +import org.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DataChangeEventListener implements EventListener { + private static final Logger log = LoggerFactory.getLogger(DataChangeEventListener.class); + private RestConfContext ctx; + + public DataChangeEventListener(RestConfContext ctx) { + this.ctx = ctx; + } + + @Override + public void onEvent(InboundEvent event) { + JSONArray jsonArrayMod; + log.info("On SSE Event is received"); + String s = event.readData(); + JSONObject jsonObj = new JSONObject(s); + jsonArrayMod = new JSONArray().put(jsonObj); + try { + RestConfProc.handleEvents(jsonArrayMod); + } catch (Exception e) { + e.printStackTrace(); + } + } +} diff --git a/src/main/java/org/onap/dcae/collectors/restconf/common/EventProcessor.java b/src/main/java/org/onap/dcae/collectors/restconf/common/EventProcessor.java new file mode 100755 index 0000000..3409f9c --- /dev/null +++ b/src/main/java/org/onap/dcae/collectors/restconf/common/EventProcessor.java @@ -0,0 +1,88 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.collectors.restconf + * ================================================================================ + * Copyright (C) 2018 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcae.collectors.restconf.common; + + +import org.json.JSONObject; +import org.onap.dcae.collectors.restconf.common.event.publishing.EventPublisher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +public class EventProcessor implements Runnable { + private static final Logger log = LoggerFactory.getLogger(EventProcessor.class); + + static Map streamidHash = new HashMap<>(); + public JSONObject event; + private EventPublisher eventPublisher; + + public EventProcessor(EventPublisher eventPublisher) { + this.eventPublisher = eventPublisher; + streamidHash = parseStreamIdToStreamHashMapping(new RestConfProc().streamID); + } + + private Map parseStreamIdToStreamHashMapping(String streamId) { + Map streamidHash = new HashMap<>(); + String[] list = streamId.split("\\|"); + for (String aList : list) { + String domain = aList.split("=")[0]; + String[] streamIdList = aList.substring(aList.indexOf('=') + 1).split(","); + streamidHash.put(domain, streamIdList); + } + return streamidHash; + } + + @Override + public void run() { + try { + + while (true) { + event = RestConfProc.fProcessingInputQueue.take(); + // As long as the producer is running we remove elements from + // the queue. + log.info("QueueSize:" + RestConfProc.fProcessingInputQueue.size() + "\tEventProcessor\tRemoving element: " + + event); + String[] streamIdList = streamidHash.get("route"); + log.debug("streamIdList:" + Arrays.toString(streamIdList)); + + if (streamIdList.length == 0) { + log.error("No StreamID defined for publish - Message dropped" + event); + } else { + sendEventsToStreams(streamIdList); + } + log.debug("Event published" + event); + } + } catch (Exception e) { + log.error("EventProcessor InterruptedException" + e.getMessage()); + Thread.currentThread().interrupt(); + } + } + + private void sendEventsToStreams(String[] streamIdList) { + for (String aStreamIdList : streamIdList) { + log.info("Invoking publisher for streamId:" + aStreamIdList); + eventPublisher.sendEvent(event, aStreamIdList); + } + } +} diff --git a/src/main/java/org/onap/dcae/collectors/restconf/common/Format.java b/src/main/java/org/onap/dcae/collectors/restconf/common/Format.java new file mode 100755 index 0000000..94344d6 --- /dev/null +++ b/src/main/java/org/onap/dcae/collectors/restconf/common/Format.java @@ -0,0 +1,37 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.collectors.restconf + * ================================================================================ + * Copyright (C) 2018 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.restconf.common; + +public enum Format { + JSON, XML, NONE; + + public static Format fromString(String s) { + if ("json".equalsIgnoreCase(s)) { + return JSON; + } + if ("xml".equalsIgnoreCase(s)) { + return XML; + } + if ("none".equalsIgnoreCase(s)) { + return NONE; + } + throw new IllegalArgumentException("Invalid value for format: " + s); + } +} diff --git a/src/main/java/org/onap/dcae/collectors/restconf/common/HttpMethod.java b/src/main/java/org/onap/dcae/collectors/restconf/common/HttpMethod.java new file mode 100755 index 0000000..d85d5f5 --- /dev/null +++ b/src/main/java/org/onap/dcae/collectors/restconf/common/HttpMethod.java @@ -0,0 +1,48 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.collectors.restconf + * ================================================================================ + * Copyright (C) 2018 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + + +package org.onap.dcae.collectors.restconf.common; + +public enum HttpMethod { + GET, POST, PUT, DELETE, PATCH; + + public static HttpMethod fromString(String s) { + if (s == null) { + return null; + } + if (s.equalsIgnoreCase("get")) { + return GET; + } + if (s.equalsIgnoreCase("post")) { + return POST; + } + if (s.equalsIgnoreCase("put")) { + return PUT; + } + if (s.equalsIgnoreCase("delete")) { + return DELETE; + } + if (s.equalsIgnoreCase("patch")) { + return PATCH; + } + throw new IllegalArgumentException("Invalid value for HTTP Method: " + s); + } +} diff --git a/src/main/java/org/onap/dcae/collectors/restconf/common/HttpResponse.java b/src/main/java/org/onap/dcae/collectors/restconf/common/HttpResponse.java new file mode 100755 index 0000000..e1b97da --- /dev/null +++ b/src/main/java/org/onap/dcae/collectors/restconf/common/HttpResponse.java @@ -0,0 +1,30 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.collectors.restconf + * ================================================================================ + * Copyright (C) 2018 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcae.collectors.restconf.common; + +import javax.ws.rs.core.MultivaluedMap; + +public class HttpResponse { + public int code; + public String message; + public String body; + public MultivaluedMap headers; +} diff --git a/src/main/java/org/onap/dcae/collectors/restconf/common/JsonParser.java b/src/main/java/org/onap/dcae/collectors/restconf/common/JsonParser.java new file mode 100755 index 0000000..f29bbc3 --- /dev/null +++ b/src/main/java/org/onap/dcae/collectors/restconf/common/JsonParser.java @@ -0,0 +1,92 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.collectors.restconf + * ================================================================================ + * Copyright (C) 2018 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcae.collectors.restconf.common; + +import org.codehaus.jettison.json.JSONArray; +import org.codehaus.jettison.json.JSONException; +import org.codehaus.jettison.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +import static com.google.common.base.Preconditions.checkNotNull; + +public class JsonParser { + + private static final Logger log = LoggerFactory.getLogger(JsonParser.class); + + private JsonParser() { + // Preventing instantiation of the same. + } + + @SuppressWarnings("unchecked") + public static Map convertToProperties(String s) + throws Exception { + + checkNotNull(s, "Input should not be null."); + + try { + JSONObject json = new JSONObject(s); + Map wm = new HashMap<>(); + Iterator ii = json.keys(); + while (ii.hasNext()) { + String key1 = ii.next(); + wm.put(key1, json.get(key1)); + } + + Map mm = new HashMap<>(); + + while (!wm.isEmpty()) + for (String key : new ArrayList<>(wm.keySet())) { + Object o = wm.get(key); + wm.remove(key); + + if (o instanceof Boolean || o instanceof Number || o instanceof String) { + mm.put(key, o.toString()); + + log.info("Added property: {} : {}", key, o.toString()); + } else if (o instanceof JSONObject) { + JSONObject jo = (JSONObject) o; + Iterator i = jo.keys(); + while (i.hasNext()) { + String key1 = i.next(); + wm.put(key + "." + key1, jo.get(key1)); + } + } else if (o instanceof JSONArray) { + JSONArray ja = (JSONArray) o; + mm.put(key + "_length", String.valueOf(ja.length())); + + log.info("Added property: {}_length: {}", key, String.valueOf(ja.length())); + + for (int i = 0; i < ja.length(); i++) + wm.put(key + '[' + i + ']', ja.get(i)); + } + } + return mm; + } catch (JSONException e) { + throw new Exception("Unable to convert JSON to properties" + e.getLocalizedMessage(), e); + } + } +} diff --git a/src/main/java/org/onap/dcae/collectors/restconf/common/Parameters.java b/src/main/java/org/onap/dcae/collectors/restconf/common/Parameters.java new file mode 100755 index 0000000..02fd68b --- /dev/null +++ b/src/main/java/org/onap/dcae/collectors/restconf/common/Parameters.java @@ -0,0 +1,52 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.collectors.restconf + * ================================================================================ + * Copyright (C) 2018 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcae.collectors.restconf.common; + +import java.util.Set; + +public class Parameters { + public String templateFileName; + public String restapiUrl; + public String restapiUser; + public String restapiPassword; + public Format format; + public String contentType; + public HttpMethod httpMethod; + public String responsePrefix; + public Set listNameList; + public boolean skipSending; + public boolean convertResponse; + public String keyStoreFileName; + public String keyStorePassword; + public String trustStoreFileName; + public String trustStorePassword; + public boolean ssl; + public String customHttpHeaders; + public String partner; + public Boolean dumpHeaders; + public String requestBody; + public String oAuthConsumerKey; + public String oAuthConsumerSecret; + public String oAuthSignatureMethod; + public String oAuthVersion; + public AuthType authtype; + public Boolean returnRequestPayload; +} diff --git a/src/main/java/org/onap/dcae/collectors/restconf/common/RestConfCollector.java b/src/main/java/org/onap/dcae/collectors/restconf/common/RestConfCollector.java new file mode 100755 index 0000000..754c73d --- /dev/null +++ b/src/main/java/org/onap/dcae/collectors/restconf/common/RestConfCollector.java @@ -0,0 +1,66 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.collectors.restconf + * ================================================================================ + * Copyright (C) 2018 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcae.collectors.restconf.common; + +import com.att.nsa.cmdLine.NsaCommandLineUtil; +import com.att.nsa.drumlin.service.framework.DrumlinServlet; +import com.att.nsa.drumlin.till.nv.impl.nvPropertiesFile; +import com.att.nsa.drumlin.till.nv.impl.nvReadableStack; +import com.att.nsa.drumlin.till.nv.impl.nvReadableTable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.URL; +import java.util.Map; + +public class RestConfCollector { + + public static final Logger eplog = LoggerFactory.getLogger("org.onap.restconf.common.error"); + + public static void main(String[] args) { + try { + final Map argMap = NsaCommandLineUtil.processCmdLine(args, true); + final String config = NsaCommandLineUtil.getSetting(argMap, Constants.KCONFIG, "collector.properties"); + final URL settingStream = DrumlinServlet.findStream(config, RestConfCollector.class); + + final nvReadableStack settings = new nvReadableStack(); + settings.push(new nvPropertiesFile(settingStream)); + settings.push(new nvReadableTable(argMap)); + + RestConfProc restConfProc = new RestConfProc(settings); + Map paraMap = restConfProc.getParaMap(); + String restApiURL = paraMap.get(Constants.KSETTING_REST_API_URL); + String sseEventsURL = paraMap.get(Constants.KSETTING_SSE_CONNECT_URL); + String[] listRestApiURL = restApiURL.split(";"); + String[] listSseEventsURL = sseEventsURL.split(";"); + for (int i = 0; i < listRestApiURL.length; i++) { + paraMap.put(Constants.KSETTING_REST_API_URL, "http://" + listRestApiURL[i] + + "/RestConfServer/rest/operations/establish-subscription"); + paraMap.put(Constants.KSETTING_SSE_CONNECT_URL, listSseEventsURL[i]); + restConfProc.establishSubscription(paraMap, restConfProc.getCtx()); + } + + } catch (Exception e) { + RestConfCollector.eplog.error("Fatal error during application startup", e); + throw new RuntimeException(e); + } + } +} diff --git a/src/main/java/org/onap/dcae/collectors/restconf/common/RestConfContext.java b/src/main/java/org/onap/dcae/collectors/restconf/common/RestConfContext.java new file mode 100755 index 0000000..4cb16ae --- /dev/null +++ b/src/main/java/org/onap/dcae/collectors/restconf/common/RestConfContext.java @@ -0,0 +1,51 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.collectors.restconf + * ================================================================================ + * Copyright (C) 2018 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcae.collectors.restconf.common; + +import java.util.HashMap; +import java.util.Set; + +public class RestConfContext { + private HashMap attributes; + + public RestConfContext() { + attributes = new HashMap<>(); + } + + public String getAttribute(String name) { + return attributes.getOrDefault(name, null); + } + + public void setAttribute(String name, String value) { + if (value == null) { + if (attributes.containsKey(name)) { + attributes.remove(name); + } + } else { + attributes.put(name, value); + } + } + + public Set getAttributeKeySet() { + return attributes.keySet(); + } + +} diff --git a/src/main/java/org/onap/dcae/collectors/restconf/common/RestConfProc.java b/src/main/java/org/onap/dcae/collectors/restconf/common/RestConfProc.java new file mode 100755 index 0000000..67ea1fb --- /dev/null +++ b/src/main/java/org/onap/dcae/collectors/restconf/common/RestConfProc.java @@ -0,0 +1,224 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.collectors.restconf + * ================================================================================ + * Copyright (C) 2018 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcae.collectors.restconf.common; + +import com.att.nsa.drumlin.till.nv.rrNvReadable; +import org.glassfish.jersey.media.sse.EventSource; +import org.glassfish.jersey.media.sse.SseFeature; +import org.json.JSONArray; +import org.json.JSONObject; +import org.onap.dcae.collectors.restconf.common.event.publishing.DMaaPConfigurationParser; +import org.onap.dcae.collectors.restconf.common.event.publishing.EventPublisher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.ws.rs.client.Client; +import javax.ws.rs.client.ClientBuilder; +import javax.ws.rs.client.WebTarget; +import java.nio.file.Paths; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; + + +public class RestConfProc { + + private static final Logger log = LoggerFactory.getLogger(RestConfProc.class); + + public static String format; + + private static RestConfContext ctx = new RestConfContext(); + + private static final Logger oplog = LoggerFactory.getLogger("org.onap.restconf.common.output"); + + private Map runnableInfo = new ConcurrentHashMap<>(); + + private final Map paraMap = new HashMap<>(); + private static String cambriaConfigFile; + + public static LinkedBlockingQueue fProcessingInputQueue; + + public static String streamID; + private ExecutorService executor = Executors.newCachedThreadPool(); + + public RestConfProc() { + } + + private void parseInputParameters(rrNvReadable settings) { + String tempFileName; + String restApiUrl; + String httpMetthod; + String respPrefix; + String skipSending; + String sseConnectUrl; + String[] currentConfigFile; + + currentConfigFile = settings.getStrings(Constants.KSETTING_DMAAPCONFIGS, Constants.KDEFAULT_DMAAPCONFIGS); + cambriaConfigFile = currentConfigFile[0]; + + tempFileName = settings.getString(Constants.KDEFAULT_TEMP_FILENAME, null); + restApiUrl = settings.getString(Constants.KSETTING_REST_API_URL, null); + httpMetthod = settings.getString(Constants.KSETTING_HTTP_METHOD, null); + respPrefix = settings.getString(Constants.KSETTING_RESP_PREFIX, null); + skipSending = settings.getString(Constants.KSETTING_SKIP_SENDING, null); + sseConnectUrl = settings.getString(Constants.KSETTING_SSE_CONNECT_URL, null); + format = settings.getString(Constants.KSETTING_FORMAT, null); + streamID = "route=route_failure"; + + paraMap.put(Constants.KDEFAULT_TEMP_FILENAME, tempFileName); + paraMap.put(Constants.KSETTING_REST_API_URL, restApiUrl); + paraMap.put(Constants.KSETTING_HTTP_METHOD, httpMetthod); + paraMap.put(Constants.KSETTING_RESP_PREFIX, respPrefix); + paraMap.put(Constants.KSETTING_SKIP_SENDING, skipSending); + paraMap.put(Constants.KSETTING_SSE_CONNECT_URL, sseConnectUrl); + paraMap.put(Constants.KSETTING_FORMAT, format); + + ctx.setAttribute("prop.encoding-json", "encoding-json"); + ctx.setAttribute("restapi-result.response-code", "200"); + ctx.setAttribute("restapi-result.ietf-subscribed-notifications:output.identifier", "100"); + } + + public RestConfProc(rrNvReadable settings) { + + parseInputParameters(settings); + + fProcessingInputQueue = new LinkedBlockingQueue<>(Constants.KDEFAULT_MAXQUEUEDEVENTS); + + EventProcessor ep = new EventProcessor(EventPublisher.createPublisher(oplog, + DMaaPConfigurationParser + .parseToDomainMapping(Paths.get(cambriaConfigFile)) + .get())); + ExecutorService executor = Executors.newFixedThreadPool(20); + for (int i = 0; i < 20; ++i) { + executor.execute(ep); + } + } + + /** + * To establish a subscription with controller by sending HTTP request + * + * @param paramMap holds the input configuration + * @param ctx restconf context + * @throws Exception exception + */ + public void establishSubscription(Map paramMap, RestConfContext ctx) throws Exception { + + RestapiCallNode restApiCallNode = new RestapiCallNode(); + + restApiCallNode.sendRequest(paramMap, ctx, null); + + establishPersistentConnection(paramMap, ctx); + } + + /** + * To establish persistent connection after receiving successful subscription response from controller + * + * @param paramMap holds the input configuration + * @param ctx restconf context + */ + public void establishPersistentConnection(Map paramMap, RestConfContext ctx) { + + // check whether response is ok + if (ctx.getAttribute(Constants.RESPONSE_CODE).equals(Constants.RESPONSE_CODE_200)) { + + String id = ctx.getAttribute(Constants.OUTPUT_IDENTIFIER); + + String url = paramMap.get(Constants.KSETTING_SSE_CONNECT_URL); + + PersistentConnection connection = new PersistentConnection(url, ctx); + runnableInfo.put(id, connection); + executor.execute(connection); + } else { + // error response is already updated in ctx + log.info("Failed to subscribe"); + } + } + + /** + * Get input parameter map + * + * @return input parameters map + */ + public Map getParaMap() { + return paraMap; + } + + + /** + * Get restConf context which has information about message encoding type + * + * @return restconf context + */ + public RestConfContext getCtx() { + return ctx; + } + + public class PersistentConnection implements Runnable { + private String url; + private RestConfContext ctx; + private volatile boolean running = true; + + public PersistentConnection(String url, RestConfContext ctx) { + this.url = url; + this.ctx = ctx; + } + + @Override + public void run() { + Client client = ClientBuilder.newBuilder() + .register(SseFeature.class).build(); + WebTarget target = client.target(url); + EventSource eventSource = EventSource.target(target).build(); + eventSource.register(new DataChangeEventListener(ctx)); + eventSource.open(); + log.info("Connected to SSE source"); + while (running) { + try { + Thread.sleep(5000); + } catch (InterruptedException ie) { + log.info("Exception: " + ie.getMessage()); + } + } + eventSource.close(); + log.info("Closed connection to SSE source"); + } + } + + /** + * To process the array of events which are received from controller + * + * @param a JSONArray + * @throws Exception exception + */ + public static void handleEvents(JSONArray a) throws Exception { + for (int i = 0; i < a.length(); i++) { + if (!fProcessingInputQueue.offer(a.getJSONObject(i))) { + throw new Exception(); + } + } + log.debug("RestConfCollector.handleEvents:EVENTS has been published successfully!"); + } +} + + diff --git a/src/main/java/org/onap/dcae/collectors/restconf/common/RestapiCallNode.java b/src/main/java/org/onap/dcae/collectors/restconf/common/RestapiCallNode.java new file mode 100755 index 0000000..a324e87 --- /dev/null +++ b/src/main/java/org/onap/dcae/collectors/restconf/common/RestapiCallNode.java @@ -0,0 +1,600 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.collectors.restconf + * ================================================================================ + * Copyright (C) 2018 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcae.collectors.restconf.common; + +import com.sun.jersey.api.client.Client; +import com.sun.jersey.api.client.ClientHandlerException; +import com.sun.jersey.api.client.ClientResponse; +import com.sun.jersey.api.client.UniformInterfaceException; +import com.sun.jersey.api.client.WebResource; +import com.sun.jersey.api.client.config.ClientConfig; +import com.sun.jersey.api.client.config.DefaultClientConfig; +import com.sun.jersey.api.client.filter.HTTPBasicAuthFilter; +import com.sun.jersey.api.client.filter.HTTPDigestAuthFilter; +import com.sun.jersey.client.urlconnection.HTTPSProperties; +import com.sun.jersey.oauth.client.OAuthClientFilter; +import com.sun.jersey.oauth.signature.OAuthParameters; +import com.sun.jersey.oauth.signature.OAuthSecrets; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.net.ssl.HostnameVerifier; +import javax.net.ssl.HttpsURLConnection; +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.SSLContext; +import javax.ws.rs.core.EntityTag; +import javax.ws.rs.core.MultivaluedMap; +import javax.ws.rs.core.UriBuilder; +import java.io.FileInputStream; +import java.io.IOException; +import java.net.SocketException; +import java.net.URI; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.security.KeyStore; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class RestapiCallNode { + private static final Logger log = LoggerFactory.getLogger(RestapiCallNode.class); + + public void sendRequest(Map paramMap, RestConfContext ctx, Integer retryCount) throws Exception { + RetryPolicy retryPolicy = null; + HttpResponse r = new HttpResponse(); + try { + Parameters p = getParameters(paramMap); + String pp = p.responsePrefix != null ? p.responsePrefix + '.' : ""; + String req = null; + if (p.templateFileName != null) { + String reqTemplate = readFile(p.templateFileName); + req = buildXmlJsonRequest(ctx, reqTemplate, p.format); + } else if (p.requestBody != null) { + req = p.requestBody; + } + + r = sendHttpRequest(req, p); + setResponseStatus(ctx, p.responsePrefix, r); + + if (p.dumpHeaders && r.headers != null) { + for (Map.Entry> a : r.headers.entrySet()) { + ctx.setAttribute(pp + "header." + a.getKey(), StringUtils.join(a.getValue(), ",")); + } + } + + if (p.returnRequestPayload && req != null) { + ctx.setAttribute(pp + "httpRequest", req); + } + + if (r.body != null && r.body.trim().length() > 0) { + ctx.setAttribute(pp + "httpResponse", r.body); + + if (p.convertResponse) { + Map mm = null; + if (p.format == Format.XML) { + mm = XmlParser.convertToProperties(r.body, p.listNameList); + } else if (p.format == Format.JSON) { + mm = JsonParser.convertToProperties(r.body); + } + + if (mm != null) { + for (Map.Entry entry : mm.entrySet()) + ctx.setAttribute(pp + entry.getKey(), entry.getValue()); + } + } + } + } catch (Exception e) { + boolean shouldRetry = false; + if (e.getCause().getCause() instanceof SocketException) { + shouldRetry = true; + } + + log.error("Error sending the request: " + e.getMessage(), e); + String prefix = parseParam(paramMap, "responsePrefix", false, null); + if (retryPolicy == null || shouldRetry == false) { + setFailureResponseStatus(ctx, prefix, e.getMessage(), r); + } else { + if (retryCount == null) { + retryCount = 0; + } + String retryMessage = retryCount + " attempts were made out of " + retryPolicy.getMaximumRetries() + + " maximum retries."; + log.debug(retryMessage); + try { + retryCount = retryCount + 1; + if (retryCount < retryPolicy.getMaximumRetries() + 1) { + URI uri = new URI(paramMap.get("restapiUrl")); + String hostname = uri.getHost(); + String retryString = retryPolicy.getNextHostName(uri.toString()); + URI uriTwo = new URI(retryString); + URI retryUri = UriBuilder.fromUri(uri).host(uriTwo.getHost()).port(uriTwo.getPort()).scheme( + uriTwo.getScheme()).build(); + paramMap.put("restapiUrl", retryUri.toString()); + log.debug("URL was set to {}", retryUri.toString()); + log.debug("Failed to communicate with host {}. Request will be re-attempted using the host {}.", + hostname, retryString); + log.debug("This is retry attempt {} out of {}", retryCount, retryPolicy.getMaximumRetries()); + sendRequest(paramMap, ctx, retryCount); + } else { + log.debug("Maximum retries reached, calling setFailureResponseStatus."); + setFailureResponseStatus(ctx, prefix, e.getMessage(), r); + } + } catch (Exception ex) { + log.error("Could not attempt retry.", ex); + String retryErrorMessage = + "Retry attempt has failed. No further retry shall be attempted, calling " + + "setFailureResponseStatus."; + setFailureResponseStatus(ctx, prefix, retryErrorMessage, r); + } + } + } + + if (r != null && r.code >= 300) { + throw new Exception(String.valueOf(r.code) + ": " + r.message); + } + } + + protected Parameters getParameters(Map paramMap) throws Exception { + Parameters p = new Parameters(); + p.templateFileName = parseParam(paramMap, "templateFileName", false, null); + p.requestBody = parseParam(paramMap, "requestBody", false, null); + p.restapiUrl = parseParam(paramMap, "restapiUrl", true, null); + validateUrl(p.restapiUrl); + p.restapiUser = parseParam(paramMap, "restapiUser", false, null); + p.restapiPassword = parseParam(paramMap, "restapiPassword", false, null); + p.oAuthConsumerKey = parseParam(paramMap, "oAuthConsumerKey", false, null); + p.oAuthConsumerSecret = parseParam(paramMap, "oAuthConsumerSecret", false, null); + p.oAuthSignatureMethod = parseParam(paramMap, "oAuthSignatureMethod", false, null); + p.oAuthVersion = parseParam(paramMap, "oAuthVersion", false, null); + p.contentType = parseParam(paramMap, "contentType", false, null); + p.format = Format.fromString(parseParam(paramMap, "format", false, "json")); + p.authtype = AuthType.fromString(parseParam(paramMap, "authType", false, "unspecified")); + p.httpMethod = HttpMethod.fromString(parseParam(paramMap, "httpMethod", false, "post")); + p.responsePrefix = parseParam(paramMap, "responsePrefix", false, null); + p.listNameList = getListNameList(paramMap); + String skipSendingStr = paramMap.get("skipSending"); + p.skipSending = "true".equalsIgnoreCase(skipSendingStr); + p.convertResponse = Boolean.valueOf(parseParam(paramMap, "convertResponse", false, "true")); + p.trustStoreFileName = parseParam(paramMap, "trustStoreFileName", false, null); + p.trustStorePassword = parseParam(paramMap, "trustStorePassword", false, null); + p.keyStoreFileName = parseParam(paramMap, "keyStoreFileName", false, null); + p.keyStorePassword = parseParam(paramMap, "keyStorePassword", false, null); + p.ssl = p.trustStoreFileName != null && p.trustStorePassword != null && p.keyStoreFileName != null && + p.keyStorePassword != null; + p.customHttpHeaders = parseParam(paramMap, "customHttpHeaders", false, null); + p.partner = parseParam(paramMap, "partner", false, null); + p.dumpHeaders = Boolean.valueOf(parseParam(paramMap, "dumpHeaders", false, null)); + p.returnRequestPayload = Boolean.valueOf(parseParam(paramMap, "returnRequestPayload", false, null)); + return p; + } + + private void validateUrl(String restapiUrl) throws Exception { + try { + URI.create(restapiUrl); + } catch (IllegalArgumentException e) { + throw new Exception("Invalid input of url " + e.getLocalizedMessage(), e); + } + } + + protected Set getListNameList(Map paramMap) { + Set ll = new HashSet<>(); + for (Map.Entry entry : paramMap.entrySet()) + if (entry.getKey().startsWith("listName")) { + ll.add(entry.getValue()); + } + return ll; + } + + protected String parseParam(Map paramMap, String name, boolean required, String def) + throws Exception { + String s = paramMap.get(name); + + if (s == null || s.trim().length() == 0) { + if (!required) { + return def; + } + throw new Exception("Parameter " + name + " is required in RestapiCallNode"); + } + + s = s.trim(); + StringBuilder value = new StringBuilder(); + int i = 0; + int i1 = s.indexOf('%'); + while (i1 >= 0) { + int i2 = s.indexOf('%', i1 + 1); + if (i2 < 0) { + break; + } + + String varName = s.substring(i1 + 1, i2); + String varValue = System.getenv(varName); + if (varValue == null) { + varValue = "%" + varName + "%"; + } + + value.append(s.substring(i, i1)); + value.append(varValue); + + i = i2 + 1; + i1 = s.indexOf('%', i); + } + value.append(s.substring(i)); + + log.info("Parameter {}: [{}]", name, value); + return value.toString(); + } + + protected String buildXmlJsonRequest(RestConfContext ctx, String template, Format format) throws Exception { + log.info("Building {} started", format); + long t1 = System.currentTimeMillis(); + + template = expandRepeats(ctx, template, 1); + + Map mm = new HashMap<>(); + for (String s : ctx.getAttributeKeySet()) + mm.put(s, ctx.getAttribute(s)); + StringBuilder ss = new StringBuilder(); + int i = 0; + while (i < template.length()) { + int i1 = template.indexOf("${", i); + if (i1 < 0) { + ss.append(template.substring(i)); + break; + } + + int i2 = template.indexOf('}', i1 + 2); + if (i2 < 0) { + throw new Exception("Template error: Matching } not found"); + } + + String var1 = template.substring(i1 + 2, i2); + String value1 = format == Format.XML ? XmlJsonUtil.getXml(mm, var1) : XmlJsonUtil.getJson(mm, var1); + // log.info(" " + var1 + ": " + value1); + if (value1 == null || value1.trim().length() == 0) { + // delete the whole element (line) + int i3 = template.lastIndexOf('\n', i1); + if (i3 < 0) { + i3 = 0; + } + int i4 = template.indexOf('\n', i1); + if (i4 < 0) { + i4 = template.length(); + } + + if (i < i3) { + ss.append(template.substring(i, i3)); + } + i = i4; + } else { + ss.append(template.substring(i, i1)).append(value1); + i = i2 + 1; + } + } + + String req = format == Format.XML + ? XmlJsonUtil.removeEmptyStructXml(ss.toString()) : XmlJsonUtil.removeEmptyStructJson(ss.toString()); + + if (format == Format.JSON) { + req = XmlJsonUtil.removeLastCommaJson(req); + } + + long t2 = System.currentTimeMillis(); + log.info("Building {} completed. Time: {}", format, (t2 - t1)); + + return req; + } + + protected String expandRepeats(RestConfContext ctx, String template, int level) throws Exception { + StringBuilder newTemplate = new StringBuilder(); + int k = 0; + while (k < template.length()) { + int i1 = template.indexOf("${repeat:", k); + if (i1 < 0) { + newTemplate.append(template.substring(k)); + break; + } + + int i2 = template.indexOf(':', i1 + 9); + if (i2 < 0) { + throw new Exception( + "Template error: Context variable name followed by : is required after repeat"); + } + + // Find the closing }, store in i3 + int nn = 1; + int i3 = -1; + int i = i2; + while (nn > 0 && i < template.length()) { + i3 = template.indexOf('}', i); + if (i3 < 0) { + throw new Exception("Template error: Matching } not found"); + } + int i32 = template.indexOf('{', i); + if (i32 >= 0 && i32 < i3) { + nn++; + i = i32 + 1; + } else { + nn--; + i = i3 + 1; + } + } + + String var1 = template.substring(i1 + 9, i2); + String value1 = ctx.getAttribute(var1); + log.info(" {}:{}", var1, value1); + int n = 0; + try { + n = Integer.parseInt(value1); + } catch (NumberFormatException e) { + log.info("value1 not set or not a number, n will remain set at zero"); + } + + newTemplate.append(template.substring(k, i1)); + + String rpt = template.substring(i2 + 1, i3); + + for (int ii = 0; ii < n; ii++) { + String ss = rpt.replaceAll("\\[\\$\\{" + level + "\\}\\]", "[" + ii + "]"); + if (ii == n - 1 && ss.trim().endsWith(",")) { + int i4 = ss.lastIndexOf(','); + if (i4 > 0) { + ss = ss.substring(0, i4) + ss.substring(i4 + 1); + } + } + newTemplate.append(ss); + } + + k = i3 + 1; + } + + if (k == 0) { + return newTemplate.toString(); + } + + return expandRepeats(ctx, newTemplate.toString(), level + 1); + } + + protected String readFile(String fileName) throws Exception { + try { + byte[] encoded = Files.readAllBytes(Paths.get(fileName)); + return new String(encoded, "UTF-8"); + } catch (IOException | SecurityException e) { + throw new Exception("Unable to read file " + fileName + e.getLocalizedMessage(), e); + } + } + + protected Client addAuthType(Client client, Parameters p) throws Exception { + if (p.authtype == AuthType.Unspecified) { + if (p.restapiUser != null && p.restapiPassword != null) { + client.addFilter(new HTTPBasicAuthFilter(p.restapiUser, p.restapiPassword)); + } else if (p.oAuthConsumerKey != null && p.oAuthConsumerSecret != null + && p.oAuthSignatureMethod != null) { + OAuthParameters params = new OAuthParameters() + .signatureMethod(p.oAuthSignatureMethod) + .consumerKey(p.oAuthConsumerKey) + .version(p.oAuthVersion); + + OAuthSecrets secrets = new OAuthSecrets() + .consumerSecret(p.oAuthConsumerSecret); + client.addFilter(new OAuthClientFilter(client.getProviders(), params, secrets)); + } + } else { + if (p.authtype == AuthType.DIGEST) { + if (p.restapiUser != null && p.restapiPassword != null) { + client.addFilter(new HTTPDigestAuthFilter(p.restapiUser, p.restapiPassword)); + } else { + throw new Exception("oAUTH authentication type selected but all restapiUser and restapiPassword " + + "parameters doesn't exist", new Throwable()); + } + } else if (p.authtype == AuthType.BASIC) { + if (p.restapiUser != null && p.restapiPassword != null) { + client.addFilter(new HTTPBasicAuthFilter(p.restapiUser, p.restapiPassword)); + } else { + throw new Exception("oAUTH authentication type selected but all restapiUser and restapiPassword " + + "parameters doesn't exist", new Throwable()); + } + } else if (p.authtype == AuthType.OAUTH) { + if (p.oAuthConsumerKey != null && p.oAuthConsumerSecret != null && p.oAuthSignatureMethod != null) { + OAuthParameters params = new OAuthParameters() + .signatureMethod(p.oAuthSignatureMethod) + .consumerKey(p.oAuthConsumerKey) + .version(p.oAuthVersion); + + OAuthSecrets secrets = new OAuthSecrets() + .consumerSecret(p.oAuthConsumerSecret); + client.addFilter(new OAuthClientFilter(client.getProviders(), params, secrets)); + } else { + throw new Exception("oAUTH authentication type selected but all oAuthConsumerKey, oAuthConsumerSecret " + + "and oAuthSignatureMethod parameters doesn't exist", new Throwable()); + } + } + } + return client; + } + + protected HttpResponse sendHttpRequest(String request, Parameters p) throws Exception { + + ClientConfig config = new DefaultClientConfig(); + SSLContext ssl = null; + if (p.ssl && p.restapiUrl.startsWith("https")) { + ssl = createSSLContext(p); + } + if (ssl != null) { + HostnameVerifier hostnameVerifier = (hostname, session) -> true; + + config.getProperties().put(HTTPSProperties.PROPERTY_HTTPS_PROPERTIES, + new HTTPSProperties(hostnameVerifier, ssl)); + } + + logProperties(config.getProperties()); + + Client client = Client.create(config); + client.setConnectTimeout(5000); + WebResource webResource = addAuthType(client, p).resource(p.restapiUrl); + + log.info("Sending request:"); + log.info(request); + long t1 = System.currentTimeMillis(); + + HttpResponse r = new HttpResponse(); + r.code = 200; + + if (!p.skipSending) { + String tt = p.format == Format.XML ? "application/xml" : "application/json"; + String tt1 = tt + ";charset=UTF-8"; + if (p.contentType != null) { + tt = p.contentType; + tt1 = p.contentType; + } + + WebResource.Builder webResourceBuilder = webResource.accept(tt).type(tt1); + if (p.format == Format.NONE) { + webResourceBuilder = webResource.header("", ""); + } + + if (p.customHttpHeaders != null && p.customHttpHeaders.length() > 0) { + String[] keyValuePairs = p.customHttpHeaders.split(","); + for (String singlePair : keyValuePairs) { + int equalPosition = singlePair.indexOf('='); + webResourceBuilder.header(singlePair.substring(0, equalPosition), + singlePair.substring(equalPosition + 1, singlePair.length())); + } + } + + webResourceBuilder.header("X-ECOMP-RequestID", org.slf4j.MDC.get("X-ECOMP-RequestID")); + + ClientResponse response; + + try { + response = webResourceBuilder.method(p.httpMethod.toString(), ClientResponse.class, request); + } catch (UniformInterfaceException | ClientHandlerException e) { + throw new Exception("Exception while sending http request to client " + + e.getLocalizedMessage(), e); + } + + r.code = response.getStatus(); + r.headers = response.getHeaders(); + EntityTag etag = response.getEntityTag(); + if (etag != null) { + r.message = etag.getValue(); + } + if (response.hasEntity() && r.code != 204) { + r.body = response.getEntity(String.class); + } + } + + long t2 = System.currentTimeMillis(); + log.info("Response received. Time: {}", (t2 - t1)); + log.info("HTTP response code: {}", r.code); + log.info("HTTP response message: {}", r.message); + logHeaders(r.headers); + log.info("HTTP response: {}", r.body); + + return r; + } + + protected void setFailureResponseStatus(RestConfContext ctx, String prefix, String errorMessage, + HttpResponse resp) { + resp.code = 500; + resp.message = errorMessage; + String pp = prefix != null ? prefix + '.' : ""; + ctx.setAttribute(pp + "response-code", String.valueOf(resp.code)); + ctx.setAttribute(pp + "response-message", resp.message); + } + + protected void setResponseStatus(RestConfContext ctx, String prefix, HttpResponse r) { + String pp = prefix != null ? prefix + '.' : ""; + ctx.setAttribute(pp + "response-code", String.valueOf(r.code)); + ctx.setAttribute(pp + "response-message", r.message); + } + + protected SSLContext createSSLContext(Parameters p) { + try (FileInputStream in = new FileInputStream(p.keyStoreFileName)) { + System.setProperty("jsse.enableSNIExtension", "false"); + System.setProperty("javax.net.ssl.trustStore", p.trustStoreFileName); + System.setProperty("javax.net.ssl.trustStorePassword", p.trustStorePassword); + + HttpsURLConnection.setDefaultHostnameVerifier((string, ssls) -> true); + + KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()); + KeyStore ks = KeyStore.getInstance("PKCS12"); + char[] pwd = p.keyStorePassword.toCharArray(); + ks.load(in, pwd); + kmf.init(ks, pwd); + + SSLContext ctx = SSLContext.getInstance("TLS"); + ctx.init(kmf.getKeyManagers(), null, null); + return ctx; + } catch (Exception e) { + log.error("Error creating SSLContext: {}", e.getMessage(), e); + } + return null; + } + + protected void logProperties(Map mm) { + List ll = new ArrayList<>(); + for (Object o : mm.keySet()) + ll.add((String) o); + Collections.sort(ll); + + log.info("Properties:"); + for (String name : ll) + log.info("--- {}:{}", name, String.valueOf(mm.get(name))); + } + + protected void logHeaders(MultivaluedMap mm) { + log.info("HTTP response headers:"); + + if (mm == null) { + return; + } + + List ll = new ArrayList<>(); + for (Object o : mm.keySet()) + ll.add((String) o); + Collections.sort(ll); + + for (String name : ll) + log.info("--- {}:{}", name, String.valueOf(mm.get(name))); + } + + private static class FileParam { + + public String fileName; + public String url; + public String user; + public String password; + public HttpMethod httpMethod; + public String responsePrefix; + public boolean skipSending; + public String oAuthConsumerKey; + public String oAuthConsumerSecret; + public String oAuthSignatureMethod; + public String oAuthVersion; + public AuthType authtype; + } + +} \ No newline at end of file diff --git a/src/main/java/org/onap/dcae/collectors/restconf/common/RetryException.java b/src/main/java/org/onap/dcae/collectors/restconf/common/RetryException.java new file mode 100755 index 0000000..91c0a9c --- /dev/null +++ b/src/main/java/org/onap/dcae/collectors/restconf/common/RetryException.java @@ -0,0 +1,27 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.collectors.restconf + * ================================================================================ + * Copyright (C) 2018 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcae.collectors.restconf.common; + +public class RetryException extends Exception { + public RetryException(String message) { + super(message); + } +} diff --git a/src/main/java/org/onap/dcae/collectors/restconf/common/RetryPolicy.java b/src/main/java/org/onap/dcae/collectors/restconf/common/RetryPolicy.java new file mode 100755 index 0000000..7bc0759 --- /dev/null +++ b/src/main/java/org/onap/dcae/collectors/restconf/common/RetryPolicy.java @@ -0,0 +1,58 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.collectors.restconf + * ================================================================================ + * Copyright (C) 2018 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcae.collectors.restconf.common; + +public class RetryPolicy { + private String[] hostnames; + private Integer maximumRetries; + + public Integer getMaximumRetries() { + return maximumRetries; + } + + public String getNextHostName(String uri) throws RetryException { + Integer position = null; + + for (int i = 0; i < hostnames.length; i++) { + if (uri.contains(hostnames[i])) { + position = i; + break; + } + } + + if (position == null) { + throw new RetryException("No match found for the provided uri[" + uri + "] " + + "so the next host name could not be retreived"); + } + position++; + + if (position > hostnames.length - 1) { + position = 0; + } + return hostnames[position]; + } + + public RetryPolicy(String[] hostnames, Integer maximumRetries) { + this.hostnames = hostnames; + this.maximumRetries = maximumRetries; + } + +} diff --git a/src/main/java/org/onap/dcae/collectors/restconf/common/RetryPolicyStore.java b/src/main/java/org/onap/dcae/collectors/restconf/common/RetryPolicyStore.java new file mode 100755 index 0000000..10b0e00 --- /dev/null +++ b/src/main/java/org/onap/dcae/collectors/restconf/common/RetryPolicyStore.java @@ -0,0 +1,53 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.collectors.restconf + * ================================================================================ + * Copyright (C) 2018 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcae.collectors.restconf.common; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; + +public class RetryPolicyStore { + private static final Logger log = LoggerFactory.getLogger(RetryPolicyStore.class); + + HashMap retryPolicies; + public String proxyServers; + + public String getProxyServers() { + return proxyServers; + } + + public void setProxyServers(String admServers) { + this.proxyServers = admServers; + String[] adminServersArray = admServers.split(","); + RetryPolicy adminPortalRetry = new RetryPolicy(adminServersArray, adminServersArray.length); + retryPolicies.put("dme2proxy", adminPortalRetry); + } + + public RetryPolicyStore() { + retryPolicies = new HashMap<>(); + } + + public RetryPolicy getRetryPolicy(String policyName) { + return (this.retryPolicies.get(policyName)); + } + +} diff --git a/src/main/java/org/onap/dcae/collectors/restconf/common/XmlJsonUtil.java b/src/main/java/org/onap/dcae/collectors/restconf/common/XmlJsonUtil.java new file mode 100755 index 0000000..382ef4d --- /dev/null +++ b/src/main/java/org/onap/dcae/collectors/restconf/common/XmlJsonUtil.java @@ -0,0 +1,412 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.collectors.restconf + * ================================================================================ + * Copyright (C) 2018 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcae.collectors.restconf.common; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class XmlJsonUtil { + + private static final Logger log = LoggerFactory.getLogger(XmlJsonUtil.class); + + private XmlJsonUtil() { + // Preventing instantiation of the same. + } + + public static String getXml(Map varmap, String var) { + boolean escape = true; + if (var.startsWith("'")) { + var = var.substring(1); + escape = false; + } + + Object o = createStructure(varmap, var); + return generateXml(o, 0, escape); + } + + public static String getJson(Map varmap, String var) { + boolean escape = true; + if (var.startsWith("'")) { + var = var.substring(1); + escape = false; + } + + boolean quotes = true; + if (var.startsWith("\"")) { + var = var.substring(1); + quotes = false; + } + + Object o = createStructure(varmap, var); + return generateJson(o, escape, quotes); + } + + private static Object createStructure(Map flatmap, String var) { + if (flatmap.containsKey(var)) { + if (var.endsWith("_length") || var.endsWith("].key")) { + return null; + } + return flatmap.get(var); + } + + Map mm = new HashMap<>(); + for (String k : flatmap.keySet()) + if (k.startsWith(var + ".")) { + int i1 = k.indexOf('.', var.length() + 1); + int i2 = k.indexOf('[', var.length() + 1); + int i3 = k.length(); + if (i1 > 0 && i1 < i3) { + i3 = i1; + } + if (i2 > 0 && i2 < i3) { + i3 = i2; + } + String k1 = k.substring(var.length() + 1, i3); + String var1 = k.substring(0, i3); + if (!mm.containsKey(k1)) { + Object str = createStructure(flatmap, var1); + if (str != null && (!(str instanceof String) || ((String) str).trim().length() > 0)) { + mm.put(k1, str); + } + } + } + if (!mm.isEmpty()) { + return mm; + } + + boolean arrayFound = false; + for (String k : flatmap.keySet()) + if (k.startsWith(var + "[")) { + arrayFound = true; + break; + } + + if (arrayFound) { + List ll = new ArrayList<>(); + + int length = Integer.MAX_VALUE; + String lengthStr = flatmap.get(var + "_length"); + if (lengthStr != null) { + try { + length = Integer.parseInt(lengthStr); + } catch (Exception e) { + log.warn("Invalid number for {}_length:{}", var, lengthStr, e); + } + } + + for (int i = 0; i < length; i++) { + Object v = createStructure(flatmap, var + '[' + i + ']'); + if (v == null) { + break; + } + ll.add(v); + } + + if (!ll.isEmpty()) { + return ll; + } + } + + return null; + } + + @SuppressWarnings("unchecked") + private static String generateXml(Object o, int indent, boolean escape) { + if (o == null) { + return null; + } + + if (o instanceof String) { + return escape ? escapeXml((String) o) : (String) o; + } + ; + + if (o instanceof Map) { + StringBuilder ss = new StringBuilder(); + Map mm = (Map) o; + for (Map.Entry entry : mm.entrySet()) { + Object v = entry.getValue(); + String key = entry.getKey(); + if (v instanceof String) { + String s = escape ? escapeXml((String) v) : (String) v; + ss.append(pad(indent)).append('<').append(key).append('>'); + ss.append(s); + ss.append("').append('\n'); + } else if (v instanceof Map) { + ss.append(pad(indent)).append('<').append(key).append('>').append('\n'); + ss.append(generateXml(v, indent + 1, escape)); + ss.append(pad(indent)).append("').append('\n'); + } else if (v instanceof List) { + List ll = (List) v; + for (Object o1 : ll) { + ss.append(pad(indent)).append('<').append(key).append('>').append('\n'); + ss.append(generateXml(o1, indent + 1, escape)); + ss.append(pad(indent)).append("').append('\n'); + } + } + } + return ss.toString(); + } + + return null; + } + + private static String generateJson(Object o, boolean escape, boolean quotes) { + if (o == null) { + return null; + } + + StringBuilder ss = new StringBuilder(); + generateJson(ss, o, 0, false, escape, quotes); + return ss.toString(); + } + + @SuppressWarnings("unchecked") + private static void generateJson(StringBuilder ss, Object o, int indent, boolean padFirst, boolean escape, boolean quotes) { + if (o instanceof String) { + String s = escape ? escapeJson((String) o) : (String) o; + if (padFirst) { + ss.append(pad(indent)); + } + if (quotes) { + ss.append('"').append(s).append('"'); + } else { + ss.append(s); + } + return; + } + + if (o instanceof Map) { + Map mm = (Map) o; + + if (padFirst) { + ss.append(pad(indent)); + } + ss.append("{\n"); + + boolean first = true; + for (Map.Entry entry : mm.entrySet()) { + if (!first) { + ss.append(",\n"); + } + first = false; + Object v = entry.getValue(); + String key = entry.getKey(); + ss.append(pad(indent + 1)).append('"').append(key).append("\": "); + generateJson(ss, v, indent + 1, false, escape, true); + } + + ss.append("\n"); + ss.append(pad(indent)).append('}'); + + return; + } + + if (o instanceof List) { + List ll = (List) o; + + if (padFirst) { + ss.append(pad(indent)); + } + ss.append("[\n"); + + boolean first = true; + for (Object o1 : ll) { + if (!first) { + ss.append(",\n"); + } + first = false; + + generateJson(ss, o1, indent + 1, true, escape, quotes); + } + + ss.append("\n"); + ss.append(pad(indent)).append(']'); + } + } + + public static String removeLastCommaJson(String s) { + StringBuilder sb = new StringBuilder(); + int k = 0; + int start = 0; + while (k < s.length()) { + int i11 = s.indexOf('}', k); + int i12 = s.indexOf(']', k); + int i1 = -1; + if (i11 < 0) { + i1 = i12; + } else if (i12 < 0) { + i1 = i11; + } else { + i1 = i11 < i12 ? i11 : i12; + } + if (i1 < 0) { + break; + } + + int i2 = s.lastIndexOf(',', i1); + if (i2 < 0) { + k = i1 + 1; + continue; + } + + String between = s.substring(i2 + 1, i1); + if (between.trim().length() > 0) { + k = i1 + 1; + continue; + } + + sb.append(s.substring(start, i2)); + start = i2 + 1; + k = i1 + 1; + } + + sb.append(s.substring(start, s.length())); + + return sb.toString(); + } + + public static String removeEmptyStructJson(String s) { + int k = 0; + while (k < s.length()) { + boolean curly = true; + int i11 = s.indexOf('{', k); + int i12 = s.indexOf('[', k); + int i1 = -1; + if (i11 < 0) { + i1 = i12; + curly = false; + } else if (i12 < 0) { + i1 = i11; + } else if (i11 < i12) { + i1 = i11; + } else { + i1 = i12; + curly = false; + } + + if (i1 >= 0) { + int i2 = curly ? s.indexOf('}', i1) : s.indexOf(']', i1); + if (i2 > 0) { + String value = s.substring(i1 + 1, i2); + if (value.trim().length() == 0) { + int i4 = s.lastIndexOf('\n', i1); + if (i4 < 0) { + i4 = 0; + } + int i5 = s.indexOf('\n', i2); + if (i5 < 0) { + i5 = s.length(); + } + + s = s.substring(0, i4) + s.substring(i5); + k = 0; + } else { + k = i1 + 1; + } + } else { + break; + } + } else { + break; + } + } + + return s; + } + + public static String removeEmptyStructXml(String s) { + int k = 0; + while (k < s.length()) { + int i1 = s.indexOf('<', k); + if (i1 < 0 || i1 == s.length() - 1) { + break; + } + + char c1 = s.charAt(i1 + 1); + if (c1 == '?' || c1 == '!') { + k = i1 + 2; + continue; + } + + int i2 = s.indexOf('>', i1); + if (i2 < 0) { + k = i1 + 1; + continue; + } + + String closingTag = " 0) { + k = i2 + 1; + continue; + } + + int i4 = s.lastIndexOf('\n', i1); + if (i4 < 0) { + i4 = 0; + } + int i5 = s.indexOf('\n', i3); + if (i5 < 0) { + i5 = s.length(); + } + + s = s.substring(0, i4) + s.substring(i5); + k = 0; + } + + return s; + } + + private static String escapeXml(String v) { + String s = v.replaceAll("&", "&"); + s = s.replaceAll("<", "<"); + s = s.replaceAll("'", "'"); + s = s.replaceAll("\"", """); + s = s.replaceAll(">", ">"); + return s; + } + + private static String escapeJson(String v) { + String s = v.replaceAll("\\\\", "\\\\\\\\"); + s = s.replaceAll("\"", "\\\\\""); + return s; + } + + private static String pad(int n) { + StringBuilder s = new StringBuilder(); + for (int i = 0; i < n; i++) + s.append(Character.toString('\t')); + return s.toString(); + } +} + diff --git a/src/main/java/org/onap/dcae/collectors/restconf/common/XmlParser.java b/src/main/java/org/onap/dcae/collectors/restconf/common/XmlParser.java new file mode 100755 index 0000000..e073cd6 --- /dev/null +++ b/src/main/java/org/onap/dcae/collectors/restconf/common/XmlParser.java @@ -0,0 +1,178 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.collectors.restconf + * ================================================================================ + * Copyright (C) 2018 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcae.collectors.restconf.common; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.xml.sax.Attributes; +import org.xml.sax.SAXException; +import org.xml.sax.helpers.DefaultHandler; + +import javax.xml.parsers.ParserConfigurationException; +import javax.xml.parsers.SAXParser; +import javax.xml.parsers.SAXParserFactory; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import static com.google.common.base.Preconditions.checkNotNull; + +public class XmlParser { + + private static final Logger log = LoggerFactory.getLogger(XmlParser.class); + + private XmlParser() { + // Preventing instantiation of the same. + } + + public static Map convertToProperties(String s, Set listNameList) + throws Exception { + + checkNotNull(s, "Input should not be null."); + + Handler handler = new Handler(listNameList); + try { + SAXParserFactory factory = SAXParserFactory.newInstance(); + SAXParser saxParser = factory.newSAXParser(); + InputStream in = new ByteArrayInputStream(s.getBytes()); + saxParser.parse(in, handler); + } catch (ParserConfigurationException | IOException | SAXException | NumberFormatException e) { + throw new Exception("Unable to convert XML to properties" + e.getLocalizedMessage(), e); + } + return handler.getProperties(); + } + + private static class Handler extends DefaultHandler { + + private Set listNameList; + + private Map properties = new HashMap<>(); + + public Map getProperties() { + return properties; + } + + public Handler(Set listNameList) { + super(); + this.listNameList = listNameList; + if (this.listNameList == null) { + this.listNameList = new HashSet<>(); + } + } + + StringBuilder currentName = new StringBuilder(); + StringBuilder currentValue = new StringBuilder(); + + @Override + public void startElement(String uri, String localName, String qName, Attributes attributes) + throws SAXException { + super.startElement(uri, localName, qName, attributes); + + String name = localName; + if (name == null || name.trim().length() == 0) { + name = qName; + } + int i2 = name.indexOf(':'); + if (i2 >= 0) { + name = name.substring(i2 + 1); + } + + if (currentName.length() > 0) { + currentName.append(Character.toString('.')); + } + currentName.append(name); + + String listName = removeIndexes(currentName.toString()); + + if (listNameList.contains(listName)) { + String n = currentName.toString() + "_length"; + int len = getInt(properties, n); + properties.put(n, String.valueOf(len + 1)); + currentName.append("[").append(len).append("]"); + } + } + + @Override + public void endElement(String uri, String localName, String qName) throws SAXException { + super.endElement(uri, localName, qName); + + String name = localName; + if (name == null || name.trim().length() == 0) { + name = qName; + } + int i2 = name.indexOf(':'); + if (i2 >= 0) { + name = name.substring(i2 + 1); + } + + String s = currentValue.toString().trim(); + if (s.length() > 0) { + properties.put(currentName.toString(), s); + + log.info("Added property: {} : {}", currentName, s); + currentValue = new StringBuilder(); + } + + int i1 = currentName.lastIndexOf("." + name); + if (i1 <= 0) { + currentName = new StringBuilder(); + } else { + currentName = new StringBuilder(currentName.substring(0, i1)); + } + } + + @Override + public void characters(char[] ch, int start, int length) throws SAXException { + super.characters(ch, start, length); + + String value = new String(ch, start, length); + currentValue.append(value); + } + + private static int getInt(Map mm, String name) { + String s = mm.get(name); + if (s == null) { + return 0; + } + return Integer.parseInt(s); + } + + private String removeIndexes(String currentName) { + StringBuilder b = new StringBuilder(); + boolean add = true; + for (int i = 0; i < currentName.length(); i++) { + char c = currentName.charAt(i); + if (c == '[') { + add = false; + } else if (c == ']') { + add = true; + } else if (add) { + b.append(Character.toString(c)); + } + } + return b.toString(); + } + } +} diff --git a/src/main/java/org/onap/dcae/collectors/restconf/common/event/publishing/DMaaPConfigurationParser.java b/src/main/java/org/onap/dcae/collectors/restconf/common/event/publishing/DMaaPConfigurationParser.java new file mode 100755 index 0000000..04271e4 --- /dev/null +++ b/src/main/java/org/onap/dcae/collectors/restconf/common/event/publishing/DMaaPConfigurationParser.java @@ -0,0 +1,107 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.collectors.restconf + * ================================================================================ + * Copyright (C) 2018 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcae.collectors.restconf.common.event.publishing; + +import io.vavr.collection.List; +import io.vavr.collection.Map; +import io.vavr.control.Option; +import io.vavr.control.Try; +import org.onap.dcae.collectors.restconf.common.AnyNode; + +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Path; + +import static io.vavr.API.List; +import static io.vavr.API.Try; +import static io.vavr.API.Tuple; +import static io.vavr.API.unchecked; +import static org.onap.dcae.collectors.restconf.common.event.publishing.VavrUtils.enhanceError; +import static org.onap.dcae.collectors.restconf.common.event.publishing.VavrUtils.f; + +public class DMaaPConfigurationParser { + + public static Try> parseToDomainMapping(Path configLocation) { + return readFromFile(configLocation) + .flatMap(DMaaPConfigurationParser::toJSON) + .flatMap(DMaaPConfigurationParser::toConfigMap); + } + + private static Try readFromFile(Path configLocation) { + return Try(() -> new String(Files.readAllBytes(configLocation))) + .mapFailure(enhanceError(f("Could not read DMaaP configuration from location: '%s'", configLocation))); + } + + private static Try toJSON(String config) { + return Try(() -> AnyNode.fromString(config)) + .mapFailure(enhanceError(f("DMaaP configuration '%s' is not a valid JSON document", config))); + } + + private static Try> toConfigMap(AnyNode config) { + return Try(() -> usesLegacyFormat(config) ? parseLegacyFormat(config) : parseNewFormat(config)) + .mapFailure(enhanceError( + f("Parsing DMaaP configuration: '%s' failed, probably it is in unexpected format", config))); + } + + private static boolean usesLegacyFormat(AnyNode dMaaPConfig) { + return dMaaPConfig.has("channels"); + } + + private static Map parseLegacyFormat(AnyNode root) { + return root.get("channels").toList().toMap( + channel -> channel.get("name").toString(), + channel -> { + String destinationsStr = channel.getAsOption("cambria.url") + .getOrElse(channel.getAsOption("cambria.hosts").get()) + .toString(); + String topic = channel.get("cambria.topic").toString(); + Option maybeUser = channel.getAsOption("basicAuthUsername").map(AnyNode::toString); + Option maybePassword = channel.getAsOption("basicAuthPassword").map(AnyNode::toString); + List destinations = List(destinationsStr.split(",")); + return buildBasedOnAuth(maybeUser, maybePassword, topic, destinations); + }); + } + + private static Map parseNewFormat(AnyNode root) { + return root.keys().toMap( + channelName -> channelName, + channelName -> { + AnyNode channelConfig = root.get(channelName); + Option maybeUser = channelConfig.getAsOption("aaf_username").map(AnyNode::toString); + Option maybePassword = channelConfig.getAsOption("aaf_password").map(AnyNode::toString); + URL topicURL = unchecked( + () -> new URL(channelConfig.get("dmaap_info").get("topic_url").toString())).apply(); + String[] pathSegments = topicURL.getPath().substring(1).split("/"); + String topic = pathSegments[1]; + String destination = "events".equals(pathSegments[0]) ? topicURL.getAuthority() : topicURL.getHost(); + List destinations = List(destination); + return buildBasedOnAuth(maybeUser, maybePassword, topic, destinations); + }); + } + + private static PublisherConfig buildBasedOnAuth(Option maybeUser, Option maybePassword, + String topic, List destinations) { + return maybeUser.flatMap(user -> maybePassword.map(password -> Tuple(user, password))) + .map(credentials -> new PublisherConfig(destinations, topic, credentials._1, credentials._2)) + .getOrElse(new PublisherConfig(destinations, topic)); + } + +} diff --git a/src/main/java/org/onap/dcae/collectors/restconf/common/event/publishing/DMaaPEventPublisher.java b/src/main/java/org/onap/dcae/collectors/restconf/common/event/publishing/DMaaPEventPublisher.java new file mode 100755 index 0000000..36e950f --- /dev/null +++ b/src/main/java/org/onap/dcae/collectors/restconf/common/event/publishing/DMaaPEventPublisher.java @@ -0,0 +1,75 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.collectors.restconf + * ================================================================================ + * Copyright (C) 2018 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcae.collectors.restconf.common.event.publishing; + +import com.att.nsa.cambria.client.CambriaBatchingPublisher; +import io.vavr.control.Try; +import org.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +public class DMaaPEventPublisher implements EventPublisher { + + private static final int PENDING_MESSAGE_LOG_THRESHOLD = 100; + private static final Logger log = LoggerFactory.getLogger(DMaaPEventPublisher.class); + private final DMaaPPublishersCache publishersCache; + private final Logger outputLogger; + + DMaaPEventPublisher(DMaaPPublishersCache DMaaPPublishersCache, + Logger outputLogger) { + this.publishersCache = DMaaPPublishersCache; + this.outputLogger = outputLogger; + } + + @Override + public void sendEvent(JSONObject event, String domain) { + publishersCache.getPublisher(domain) + .onEmpty(() -> + log.warn(VavrUtils.f("Could not find event publisher for domain: '%s', dropping message: '%s'", domain, event))) + .forEach(publisher -> sendEvent(event, domain, publisher)); + } + + private void sendEvent(JSONObject event, String domain, CambriaBatchingPublisher publisher) { + Try.run(() -> uncheckedSendEvent(event, domain, publisher)) + .onFailure(exc -> closePublisher(event, domain, exc)); + } + + private void uncheckedSendEvent(JSONObject event, String domain, CambriaBatchingPublisher publisher) + throws IOException { + System.out.println("printing publisher information" + publisher); + int pendingMsgs = publisher.send("MyPartitionKey", event.toString()); + if (pendingMsgs > PENDING_MESSAGE_LOG_THRESHOLD) { + log.info("Pending messages count: " + pendingMsgs); + } + String infoMsg = VavrUtils.f("Event: '%s' scheduled to be send asynchronously on domain: '%s'", event, domain); + log.info(infoMsg); + outputLogger.info(infoMsg); + } + + private void closePublisher(JSONObject event, String domain, Throwable e) { + log.error(VavrUtils.f("Unable to schedule event: '%s' on domain: '%s'. Closing publisher and dropping message.", + event, domain), e); + publishersCache.closePublisherFor(domain); + } + +} diff --git a/src/main/java/org/onap/dcae/collectors/restconf/common/event/publishing/DMaaPPublishersBuilder.java b/src/main/java/org/onap/dcae/collectors/restconf/common/event/publishing/DMaaPPublishersBuilder.java new file mode 100755 index 0000000..2f9b3ed --- /dev/null +++ b/src/main/java/org/onap/dcae/collectors/restconf/common/event/publishing/DMaaPPublishersBuilder.java @@ -0,0 +1,63 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.collectors.restconf + * ================================================================================ + * Copyright (C) 2018 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcae.collectors.restconf.common.event.publishing; + +import com.att.nsa.cambria.client.CambriaBatchingPublisher; +import com.att.nsa.cambria.client.CambriaClientBuilders; +import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder; +import io.vavr.control.Try; + +import static io.vavr.API.Try; +import static org.onap.dcae.collectors.restconf.common.event.publishing.VavrUtils.enhanceError; +import static org.onap.dcae.collectors.restconf.common.event.publishing.VavrUtils.f; + +/** + * @author Pawel Szalapski (pawel.szalapski@nokia.com) + */ +final class DMaaPPublishersBuilder { + + @SuppressWarnings("mapFailure takes a generic varargs, unchecked because of Javas type system limitation, actually safe to do") + static Try buildPublisher(PublisherConfig config) { + return Try(() -> builder(config).build()) + .mapFailure(enhanceError(f("DMaaP client builder throws exception for this configuration: '%s'", config))); + } + + private static PublisherBuilder builder(PublisherConfig config) { + if (config.isSecured()) { + return authenticatedBuilder(config); + } else { + return unAuthenticatedBuilder(config); + } + } + + private static PublisherBuilder authenticatedBuilder(PublisherConfig config) { + return unAuthenticatedBuilder(config) + .usingHttps() + .authenticatedByHttp(config.userName().get(), config.password().get()); + } + + private static PublisherBuilder unAuthenticatedBuilder(PublisherConfig config) { + return new CambriaClientBuilders.PublisherBuilder() + .usingHosts(config.destinations().mkString(",")) + .onTopic(config.topic()) + .logSendFailuresAfter(5); + } +} diff --git a/src/main/java/org/onap/dcae/collectors/restconf/common/event/publishing/DMaaPPublishersCache.java b/src/main/java/org/onap/dcae/collectors/restconf/common/event/publishing/DMaaPPublishersCache.java new file mode 100755 index 0000000..acad96d --- /dev/null +++ b/src/main/java/org/onap/dcae/collectors/restconf/common/event/publishing/DMaaPPublishersCache.java @@ -0,0 +1,110 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.collectors.restconf + * ================================================================================ + * Copyright (C) 2018 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcae.collectors.restconf.common.event.publishing; + +import com.att.nsa.cambria.client.CambriaBatchingPublisher; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; +import io.vavr.collection.Map; +import io.vavr.control.Option; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import static io.vavr.API.Option; +import static org.onap.dcae.collectors.restconf.common.event.publishing.VavrUtils.f; + +public class DMaaPPublishersCache { + + private static final Logger log = LoggerFactory.getLogger(DMaaPPublishersCache.class); + private final LoadingCache publishersCache; + private AtomicReference> dMaaPConfiguration; + + DMaaPPublishersCache(Map dMaaPConfiguration) { + this.dMaaPConfiguration = new AtomicReference<>(dMaaPConfiguration); + this.publishersCache = CacheBuilder.newBuilder() + .removalListener(new OnPublisherRemovalListener()) + .build(new CambriaPublishersCacheLoader()); + } + + DMaaPPublishersCache(CambriaPublishersCacheLoader dMaaPPublishersCacheLoader, + OnPublisherRemovalListener onPublisherRemovalListener, + Map dMaaPConfiguration) { + this.dMaaPConfiguration = new AtomicReference<>(dMaaPConfiguration); + this.publishersCache = CacheBuilder.newBuilder() + .removalListener(onPublisherRemovalListener) + .build(dMaaPPublishersCacheLoader); + } + + Option getPublisher(String streamID) { + try { + return Option(publishersCache.getUnchecked(streamID)); + } catch (Exception e) { + log.warn("Could not create / load Cambria Publisher for streamID", e); + return Option.none(); + } + } + + void closePublisherFor(String streamId) { + publishersCache.invalidate(streamId); + } + + static class OnPublisherRemovalListener implements RemovalListener { + + @Override + public void onRemoval(@Nonnull RemovalNotification notification) { + CambriaBatchingPublisher publisher = notification.getValue(); + if (publisher != null) { // The value might get Garbage Collected at this moment, regardless of @Nonnull + try { + int timeout = 20; + TimeUnit unit = TimeUnit.SECONDS; + java.util.List stuck = publisher.close(timeout, unit); + if (!stuck.isEmpty()) { + log.error(f("Publisher got stuck and did not manage to close in '%s' '%s', " + + "%s messages were dropped", stuck.size(), timeout, unit)); + } + } catch (InterruptedException | IOException e) { + log.error("Could not close Cambria publisher, some messages might have been dropped", e); + } + } + } + } + + class CambriaPublishersCacheLoader extends CacheLoader { + + @Override + public CambriaBatchingPublisher load(@Nonnull String domain) { + return dMaaPConfiguration.get() + .get(domain) + .toTry(() -> new RuntimeException( + f("DMaaP configuration contains no configuration for domain: '%s'", domain))) + .flatMap(DMaaPPublishersBuilder::buildPublisher) + .get(); + } + } +} diff --git a/src/main/java/org/onap/dcae/collectors/restconf/common/event/publishing/EventPublisher.java b/src/main/java/org/onap/dcae/collectors/restconf/common/event/publishing/EventPublisher.java new file mode 100755 index 0000000..3d339b3 --- /dev/null +++ b/src/main/java/org/onap/dcae/collectors/restconf/common/event/publishing/EventPublisher.java @@ -0,0 +1,35 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.collectors.restconf + * ================================================================================ + * Copyright (C) 2018 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.restconf.common.event.publishing; + + +import io.vavr.collection.Map; +import org.json.JSONObject; +import org.slf4j.Logger; + +public interface EventPublisher { + + static EventPublisher createPublisher(Logger outputLogger, Map dMaaPConfig) { + return new DMaaPEventPublisher(new DMaaPPublishersCache(dMaaPConfig), outputLogger); + } + + void sendEvent(JSONObject event, String domain); + +} diff --git a/src/main/java/org/onap/dcae/collectors/restconf/common/event/publishing/PublisherConfig.java b/src/main/java/org/onap/dcae/collectors/restconf/common/event/publishing/PublisherConfig.java new file mode 100755 index 0000000..33fa15b --- /dev/null +++ b/src/main/java/org/onap/dcae/collectors/restconf/common/event/publishing/PublisherConfig.java @@ -0,0 +1,95 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.collectors.restconf + * ================================================================================ + * Copyright (C) 2018 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.restconf.common.event.publishing; + +import io.vavr.collection.List; +import io.vavr.control.Option; + +import java.util.Objects; + +public class PublisherConfig { + private final List destinations; + private final String topic; + private String userName; + private String password; + + PublisherConfig(List destinations, String topic) { + this.destinations = destinations; + this.topic = topic; + } + + PublisherConfig(List destinations, String topic, String userName, String password) { + this.destinations = destinations; + this.topic = topic; + this.userName = userName; + this.password = password; + } + + List destinations() { + return destinations; + } + + String topic() { + return topic; + } + + Option userName() { + return Option.of(userName); + } + + Option password() { + return Option.of(password); + } + + boolean isSecured() { + return userName().isDefined() && password().isDefined(); + } + + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + PublisherConfig that = (PublisherConfig) o; + return Objects.equals(destinations, that.destinations) && + Objects.equals(topic, that.topic) && + Objects.equals(userName, that.userName) && + Objects.equals(password, that.password); + } + + @Override + public int hashCode() { + return Objects.hash(destinations, topic, userName, password); + } + + @Override + public String toString() { + return "PublisherConfig{" + + "destinations=" + destinations + + ", topic='" + topic + '\'' + + ", userName='" + userName + '\'' + + ", password='" + password + '\'' + + '}'; + } +} diff --git a/src/main/java/org/onap/dcae/collectors/restconf/common/event/publishing/VavrUtils.java b/src/main/java/org/onap/dcae/collectors/restconf/common/event/publishing/VavrUtils.java new file mode 100755 index 0000000..77a3052 --- /dev/null +++ b/src/main/java/org/onap/dcae/collectors/restconf/common/event/publishing/VavrUtils.java @@ -0,0 +1,48 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.collectors.restconf + * ================================================================================ + * Copyright (C) 2018 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcae.collectors.restconf.common.event.publishing; + +import io.vavr.API; +import io.vavr.API.Match.Case; + +import static io.vavr.API.$; + + +public class VavrUtils { + private VavrUtils() { + // utils aggregator + } + + /** + * Shortcut for 'string interpolation' + */ + static String f(String msg, Object... args) { + return String.format(msg, args); + } + + /** + * Wrap failure with a more descriptive message of what has failed and chain original cause. Used to provide a + * context for errors instead of raw exception. + */ + static Case enhanceError(String msg) { + return API.Case($(), e -> new RuntimeException(msg, e)); + } +} diff --git a/src/main/scripts/docker_entry.sh b/src/main/scripts/docker_entry.sh new file mode 100755 index 0000000..78b83f6 --- /dev/null +++ b/src/main/scripts/docker_entry.sh @@ -0,0 +1,6 @@ +#!/bin/sh + +echo "INFO: USING RESTCONF CONTROLLER" + +/opt/app/restconfcollector/bin/restConfCollector.sh stop +/opt/app/restconfcollector/bin/restConfCollector.sh start \ No newline at end of file diff --git a/src/main/scripts/restConfCollector.sh b/src/main/scripts/restConfCollector.sh new file mode 100755 index 0000000..88893c0 --- /dev/null +++ b/src/main/scripts/restConfCollector.sh @@ -0,0 +1,100 @@ +#!/bin/sh + +usage() { + echo "restConfCollector.sh " +} + +BASEDIR=/opt/app/restconfcollector +rm -rf /opt/app/restconfcollector/logs +mkdir /opt/app/restconfcollector/logs +cd /opt/app/restconfcollector/logs +touch console.txt +cd - + +restConfCollector_start() { + echo `date +"%Y%m%d.%H%M%S%3N"` - restConfCollector_start | tee -a ${BASEDIR}/logs/console.txt + collectorPid=`pgrep -f org.onap.restconf.common` + + if [ ! -z "$collectorPid" ]; then + echo "WARNING: restConf Collector already running as PID $collectorPid" | tee -a ${BASEDIR}/logs/console.txt + echo "Startup Aborted!!!" | tee -a ${BASEDIR}/logs/console.txt + exit 1 + fi + + + # run java. The classpath is the etc dir for config files, and the lib dir + # for all the jars. + + cd ${BASEDIR} + echo "192.168.17.11 onap-message-router" >> /etc/hosts + nohup $JAVA -cp "etc${PATHSEP}lib/*" $JAVA_OPTS -Dhttps.protocols=TLSv1.1,TLSv1.2 $MAINCLASS $* & + if [ $? -ne 0 ]; then + echo "restConf Collector has been started!!!" | tee -a ${BASEDIR}/logs/console.txt + fi + + +} + +## Pre-setting +JAVA_HOME=/usr/bin/java + +# use JAVA_HOME if provided +if [ -z "$JAVA_HOME" ]; then + echo "ERROR: JAVA_HOME not setup" + echo "Startup Aborted!!" + exit 1 +else + JAVA=$JAVA_HOME +fi + +MAINCLASS=org.onap.dcae.collectors.restconf.common.RestConfCollector + +# determine a path separator that works for this platform +PATHSEP=":" +case "$(uname -s)" in + + Darwin) + ;; + + Linux) + ;; + + CYGWIN*|MINGW32*|MSYS*) + PATHSEP=";" + ;; + + *) + ;; +esac + +restConfCollector_stop() { + echo `date +"%Y%m%d.%H%M%S%3N"` - collector_stop + collectorPid=`pgrep -f org.onap.dcae.collectors.restconf.common` + if [ ! -z "$collectorPid" ]; then + echo "Stopping PID $collectorPid" + + kill -9 $collectorPid + sleep 5 + if [ ! "$(pgrep -f org.onap.restconf.common)" ]; then + echo "restConf Collector has been stopped!!!" + else + echo "restConf Collector is being stopped!!!" + fi + else + echo "WARNING: No restConf Collector instance is currently running"; + exit 1 + fi + +} + +case $1 in + "start") + restConfCollector_start | tee -a ${BASEDIR}/logs/console.txt + ;; + "stop") + restConfCollector_stop | tee -a ${BASEDIR}/logs/console.txt + ;; + *) + usage + ;; +esac diff --git a/src/test/java/org/onap/dcae/collectors/restconf/common/event/publishing/DMaaPConfigurationParserTest.java b/src/test/java/org/onap/dcae/collectors/restconf/common/event/publishing/DMaaPConfigurationParserTest.java new file mode 100755 index 0000000..1b709bd --- /dev/null +++ b/src/test/java/org/onap/dcae/collectors/restconf/common/event/publishing/DMaaPConfigurationParserTest.java @@ -0,0 +1,111 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.collectors.restconf + * ================================================================================ + * Copyright (C) 2018 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcae.collectors.restconf.common.event.publishing; + +import io.vavr.collection.Map; +import io.vavr.control.Try; +import org.junit.Test; + +import java.nio.file.Path; +import java.nio.file.Paths; + +import static io.vavr.API.List; +import static org.assertj.core.api.Assertions.assertThat; +import static org.onap.dcae.collectors.restconf.common.event.publishing.DMaaPConfigurationParser.parseToDomainMapping; + +public class DMaaPConfigurationParserTest { + @Test + public void testParseCredentialsForGen2() { + Path path = Paths.get("src/test/resources/testParseDMaaPCredentialsGen2.json"); + Try> publisherConfigs = parseToDomainMapping(path); + + PublisherConfig authCredentialsNulls = publisherConfigs.get().get("auth-credentials-null").getOrNull(); + assertThat(authCredentialsNulls.userName().isEmpty()).isTrue(); + assertThat(authCredentialsNulls.password().isEmpty()).isTrue(); + assertThat(authCredentialsNulls.isSecured()).isFalse(); + + PublisherConfig authCredentialsPresent = publisherConfigs.get().get("auth-credentials-present").getOrNull(); + assertThat(authCredentialsPresent.userName().getOrNull()).isEqualTo("sampleUser"); + assertThat(authCredentialsPresent.password().getOrNull()).isEqualTo("samplePassword"); + assertThat(authCredentialsPresent.isSecured()).isTrue(); + + PublisherConfig authCredentialsKeysMissing = publisherConfigs.get().get("auth-credentials-missing").getOrNull(); + assertThat(authCredentialsKeysMissing.userName().isEmpty()).isTrue(); + assertThat(authCredentialsKeysMissing.password().isEmpty()).isTrue(); + assertThat(authCredentialsKeysMissing.isSecured()).isFalse(); + } + + + @Test + public void testParseCredentialsForLegacy() { + Path path = Paths.get("src/test/resources/testParseDMaaPCredentialsLegacy.json"); + Try> publisherConfigs = parseToDomainMapping(path); + + PublisherConfig authCredentialsNull = publisherConfigs.get().get("auth-credentials-null").getOrNull(); + assertThat(authCredentialsNull.userName().isEmpty()).isTrue(); + assertThat(authCredentialsNull.password().isEmpty()).isTrue(); + assertThat(authCredentialsNull.isSecured()).isFalse(); + + PublisherConfig authCredentialsPresent = publisherConfigs.get().get("auth-credentials-present").getOrNull(); + assertThat(authCredentialsPresent.userName().getOrNull()).isEqualTo("sampleUser"); + assertThat(authCredentialsPresent.password().getOrNull()).isEqualTo("samplePassword"); + assertThat(authCredentialsPresent.isSecured()).isTrue(); + + PublisherConfig authCredentialsMissing = publisherConfigs.get().get("auth-credentials-missing").getOrNull(); + assertThat(authCredentialsMissing.userName().isEmpty()).isTrue(); + assertThat(authCredentialsMissing.password().isEmpty()).isTrue(); + assertThat(authCredentialsMissing.isSecured()).isFalse(); + } + + + @Test + public void testParseGen2() { + Path path = Paths.get("src/test/resources/testParseDMaaPGen2.json"); + Try> publisherConfigs = parseToDomainMapping(path); + + PublisherConfig withEventsSegment = publisherConfigs.get().get("event-segments-with-port").getOrNull(); + assertThat(withEventsSegment.destinations()).isEqualTo(List("UEBHOST:3904")); + assertThat(withEventsSegment.topic()).isEqualTo("DCAE-RESTCONF-COLLECTOR-EVENTS-DEV"); + + PublisherConfig withOtherSegment = publisherConfigs.get().get("other-segments-without-ports").getOrNull(); + assertThat(withOtherSegment.destinations()).isEqualTo(List("UEBHOST")); + assertThat(withOtherSegment.topic()).isEqualTo("DCAE-RESTCONF-COLLECTOR-EVENTS-DEV"); + } + + @Test + public void testParseLegacy() { + Path exemplaryConfig = Paths.get("src/test/resources/testParseDMaaPLegacy.json"); + Try> publisherConfigs = + parseToDomainMapping(exemplaryConfig); + + PublisherConfig urlFirstThenHosts = publisherConfigs.get().get("url-precedes-hosts").getOrNull(); + assertThat(urlFirstThenHosts.destinations()).isEqualTo(List("127.0.0.1:3904")); + assertThat(urlFirstThenHosts.topic()).isEqualTo("DCAE-RESTCONF-COLLECTOR-EVENTS-DEV"); + + PublisherConfig urlKeyMissing = publisherConfigs.get().get("url-key-missing").getOrNull(); + assertThat(urlKeyMissing.destinations()).isEqualTo(List("h1.att.com", "h2.att.com")); + assertThat(urlKeyMissing.topic()).isEqualTo("DCAE-RESTCONF-COLLECTOR-EVENTS-DEV"); + + PublisherConfig urlIsMissing = publisherConfigs.get().get("url-is-null").getOrNull(); + assertThat(urlIsMissing.destinations()).isEqualTo(List("h1.att.com", "h2.att.com")); + assertThat(urlIsMissing.topic()).isEqualTo("DCAE-RESTCONF-COLLECTOR-EVENTS-DEV"); + } +} diff --git a/src/test/java/org/onap/dcae/collectors/restconf/common/event/publishing/DMaaPEventPublisherTest.java b/src/test/java/org/onap/dcae/collectors/restconf/common/event/publishing/DMaaPEventPublisherTest.java new file mode 100755 index 0000000..f66748c --- /dev/null +++ b/src/test/java/org/onap/dcae/collectors/restconf/common/event/publishing/DMaaPEventPublisherTest.java @@ -0,0 +1,77 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.collectors.restconf + * ================================================================================ + * Copyright (C) 2018 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.restconf.common.event.publishing; + +import com.att.nsa.cambria.client.CambriaBatchingPublisher; +import org.json.JSONObject; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; + +import java.io.IOException; + +import static io.vavr.API.Option; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.BDDMockito.given; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class DMaaPEventPublisherTest { + + private static final String STREAM_ID = "sampleStreamId"; + + private DMaaPEventPublisher eventPublisher; + private CambriaBatchingPublisher cambriaPublisher; + private DMaaPPublishersCache DMaaPPublishersCache; + + @Before + public void setUp() { + cambriaPublisher = mock(CambriaBatchingPublisher.class); + DMaaPPublishersCache = mock(DMaaPPublishersCache.class); + when(DMaaPPublishersCache.getPublisher(anyString())).thenReturn(Option(cambriaPublisher)); + eventPublisher = new DMaaPEventPublisher(DMaaPPublishersCache, mock(Logger.class)); + } + + @Test + public void shouldSendEventToTopic() throws Exception { + // given + JSONObject event = new JSONObject("{}"); + + // when + eventPublisher.sendEvent(event, STREAM_ID); + + // then + verify(cambriaPublisher).send("MyPartitionKey", event.toString()); + } + + @Test + public void shouldCloseConnectionWhenExceptionOccurred() throws Exception { + // given + JSONObject event = new JSONObject("{}"); + given(cambriaPublisher.send(anyString(), anyString())).willThrow(new IOException("epic fail")); + + // when + eventPublisher.sendEvent(event, STREAM_ID); + + // then + verify(DMaaPPublishersCache).closePublisherFor(STREAM_ID); + } +} diff --git a/src/test/java/org/onap/dcae/collectors/restconf/common/event/publishing/DMaaPPublishersCacheTest.java b/src/test/java/org/onap/dcae/collectors/restconf/common/event/publishing/DMaaPPublishersCacheTest.java new file mode 100755 index 0000000..49f37c3 --- /dev/null +++ b/src/test/java/org/onap/dcae/collectors/restconf/common/event/publishing/DMaaPPublishersCacheTest.java @@ -0,0 +1,92 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.collectors.restconf + * ================================================================================ + * Copyright (C) 2018 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.restconf.common.event.publishing; + +import com.att.nsa.cambria.client.CambriaBatchingPublisher; +import io.vavr.collection.Map; +import io.vavr.control.Option; +import org.junit.Before; +import org.junit.Test; +import org.onap.dcae.collectors.restconf.common.event.publishing.DMaaPPublishersCache.OnPublisherRemovalListener; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +import static io.vavr.API.List; +import static io.vavr.API.Map; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class DMaaPPublishersCacheTest { + + private String streamId1; + private Map dMaaPConfigs; + + @Before + public void setUp() { + streamId1 = "sampleStream1"; + dMaaPConfigs = Map("sampleStream1", new PublisherConfig(List("destination1"), "topic1")); + } + + @Test + public void shouldReturnTheSameCachedInstanceOnConsecutiveRetrievals() { + // given + DMaaPPublishersCache dMaaPPublishersCache = new DMaaPPublishersCache(dMaaPConfigs); + + // when + Option firstPublisher = dMaaPPublishersCache.getPublisher(streamId1); + Option secondPublisher = dMaaPPublishersCache.getPublisher(streamId1); + + // then + assertSame("should return same instance", firstPublisher.get(), secondPublisher.get()); + } + + @Test + public void shouldCloseCambriaPublisherOnCacheInvalidate() throws IOException, InterruptedException { + // given + CambriaBatchingPublisher cambriaPublisherMock1 = mock(CambriaBatchingPublisher.class); + DMaaPPublishersCache.CambriaPublishersCacheLoader cacheLoaderMock = mock(DMaaPPublishersCache.CambriaPublishersCacheLoader.class); + DMaaPPublishersCache dMaaPPublishersCache = new DMaaPPublishersCache(cacheLoaderMock, + new OnPublisherRemovalListener(), + dMaaPConfigs); + when(cacheLoaderMock.load(streamId1)).thenReturn(cambriaPublisherMock1); + + // when + dMaaPPublishersCache.getPublisher(streamId1); + dMaaPPublishersCache.closePublisherFor(streamId1); + + // then + verify(cambriaPublisherMock1).close(20, TimeUnit.SECONDS); + + } + + @Test + public void shouldReturnNoneIfThereIsNoDMaaPConfigurationForGivenStreamID() { + // given + DMaaPPublishersCache dMaaPPublishersCache = new DMaaPPublishersCache(dMaaPConfigs); + + // then + assertTrue("should not exist", dMaaPPublishersCache.getPublisher("non-existing").isEmpty()); + } + +} \ No newline at end of file diff --git a/src/test/java/org/onap/dcae/collectors/restconf/restconftest/AnyNodeTest.java b/src/test/java/org/onap/dcae/collectors/restconf/restconftest/AnyNodeTest.java new file mode 100755 index 0000000..746023d --- /dev/null +++ b/src/test/java/org/onap/dcae/collectors/restconf/restconftest/AnyNodeTest.java @@ -0,0 +1,62 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.collectors.restconf + * ================================================================================ + * Copyright (C) 2018 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcae.collectors.restconf.restconftest; + +import com.google.common.collect.Sets; +import org.junit.BeforeClass; +import org.junit.Test; +import org.onap.dcae.collectors.restconf.common.AnyNode; + +import java.util.Set; + +import static org.assertj.core.api.Assertions.assertThat; + +public class AnyNodeTest { + + private static final String SAMPLE_JSON_FILEPATH = "{\n" + + " \"channels\": [{\n" + + " \"one\": \"number1\", \"two\": \"number2\", \"three\": \"number3\"}],\n" + + " \"sampleStrList\": [\"1\", \"2\", \"3\", \"4\", \"5\"],\n" + + " \"sampleNestedObject\": {\"a\": 1, \"b\": 2},\n" + + " \"sampleInt\": 1,\n" + + " \"sampleString\": \"str\",\n" + + " \"sampleNull\": null\n" + + "}\n"; + private static final Set EXPECTED_JSON_KEYS = Sets + .newHashSet("channels", "sampleStrList", "sampleNestedObject", "sampleInt", "sampleString", "sampleNull"); + private static AnyNode node; + + + @BeforeClass + public static void setUpClass() { + node = AnyNode.fromString(SAMPLE_JSON_FILEPATH); + } + + @Test + public void testShouldReturnJsonObjectKeySet() { + assertThat(node.keys()).containsOnlyElementsOf(EXPECTED_JSON_KEYS); + } + + @Test(expected = ClassCastException.class) + public void whenInvokedOnJsonObjInsteadOfJsonArrShouldRaiseRuntimeEx() { + node.toList(); + } +} \ No newline at end of file diff --git a/src/test/java/org/onap/dcae/collectors/restconf/restconftest/RestConfProcTest.java b/src/test/java/org/onap/dcae/collectors/restconf/restconftest/RestConfProcTest.java new file mode 100755 index 0000000..0084f40 --- /dev/null +++ b/src/test/java/org/onap/dcae/collectors/restconf/restconftest/RestConfProcTest.java @@ -0,0 +1,75 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.collectors.restconf + * ================================================================================ + * Copyright (C) 2018 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcae.collectors.restconf.restconftest; + +import com.att.nsa.cmdLine.NsaCommandLineUtil; +import com.att.nsa.drumlin.service.framework.DrumlinServlet; +import com.att.nsa.drumlin.till.nv.impl.nvPropertiesFile; +import com.att.nsa.drumlin.till.nv.impl.nvReadableStack; +import com.att.nsa.drumlin.till.nv.impl.nvReadableTable; +import org.glassfish.jersey.grizzly2.httpserver.GrizzlyHttpServerFactory; +import org.glassfish.jersey.media.sse.SseFeature; +import org.glassfish.jersey.server.ResourceConfig; +import org.junit.Test; +import org.onap.dcae.collectors.restconf.common.Constants; +import org.onap.dcae.collectors.restconf.common.RestConfCollector; +import org.onap.dcae.collectors.restconf.common.RestConfContext; +import org.onap.dcae.collectors.restconf.common.RestConfProc; + +import java.net.URI; +import java.net.URL; +import java.util.HashMap; +import java.util.Map; + +public class RestConfProcTest { + + private static final URI CONTEXT = URI.create("http://localhost:8080/"); + + @Test + public void testEstablishPersistentConnection() throws Exception { + + final Map argMap = new HashMap<>(); + final String config = NsaCommandLineUtil.getSetting(argMap, Constants.KCONFIG, "collector.properties"); + final URL settingStream = DrumlinServlet.findStream(config, RestConfCollector.class); + + final nvReadableStack settings = new nvReadableStack(); + + settings.push(new nvPropertiesFile(settingStream)); + settings.push(new nvReadableTable(argMap)); + + RestConfProc restConfProc = new RestConfProc(settings); + + final ResourceConfig resourceConfig = new ResourceConfig(SseResource.class, SseFeature.class); + GrizzlyHttpServerFactory.createHttpServer(CONTEXT, resourceConfig); + RestConfContext ctx = new RestConfContext(); + ctx.setAttribute("prop.encoding-json", "encoding-json"); + ctx.setAttribute("restapi-result.response-code", "200"); + ctx.setAttribute("restapi-result.ietf-subscribed-notifications:output.identifier", "100"); + + Map p = new HashMap<>(); + p.put("sseConnectURL", "http://localhost:8080/ssevents"); + p.put("subscriberId", "networkId"); + p.put("responsePrefix", "restapi-result"); + + restConfProc.establishPersistentConnection(p, ctx); + Thread.sleep(1000); + } +} diff --git a/src/test/java/org/onap/dcae/collectors/restconf/restconftest/SseResource.java b/src/test/java/org/onap/dcae/collectors/restconf/restconftest/SseResource.java new file mode 100755 index 0000000..db81886 --- /dev/null +++ b/src/test/java/org/onap/dcae/collectors/restconf/restconftest/SseResource.java @@ -0,0 +1,69 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.collectors.restconf + * ================================================================================ + * Copyright (C) 2018 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcae.collectors.restconf.restconftest; + +import org.glassfish.jersey.media.sse.EventOutput; +import org.glassfish.jersey.media.sse.OutboundEvent; +import org.glassfish.jersey.media.sse.SseFeature; + +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import java.io.IOException; + +@Path("ssevents") +public class SseResource { + + @GET + @Produces(SseFeature.SERVER_SENT_EVENTS) + public EventOutput getServerSentEvents() throws IOException { + String data = "{" + + "\"ietf-notification:notification\" : {" + + " \"eventTime\" : \"2017-10-25T08:22:33.44Z\"," + + " \"ietf-yang-push:push-change-update\": {" + + "\"subscription-id\":\"89\"," + + "\"datastore-changes\": {" + + "\"ietf-yang-patch:yang-patch\":{" + + "\"patch-id\":\"1\"," + + "\"edit\":[{" + + "\"edit-id\":\"edit1\"," + + "\"operation\":\"merge\"," + + "\"target\":\"/ietf-interfaces:interfaces-state\"," + + "\"value\": {" + + "\"ietf-interfaces:interfaces-state\":{" + + "\"interface\": {" + + "\"name\":\"eth0\"," + + "\"oper-status\":\"down\"," + + "}" + + "}" + + "}" + + "}]" + + "}" + + "}" + + "}" + + "}" + + "}"; + final EventOutput result = new EventOutput(); + result.write(new OutboundEvent.Builder().data(String.class, data).build()); + result.close(); + return result; + } +} diff --git a/src/test/java/org/onap/dcae/collectors/restconf/restconftest/TestRestConfCollector.java b/src/test/java/org/onap/dcae/collectors/restconf/restconftest/TestRestConfCollector.java new file mode 100755 index 0000000..6bedc8e --- /dev/null +++ b/src/test/java/org/onap/dcae/collectors/restconf/restconftest/TestRestConfCollector.java @@ -0,0 +1,66 @@ +/*- + * ============LICENSE_START======================================================= + * org.onap.dcaegen2.collectors.restconf + * ================================================================================ + * Copyright (C) 2018 Nokia. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcae.collectors.restconf.restconftest; + +import com.att.nsa.cmdLine.NsaCommandLineUtil; +import com.att.nsa.drumlin.till.nv.impl.nvReadableStack; +import com.att.nsa.drumlin.till.nv.impl.nvReadableTable; +import com.google.gson.JsonElement; +import com.google.gson.JsonParser; +import org.json.JSONArray; +import org.json.JSONObject; +import org.junit.Test; +import org.onap.dcae.collectors.restconf.common.Constants; +import org.onap.dcae.collectors.restconf.common.RestConfProc; + +import java.io.FileReader; +import java.util.Map; +import java.util.concurrent.LinkedBlockingQueue; + +import static org.junit.Assert.assertEquals; + +public class TestRestConfCollector { + @Test + public void testParseCLIArguments() { + // given + String args[] = {"-a", "aa"}; + Map argMap = NsaCommandLineUtil.processCmdLine(args, true); + // when + nvReadableStack settings = new nvReadableStack(); + settings.push(new nvReadableTable(argMap)); + + // then + assertEquals(settings.getString("a", "default"), "aa"); + } + + @Test + public void shouldPutValidRestConfEventOnProcessingQueueWithoutExceptions() throws Exception { + // given + RestConfProc.fProcessingInputQueue = new LinkedBlockingQueue<>( + Constants.KDEFAULT_MAXQUEUEDEVENTS); + JsonElement restConfEvent = new JsonParser().parse(new FileReader("src/test/resources/RestConfEvent.json")); + JSONObject validRestConfEvent = new JSONObject(restConfEvent.toString()); + JSONArray jsonArrayMod = new JSONArray().put(validRestConfEvent); + + // then + RestConfProc.handleEvents(jsonArrayMod); + } +} diff --git a/src/test/resources/RestConfEvent.json b/src/test/resources/RestConfEvent.json new file mode 100755 index 0000000..93d0b64 --- /dev/null +++ b/src/test/resources/RestConfEvent.json @@ -0,0 +1,26 @@ +{ + "ietf-notification:notification" : { + "eventTime" : "eventtime", + "ietf-yang-push:push-change-update": { + "subscription-id":"100", + "datastore-changes": { + "ietf-yang-patch:yang-patch":{ + "patch-id":"patch-id", + "edit":[{ + "edit-id":"edit-id", + "operation":"create", + "target":"target", + "value": { + "ietf-interfaces:interfaces-state":{ + "interface": { + "name":"eth0", + "oper-status":"up" + } + } + } + }] + } + } + } + } +} \ No newline at end of file diff --git a/src/test/resources/testParseDMaaPCredentialsGen2.json b/src/test/resources/testParseDMaaPCredentialsGen2.json new file mode 100755 index 0000000..23230c1 --- /dev/null +++ b/src/test/resources/testParseDMaaPCredentialsGen2.json @@ -0,0 +1,21 @@ +{ + "auth-credentials-null": { + "aaf_username": null, + "dmaap_info": { + "topic_url": "http://UEBHOST:3904/events/DCAE-RESTCONF-COLLECTOR-EVENTS-DEV" + }, + "aaf_password": null + }, + "auth-credentials-present": { + "aaf_username": "sampleUser", + "dmaap_info": { + "topic_url": "http://UEBHOST:3904/events/DCAE-RESTCONF-COLLECTOR-EVENTS-DEV" + }, + "aaf_password": "samplePassword" + }, + "auth-credentials-missing": { + "dmaap_info": { + "topic_url": "http://UEBHOST:3904/events/DCAE-RESTCONF-COLLECTOR-EVENTS-DEV" + } + } +} \ No newline at end of file diff --git a/src/test/resources/testParseDMaaPCredentialsLegacy.json b/src/test/resources/testParseDMaaPCredentialsLegacy.json new file mode 100755 index 0000000..ee215e2 --- /dev/null +++ b/src/test/resources/testParseDMaaPCredentialsLegacy.json @@ -0,0 +1,26 @@ +{ + "channels": [ + { + "name": "auth-credentials-null", + "cambria.url": "127.0.0.1:3904", + "cambria.hosts": "uebsb91kcdc.it.att.com,uebsb92kcdc.it.att.com,uebsb93kcdc.it.att.com", + "cambria.topic": "DCAE-RESTCONF-COLLECTOR-EVENTS-DEV", + "basicAuthPassword": null, + "basicAuthUsername": null + }, + { + "name": "auth-credentials-present", + "cambria.url": "127.0.0.1:3904", + "cambria.hosts": "uebsb91kcdc.it.att.com,uebsb92kcdc.it.att.com,uebsb93kcdc.it.att.com", + "cambria.topic": "DCAE-RESTCONF-COLLECTOR-EVENTS-DEV", + "basicAuthPassword": "samplePassword", + "basicAuthUsername": "sampleUser" + }, + { + "name": "auth-credentials-missing", + "cambria.url": "127.0.0.1:3904", + "cambria.hosts": "uebsb91kcdc.it.att.com,uebsb92kcdc.it.att.com,uebsb93kcdc.it.att.com", + "cambria.topic": "DCAE-RESTCONF-COLLECTOR-EVENTS-DEV" + } + ] +} \ No newline at end of file diff --git a/src/test/resources/testParseDMaaPGen2.json b/src/test/resources/testParseDMaaPGen2.json new file mode 100755 index 0000000..4ef04a2 --- /dev/null +++ b/src/test/resources/testParseDMaaPGen2.json @@ -0,0 +1,12 @@ +{ + "event-segments-with-port": { + "dmaap_info": { + "topic_url": "http://UEBHOST:3904/events/DCAE-RESTCONF-COLLECTOR-EVENTS-DEV" + } + }, + "other-segments-without-ports": { + "dmaap_info": { + "topic_url": "http://UEBHOST:3904/somethingHere/DCAE-RESTCONF-COLLECTOR-EVENTS-DEV" + } + } +} \ No newline at end of file diff --git a/src/test/resources/testParseDMaaPLegacy.json b/src/test/resources/testParseDMaaPLegacy.json new file mode 100755 index 0000000..fda9c60 --- /dev/null +++ b/src/test/resources/testParseDMaaPLegacy.json @@ -0,0 +1,21 @@ +{ + "channels": [ + { + "name": "url-precedes-hosts", + "cambria.url": "127.0.0.1:3904", + "cambria.hosts": "h1.att.com,h2.att.com", + "cambria.topic": "DCAE-RESTCONF-COLLECTOR-EVENTS-DEV" + }, + { + "name": "url-key-missing", + "cambria.hosts": "h1.att.com,h2.att.com", + "cambria.topic": "DCAE-RESTCONF-COLLECTOR-EVENTS-DEV" + }, + { + "name": "url-is-null", + "cambria.url": null, + "cambria.hosts": "h1.att.com,h2.att.com", + "cambria.topic": "DCAE-RESTCONF-COLLECTOR-EVENTS-DEV" + } + ] +} \ No newline at end of file -- cgit 1.2.3-korg