From 11510b43c277b8e1dd7e58d79785544810118c8e Mon Sep 17 00:00:00 2001 From: Jessica Wagantall Date: Tue, 1 Dec 2020 11:26:31 -0800 Subject: Migrate sli-adaptor files Migrate sli-adaptor repo files into a new "adaptors" directory. Signed-off-by: Jessica Wagantall --- adaptors/message-router/README.md | 7 + adaptors/message-router/consumer/README.md | 7 + adaptors/message-router/consumer/api/pom.xml | 27 +++ .../messagerouter/consumer/api/ConsumerApi.java | 6 + .../consumer/api/PollingConsumer.java | 7 + .../consumer/api/PullingConsumer.java | 7 + .../messagerouter/consumer/api/RequestHandler.java | 5 + adaptors/message-router/consumer/installer/pom.xml | 128 +++++++++++++ .../src/assembly/assemble_installer_zip.xml | 59 ++++++ .../src/assembly/assemble_mvnrepo_zip.xml | 47 +++++ .../src/main/resources/scripts/install-feature.sh | 39 ++++ adaptors/message-router/consumer/pom.xml | 19 ++ adaptors/message-router/consumer/provider/pom.xml | 43 +++++ .../provider/impl/AbstractBaseConsumer.java | 207 +++++++++++++++++++++ .../consumer/provider/impl/ConsumerFactory.java | 202 ++++++++++++++++++++ .../provider/impl/PollingConsumerImpl.java | 100 ++++++++++ .../provider/impl/PullingConsumerImpl.java | 39 ++++ .../provider/impl/AbstractBaseConsumerTest.java | 91 +++++++++ .../provider/impl/ConsumerFactoryTest.java | 164 ++++++++++++++++ adaptors/message-router/pom.xml | 33 ++++ adaptors/message-router/publisher/README.md | 8 + adaptors/message-router/publisher/api/pom.xml | 27 +++ .../messagerouter/publisher/api/PublisherApi.java | 5 + .../message-router/publisher/installer/pom.xml | 133 +++++++++++++ .../src/assembly/assemble_installer_zip.xml | 59 ++++++ .../src/assembly/assemble_mvnrepo_zip.xml | 49 +++++ .../src/main/resources/scripts/install-feature.sh | 39 ++++ adaptors/message-router/publisher/pom.xml | 20 ++ adaptors/message-router/publisher/provider/pom.xml | 39 ++++ .../publisher/provider/impl/PublisherApiImpl.java | 178 ++++++++++++++++++ .../resources/OSGI-INF/blueprint/blueprint.xml | 14 ++ .../provider/impl/PublisherApiImplTest.java | 51 +++++ .../message-router/publisher/sample.client/pom.xml | 39 ++++ .../publisher/client/impl/ClientImpl.java | 58 ++++++ .../resources/OSGI-INF/blueprint/blueprint.xml | 16 ++ .../publisher/client/impl/ClientImplTest.java | 29 +++ 36 files changed, 2001 insertions(+) create mode 100755 adaptors/message-router/README.md create mode 100755 adaptors/message-router/consumer/README.md create mode 100755 adaptors/message-router/consumer/api/pom.xml create mode 100755 adaptors/message-router/consumer/api/src/main/java/org/onap/ccsdk/sli/adaptors/messagerouter/consumer/api/ConsumerApi.java create mode 100755 adaptors/message-router/consumer/api/src/main/java/org/onap/ccsdk/sli/adaptors/messagerouter/consumer/api/PollingConsumer.java create mode 100755 adaptors/message-router/consumer/api/src/main/java/org/onap/ccsdk/sli/adaptors/messagerouter/consumer/api/PullingConsumer.java create mode 100755 adaptors/message-router/consumer/api/src/main/java/org/onap/ccsdk/sli/adaptors/messagerouter/consumer/api/RequestHandler.java create mode 100755 adaptors/message-router/consumer/installer/pom.xml create mode 100755 adaptors/message-router/consumer/installer/src/assembly/assemble_installer_zip.xml create mode 100755 adaptors/message-router/consumer/installer/src/assembly/assemble_mvnrepo_zip.xml create mode 100755 adaptors/message-router/consumer/installer/src/main/resources/scripts/install-feature.sh create mode 100755 adaptors/message-router/consumer/pom.xml create mode 100755 adaptors/message-router/consumer/provider/pom.xml create mode 100755 adaptors/message-router/consumer/provider/src/main/java/org/onap/ccsdk/sli/adaptors/messagerouter/consumer/provider/impl/AbstractBaseConsumer.java create mode 100755 adaptors/message-router/consumer/provider/src/main/java/org/onap/ccsdk/sli/adaptors/messagerouter/consumer/provider/impl/ConsumerFactory.java create mode 100644 adaptors/message-router/consumer/provider/src/main/java/org/onap/ccsdk/sli/adaptors/messagerouter/consumer/provider/impl/PollingConsumerImpl.java create mode 100755 adaptors/message-router/consumer/provider/src/main/java/org/onap/ccsdk/sli/adaptors/messagerouter/consumer/provider/impl/PullingConsumerImpl.java create mode 100644 adaptors/message-router/consumer/provider/src/test/java/org/onap/ccsdk/sli/adaptors/messagerouter/consumer/provider/impl/AbstractBaseConsumerTest.java create mode 100644 adaptors/message-router/consumer/provider/src/test/java/org/onap/ccsdk/sli/adaptors/messagerouter/consumer/provider/impl/ConsumerFactoryTest.java create mode 100755 adaptors/message-router/pom.xml create mode 100755 adaptors/message-router/publisher/README.md create mode 100755 adaptors/message-router/publisher/api/pom.xml create mode 100755 adaptors/message-router/publisher/api/src/main/java/org/onap/ccsdk/sli/adaptors/messagerouter/publisher/api/PublisherApi.java create mode 100644 adaptors/message-router/publisher/installer/pom.xml create mode 100644 adaptors/message-router/publisher/installer/src/assembly/assemble_installer_zip.xml create mode 100644 adaptors/message-router/publisher/installer/src/assembly/assemble_mvnrepo_zip.xml create mode 100644 adaptors/message-router/publisher/installer/src/main/resources/scripts/install-feature.sh create mode 100755 adaptors/message-router/publisher/pom.xml create mode 100755 adaptors/message-router/publisher/provider/pom.xml create mode 100755 adaptors/message-router/publisher/provider/src/main/java/org/onap/ccsdk/sli/adaptors/messagerouter/publisher/provider/impl/PublisherApiImpl.java create mode 100755 adaptors/message-router/publisher/provider/src/main/resources/OSGI-INF/blueprint/blueprint.xml create mode 100644 adaptors/message-router/publisher/provider/src/test/java/org/onap/ccsdk/sli/adaptors/messagerouter/publisher/provider/impl/PublisherApiImplTest.java create mode 100755 adaptors/message-router/publisher/sample.client/pom.xml create mode 100755 adaptors/message-router/publisher/sample.client/src/main/java/org/onap/ccsdk/sli/adaptors/messagerouter/publisher/client/impl/ClientImpl.java create mode 100755 adaptors/message-router/publisher/sample.client/src/main/resources/OSGI-INF/blueprint/blueprint.xml create mode 100644 adaptors/message-router/publisher/sample.client/src/test/java/org/onap/ccsdk/sli/adaptors/messagerouter/publisher/client/impl/ClientImplTest.java (limited to 'adaptors/message-router') diff --git a/adaptors/message-router/README.md b/adaptors/message-router/README.md new file mode 100755 index 000000000..32848252b --- /dev/null +++ b/adaptors/message-router/README.md @@ -0,0 +1,7 @@ +# CCSDK Message Router Extension + +This maven module builds re-usable code that interacts with message router + +## High level modules +- **publisher** used for sending messages to a dmaap message router topic +- **consumer** not yet created, this will be used for pulling messages from a dmaap message router topic diff --git a/adaptors/message-router/consumer/README.md b/adaptors/message-router/consumer/README.md new file mode 100755 index 000000000..9d01256a7 --- /dev/null +++ b/adaptors/message-router/consumer/README.md @@ -0,0 +1,7 @@ +# Consumer + +## Modules +- api - exports the consumer interface for clients and providers to import +- features - used for managing the feature repository for consumer +- installer - provides a simple install script +- provider - provides an implementation of the consumer api \ No newline at end of file diff --git a/adaptors/message-router/consumer/api/pom.xml b/adaptors/message-router/consumer/api/pom.xml new file mode 100755 index 000000000..e77eb9793 --- /dev/null +++ b/adaptors/message-router/consumer/api/pom.xml @@ -0,0 +1,27 @@ + + + 4.0.0 + + + org.onap.ccsdk.sli.adaptors.messagerouter + consumer.aggregate + 1.1.1-SNAPSHOT + + + consumer.api + bundle + + + + + org.apache.felix + maven-bundle-plugin + + + org.onap.ccsdk.sli.adaptors.messagerouter.consumer.api + + + + + + diff --git a/adaptors/message-router/consumer/api/src/main/java/org/onap/ccsdk/sli/adaptors/messagerouter/consumer/api/ConsumerApi.java b/adaptors/message-router/consumer/api/src/main/java/org/onap/ccsdk/sli/adaptors/messagerouter/consumer/api/ConsumerApi.java new file mode 100755 index 000000000..41deade85 --- /dev/null +++ b/adaptors/message-router/consumer/api/src/main/java/org/onap/ccsdk/sli/adaptors/messagerouter/consumer/api/ConsumerApi.java @@ -0,0 +1,6 @@ +package org.onap.ccsdk.sli.adaptors.messagerouter.consumer.api; + +public interface ConsumerApi extends AutoCloseable { + //registers a handler to handle a specific topic, should be called only once per client + public void registerHandler(String topic, RequestHandler requestHandler); +} diff --git a/adaptors/message-router/consumer/api/src/main/java/org/onap/ccsdk/sli/adaptors/messagerouter/consumer/api/PollingConsumer.java b/adaptors/message-router/consumer/api/src/main/java/org/onap/ccsdk/sli/adaptors/messagerouter/consumer/api/PollingConsumer.java new file mode 100755 index 000000000..29fc1c764 --- /dev/null +++ b/adaptors/message-router/consumer/api/src/main/java/org/onap/ccsdk/sli/adaptors/messagerouter/consumer/api/PollingConsumer.java @@ -0,0 +1,7 @@ +package org.onap.ccsdk.sli.adaptors.messagerouter.consumer.api; + +public interface PollingConsumer extends ConsumerApi { + + // Starts polling message router for messages, won't stop until close it called + void start(); +} diff --git a/adaptors/message-router/consumer/api/src/main/java/org/onap/ccsdk/sli/adaptors/messagerouter/consumer/api/PullingConsumer.java b/adaptors/message-router/consumer/api/src/main/java/org/onap/ccsdk/sli/adaptors/messagerouter/consumer/api/PullingConsumer.java new file mode 100755 index 000000000..1187aaceb --- /dev/null +++ b/adaptors/message-router/consumer/api/src/main/java/org/onap/ccsdk/sli/adaptors/messagerouter/consumer/api/PullingConsumer.java @@ -0,0 +1,7 @@ +package org.onap.ccsdk.sli.adaptors.messagerouter.consumer.api; + +public interface PullingConsumer extends ConsumerApi { + + //Pulls a single batch of messages + void pull(); +} diff --git a/adaptors/message-router/consumer/api/src/main/java/org/onap/ccsdk/sli/adaptors/messagerouter/consumer/api/RequestHandler.java b/adaptors/message-router/consumer/api/src/main/java/org/onap/ccsdk/sli/adaptors/messagerouter/consumer/api/RequestHandler.java new file mode 100755 index 000000000..07a117843 --- /dev/null +++ b/adaptors/message-router/consumer/api/src/main/java/org/onap/ccsdk/sli/adaptors/messagerouter/consumer/api/RequestHandler.java @@ -0,0 +1,5 @@ +package org.onap.ccsdk.sli.adaptors.messagerouter.consumer.api; + +public interface RequestHandler { + public void handleRequest(String topic, String requestBody); +} diff --git a/adaptors/message-router/consumer/installer/pom.xml b/adaptors/message-router/consumer/installer/pom.xml new file mode 100755 index 000000000..2b8eb688e --- /dev/null +++ b/adaptors/message-router/consumer/installer/pom.xml @@ -0,0 +1,128 @@ + + + 4.0.0 + + + org.onap.ccsdk.sli.adaptors.messagerouter + consumer.aggregate + 1.1.1-SNAPSHOT + + + org.onap.ccsdk.sli.adaptors.messagerouter + consumer.installer + 1.1.1-SNAPSHOT + pom + + + messagerouter-consumer + messagerouter-consumer + mvn:${project.groupId}/consumer.features/${project.version}/xml/features + false + + + + + ${project.groupId} + consumer.api + ${project.version} + + + ${project.groupId} + consumer.provider + ${project.version} + + + + + + + maven-assembly-plugin + 2.6 + + + maven-repo-zip + + single + + package + + true + stage/${application.name}-${project.version} + + src/assembly/assemble_mvnrepo_zip.xml + + true + + + + installer-zip + + single + + package + + true + ${application.name}-${project.version}-installer + + src/assembly/assemble_installer_zip.xml + + false + + + + + + org.apache.maven.plugins + maven-dependency-plugin + + + copy-dependencies + + copy-dependencies + + prepare-package + + false + ${project.build.directory}/assembly/system + false + true + true + true + false + false + ${project.groupId} + provided + + + + + + maven-resources-plugin + 2.6 + + + copy-version + + copy-resources + + validate + + ${basedir}/target/stage + + + src/main/resources/scripts + + install-feature.sh + + true + + + + + + + + + + + diff --git a/adaptors/message-router/consumer/installer/src/assembly/assemble_installer_zip.xml b/adaptors/message-router/consumer/installer/src/assembly/assemble_installer_zip.xml new file mode 100755 index 000000000..c6169a879 --- /dev/null +++ b/adaptors/message-router/consumer/installer/src/assembly/assemble_installer_zip.xml @@ -0,0 +1,59 @@ + + + + + + installer_zip + + zip + + + + false + + + + target/stage/ + ${application.name} + 755 + + *.sh + + + + target/stage/ + ${application.name} + 644 + + *.sh + + + + + + + \ No newline at end of file diff --git a/adaptors/message-router/consumer/installer/src/assembly/assemble_mvnrepo_zip.xml b/adaptors/message-router/consumer/installer/src/assembly/assemble_mvnrepo_zip.xml new file mode 100755 index 000000000..409c66224 --- /dev/null +++ b/adaptors/message-router/consumer/installer/src/assembly/assemble_mvnrepo_zip.xml @@ -0,0 +1,47 @@ + + + + + + repo + + zip + + + + false + + + + target/assembly/ + . + + + + + + \ No newline at end of file diff --git a/adaptors/message-router/consumer/installer/src/main/resources/scripts/install-feature.sh b/adaptors/message-router/consumer/installer/src/main/resources/scripts/install-feature.sh new file mode 100755 index 000000000..6f2518f64 --- /dev/null +++ b/adaptors/message-router/consumer/installer/src/main/resources/scripts/install-feature.sh @@ -0,0 +1,39 @@ +#!/bin/bash + +### +# ============LICENSE_START======================================================= +# openECOMP : SDN-C +# ================================================================================ +# Copyright (C) 2017 AT&T Intellectual Property. All rights +# reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +### + +ODL_HOME=${ODL_HOME:-/opt/opendaylight/current} +ODL_KARAF_CLIENT=${ODL_KARAF_CLIENT:-${ODL_HOME}/bin/client} +INSTALLERDIR=$(dirname $0) + +REPOZIP=${INSTALLERDIR}/${features.boot}-${project.version}-repo.zip + +if [ -f ${REPOZIP} ] +then + unzip -d ${ODL_HOME} ${REPOZIP} +else + echo "ERROR : repo zip ($REPOZIP) not found" + exit 1 +fi + +${ODL_KARAF_CLIENT} feature:repo-add ${features.repositories} +${ODL_KARAF_CLIENT} feature:install ${features.boot} \ No newline at end of file diff --git a/adaptors/message-router/consumer/pom.xml b/adaptors/message-router/consumer/pom.xml new file mode 100755 index 000000000..cad1bbba0 --- /dev/null +++ b/adaptors/message-router/consumer/pom.xml @@ -0,0 +1,19 @@ + + + 4.0.0 + + + org.onap.ccsdk.sli.adaptors.messagerouter + messagerouter-root + 1.1.1-SNAPSHOT + + + consumer.aggregate + pom + + + api + provider + installer + + diff --git a/adaptors/message-router/consumer/provider/pom.xml b/adaptors/message-router/consumer/provider/pom.xml new file mode 100755 index 000000000..d3e568c40 --- /dev/null +++ b/adaptors/message-router/consumer/provider/pom.xml @@ -0,0 +1,43 @@ + + + 4.0.0 + + + org.onap.ccsdk.sli.adaptors.messagerouter + consumer.aggregate + 1.1.1-SNAPSHOT + + + consumer.provider + bundle + + + + ${project.groupId} + consumer.api + ${project.version} + + + org.slf4j + slf4j-api + + + com.google.code.gson + gson + + + + + + + org.apache.felix + maven-bundle-plugin + + + ${project.groupId}.consumer.provider.impl + + + + + + diff --git a/adaptors/message-router/consumer/provider/src/main/java/org/onap/ccsdk/sli/adaptors/messagerouter/consumer/provider/impl/AbstractBaseConsumer.java b/adaptors/message-router/consumer/provider/src/main/java/org/onap/ccsdk/sli/adaptors/messagerouter/consumer/provider/impl/AbstractBaseConsumer.java new file mode 100755 index 000000000..8937f7b91 --- /dev/null +++ b/adaptors/message-router/consumer/provider/src/main/java/org/onap/ccsdk/sli/adaptors/messagerouter/consumer/provider/impl/AbstractBaseConsumer.java @@ -0,0 +1,207 @@ +/*- + * ============LICENSE_START======================================================= + * openECOMP : SDN-C + * ================================================================================ + * Copyright (C) 2017 - 2018 AT&T Intellectual Property. All rights + * reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.ccsdk.sli.adaptors.messagerouter.consumer.provider.impl; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.net.HttpURLConnection; +import java.net.MalformedURLException; +import java.net.URL; +import java.util.Base64; + +import javax.net.ssl.HostnameVerifier; +import javax.net.ssl.HttpsURLConnection; +import javax.net.ssl.SSLSession; + +import org.onap.ccsdk.sli.adaptors.messagerouter.consumer.api.ConsumerApi; +import org.onap.ccsdk.sli.adaptors.messagerouter.consumer.api.RequestHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.gson.Gson; +import com.google.gson.JsonParseException; + +/* + * java.net based client to build message router consumers + */ +public abstract class AbstractBaseConsumer implements ConsumerApi { + private static final Logger LOG = LoggerFactory.getLogger(AbstractBaseConsumer.class); + private static final String REQUEST_METHOD = "GET"; + + private final String host; + private final Integer connectTimeout; + private final Integer readTimeout; + private final String group; + private final String id; + private final String filter; + private final Integer limit; + private final Integer timeoutQueryParamValue; + private final String authorizationString; + + protected RequestHandler requestHandler; + protected URL url; + protected String topic; + + public AbstractBaseConsumer(String username, String password, String host, String authentication, Integer connectTimeout, Integer readTimeout, String group, String id, String filter, Integer limit, Integer timeoutQueryParamValue) { + this.host = host; + this.connectTimeout = connectTimeout; + this.readTimeout = readTimeout; + this.group = group; + this.id = id; + this.filter = filter; + this.limit = limit; + this.timeoutQueryParamValue = timeoutQueryParamValue; + + if ("basic".equals(authentication)) { + if (username != null && password != null && username.length() > 0 && password.length() > 0) { + authorizationString = buildAuthorizationString(username, password); + } else { + throw new IllegalStateException("Authentication is set to basic but username or password is missing"); + } + } else if ("noauth".equals(authentication)) { + authorizationString = null; + } else { + throw new IllegalStateException("Unknown authentication method: " + authentication); + } + } + + protected void poll() { + String responseBody = performHttpOperation(); + if (responseBody != null && !responseBody.startsWith("[]")) { + LOG.info("New message was fetched from MessageRouter."); + LOG.trace("Fetched message is\n{}", responseBody); + try { + String[] requests = new Gson().fromJson(responseBody, String[].class); + if (requests != null) { + for (String request : requests) { + if (request != null) { + requestHandler.handleRequest(topic,request); + } + } + } else { + LOG.warn("Deserialization of received message results in null array.", responseBody); + } + } catch (JsonParseException e) { + LOG.warn("Received message has bad format. Expected format is JSON."); + } + } else { + LOG.trace("No new message was fetched from MessageRouter."); + } + } + + private String buildlUrlString(String topic) { + StringBuilder sb = new StringBuilder(); + sb.append(host + "/events/" + topic + "/" + group + "/" + id); + sb.append("?timeout=" + timeoutQueryParamValue); + + if (limit != null) { + sb.append("&limit=" + limit); + } + if (filter != null) { + sb.append("&filter=" + filter); + } + return sb.toString(); + } + + private String performHttpOperation() { + HttpURLConnection httpUrlConnection = null; + try { + httpUrlConnection = buildHttpURLConnection(url); + httpUrlConnection.setRequestMethod(REQUEST_METHOD); + httpUrlConnection.connect(); + int status = httpUrlConnection.getResponseCode(); + if (status < 300) { + return readFromStream(httpUrlConnection.getInputStream()); + } else { + String response = readFromStream(httpUrlConnection.getErrorStream()); + LOG.warn("Fetching message from MessageRouter on url {} failed with http status {}. Error message is\n{}.", url, status, response); + } + } catch (Exception e) { + LOG.warn("Exception was thrown during fetching message from MessageRouter on url {}.", url, e); + } finally { + if (httpUrlConnection != null) { + httpUrlConnection.disconnect(); + } + } + return null; + } + + private String buildAuthorizationString(String userName, String password) { + String basicAuthString = userName + ":" + password; + basicAuthString = Base64.getEncoder().encodeToString(basicAuthString.getBytes()); + return "Basic " + basicAuthString; + } + + protected HttpURLConnection buildHttpURLConnection(URL url) throws IOException { + HttpURLConnection httpUrlConnection = (HttpURLConnection) url.openConnection(); + if (authorizationString != null) { + httpUrlConnection.setRequestProperty("Authorization", authorizationString); + } + httpUrlConnection.setRequestProperty("Accept", "application/json"); + httpUrlConnection.setUseCaches(false); + httpUrlConnection.setConnectTimeout(connectTimeout); + httpUrlConnection.setReadTimeout(readTimeout); + + // ignore hostname errors when dealing with HTTPS connections + if (httpUrlConnection instanceof HttpsURLConnection) { + HttpsURLConnection conn = (HttpsURLConnection) httpUrlConnection; + conn.setHostnameVerifier(new HostnameVerifier() { + @Override + public boolean verify(String arg0, SSLSession arg1) { + return true; + } + }); + } + return httpUrlConnection; + } + + protected String readFromStream(InputStream stream) throws IOException { + BufferedReader br = new BufferedReader(new InputStreamReader(stream)); + StringBuilder sb = new StringBuilder(); + String line; + while ((line = br.readLine()) != null) { + sb.append(line); + sb.append("\n"); + } + br.close(); + return sb.toString(); + } + + @Override + public void registerHandler(String topic, RequestHandler requestHandler) { + this.topic = topic; + try { + this.url = new URL(buildlUrlString(topic)); + } catch (MalformedURLException e) { + LOG.error("Topic " + topic + " resulted in MalformedURLException", e); + } + this.requestHandler = requestHandler; + } + + @Override + public void close() throws Exception { + //BaseConsumer doesn't spawn any threads + } + +} diff --git a/adaptors/message-router/consumer/provider/src/main/java/org/onap/ccsdk/sli/adaptors/messagerouter/consumer/provider/impl/ConsumerFactory.java b/adaptors/message-router/consumer/provider/src/main/java/org/onap/ccsdk/sli/adaptors/messagerouter/consumer/provider/impl/ConsumerFactory.java new file mode 100755 index 000000000..1aa02c70a --- /dev/null +++ b/adaptors/message-router/consumer/provider/src/main/java/org/onap/ccsdk/sli/adaptors/messagerouter/consumer/provider/impl/ConsumerFactory.java @@ -0,0 +1,202 @@ +/*- + * ============LICENSE_START======================================================= + * openECOMP : SDN-C + * ================================================================================ + * Copyright (C) 2017 - 2019 AT&T Intellectual Property. All rights + * reserved. + * + * Modifications Copyright (C) 2019 IBM. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.ccsdk.sli.adaptors.messagerouter.consumer.provider.impl; + +import java.io.UnsupportedEncodingException; +import java.net.URLEncoder; +import java.nio.charset.StandardCharsets; +import java.util.Properties; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ConsumerFactory { + private static final Logger LOG = LoggerFactory.getLogger(ConsumerFactory.class); + + // Default values to minimize required configuration + private static final int DEFAULT_FETCH_PAUSE = 5000; + private static final int DEFAULT_CONNECT_TIMEOUT = 30000; + private static final int DEFAULT_READ_TIMEOUT = 180000; + private static final int DEFAULT_LIMIT = 5; // Limits the number of messages pulled in a single GET request + private static final int DEFAULT_TIMEOUT_QUERY_PARAM_VALUE = 15000; + private static final String DEFAULT_AUTH_METHOD = "basic"; + + // Required properties + protected final String username; + protected final String password; + protected final String host; + private final String group; + private final String id; + + // Optional properties + protected Integer connectTimeout; + protected Integer readTimeout; + private Integer fetchPause; + private Integer limit; + private Integer timeoutQueryParamValue; + private String filter; + protected String auth; + + public ConsumerFactory(Properties properties) { + // Required properties + username = properties.getProperty("username"); + password = properties.getProperty("password"); + host = properties.getProperty("host"); + auth = properties.getProperty("auth"); + group = properties.getProperty("group"); + id = properties.getProperty("id"); + + // Optional properties + connectTimeout = readOptionalInteger(properties, "connectTimeoutSeconds"); + readTimeout = readOptionalInteger(properties, "readTimeoutMinutes"); + fetchPause = readOptionalInteger(properties, "fetchPause"); + limit = readOptionalInteger(properties, "limit"); + timeoutQueryParamValue = readOptionalInteger(properties, "timeout"); + processFilter(properties.getProperty("filter")); + + setDefaults(); + } + + public ConsumerFactory(String username, String password, String host, String group, String id, Integer connectTimeout, Integer readTimeout) { + this.username = username; + this.password = password; + this.host = host; + this.group = group; + this.id = id; + setDefaults(); + } + + + public String getAuth() { + return auth; + } + + public void setAuth(String auth) { + this.auth = auth; + } + + public Integer getConnectTimeout() { + return connectTimeout; + } + + public void setConnectTimeout(Integer connectTimeout) { + this.connectTimeout = connectTimeout; + } + + public Integer getReadTimeout() { + return readTimeout; + } + + public void setReadTimeout(Integer readTimeout) { + this.readTimeout = readTimeout; + } + + public Integer getFetchPause() { + return fetchPause; + } + + public void setFetchPause(Integer fetchPause) { + this.fetchPause = fetchPause; + } + + public Integer getLimit() { + return limit; + } + + public void setLimit(Integer limit) { + this.limit = limit; + } + + public Integer getTimeoutQueryParamValue() { + return timeoutQueryParamValue; + } + + public void setTimeoutQueryParamValue(Integer timeoutQueryParamValue) { + this.timeoutQueryParamValue = timeoutQueryParamValue; + } + + public String getFilter() { + return filter; + } + + public void setFilter(String filter) { + processFilter(filter); + } + + private Integer readOptionalInteger(Properties properties, String propertyName) { + String stringValue = properties.getProperty(propertyName); + if (stringValue != null && stringValue.length() > 0) { + try { + return Integer.valueOf(stringValue); + } catch (NumberFormatException e) { + LOG.error("property " + propertyName + " had the value " + stringValue + " that could not be converted to an Integer", e); + } + } + return null; + } + + public PollingConsumerImpl createPollingClient() { + return new PollingConsumerImpl(username, password, host, auth, connectTimeout, readTimeout, fetchPause, group, id, filter, limit, timeoutQueryParamValue); + } + + public PullingConsumerImpl createPullingClient() { + return new PullingConsumerImpl(username, password, host, auth, connectTimeout, readTimeout, group, id, filter, limit, timeoutQueryParamValue); + } + + private void processFilter(String filterString) { + if (filterString != null) { + if (filterString.length() > 0) { + try { + filter = URLEncoder.encode(filterString, StandardCharsets.UTF_8.name()); + } catch (UnsupportedEncodingException e) { + LOG.warn("Couldn't encode filter string. Filter will be ignored.", e); + filter = null; + } + } else { + filter = null; + } + } + } + + private void setDefaults() { + if (connectTimeout == null) { + connectTimeout = DEFAULT_CONNECT_TIMEOUT; + } + if (readTimeout == null) { + readTimeout = DEFAULT_READ_TIMEOUT; + } + if (fetchPause == null) { + fetchPause = DEFAULT_FETCH_PAUSE; + } + if (limit == null) { + limit = DEFAULT_LIMIT; + } + if (timeoutQueryParamValue == null) { + timeoutQueryParamValue = DEFAULT_TIMEOUT_QUERY_PARAM_VALUE; + } + if (auth == null) { + auth = DEFAULT_AUTH_METHOD; + } + } + +} diff --git a/adaptors/message-router/consumer/provider/src/main/java/org/onap/ccsdk/sli/adaptors/messagerouter/consumer/provider/impl/PollingConsumerImpl.java b/adaptors/message-router/consumer/provider/src/main/java/org/onap/ccsdk/sli/adaptors/messagerouter/consumer/provider/impl/PollingConsumerImpl.java new file mode 100644 index 000000000..263e94ca9 --- /dev/null +++ b/adaptors/message-router/consumer/provider/src/main/java/org/onap/ccsdk/sli/adaptors/messagerouter/consumer/provider/impl/PollingConsumerImpl.java @@ -0,0 +1,100 @@ +/*- + * ============LICENSE_START======================================================= + * openECOMP : SDN-C + * ================================================================================ + * Copyright (C) 2017 - 2018 AT&T Intellectual Property. All rights + * reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.ccsdk.sli.adaptors.messagerouter.consumer.provider.impl; + +import org.onap.ccsdk.sli.adaptors.messagerouter.consumer.api.PollingConsumer; +import org.onap.ccsdk.sli.adaptors.messagerouter.consumer.api.RequestHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/* + * java.net based client to build message router consumers + */ +public class PollingConsumerImpl implements PollingConsumer { + + //RunnableConsumer is a private inner class so run cannot be called from other code + private class RunnableConsumer extends AbstractBaseConsumer implements Runnable, PollingConsumer { + private final Logger LOG = LoggerFactory.getLogger(PollingConsumerImpl.class); + private volatile Thread t; + private final Integer fetchPause; + + public RunnableConsumer(String username, String password, String host, String authentication, Integer connectTimeout, Integer readTimeout, Integer fetchPause, String group, String id, String filter, Integer limit, Integer timeoutQueryParamValue) { + super(username, password, host, authentication, connectTimeout, readTimeout, group, id, filter, limit, timeoutQueryParamValue); + this.fetchPause = fetchPause; + } + + public void start() { + t = new Thread(this); + t.start(); + LOG.info("ConsumerImpl started. Fetch period is {} ms.", fetchPause); + } + + public void stop() { + t = null; + LOG.info("ConsumerImpl stopped."); + } + + @Override + public void run() { + if (this.url != null) { + Thread thisThread = Thread.currentThread(); + while (t == thisThread) { + poll(); + try { + LOG.trace("Next fetch from MessageRouter url {} after {} milliseconds.", url, fetchPause); + Thread.sleep(fetchPause); + } catch (InterruptedException e) { + LOG.warn("Thread sleep was interrupted.", e); + } + } + } else { + LOG.error("URL is null, can't listen for messages"); + } + } + + @Override + public void close() throws Exception { + stop(); + } + } + + private RunnableConsumer c; + + public PollingConsumerImpl(String username, String password, String host, String authentication, Integer connectTimeout, Integer readTimeout, Integer fetchPause, String group, String id, String filter, Integer limit, Integer timeoutQueryParamValue) { + c = new RunnableConsumer(username, password, host, authentication, connectTimeout, readTimeout, fetchPause, group, id, filter, limit, timeoutQueryParamValue); + } + + @Override + public void start() { + c.start(); + } + + @Override + public void registerHandler(String topic, RequestHandler requestHandler) { + c.registerHandler(topic, requestHandler); + } + + @Override + public void close() throws Exception { + c.close(); + } +} diff --git a/adaptors/message-router/consumer/provider/src/main/java/org/onap/ccsdk/sli/adaptors/messagerouter/consumer/provider/impl/PullingConsumerImpl.java b/adaptors/message-router/consumer/provider/src/main/java/org/onap/ccsdk/sli/adaptors/messagerouter/consumer/provider/impl/PullingConsumerImpl.java new file mode 100755 index 000000000..e5a5bc4d8 --- /dev/null +++ b/adaptors/message-router/consumer/provider/src/main/java/org/onap/ccsdk/sli/adaptors/messagerouter/consumer/provider/impl/PullingConsumerImpl.java @@ -0,0 +1,39 @@ +/*- + * ============LICENSE_START======================================================= + * openECOMP : SDN-C + * ================================================================================ + * Copyright (C) 2017 - 2019 AT&T Intellectual Property. All rights + * reserved. + * + * Modifications Copyright (C) 2019 IBM. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.ccsdk.sli.adaptors.messagerouter.consumer.provider.impl; + +import org.onap.ccsdk.sli.adaptors.messagerouter.consumer.api.PullingConsumer; + + +public class PullingConsumerImpl extends AbstractBaseConsumer implements PullingConsumer { + + public PullingConsumerImpl(String username, String password, String host, String authentication, Integer connectTimeout, Integer readTimeout, String group, String id, String filter, Integer limit, Integer timeoutQueryParamValue) { + super(username, password, host, authentication, connectTimeout, readTimeout, group, id, filter, limit, timeoutQueryParamValue); + } + + @Override + public void pull() { + this.poll(); + } + +} diff --git a/adaptors/message-router/consumer/provider/src/test/java/org/onap/ccsdk/sli/adaptors/messagerouter/consumer/provider/impl/AbstractBaseConsumerTest.java b/adaptors/message-router/consumer/provider/src/test/java/org/onap/ccsdk/sli/adaptors/messagerouter/consumer/provider/impl/AbstractBaseConsumerTest.java new file mode 100644 index 000000000..75873385b --- /dev/null +++ b/adaptors/message-router/consumer/provider/src/test/java/org/onap/ccsdk/sli/adaptors/messagerouter/consumer/provider/impl/AbstractBaseConsumerTest.java @@ -0,0 +1,91 @@ +package org.onap.ccsdk.sli.adaptors.messagerouter.consumer.provider.impl; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import java.net.HttpURLConnection; +import java.net.URL; + +import org.junit.Test; +import org.onap.ccsdk.sli.adaptors.messagerouter.consumer.api.RequestHandler; + +public class AbstractBaseConsumerTest { + private class DummyConsumer extends AbstractBaseConsumer { + + public DummyConsumer(String username, String password, String host, String authentication, Integer connectTimeout, Integer readTimeout, String group, String id, String filter, Integer limit, Integer timeoutQueryParamValue) { + super(username, password, host, authentication, connectTimeout, readTimeout, group, id, filter, limit, timeoutQueryParamValue); + } + + } + + public DummyConsumer getAuthDummy() { + String username = "deadpool"; + String password = "notSECURE"; + String host = "http://localhost:7001"; + String group = "myCluster"; + String id = "node1"; + Integer connectTimeout = 10000; + Integer readTimeout = 20000; + String authentication = "basic"; + String filter = null; + Integer limit = 3; + Integer timeoutQueryParamValue = 5000; + return new DummyConsumer(username, password, host, authentication, connectTimeout, readTimeout, group, id, filter, limit, timeoutQueryParamValue); + } + + @Test + public void createDummyWithAuth() { + assertNotNull(getAuthDummy()); + } + + @Test + public void createDummyNohAuth() { + String username = null; + String password = null; + String host = "http://localhost:7001"; + String group = "myCluster"; + String id = "node1"; + Integer connectTimeout = 10000; + Integer readTimeout = 20000; + String authentication = "noauth"; + String filter = null; + Integer limit = 3; + Integer timeoutQueryParamValue = 5000; + assertNotNull(new DummyConsumer(username, password, host, authentication, connectTimeout, readTimeout, group, id, filter, limit, timeoutQueryParamValue)); + } + + @Test + public void callClose() throws Exception { + DummyConsumer dummy = getAuthDummy(); + dummy.close(); + assertNotNull(dummy); + } + + @Test + public void registerDummyHandler() throws Exception { + DummyConsumer dummy = getAuthDummy(); + String topic = "politics"; + RequestHandler requestHandler = new RequestHandler() { + + @Override + public void handleRequest(String topic, String requestBody) { + // TODO Auto-generated method stub + + }; + + }; + dummy.registerHandler(topic, requestHandler); + assertEquals(new URL("http://localhost:7001/events/politics/myCluster/node1?timeout=5000&limit=3"), dummy.url); + assertEquals(topic, dummy.topic); + + } + + @Test + public void buildURL() throws Exception { + DummyConsumer dummy = getAuthDummy(); + HttpURLConnection connection = dummy.buildHttpURLConnection(new URL("http://localhost:7001/events/politics/myCluster/node1?timeout=5000&limit=3")); + assertNotNull(connection); + assertEquals("application/json", connection.getRequestProperty("Accept")); + } + +} diff --git a/adaptors/message-router/consumer/provider/src/test/java/org/onap/ccsdk/sli/adaptors/messagerouter/consumer/provider/impl/ConsumerFactoryTest.java b/adaptors/message-router/consumer/provider/src/test/java/org/onap/ccsdk/sli/adaptors/messagerouter/consumer/provider/impl/ConsumerFactoryTest.java new file mode 100644 index 000000000..d1018a014 --- /dev/null +++ b/adaptors/message-router/consumer/provider/src/test/java/org/onap/ccsdk/sli/adaptors/messagerouter/consumer/provider/impl/ConsumerFactoryTest.java @@ -0,0 +1,164 @@ +package org.onap.ccsdk.sli.adaptors.messagerouter.consumer.provider.impl; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import java.util.Properties; + +import org.junit.Test; + +public class ConsumerFactoryTest { + + @Test + public void testFactoryClientCreation() throws Exception { + Properties props = new Properties(); + String userName = "deadpool"; + String password = "notSECURE"; + String host = "http://localhost:7001"; + String group = "myCluster"; + String id = "node1"; + Integer connectTimeout = 10000; + Integer readTimeout = 20000; + props.put("username", userName); + props.put("password", password); + props.put("host", host); + props.put("group", group); + + ConsumerFactory factory = new ConsumerFactory(userName, password, host, group, id, connectTimeout, readTimeout); + assertNotNull(factory.createPollingClient()); + assertNotNull(factory.createPullingClient()); + } + + @Test + public void testFactoryDefaults() throws Exception { + Properties props = new Properties(); + String userName = "deadpool"; + String password = "notSECURE"; + String host = "http://localhost:7001"; + String group = "myCluster"; + String id = "node1"; + Integer connectTimeout = 10000; + Integer readTimeout = 20000; + props.put("username", userName); + props.put("password", password); + props.put("host", host); + props.put("group", group); + + ConsumerFactory factory = new ConsumerFactory(userName, password, host, group, id, connectTimeout, readTimeout); + + assertNotNull(factory.getAuth()); + assertNotNull(factory.getConnectTimeout()); + assertNotNull(factory.getReadTimeout()); + assertNotNull(factory.getFetchPause()); + assertNotNull(factory.getLimit()); + assertNotNull(factory.getTimeoutQueryParamValue()); + } + + @Test + public void testFactoryDefaultsWithProps() { + Properties props = new Properties(); + String userName = "deadpool"; + String password = "notSECURE"; + String host = "http://localhost:7001"; + String auth = "basic"; + String group = "myCluster"; + props.put("username", userName); + props.put("password", password); + props.put("host", host); + props.put("group", group); + + ConsumerFactory factory = new ConsumerFactory(props); + + assertNotNull(factory.getAuth()); + assertNotNull(factory.getConnectTimeout()); + assertNotNull(factory.getReadTimeout()); + assertNotNull(factory.getFetchPause()); + assertNotNull(factory.getLimit()); + assertNotNull(factory.getTimeoutQueryParamValue()); + } + + @Test + public void testFactoryOverrides() throws Exception { + Properties props = new Properties(); + String userName = "deadpool"; + String password = "notSECURE"; + String host = "http://localhost:7001"; + String group = "myCluster"; + props.put("username", userName); + props.put("password", password); + props.put("host", host); + props.put("group", group); + + String connectTimeout = "200"; + String readTimeout = "300"; + String fetchPause = "1000"; + String auth = "noauth"; + String timeoutQueryParamValue = "50"; + String limit = "2"; + props.put("connectTimeoutSeconds", connectTimeout); + props.put("readTimeoutMinutes", readTimeout); + props.put("fetchPause", fetchPause); + props.put("auth", auth); + props.put("timeout", timeoutQueryParamValue); + props.put("limit", limit); + + ConsumerFactory factory = new ConsumerFactory(props); + + assertEquals(auth, factory.getAuth()); + assertEquals(Integer.valueOf(connectTimeout), factory.getConnectTimeout()); + assertEquals(Integer.valueOf(readTimeout), factory.getReadTimeout()); + assertEquals(Integer.valueOf(fetchPause), factory.getFetchPause()); + assertEquals(Integer.valueOf(limit), factory.getLimit()); + assertEquals(Integer.valueOf(timeoutQueryParamValue), factory.getTimeoutQueryParamValue()); + } + + @Test + public void testManualOverrides() { + Properties props = new Properties(); + String userName = "deadpool"; + String password = "notSECURE"; + String host = "http://localhost:7001"; + String auth = "basic"; + String group = "myCluster"; + props.put("username", userName); + props.put("password", password); + props.put("host", host); + props.put("group", group); + + ConsumerFactory factory = new ConsumerFactory(props); + + assertNotNull(factory.getAuth()); + assertNotNull(factory.getConnectTimeout()); + assertNotNull(factory.getReadTimeout()); + assertNotNull(factory.getFetchPause()); + assertNotNull(factory.getLimit()); + assertNotNull(factory.getTimeoutQueryParamValue()); + String newAuth = "noauth"; + factory.setAuth(newAuth); + assertEquals(newAuth, factory.getAuth()); + + Integer connectTimeout = 1; + factory.setConnectTimeout(connectTimeout); + assertEquals(connectTimeout, factory.getConnectTimeout()); + + Integer fetchPause = 5; + factory.setFetchPause(fetchPause); + assertEquals(fetchPause, factory.getFetchPause()); + + factory.setFilter("\"filter\":{\n" + "\"class\":\"And\",\n" + "\"filters\":\n" + "[\n" + "{ \"class\":\"Equals\", \"foo\":\"abc\" },\n" + "{ \"class\":\"Assigned\", \"field\":\"bar\" }\n" + "]\n" + "}"); + assertNotNull(factory.getFilter()); + + Integer limit = 3; + factory.setLimit(limit); + assertEquals(limit, factory.getLimit()); + + Integer readTimeout = 2; + factory.setReadTimeout(readTimeout); + assertEquals(readTimeout, factory.getReadTimeout()); + + Integer timeoutQueryParamValue = 47; + factory.setTimeoutQueryParamValue(timeoutQueryParamValue); + assertEquals(timeoutQueryParamValue, factory.getTimeoutQueryParamValue()); + } + +} diff --git a/adaptors/message-router/pom.xml b/adaptors/message-router/pom.xml new file mode 100755 index 000000000..63d267d2a --- /dev/null +++ b/adaptors/message-router/pom.xml @@ -0,0 +1,33 @@ + + + 4.0.0 + + + org.onap.ccsdk.parent + binding-parent + 2.1.0 + + + + org.onap.ccsdk.sli.adaptors.messagerouter + messagerouter-root + 1.1.1-SNAPSHOT + pom + + + publisher + consumer + + + + + + + org.apache.felix + maven-bundle-plugin + true + + + + + diff --git a/adaptors/message-router/publisher/README.md b/adaptors/message-router/publisher/README.md new file mode 100755 index 000000000..58c1bf470 --- /dev/null +++ b/adaptors/message-router/publisher/README.md @@ -0,0 +1,8 @@ +# Publisher + +## Modules +- api - exports the publisher interface for clients and providers to import +- features - used for managing the feature repository for publisher +- installer - provides a simple install script +- provider - provides an implementation of the publisher api, this implementation assumes the controller has a single identity for publishing to DMAAP message router +- sample.client - a dummy client that posts a simple message to a configured topic during its initialization diff --git a/adaptors/message-router/publisher/api/pom.xml b/adaptors/message-router/publisher/api/pom.xml new file mode 100755 index 000000000..7831284cf --- /dev/null +++ b/adaptors/message-router/publisher/api/pom.xml @@ -0,0 +1,27 @@ + + + 4.0.0 + + + org.onap.ccsdk.sli.adaptors.messagerouter + publisher.aggregate + 1.1.1-SNAPSHOT + + + publisher.api + bundle + + + + + org.apache.felix + maven-bundle-plugin + + + org.onap.ccsdk.sli.adaptors.messagerouter.publisher.api + + + + + + diff --git a/adaptors/message-router/publisher/api/src/main/java/org/onap/ccsdk/sli/adaptors/messagerouter/publisher/api/PublisherApi.java b/adaptors/message-router/publisher/api/src/main/java/org/onap/ccsdk/sli/adaptors/messagerouter/publisher/api/PublisherApi.java new file mode 100755 index 000000000..3551e0441 --- /dev/null +++ b/adaptors/message-router/publisher/api/src/main/java/org/onap/ccsdk/sli/adaptors/messagerouter/publisher/api/PublisherApi.java @@ -0,0 +1,5 @@ +package org.onap.ccsdk.sli.adaptors.messagerouter.publisher.api; + +public interface PublisherApi { + public Boolean publish(String topic, String body); +} diff --git a/adaptors/message-router/publisher/installer/pom.xml b/adaptors/message-router/publisher/installer/pom.xml new file mode 100644 index 000000000..bc0565e5b --- /dev/null +++ b/adaptors/message-router/publisher/installer/pom.xml @@ -0,0 +1,133 @@ + + + 4.0.0 + + + org.onap.ccsdk.sli.adaptors.messagerouter + publisher.aggregate + 1.1.1-SNAPSHOT + + + org.onap.ccsdk.sli.adaptors.messagerouter + publisher.installer + 1.1.1-SNAPSHOT + pom + + + messagerouter-publisher + messagerouter-publisher + mvn:${project.groupId}/publisher.features/${project.version}/xml/features + false + + + + + ${project.groupId} + publisher.api + ${project.version} + + + ${project.groupId} + publisher.provider + ${project.version} + + + ${project.groupId} + sample.client + ${project.version} + + + + + + + maven-assembly-plugin + 2.6 + + + maven-repo-zip + + single + + package + + true + stage/${application.name}-${project.version} + + src/assembly/assemble_mvnrepo_zip.xml + + true + + + + installer-zip + + single + + package + + true + ${application.name}-${project.version}-installer + + src/assembly/assemble_installer_zip.xml + + false + + + + + + org.apache.maven.plugins + maven-dependency-plugin + + + copy-dependencies + + copy-dependencies + + prepare-package + + false + ${project.build.directory}/assembly/system + false + true + true + true + false + false + ${project.groupId} + provided + + + + + + maven-resources-plugin + 2.6 + + + copy-version + + copy-resources + + validate + + ${basedir}/target/stage + + + src/main/resources/scripts + + install-feature.sh + + true + + + + + + + + + + + diff --git a/adaptors/message-router/publisher/installer/src/assembly/assemble_installer_zip.xml b/adaptors/message-router/publisher/installer/src/assembly/assemble_installer_zip.xml new file mode 100644 index 000000000..c6169a879 --- /dev/null +++ b/adaptors/message-router/publisher/installer/src/assembly/assemble_installer_zip.xml @@ -0,0 +1,59 @@ + + + + + + installer_zip + + zip + + + + false + + + + target/stage/ + ${application.name} + 755 + + *.sh + + + + target/stage/ + ${application.name} + 644 + + *.sh + + + + + + + \ No newline at end of file diff --git a/adaptors/message-router/publisher/installer/src/assembly/assemble_mvnrepo_zip.xml b/adaptors/message-router/publisher/installer/src/assembly/assemble_mvnrepo_zip.xml new file mode 100644 index 000000000..377b5b153 --- /dev/null +++ b/adaptors/message-router/publisher/installer/src/assembly/assemble_mvnrepo_zip.xml @@ -0,0 +1,49 @@ + + + + + + repo + + zip + + + + false + + + + target/assembly/ + . + + + + + + + + \ No newline at end of file diff --git a/adaptors/message-router/publisher/installer/src/main/resources/scripts/install-feature.sh b/adaptors/message-router/publisher/installer/src/main/resources/scripts/install-feature.sh new file mode 100644 index 000000000..15dc0c27a --- /dev/null +++ b/adaptors/message-router/publisher/installer/src/main/resources/scripts/install-feature.sh @@ -0,0 +1,39 @@ +#!/bin/bash + +### +# ============LICENSE_START======================================================= +# openECOMP : SDN-C +# ================================================================================ +# Copyright (C) 2017 AT&T Intellectual Property. All rights +# reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +### + +ODL_HOME=${ODL_HOME:-/opt/opendaylight/current} +ODL_KARAF_CLIENT=${ODL_KARAF_CLIENT:-${ODL_HOME}/bin/client} +INSTALLERDIR=$(dirname $0) + +REPOZIP=${INSTALLERDIR}/${features.boot}-${project.version}.zip + +if [ -f ${REPOZIP} ] +then + unzip -d ${ODL_HOME} ${REPOZIP} +else + echo "ERROR : repo zip ($REPOZIP) not found" + exit 1 +fi + +${ODL_KARAF_CLIENT} feature:repo-add ${features.repositories} +${ODL_KARAF_CLIENT} feature:install ${features.boot} \ No newline at end of file diff --git a/adaptors/message-router/publisher/pom.xml b/adaptors/message-router/publisher/pom.xml new file mode 100755 index 000000000..6fb9c82d3 --- /dev/null +++ b/adaptors/message-router/publisher/pom.xml @@ -0,0 +1,20 @@ + + + 4.0.0 + + + org.onap.ccsdk.sli.adaptors.messagerouter + messagerouter-root + 1.1.1-SNAPSHOT + + + publisher.aggregate + pom + + + api + provider + sample.client + installer + + diff --git a/adaptors/message-router/publisher/provider/pom.xml b/adaptors/message-router/publisher/provider/pom.xml new file mode 100755 index 000000000..7cd311448 --- /dev/null +++ b/adaptors/message-router/publisher/provider/pom.xml @@ -0,0 +1,39 @@ + + + 4.0.0 + + + org.onap.ccsdk.sli.adaptors.messagerouter + publisher.aggregate + 1.1.1-SNAPSHOT + + + publisher.provider + bundle + + + + ${project.groupId} + publisher.api + ${project.version} + + + org.slf4j + slf4j-api + + + + + + + org.apache.felix + maven-bundle-plugin + + + ${project.groupId}.publisher.provider.impl + + + + + + diff --git a/adaptors/message-router/publisher/provider/src/main/java/org/onap/ccsdk/sli/adaptors/messagerouter/publisher/provider/impl/PublisherApiImpl.java b/adaptors/message-router/publisher/provider/src/main/java/org/onap/ccsdk/sli/adaptors/messagerouter/publisher/provider/impl/PublisherApiImpl.java new file mode 100755 index 000000000..4aedc6071 --- /dev/null +++ b/adaptors/message-router/publisher/provider/src/main/java/org/onap/ccsdk/sli/adaptors/messagerouter/publisher/provider/impl/PublisherApiImpl.java @@ -0,0 +1,178 @@ +/** + * ============LICENSE_START==================================================== + * org.onap.aaf + * =========================================================================== + * Copyright (c) 2018 AT&T Intellectual Property. All rights reserved. + * + * Modifications Copyright (C) 2019 IBM. + * =========================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END==================================================== + * + */ +package org.onap.ccsdk.sli.adaptors.messagerouter.publisher.provider.impl; + +import java.io.BufferedReader; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.net.HttpURLConnection; +import java.net.SocketException; +import java.net.URL; +import java.util.Base64; + +import org.onap.ccsdk.sli.adaptors.messagerouter.publisher.api.PublisherApi; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class PublisherApiImpl implements PublisherApi { + private static final Logger logger = LoggerFactory.getLogger(PublisherApiImpl.class); + protected static final Integer DEFAULT_CONNECT_TIMEOUT = 30000; // will be treated as 30 seconds + protected static final Integer DEFAULT_READ_TIMEOUT = 180000; // will be treated as 3 minutes + private String authorizationString; + protected Integer connectTimeout; + protected Integer readTimeout; + protected String baseUrl; + protected String username; + protected String[] hosts; + private String password; + + public PublisherApiImpl() { + connectTimeout = DEFAULT_CONNECT_TIMEOUT; + readTimeout = DEFAULT_READ_TIMEOUT; + } + + public void setUsername(String username) { + this.username = username; + setAuthorizationString(); + } + + public void setPassword(String password) { + this.password = password; + setAuthorizationString(); + } + + public void setHost(String hostString) { + // a comma separated list of hosts can be passed in or a single host may be used + if (!hostString.contains(",")) { + this.hosts = new String[] { hostString }; + } else { + this.hosts = hostString.split(","); + } + } + + public void init() { + setAuthorizationString(); + } + + protected String buildUrlString(Integer hostIndex, String topic) { + return hosts[hostIndex] + "/events/" + topic; + } + + protected void configureHttpURLConnection(HttpURLConnection httpUrlConnection) { + httpUrlConnection.setRequestProperty("Content-Type", "application/json"); + } + + public Boolean publish(String topic, String body) { + for (int hostIndex = 0; hostIndex < hosts.length; hostIndex++) { + HttpURLConnection httpUrlConnection = null; + URL url = null; + try { + url = new URL(buildUrlString(hostIndex, topic)); + logger.info("Publishing body to topic {} using the url {}", topic, url); + logger.info("Message to publish is\n{}", body); + httpUrlConnection = buildHttpURLConnection(url); + httpUrlConnection.setDoInput(true); + httpUrlConnection.setDoOutput(true); + httpUrlConnection.setUseCaches(false); + httpUrlConnection.setRequestMethod("POST"); + + // Write message + httpUrlConnection.setRequestProperty("Content-Length", Integer.toString(body.length())); + DataOutputStream outStr = new DataOutputStream(httpUrlConnection.getOutputStream()); + outStr.write(body.getBytes()); + outStr.close(); + + int status = httpUrlConnection.getResponseCode(); + logger.info("Publishing body for topic {} using url {} returned status {}.", topic, url, status); + if (status < 300) { + String responseFromDMaaP = readFromStream(httpUrlConnection.getInputStream()); + logger.info("Message router response is\n{}", responseFromDMaaP); + return true; + } else { + if (httpUrlConnection.getErrorStream() != null) { + String responseFromDMaaP = readFromStream(httpUrlConnection.getErrorStream()); + logger.warn("Publishing body for topic {} using url {} failed." + " Error message is\n{}", + topic, url, responseFromDMaaP); + } + return false; + } + + } catch (SocketException socketException) { + logger.error("SocketException was thrown during publishing message to DMaaP on url {}.", url, + socketException); + if (hostIndex < hosts.length) { + logger.info("Message sent to {} failed with a SocketException, but will be tried on {}", + hosts[hostIndex], hosts[hostIndex + 1]); + } + } catch (Exception e) { + logger.warn("Exception was thrown during publishing message to DMaaP on url {}.", url, e); + return false; + } finally { + if (httpUrlConnection != null) { + httpUrlConnection.disconnect(); + } + } + } + return false; + } + + protected void setAuthorizationString() { + String str = buildAuthorizationString(this.username, this.password); + this.authorizationString = str; + //System.out.println(this.authorizationString); + } + + protected String buildAuthorizationString(String username, String password) { + String basicAuthString = username + ":" + password; + basicAuthString = Base64.getEncoder().encodeToString(basicAuthString.getBytes()); + return "Basic " + basicAuthString; + } + + protected HttpURLConnection buildHttpURLConnection(URL url) throws IOException { + HttpURLConnection httpUrlConnection = (HttpURLConnection) url.openConnection(); + if (authorizationString != null) { + System.out.println(authorizationString); + httpUrlConnection.setRequestProperty("Authorization", authorizationString); + } + httpUrlConnection.setRequestProperty("Accept", "application/json"); + httpUrlConnection.setUseCaches(false); + httpUrlConnection.setConnectTimeout(connectTimeout); + httpUrlConnection.setReadTimeout(readTimeout); + configureHttpURLConnection(httpUrlConnection); + return httpUrlConnection; + } + + protected String readFromStream(InputStream stream) throws IOException { + BufferedReader br = new BufferedReader(new InputStreamReader(stream)); + StringBuilder sb = new StringBuilder(); + String line; + while ((line = br.readLine()) != null) { + sb.append(line); + } + br.close(); + return sb.toString(); + } + +} \ No newline at end of file diff --git a/adaptors/message-router/publisher/provider/src/main/resources/OSGI-INF/blueprint/blueprint.xml b/adaptors/message-router/publisher/provider/src/main/resources/OSGI-INF/blueprint/blueprint.xml new file mode 100755 index 000000000..4702a6260 --- /dev/null +++ b/adaptors/message-router/publisher/provider/src/main/resources/OSGI-INF/blueprint/blueprint.xml @@ -0,0 +1,14 @@ + + + + + + + + + + \ No newline at end of file diff --git a/adaptors/message-router/publisher/provider/src/test/java/org/onap/ccsdk/sli/adaptors/messagerouter/publisher/provider/impl/PublisherApiImplTest.java b/adaptors/message-router/publisher/provider/src/test/java/org/onap/ccsdk/sli/adaptors/messagerouter/publisher/provider/impl/PublisherApiImplTest.java new file mode 100644 index 000000000..53744f73e --- /dev/null +++ b/adaptors/message-router/publisher/provider/src/test/java/org/onap/ccsdk/sli/adaptors/messagerouter/publisher/provider/impl/PublisherApiImplTest.java @@ -0,0 +1,51 @@ +package org.onap.ccsdk.sli.adaptors.messagerouter.publisher.provider.impl; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import java.net.HttpURLConnection; +import java.net.URL; + +import org.junit.Test; + +public class PublisherApiImplTest { + @Test + public void verifyDefaultTimeouts() { + PublisherApiImpl pub = new PublisherApiImpl(); + assertEquals(pub.DEFAULT_CONNECT_TIMEOUT, pub.connectTimeout); + assertEquals(pub.DEFAULT_READ_TIMEOUT, pub.readTimeout); + } + + @Test + public void buildHttpURLConnection() throws Exception { + PublisherApiImpl pub = new PublisherApiImpl(); + pub.init(); + + String myUserName = "Batman"; + pub.setUsername(myUserName); + assertEquals(myUserName, pub.username); + String password = "P@$$"; + pub.setPassword(password); + + HttpURLConnection httpUrlConnection = pub.buildHttpURLConnection(new URL("http://localhost:7001")); + assertNotNull(httpUrlConnection.getReadTimeout()); + assertNotNull(httpUrlConnection.getConnectTimeout()); + assertEquals("application/json", httpUrlConnection.getRequestProperty("Content-Type")); + assertEquals("application/json", httpUrlConnection.getRequestProperty("Accept")); + } + + @Test + public void testMultipleHosts() { + PublisherApiImpl pub = new PublisherApiImpl(); + String myTopic = "worldNews"; + String hostOne = "http://localhost:7001"; + String hostTwo = "http://localhost:7002"; + String hostThree = "http://localhost:7003"; + + pub.setHost(hostOne + "," + hostTwo + "," + hostThree); + + assertEquals("http://localhost:7001/events/worldNews", pub.buildUrlString(0, myTopic)); + assertEquals("http://localhost:7002/events/worldNews", pub.buildUrlString(1, myTopic)); + assertEquals("http://localhost:7003/events/worldNews", pub.buildUrlString(2, myTopic)); + } +} \ No newline at end of file diff --git a/adaptors/message-router/publisher/sample.client/pom.xml b/adaptors/message-router/publisher/sample.client/pom.xml new file mode 100755 index 000000000..694af1f4a --- /dev/null +++ b/adaptors/message-router/publisher/sample.client/pom.xml @@ -0,0 +1,39 @@ + + + 4.0.0 + + + org.onap.ccsdk.sli.adaptors.messagerouter + publisher.aggregate + 1.1.1-SNAPSHOT + + + sample.client + bundle + + + + ${project.groupId} + publisher.api + ${project.version} + + + org.slf4j + slf4j-api + + + + + + + org.apache.felix + maven-bundle-plugin + + + ${project.groupId}.publisher.client.impl + + + + + + diff --git a/adaptors/message-router/publisher/sample.client/src/main/java/org/onap/ccsdk/sli/adaptors/messagerouter/publisher/client/impl/ClientImpl.java b/adaptors/message-router/publisher/sample.client/src/main/java/org/onap/ccsdk/sli/adaptors/messagerouter/publisher/client/impl/ClientImpl.java new file mode 100755 index 000000000..d1610305a --- /dev/null +++ b/adaptors/message-router/publisher/sample.client/src/main/java/org/onap/ccsdk/sli/adaptors/messagerouter/publisher/client/impl/ClientImpl.java @@ -0,0 +1,58 @@ +/** + * ============LICENSE_START==================================================== + * org.onap.aaf + * =========================================================================== + * Copyright (c) 2019 AT&T Intellectual Property. All rights reserved. + * + * Modifications Copyright (C) 2019 IBM. + * =========================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END==================================================== + * + */ + +package org.onap.ccsdk.sli.adaptors.messagerouter.publisher.client.impl; + +import org.onap.ccsdk.sli.adaptors.messagerouter.publisher.api.PublisherApi; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ClientImpl { + private static final Logger logger = LoggerFactory.getLogger(ClientImpl.class); + private String topic; + private PublisherApi publisher; + + public ClientImpl() { + + } + + public void setPublisher(PublisherApi publisherApi) { + this.publisher = publisherApi; + } + + public void setTopic(String topic) { + this.topic = topic; + } + + + + public void init() { + for (int i = 0; i < 5; i++) { + String body = "{\"hello\":\"world " + Math.random() + "\"}"; + logger.error("Loop iteration " + i + " sending body " + body + " to the topic " + topic); + Boolean result = publisher.publish(topic, body); + logger.error("Loop iteration " + i + " returned the boolean value " + result); + } + } + +} \ No newline at end of file diff --git a/adaptors/message-router/publisher/sample.client/src/main/resources/OSGI-INF/blueprint/blueprint.xml b/adaptors/message-router/publisher/sample.client/src/main/resources/OSGI-INF/blueprint/blueprint.xml new file mode 100755 index 000000000..c2e981da2 --- /dev/null +++ b/adaptors/message-router/publisher/sample.client/src/main/resources/OSGI-INF/blueprint/blueprint.xml @@ -0,0 +1,16 @@ + + + + + + + + + + + \ No newline at end of file diff --git a/adaptors/message-router/publisher/sample.client/src/test/java/org/onap/ccsdk/sli/adaptors/messagerouter/publisher/client/impl/ClientImplTest.java b/adaptors/message-router/publisher/sample.client/src/test/java/org/onap/ccsdk/sli/adaptors/messagerouter/publisher/client/impl/ClientImplTest.java new file mode 100644 index 000000000..82a1566c3 --- /dev/null +++ b/adaptors/message-router/publisher/sample.client/src/test/java/org/onap/ccsdk/sli/adaptors/messagerouter/publisher/client/impl/ClientImplTest.java @@ -0,0 +1,29 @@ +package org.onap.ccsdk.sli.adaptors.messagerouter.publisher.client.impl; + +import static org.junit.Assert.assertEquals; + +import org.junit.Test; +import org.onap.ccsdk.sli.adaptors.messagerouter.publisher.api.PublisherApi; + +public class ClientImplTest { + + @Test + public void testSetTopic() { + ClientImpl impl = new ClientImpl(); + String myTopic = "stock updates"; + impl.setTopic(myTopic); + + PublisherApi publisherImpl = new PublisherApi() { + + @Override + public Boolean publish(String topic, String body) { + assertEquals(myTopic,topic); + return true; + } + + }; + impl.setPublisher(publisherImpl); + impl.init(); + } + +} -- cgit 1.2.3-korg