diff options
Diffstat (limited to 'message-router/consumer')
18 files changed, 1197 insertions, 0 deletions
diff --git a/message-router/consumer/README.md b/message-router/consumer/README.md new file mode 100755 index 000000000..9d01256a7 --- /dev/null +++ b/message-router/consumer/README.md @@ -0,0 +1,7 @@ +# Consumer
+
+## Modules
+- api - exports the consumer interface for clients and providers to import
+- features - used for managing the feature repository for consumer
+- installer - provides a simple install script
+- provider - provides an implementation of the consumer api
\ No newline at end of file diff --git a/message-router/consumer/api/pom.xml b/message-router/consumer/api/pom.xml new file mode 100755 index 000000000..e77eb9793 --- /dev/null +++ b/message-router/consumer/api/pom.xml @@ -0,0 +1,27 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.onap.ccsdk.sli.adaptors.messagerouter</groupId> + <artifactId>consumer.aggregate</artifactId> + <version>1.1.1-SNAPSHOT</version> + </parent> + + <artifactId>consumer.api</artifactId> + <packaging>bundle</packaging> + + <build> + <plugins> + <plugin> + <groupId>org.apache.felix</groupId> + <artifactId>maven-bundle-plugin</artifactId> + <configuration> + <instructions> + <Export-Package>org.onap.ccsdk.sli.adaptors.messagerouter.consumer.api</Export-Package> + </instructions> + </configuration> + </plugin> + </plugins> + </build> +</project> diff --git a/message-router/consumer/api/src/main/java/org/onap/ccsdk/sli/adaptors/messagerouter/consumer/api/ConsumerApi.java b/message-router/consumer/api/src/main/java/org/onap/ccsdk/sli/adaptors/messagerouter/consumer/api/ConsumerApi.java new file mode 100755 index 000000000..41deade85 --- /dev/null +++ b/message-router/consumer/api/src/main/java/org/onap/ccsdk/sli/adaptors/messagerouter/consumer/api/ConsumerApi.java @@ -0,0 +1,6 @@ +package org.onap.ccsdk.sli.adaptors.messagerouter.consumer.api;
+
+public interface ConsumerApi extends AutoCloseable {
+ //registers a handler to handle a specific topic, should be called only once per client
+ public void registerHandler(String topic, RequestHandler requestHandler);
+}
diff --git a/message-router/consumer/api/src/main/java/org/onap/ccsdk/sli/adaptors/messagerouter/consumer/api/PollingConsumer.java b/message-router/consumer/api/src/main/java/org/onap/ccsdk/sli/adaptors/messagerouter/consumer/api/PollingConsumer.java new file mode 100755 index 000000000..29fc1c764 --- /dev/null +++ b/message-router/consumer/api/src/main/java/org/onap/ccsdk/sli/adaptors/messagerouter/consumer/api/PollingConsumer.java @@ -0,0 +1,7 @@ +package org.onap.ccsdk.sli.adaptors.messagerouter.consumer.api;
+
+public interface PollingConsumer extends ConsumerApi {
+
+ // Starts polling message router for messages, won't stop until close it called
+ void start();
+}
diff --git a/message-router/consumer/api/src/main/java/org/onap/ccsdk/sli/adaptors/messagerouter/consumer/api/PullingConsumer.java b/message-router/consumer/api/src/main/java/org/onap/ccsdk/sli/adaptors/messagerouter/consumer/api/PullingConsumer.java new file mode 100755 index 000000000..1187aaceb --- /dev/null +++ b/message-router/consumer/api/src/main/java/org/onap/ccsdk/sli/adaptors/messagerouter/consumer/api/PullingConsumer.java @@ -0,0 +1,7 @@ +package org.onap.ccsdk.sli.adaptors.messagerouter.consumer.api;
+
+public interface PullingConsumer extends ConsumerApi {
+
+ //Pulls a single batch of messages
+ void pull();
+}
diff --git a/message-router/consumer/api/src/main/java/org/onap/ccsdk/sli/adaptors/messagerouter/consumer/api/RequestHandler.java b/message-router/consumer/api/src/main/java/org/onap/ccsdk/sli/adaptors/messagerouter/consumer/api/RequestHandler.java new file mode 100755 index 000000000..07a117843 --- /dev/null +++ b/message-router/consumer/api/src/main/java/org/onap/ccsdk/sli/adaptors/messagerouter/consumer/api/RequestHandler.java @@ -0,0 +1,5 @@ +package org.onap.ccsdk.sli.adaptors.messagerouter.consumer.api; + +public interface RequestHandler { + public void handleRequest(String topic, String requestBody); +} diff --git a/message-router/consumer/installer/pom.xml b/message-router/consumer/installer/pom.xml new file mode 100755 index 000000000..2b8eb688e --- /dev/null +++ b/message-router/consumer/installer/pom.xml @@ -0,0 +1,128 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.onap.ccsdk.sli.adaptors.messagerouter</groupId> + <artifactId>consumer.aggregate</artifactId> + <version>1.1.1-SNAPSHOT</version> + </parent> + + <groupId>org.onap.ccsdk.sli.adaptors.messagerouter</groupId> + <artifactId>consumer.installer</artifactId> + <version>1.1.1-SNAPSHOT</version> + <packaging>pom</packaging> + + <properties> + <application.name>messagerouter-consumer</application.name> + <features.boot>messagerouter-consumer</features.boot> + <features.repositories>mvn:${project.groupId}/consumer.features/${project.version}/xml/features</features.repositories> + <include.transitive.dependencies>false</include.transitive.dependencies> + </properties> + + <dependencies> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>consumer.api</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>consumer.provider</artifactId> + <version>${project.version}</version> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <artifactId>maven-assembly-plugin</artifactId> + <version>2.6</version> + <executions> + <execution> + <id>maven-repo-zip</id> + <goals> + <goal>single</goal> + </goals> + <phase>package</phase> + <configuration> + <attach>true</attach> + <finalName>stage/${application.name}-${project.version}</finalName> + <descriptors> + <descriptor>src/assembly/assemble_mvnrepo_zip.xml</descriptor> + </descriptors> + <appendAssemblyId>true</appendAssemblyId> + </configuration> + </execution> + <execution> + <id>installer-zip</id> + <goals> + <goal>single</goal> + </goals> + <phase>package</phase> + <configuration> + <attach>true</attach> + <finalName>${application.name}-${project.version}-installer</finalName> + <descriptors> + <descriptor>src/assembly/assemble_installer_zip.xml</descriptor> + </descriptors> + <appendAssemblyId>false</appendAssemblyId> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-dependency-plugin</artifactId> + <executions> + <execution> + <id>copy-dependencies</id> + <goals> + <goal>copy-dependencies</goal> + </goals> + <phase>prepare-package</phase> + <configuration> + <transitive>false</transitive> + <outputDirectory>${project.build.directory}/assembly/system</outputDirectory> + <overWriteReleases>false</overWriteReleases> + <overWriteSnapshots>true</overWriteSnapshots> + <overWriteIfNewer>true</overWriteIfNewer> + <useRepositoryLayout>true</useRepositoryLayout> + <addParentPoms>false</addParentPoms> + <copyPom>false</copyPom> + <includeGroupIds>${project.groupId}</includeGroupIds> + <scope>provided</scope> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <artifactId>maven-resources-plugin</artifactId> + <version>2.6</version> + <executions> + <execution> + <id>copy-version</id> + <goals> + <goal>copy-resources</goal> + </goals><!-- here the phase you need --> + <phase>validate</phase> + <configuration> + <outputDirectory>${basedir}/target/stage</outputDirectory> + <resources> + <resource> + <directory>src/main/resources/scripts</directory> + <includes> + <include>install-feature.sh</include> + </includes> + <filtering>true</filtering> + </resource> + </resources> + </configuration> + </execution> + + </executions> + </plugin> + + </plugins> + </build> +</project> diff --git a/message-router/consumer/installer/src/assembly/assemble_installer_zip.xml b/message-router/consumer/installer/src/assembly/assemble_installer_zip.xml new file mode 100755 index 000000000..c6169a879 --- /dev/null +++ b/message-router/consumer/installer/src/assembly/assemble_installer_zip.xml @@ -0,0 +1,59 @@ +<!-- + ============LICENSE_START======================================================= + openECOMP : SDN-C + ================================================================================ + Copyright (C) 2017 AT&T Intellectual Property. All rights + reserved. + ================================================================================ + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + ============LICENSE_END========================================================= + --> + +<!-- Defines how we build the .zip file which is our distribution. --> + +<assembly + xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd"> + <id>installer_zip</id> + <formats> + <format>zip</format> + </formats> + + <!-- we want "system" and related files right at the root level + as this file is suppose to be unzip on top of a karaf + distro. --> + <includeBaseDirectory>false</includeBaseDirectory> + + <fileSets> + <fileSet> + <directory>target/stage/</directory> + <outputDirectory>${application.name}</outputDirectory> + <fileMode>755</fileMode> + <includes> + <include>*.sh</include> + </includes> + </fileSet> + <fileSet> + <directory>target/stage/</directory> + <outputDirectory>${application.name}</outputDirectory> + <fileMode>644</fileMode> + <excludes> + <exclude>*.sh</exclude> + </excludes> + </fileSet> + </fileSets> + + + +</assembly>
\ No newline at end of file diff --git a/message-router/consumer/installer/src/assembly/assemble_mvnrepo_zip.xml b/message-router/consumer/installer/src/assembly/assemble_mvnrepo_zip.xml new file mode 100755 index 000000000..409c66224 --- /dev/null +++ b/message-router/consumer/installer/src/assembly/assemble_mvnrepo_zip.xml @@ -0,0 +1,47 @@ +<!-- + ============LICENSE_START======================================================= + openECOMP : SDN-C + ================================================================================ + Copyright (C) 2017 AT&T Intellectual Property. All rights + reserved. + ================================================================================ + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + ============LICENSE_END========================================================= + --> + +<!-- Defines how we build the .zip file which is our distribution. --> + +<assembly + xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd"> + <id>repo</id> + <formats> + <format>zip</format> + </formats> + + <!-- we want "system" and related files right at the root level + as this file is suppose to be unzip on top of a karaf + distro. --> + <includeBaseDirectory>false</includeBaseDirectory> + + <fileSets> + <fileSet> + <directory>target/assembly/</directory> + <outputDirectory>.</outputDirectory> + <excludes> + </excludes> + </fileSet> + </fileSets> + +</assembly>
\ No newline at end of file diff --git a/message-router/consumer/installer/src/main/resources/scripts/install-feature.sh b/message-router/consumer/installer/src/main/resources/scripts/install-feature.sh new file mode 100755 index 000000000..6f2518f64 --- /dev/null +++ b/message-router/consumer/installer/src/main/resources/scripts/install-feature.sh @@ -0,0 +1,39 @@ +#!/bin/bash + +### +# ============LICENSE_START======================================================= +# openECOMP : SDN-C +# ================================================================================ +# Copyright (C) 2017 AT&T Intellectual Property. All rights +# reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +### + +ODL_HOME=${ODL_HOME:-/opt/opendaylight/current} +ODL_KARAF_CLIENT=${ODL_KARAF_CLIENT:-${ODL_HOME}/bin/client} +INSTALLERDIR=$(dirname $0) + +REPOZIP=${INSTALLERDIR}/${features.boot}-${project.version}-repo.zip + +if [ -f ${REPOZIP} ] +then + unzip -d ${ODL_HOME} ${REPOZIP} +else + echo "ERROR : repo zip ($REPOZIP) not found" + exit 1 +fi + +${ODL_KARAF_CLIENT} feature:repo-add ${features.repositories} +${ODL_KARAF_CLIENT} feature:install ${features.boot}
\ No newline at end of file diff --git a/message-router/consumer/pom.xml b/message-router/consumer/pom.xml new file mode 100755 index 000000000..cad1bbba0 --- /dev/null +++ b/message-router/consumer/pom.xml @@ -0,0 +1,19 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.onap.ccsdk.sli.adaptors.messagerouter</groupId> + <artifactId>messagerouter-root</artifactId> + <version>1.1.1-SNAPSHOT</version> + </parent> + + <artifactId>consumer.aggregate</artifactId> + <packaging>pom</packaging> + + <modules> + <module>api</module> + <module>provider</module> + <module>installer</module> + </modules> +</project> diff --git a/message-router/consumer/provider/pom.xml b/message-router/consumer/provider/pom.xml new file mode 100755 index 000000000..d3e568c40 --- /dev/null +++ b/message-router/consumer/provider/pom.xml @@ -0,0 +1,43 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.onap.ccsdk.sli.adaptors.messagerouter</groupId> + <artifactId>consumer.aggregate</artifactId> + <version>1.1.1-SNAPSHOT</version> + </parent> + + <artifactId>consumer.provider</artifactId> + <packaging>bundle</packaging> + + <dependencies> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>consumer.api</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + <dependency> + <groupId>com.google.code.gson</groupId> + <artifactId>gson</artifactId> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.felix</groupId> + <artifactId>maven-bundle-plugin</artifactId> + <configuration> + <instructions> + <Export-Package>${project.groupId}.consumer.provider.impl</Export-Package> + </instructions> + </configuration> + </plugin> + </plugins> + </build> +</project> diff --git a/message-router/consumer/provider/src/main/java/org/onap/ccsdk/sli/adaptors/messagerouter/consumer/provider/impl/AbstractBaseConsumer.java b/message-router/consumer/provider/src/main/java/org/onap/ccsdk/sli/adaptors/messagerouter/consumer/provider/impl/AbstractBaseConsumer.java new file mode 100755 index 000000000..8937f7b91 --- /dev/null +++ b/message-router/consumer/provider/src/main/java/org/onap/ccsdk/sli/adaptors/messagerouter/consumer/provider/impl/AbstractBaseConsumer.java @@ -0,0 +1,207 @@ +/*- + * ============LICENSE_START======================================================= + * openECOMP : SDN-C + * ================================================================================ + * Copyright (C) 2017 - 2018 AT&T Intellectual Property. All rights + * reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.ccsdk.sli.adaptors.messagerouter.consumer.provider.impl; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.net.HttpURLConnection; +import java.net.MalformedURLException; +import java.net.URL; +import java.util.Base64; + +import javax.net.ssl.HostnameVerifier; +import javax.net.ssl.HttpsURLConnection; +import javax.net.ssl.SSLSession; + +import org.onap.ccsdk.sli.adaptors.messagerouter.consumer.api.ConsumerApi; +import org.onap.ccsdk.sli.adaptors.messagerouter.consumer.api.RequestHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.gson.Gson; +import com.google.gson.JsonParseException; + +/* + * java.net based client to build message router consumers + */ +public abstract class AbstractBaseConsumer implements ConsumerApi { + private static final Logger LOG = LoggerFactory.getLogger(AbstractBaseConsumer.class); + private static final String REQUEST_METHOD = "GET"; + + private final String host; + private final Integer connectTimeout; + private final Integer readTimeout; + private final String group; + private final String id; + private final String filter; + private final Integer limit; + private final Integer timeoutQueryParamValue; + private final String authorizationString; + + protected RequestHandler requestHandler; + protected URL url; + protected String topic; + + public AbstractBaseConsumer(String username, String password, String host, String authentication, Integer connectTimeout, Integer readTimeout, String group, String id, String filter, Integer limit, Integer timeoutQueryParamValue) { + this.host = host; + this.connectTimeout = connectTimeout; + this.readTimeout = readTimeout; + this.group = group; + this.id = id; + this.filter = filter; + this.limit = limit; + this.timeoutQueryParamValue = timeoutQueryParamValue; + + if ("basic".equals(authentication)) { + if (username != null && password != null && username.length() > 0 && password.length() > 0) { + authorizationString = buildAuthorizationString(username, password); + } else { + throw new IllegalStateException("Authentication is set to basic but username or password is missing"); + } + } else if ("noauth".equals(authentication)) { + authorizationString = null; + } else { + throw new IllegalStateException("Unknown authentication method: " + authentication); + } + } + + protected void poll() { + String responseBody = performHttpOperation(); + if (responseBody != null && !responseBody.startsWith("[]")) { + LOG.info("New message was fetched from MessageRouter."); + LOG.trace("Fetched message is\n{}", responseBody); + try { + String[] requests = new Gson().fromJson(responseBody, String[].class); + if (requests != null) { + for (String request : requests) { + if (request != null) { + requestHandler.handleRequest(topic,request); + } + } + } else { + LOG.warn("Deserialization of received message results in null array.", responseBody); + } + } catch (JsonParseException e) { + LOG.warn("Received message has bad format. Expected format is JSON."); + } + } else { + LOG.trace("No new message was fetched from MessageRouter."); + } + } + + private String buildlUrlString(String topic) { + StringBuilder sb = new StringBuilder(); + sb.append(host + "/events/" + topic + "/" + group + "/" + id); + sb.append("?timeout=" + timeoutQueryParamValue); + + if (limit != null) { + sb.append("&limit=" + limit); + } + if (filter != null) { + sb.append("&filter=" + filter); + } + return sb.toString(); + } + + private String performHttpOperation() { + HttpURLConnection httpUrlConnection = null; + try { + httpUrlConnection = buildHttpURLConnection(url); + httpUrlConnection.setRequestMethod(REQUEST_METHOD); + httpUrlConnection.connect(); + int status = httpUrlConnection.getResponseCode(); + if (status < 300) { + return readFromStream(httpUrlConnection.getInputStream()); + } else { + String response = readFromStream(httpUrlConnection.getErrorStream()); + LOG.warn("Fetching message from MessageRouter on url {} failed with http status {}. Error message is\n{}.", url, status, response); + } + } catch (Exception e) { + LOG.warn("Exception was thrown during fetching message from MessageRouter on url {}.", url, e); + } finally { + if (httpUrlConnection != null) { + httpUrlConnection.disconnect(); + } + } + return null; + } + + private String buildAuthorizationString(String userName, String password) { + String basicAuthString = userName + ":" + password; + basicAuthString = Base64.getEncoder().encodeToString(basicAuthString.getBytes()); + return "Basic " + basicAuthString; + } + + protected HttpURLConnection buildHttpURLConnection(URL url) throws IOException { + HttpURLConnection httpUrlConnection = (HttpURLConnection) url.openConnection(); + if (authorizationString != null) { + httpUrlConnection.setRequestProperty("Authorization", authorizationString); + } + httpUrlConnection.setRequestProperty("Accept", "application/json"); + httpUrlConnection.setUseCaches(false); + httpUrlConnection.setConnectTimeout(connectTimeout); + httpUrlConnection.setReadTimeout(readTimeout); + + // ignore hostname errors when dealing with HTTPS connections + if (httpUrlConnection instanceof HttpsURLConnection) { + HttpsURLConnection conn = (HttpsURLConnection) httpUrlConnection; + conn.setHostnameVerifier(new HostnameVerifier() { + @Override + public boolean verify(String arg0, SSLSession arg1) { + return true; + } + }); + } + return httpUrlConnection; + } + + protected String readFromStream(InputStream stream) throws IOException { + BufferedReader br = new BufferedReader(new InputStreamReader(stream)); + StringBuilder sb = new StringBuilder(); + String line; + while ((line = br.readLine()) != null) { + sb.append(line); + sb.append("\n"); + } + br.close(); + return sb.toString(); + } + + @Override + public void registerHandler(String topic, RequestHandler requestHandler) { + this.topic = topic; + try { + this.url = new URL(buildlUrlString(topic)); + } catch (MalformedURLException e) { + LOG.error("Topic " + topic + " resulted in MalformedURLException", e); + } + this.requestHandler = requestHandler; + } + + @Override + public void close() throws Exception { + //BaseConsumer doesn't spawn any threads + } + +} diff --git a/message-router/consumer/provider/src/main/java/org/onap/ccsdk/sli/adaptors/messagerouter/consumer/provider/impl/ConsumerFactory.java b/message-router/consumer/provider/src/main/java/org/onap/ccsdk/sli/adaptors/messagerouter/consumer/provider/impl/ConsumerFactory.java new file mode 100755 index 000000000..1aa02c70a --- /dev/null +++ b/message-router/consumer/provider/src/main/java/org/onap/ccsdk/sli/adaptors/messagerouter/consumer/provider/impl/ConsumerFactory.java @@ -0,0 +1,202 @@ +/*-
+ * ============LICENSE_START=======================================================
+ * openECOMP : SDN-C
+ * ================================================================================
+ * Copyright (C) 2017 - 2019 AT&T Intellectual Property. All rights
+ * reserved.
+ *
+ * Modifications Copyright (C) 2019 IBM.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.ccsdk.sli.adaptors.messagerouter.consumer.provider.impl;
+
+import java.io.UnsupportedEncodingException;
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
+import java.util.Properties;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ConsumerFactory {
+ private static final Logger LOG = LoggerFactory.getLogger(ConsumerFactory.class);
+
+ // Default values to minimize required configuration
+ private static final int DEFAULT_FETCH_PAUSE = 5000;
+ private static final int DEFAULT_CONNECT_TIMEOUT = 30000;
+ private static final int DEFAULT_READ_TIMEOUT = 180000;
+ private static final int DEFAULT_LIMIT = 5; // Limits the number of messages pulled in a single GET request
+ private static final int DEFAULT_TIMEOUT_QUERY_PARAM_VALUE = 15000;
+ private static final String DEFAULT_AUTH_METHOD = "basic";
+
+ // Required properties
+ protected final String username;
+ protected final String password;
+ protected final String host;
+ private final String group;
+ private final String id;
+
+ // Optional properties
+ protected Integer connectTimeout;
+ protected Integer readTimeout;
+ private Integer fetchPause;
+ private Integer limit;
+ private Integer timeoutQueryParamValue;
+ private String filter;
+ protected String auth;
+
+ public ConsumerFactory(Properties properties) {
+ // Required properties
+ username = properties.getProperty("username");
+ password = properties.getProperty("password");
+ host = properties.getProperty("host");
+ auth = properties.getProperty("auth");
+ group = properties.getProperty("group");
+ id = properties.getProperty("id");
+
+ // Optional properties
+ connectTimeout = readOptionalInteger(properties, "connectTimeoutSeconds");
+ readTimeout = readOptionalInteger(properties, "readTimeoutMinutes");
+ fetchPause = readOptionalInteger(properties, "fetchPause");
+ limit = readOptionalInteger(properties, "limit");
+ timeoutQueryParamValue = readOptionalInteger(properties, "timeout");
+ processFilter(properties.getProperty("filter"));
+
+ setDefaults();
+ }
+
+ public ConsumerFactory(String username, String password, String host, String group, String id, Integer connectTimeout, Integer readTimeout) {
+ this.username = username;
+ this.password = password;
+ this.host = host;
+ this.group = group;
+ this.id = id;
+ setDefaults();
+ }
+
+
+ public String getAuth() {
+ return auth;
+ }
+
+ public void setAuth(String auth) {
+ this.auth = auth;
+ }
+
+ public Integer getConnectTimeout() {
+ return connectTimeout;
+ }
+
+ public void setConnectTimeout(Integer connectTimeout) {
+ this.connectTimeout = connectTimeout;
+ }
+
+ public Integer getReadTimeout() {
+ return readTimeout;
+ }
+
+ public void setReadTimeout(Integer readTimeout) {
+ this.readTimeout = readTimeout;
+ }
+
+ public Integer getFetchPause() {
+ return fetchPause;
+ }
+
+ public void setFetchPause(Integer fetchPause) {
+ this.fetchPause = fetchPause;
+ }
+
+ public Integer getLimit() {
+ return limit;
+ }
+
+ public void setLimit(Integer limit) {
+ this.limit = limit;
+ }
+
+ public Integer getTimeoutQueryParamValue() {
+ return timeoutQueryParamValue;
+ }
+
+ public void setTimeoutQueryParamValue(Integer timeoutQueryParamValue) {
+ this.timeoutQueryParamValue = timeoutQueryParamValue;
+ }
+
+ public String getFilter() {
+ return filter;
+ }
+
+ public void setFilter(String filter) {
+ processFilter(filter);
+ }
+
+ private Integer readOptionalInteger(Properties properties, String propertyName) {
+ String stringValue = properties.getProperty(propertyName);
+ if (stringValue != null && stringValue.length() > 0) {
+ try {
+ return Integer.valueOf(stringValue);
+ } catch (NumberFormatException e) {
+ LOG.error("property " + propertyName + " had the value " + stringValue + " that could not be converted to an Integer", e);
+ }
+ }
+ return null;
+ }
+
+ public PollingConsumerImpl createPollingClient() {
+ return new PollingConsumerImpl(username, password, host, auth, connectTimeout, readTimeout, fetchPause, group, id, filter, limit, timeoutQueryParamValue);
+ }
+
+ public PullingConsumerImpl createPullingClient() {
+ return new PullingConsumerImpl(username, password, host, auth, connectTimeout, readTimeout, group, id, filter, limit, timeoutQueryParamValue);
+ }
+
+ private void processFilter(String filterString) {
+ if (filterString != null) {
+ if (filterString.length() > 0) {
+ try {
+ filter = URLEncoder.encode(filterString, StandardCharsets.UTF_8.name());
+ } catch (UnsupportedEncodingException e) {
+ LOG.warn("Couldn't encode filter string. Filter will be ignored.", e);
+ filter = null;
+ }
+ } else {
+ filter = null;
+ }
+ }
+ }
+
+ private void setDefaults() {
+ if (connectTimeout == null) {
+ connectTimeout = DEFAULT_CONNECT_TIMEOUT;
+ }
+ if (readTimeout == null) {
+ readTimeout = DEFAULT_READ_TIMEOUT;
+ }
+ if (fetchPause == null) {
+ fetchPause = DEFAULT_FETCH_PAUSE;
+ }
+ if (limit == null) {
+ limit = DEFAULT_LIMIT;
+ }
+ if (timeoutQueryParamValue == null) {
+ timeoutQueryParamValue = DEFAULT_TIMEOUT_QUERY_PARAM_VALUE;
+ }
+ if (auth == null) {
+ auth = DEFAULT_AUTH_METHOD;
+ }
+ }
+
+}
diff --git a/message-router/consumer/provider/src/main/java/org/onap/ccsdk/sli/adaptors/messagerouter/consumer/provider/impl/PollingConsumerImpl.java b/message-router/consumer/provider/src/main/java/org/onap/ccsdk/sli/adaptors/messagerouter/consumer/provider/impl/PollingConsumerImpl.java new file mode 100644 index 000000000..263e94ca9 --- /dev/null +++ b/message-router/consumer/provider/src/main/java/org/onap/ccsdk/sli/adaptors/messagerouter/consumer/provider/impl/PollingConsumerImpl.java @@ -0,0 +1,100 @@ +/*- + * ============LICENSE_START======================================================= + * openECOMP : SDN-C + * ================================================================================ + * Copyright (C) 2017 - 2018 AT&T Intellectual Property. All rights + * reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.ccsdk.sli.adaptors.messagerouter.consumer.provider.impl; + +import org.onap.ccsdk.sli.adaptors.messagerouter.consumer.api.PollingConsumer; +import org.onap.ccsdk.sli.adaptors.messagerouter.consumer.api.RequestHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/* + * java.net based client to build message router consumers + */ +public class PollingConsumerImpl implements PollingConsumer { + + //RunnableConsumer is a private inner class so run cannot be called from other code + private class RunnableConsumer extends AbstractBaseConsumer implements Runnable, PollingConsumer { + private final Logger LOG = LoggerFactory.getLogger(PollingConsumerImpl.class); + private volatile Thread t; + private final Integer fetchPause; + + public RunnableConsumer(String username, String password, String host, String authentication, Integer connectTimeout, Integer readTimeout, Integer fetchPause, String group, String id, String filter, Integer limit, Integer timeoutQueryParamValue) { + super(username, password, host, authentication, connectTimeout, readTimeout, group, id, filter, limit, timeoutQueryParamValue); + this.fetchPause = fetchPause; + } + + public void start() { + t = new Thread(this); + t.start(); + LOG.info("ConsumerImpl started. Fetch period is {} ms.", fetchPause); + } + + public void stop() { + t = null; + LOG.info("ConsumerImpl stopped."); + } + + @Override + public void run() { + if (this.url != null) { + Thread thisThread = Thread.currentThread(); + while (t == thisThread) { + poll(); + try { + LOG.trace("Next fetch from MessageRouter url {} after {} milliseconds.", url, fetchPause); + Thread.sleep(fetchPause); + } catch (InterruptedException e) { + LOG.warn("Thread sleep was interrupted.", e); + } + } + } else { + LOG.error("URL is null, can't listen for messages"); + } + } + + @Override + public void close() throws Exception { + stop(); + } + } + + private RunnableConsumer c; + + public PollingConsumerImpl(String username, String password, String host, String authentication, Integer connectTimeout, Integer readTimeout, Integer fetchPause, String group, String id, String filter, Integer limit, Integer timeoutQueryParamValue) { + c = new RunnableConsumer(username, password, host, authentication, connectTimeout, readTimeout, fetchPause, group, id, filter, limit, timeoutQueryParamValue); + } + + @Override + public void start() { + c.start(); + } + + @Override + public void registerHandler(String topic, RequestHandler requestHandler) { + c.registerHandler(topic, requestHandler); + } + + @Override + public void close() throws Exception { + c.close(); + } +} diff --git a/message-router/consumer/provider/src/main/java/org/onap/ccsdk/sli/adaptors/messagerouter/consumer/provider/impl/PullingConsumerImpl.java b/message-router/consumer/provider/src/main/java/org/onap/ccsdk/sli/adaptors/messagerouter/consumer/provider/impl/PullingConsumerImpl.java new file mode 100755 index 000000000..e5a5bc4d8 --- /dev/null +++ b/message-router/consumer/provider/src/main/java/org/onap/ccsdk/sli/adaptors/messagerouter/consumer/provider/impl/PullingConsumerImpl.java @@ -0,0 +1,39 @@ +/*- + * ============LICENSE_START======================================================= + * openECOMP : SDN-C + * ================================================================================ + * Copyright (C) 2017 - 2019 AT&T Intellectual Property. All rights + * reserved. + * + * Modifications Copyright (C) 2019 IBM. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.ccsdk.sli.adaptors.messagerouter.consumer.provider.impl; + +import org.onap.ccsdk.sli.adaptors.messagerouter.consumer.api.PullingConsumer; + + +public class PullingConsumerImpl extends AbstractBaseConsumer implements PullingConsumer { + + public PullingConsumerImpl(String username, String password, String host, String authentication, Integer connectTimeout, Integer readTimeout, String group, String id, String filter, Integer limit, Integer timeoutQueryParamValue) { + super(username, password, host, authentication, connectTimeout, readTimeout, group, id, filter, limit, timeoutQueryParamValue); + } + + @Override + public void pull() { + this.poll(); + } + +} diff --git a/message-router/consumer/provider/src/test/java/org/onap/ccsdk/sli/adaptors/messagerouter/consumer/provider/impl/AbstractBaseConsumerTest.java b/message-router/consumer/provider/src/test/java/org/onap/ccsdk/sli/adaptors/messagerouter/consumer/provider/impl/AbstractBaseConsumerTest.java new file mode 100644 index 000000000..75873385b --- /dev/null +++ b/message-router/consumer/provider/src/test/java/org/onap/ccsdk/sli/adaptors/messagerouter/consumer/provider/impl/AbstractBaseConsumerTest.java @@ -0,0 +1,91 @@ +package org.onap.ccsdk.sli.adaptors.messagerouter.consumer.provider.impl;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.net.HttpURLConnection;
+import java.net.URL;
+
+import org.junit.Test;
+import org.onap.ccsdk.sli.adaptors.messagerouter.consumer.api.RequestHandler;
+
+public class AbstractBaseConsumerTest {
+ private class DummyConsumer extends AbstractBaseConsumer {
+
+ public DummyConsumer(String username, String password, String host, String authentication, Integer connectTimeout, Integer readTimeout, String group, String id, String filter, Integer limit, Integer timeoutQueryParamValue) {
+ super(username, password, host, authentication, connectTimeout, readTimeout, group, id, filter, limit, timeoutQueryParamValue);
+ }
+
+ }
+
+ public DummyConsumer getAuthDummy() {
+ String username = "deadpool";
+ String password = "notSECURE";
+ String host = "http://localhost:7001";
+ String group = "myCluster";
+ String id = "node1";
+ Integer connectTimeout = 10000;
+ Integer readTimeout = 20000;
+ String authentication = "basic";
+ String filter = null;
+ Integer limit = 3;
+ Integer timeoutQueryParamValue = 5000;
+ return new DummyConsumer(username, password, host, authentication, connectTimeout, readTimeout, group, id, filter, limit, timeoutQueryParamValue);
+ }
+
+ @Test
+ public void createDummyWithAuth() {
+ assertNotNull(getAuthDummy());
+ }
+
+ @Test
+ public void createDummyNohAuth() {
+ String username = null;
+ String password = null;
+ String host = "http://localhost:7001";
+ String group = "myCluster";
+ String id = "node1";
+ Integer connectTimeout = 10000;
+ Integer readTimeout = 20000;
+ String authentication = "noauth";
+ String filter = null;
+ Integer limit = 3;
+ Integer timeoutQueryParamValue = 5000;
+ assertNotNull(new DummyConsumer(username, password, host, authentication, connectTimeout, readTimeout, group, id, filter, limit, timeoutQueryParamValue));
+ }
+
+ @Test
+ public void callClose() throws Exception {
+ DummyConsumer dummy = getAuthDummy();
+ dummy.close();
+ assertNotNull(dummy);
+ }
+
+ @Test
+ public void registerDummyHandler() throws Exception {
+ DummyConsumer dummy = getAuthDummy();
+ String topic = "politics";
+ RequestHandler requestHandler = new RequestHandler() {
+
+ @Override
+ public void handleRequest(String topic, String requestBody) {
+ // TODO Auto-generated method stub
+
+ };
+
+ };
+ dummy.registerHandler(topic, requestHandler);
+ assertEquals(new URL("http://localhost:7001/events/politics/myCluster/node1?timeout=5000&limit=3"), dummy.url);
+ assertEquals(topic, dummy.topic);
+
+ }
+
+ @Test
+ public void buildURL() throws Exception {
+ DummyConsumer dummy = getAuthDummy();
+ HttpURLConnection connection = dummy.buildHttpURLConnection(new URL("http://localhost:7001/events/politics/myCluster/node1?timeout=5000&limit=3"));
+ assertNotNull(connection);
+ assertEquals("application/json", connection.getRequestProperty("Accept"));
+ }
+
+}
diff --git a/message-router/consumer/provider/src/test/java/org/onap/ccsdk/sli/adaptors/messagerouter/consumer/provider/impl/ConsumerFactoryTest.java b/message-router/consumer/provider/src/test/java/org/onap/ccsdk/sli/adaptors/messagerouter/consumer/provider/impl/ConsumerFactoryTest.java new file mode 100644 index 000000000..d1018a014 --- /dev/null +++ b/message-router/consumer/provider/src/test/java/org/onap/ccsdk/sli/adaptors/messagerouter/consumer/provider/impl/ConsumerFactoryTest.java @@ -0,0 +1,164 @@ +package org.onap.ccsdk.sli.adaptors.messagerouter.consumer.provider.impl; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import java.util.Properties; + +import org.junit.Test; + +public class ConsumerFactoryTest { + + @Test + public void testFactoryClientCreation() throws Exception { + Properties props = new Properties(); + String userName = "deadpool"; + String password = "notSECURE"; + String host = "http://localhost:7001"; + String group = "myCluster"; + String id = "node1"; + Integer connectTimeout = 10000; + Integer readTimeout = 20000; + props.put("username", userName); + props.put("password", password); + props.put("host", host); + props.put("group", group); + + ConsumerFactory factory = new ConsumerFactory(userName, password, host, group, id, connectTimeout, readTimeout); + assertNotNull(factory.createPollingClient()); + assertNotNull(factory.createPullingClient()); + } + + @Test + public void testFactoryDefaults() throws Exception { + Properties props = new Properties(); + String userName = "deadpool"; + String password = "notSECURE"; + String host = "http://localhost:7001"; + String group = "myCluster"; + String id = "node1"; + Integer connectTimeout = 10000; + Integer readTimeout = 20000; + props.put("username", userName); + props.put("password", password); + props.put("host", host); + props.put("group", group); + + ConsumerFactory factory = new ConsumerFactory(userName, password, host, group, id, connectTimeout, readTimeout); + + assertNotNull(factory.getAuth()); + assertNotNull(factory.getConnectTimeout()); + assertNotNull(factory.getReadTimeout()); + assertNotNull(factory.getFetchPause()); + assertNotNull(factory.getLimit()); + assertNotNull(factory.getTimeoutQueryParamValue()); + } + + @Test + public void testFactoryDefaultsWithProps() { + Properties props = new Properties(); + String userName = "deadpool"; + String password = "notSECURE"; + String host = "http://localhost:7001"; + String auth = "basic"; + String group = "myCluster"; + props.put("username", userName); + props.put("password", password); + props.put("host", host); + props.put("group", group); + + ConsumerFactory factory = new ConsumerFactory(props); + + assertNotNull(factory.getAuth()); + assertNotNull(factory.getConnectTimeout()); + assertNotNull(factory.getReadTimeout()); + assertNotNull(factory.getFetchPause()); + assertNotNull(factory.getLimit()); + assertNotNull(factory.getTimeoutQueryParamValue()); + } + + @Test + public void testFactoryOverrides() throws Exception { + Properties props = new Properties(); + String userName = "deadpool"; + String password = "notSECURE"; + String host = "http://localhost:7001"; + String group = "myCluster"; + props.put("username", userName); + props.put("password", password); + props.put("host", host); + props.put("group", group); + + String connectTimeout = "200"; + String readTimeout = "300"; + String fetchPause = "1000"; + String auth = "noauth"; + String timeoutQueryParamValue = "50"; + String limit = "2"; + props.put("connectTimeoutSeconds", connectTimeout); + props.put("readTimeoutMinutes", readTimeout); + props.put("fetchPause", fetchPause); + props.put("auth", auth); + props.put("timeout", timeoutQueryParamValue); + props.put("limit", limit); + + ConsumerFactory factory = new ConsumerFactory(props); + + assertEquals(auth, factory.getAuth()); + assertEquals(Integer.valueOf(connectTimeout), factory.getConnectTimeout()); + assertEquals(Integer.valueOf(readTimeout), factory.getReadTimeout()); + assertEquals(Integer.valueOf(fetchPause), factory.getFetchPause()); + assertEquals(Integer.valueOf(limit), factory.getLimit()); + assertEquals(Integer.valueOf(timeoutQueryParamValue), factory.getTimeoutQueryParamValue()); + } + + @Test + public void testManualOverrides() { + Properties props = new Properties(); + String userName = "deadpool"; + String password = "notSECURE"; + String host = "http://localhost:7001"; + String auth = "basic"; + String group = "myCluster"; + props.put("username", userName); + props.put("password", password); + props.put("host", host); + props.put("group", group); + + ConsumerFactory factory = new ConsumerFactory(props); + + assertNotNull(factory.getAuth()); + assertNotNull(factory.getConnectTimeout()); + assertNotNull(factory.getReadTimeout()); + assertNotNull(factory.getFetchPause()); + assertNotNull(factory.getLimit()); + assertNotNull(factory.getTimeoutQueryParamValue()); + String newAuth = "noauth"; + factory.setAuth(newAuth); + assertEquals(newAuth, factory.getAuth()); + + Integer connectTimeout = 1; + factory.setConnectTimeout(connectTimeout); + assertEquals(connectTimeout, factory.getConnectTimeout()); + + Integer fetchPause = 5; + factory.setFetchPause(fetchPause); + assertEquals(fetchPause, factory.getFetchPause()); + + factory.setFilter("\"filter\":{\n" + "\"class\":\"And\",\n" + "\"filters\":\n" + "[\n" + "{ \"class\":\"Equals\", \"foo\":\"abc\" },\n" + "{ \"class\":\"Assigned\", \"field\":\"bar\" }\n" + "]\n" + "}"); + assertNotNull(factory.getFilter()); + + Integer limit = 3; + factory.setLimit(limit); + assertEquals(limit, factory.getLimit()); + + Integer readTimeout = 2; + factory.setReadTimeout(readTimeout); + assertEquals(readTimeout, factory.getReadTimeout()); + + Integer timeoutQueryParamValue = 47; + factory.setTimeoutQueryParamValue(timeoutQueryParamValue); + assertEquals(timeoutQueryParamValue, factory.getTimeoutQueryParamValue()); + } + +} |