diff options
Diffstat (limited to 'message-router/consumer')
18 files changed, 0 insertions, 1197 deletions
diff --git a/message-router/consumer/README.md b/message-router/consumer/README.md deleted file mode 100755 index 9d01256a7..000000000 --- a/message-router/consumer/README.md +++ /dev/null @@ -1,7 +0,0 @@ -# Consumer
-
-## Modules
-- api - exports the consumer interface for clients and providers to import
-- features - used for managing the feature repository for consumer
-- installer - provides a simple install script
-- provider - provides an implementation of the consumer api
\ No newline at end of file diff --git a/message-router/consumer/api/pom.xml b/message-router/consumer/api/pom.xml deleted file mode 100755 index e77eb9793..000000000 --- a/message-router/consumer/api/pom.xml +++ /dev/null @@ -1,27 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.onap.ccsdk.sli.adaptors.messagerouter</groupId> - <artifactId>consumer.aggregate</artifactId> - <version>1.1.1-SNAPSHOT</version> - </parent> - - <artifactId>consumer.api</artifactId> - <packaging>bundle</packaging> - - <build> - <plugins> - <plugin> - <groupId>org.apache.felix</groupId> - <artifactId>maven-bundle-plugin</artifactId> - <configuration> - <instructions> - <Export-Package>org.onap.ccsdk.sli.adaptors.messagerouter.consumer.api</Export-Package> - </instructions> - </configuration> - </plugin> - </plugins> - </build> -</project> diff --git a/message-router/consumer/api/src/main/java/org/onap/ccsdk/sli/adaptors/messagerouter/consumer/api/ConsumerApi.java b/message-router/consumer/api/src/main/java/org/onap/ccsdk/sli/adaptors/messagerouter/consumer/api/ConsumerApi.java deleted file mode 100755 index 41deade85..000000000 --- a/message-router/consumer/api/src/main/java/org/onap/ccsdk/sli/adaptors/messagerouter/consumer/api/ConsumerApi.java +++ /dev/null @@ -1,6 +0,0 @@ -package org.onap.ccsdk.sli.adaptors.messagerouter.consumer.api;
-
-public interface ConsumerApi extends AutoCloseable {
- //registers a handler to handle a specific topic, should be called only once per client
- public void registerHandler(String topic, RequestHandler requestHandler);
-}
diff --git a/message-router/consumer/api/src/main/java/org/onap/ccsdk/sli/adaptors/messagerouter/consumer/api/PollingConsumer.java b/message-router/consumer/api/src/main/java/org/onap/ccsdk/sli/adaptors/messagerouter/consumer/api/PollingConsumer.java deleted file mode 100755 index 29fc1c764..000000000 --- a/message-router/consumer/api/src/main/java/org/onap/ccsdk/sli/adaptors/messagerouter/consumer/api/PollingConsumer.java +++ /dev/null @@ -1,7 +0,0 @@ -package org.onap.ccsdk.sli.adaptors.messagerouter.consumer.api;
-
-public interface PollingConsumer extends ConsumerApi {
-
- // Starts polling message router for messages, won't stop until close it called
- void start();
-}
diff --git a/message-router/consumer/api/src/main/java/org/onap/ccsdk/sli/adaptors/messagerouter/consumer/api/PullingConsumer.java b/message-router/consumer/api/src/main/java/org/onap/ccsdk/sli/adaptors/messagerouter/consumer/api/PullingConsumer.java deleted file mode 100755 index 1187aaceb..000000000 --- a/message-router/consumer/api/src/main/java/org/onap/ccsdk/sli/adaptors/messagerouter/consumer/api/PullingConsumer.java +++ /dev/null @@ -1,7 +0,0 @@ -package org.onap.ccsdk.sli.adaptors.messagerouter.consumer.api;
-
-public interface PullingConsumer extends ConsumerApi {
-
- //Pulls a single batch of messages
- void pull();
-}
diff --git a/message-router/consumer/api/src/main/java/org/onap/ccsdk/sli/adaptors/messagerouter/consumer/api/RequestHandler.java b/message-router/consumer/api/src/main/java/org/onap/ccsdk/sli/adaptors/messagerouter/consumer/api/RequestHandler.java deleted file mode 100755 index 07a117843..000000000 --- a/message-router/consumer/api/src/main/java/org/onap/ccsdk/sli/adaptors/messagerouter/consumer/api/RequestHandler.java +++ /dev/null @@ -1,5 +0,0 @@ -package org.onap.ccsdk.sli.adaptors.messagerouter.consumer.api; - -public interface RequestHandler { - public void handleRequest(String topic, String requestBody); -} diff --git a/message-router/consumer/installer/pom.xml b/message-router/consumer/installer/pom.xml deleted file mode 100755 index 2b8eb688e..000000000 --- a/message-router/consumer/installer/pom.xml +++ /dev/null @@ -1,128 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.onap.ccsdk.sli.adaptors.messagerouter</groupId> - <artifactId>consumer.aggregate</artifactId> - <version>1.1.1-SNAPSHOT</version> - </parent> - - <groupId>org.onap.ccsdk.sli.adaptors.messagerouter</groupId> - <artifactId>consumer.installer</artifactId> - <version>1.1.1-SNAPSHOT</version> - <packaging>pom</packaging> - - <properties> - <application.name>messagerouter-consumer</application.name> - <features.boot>messagerouter-consumer</features.boot> - <features.repositories>mvn:${project.groupId}/consumer.features/${project.version}/xml/features</features.repositories> - <include.transitive.dependencies>false</include.transitive.dependencies> - </properties> - - <dependencies> - <dependency> - <groupId>${project.groupId}</groupId> - <artifactId>consumer.api</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>${project.groupId}</groupId> - <artifactId>consumer.provider</artifactId> - <version>${project.version}</version> - </dependency> - </dependencies> - - <build> - <plugins> - <plugin> - <artifactId>maven-assembly-plugin</artifactId> - <version>2.6</version> - <executions> - <execution> - <id>maven-repo-zip</id> - <goals> - <goal>single</goal> - </goals> - <phase>package</phase> - <configuration> - <attach>true</attach> - <finalName>stage/${application.name}-${project.version}</finalName> - <descriptors> - <descriptor>src/assembly/assemble_mvnrepo_zip.xml</descriptor> - </descriptors> - <appendAssemblyId>true</appendAssemblyId> - </configuration> - </execution> - <execution> - <id>installer-zip</id> - <goals> - <goal>single</goal> - </goals> - <phase>package</phase> - <configuration> - <attach>true</attach> - <finalName>${application.name}-${project.version}-installer</finalName> - <descriptors> - <descriptor>src/assembly/assemble_installer_zip.xml</descriptor> - </descriptors> - <appendAssemblyId>false</appendAssemblyId> - </configuration> - </execution> - </executions> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-dependency-plugin</artifactId> - <executions> - <execution> - <id>copy-dependencies</id> - <goals> - <goal>copy-dependencies</goal> - </goals> - <phase>prepare-package</phase> - <configuration> - <transitive>false</transitive> - <outputDirectory>${project.build.directory}/assembly/system</outputDirectory> - <overWriteReleases>false</overWriteReleases> - <overWriteSnapshots>true</overWriteSnapshots> - <overWriteIfNewer>true</overWriteIfNewer> - <useRepositoryLayout>true</useRepositoryLayout> - <addParentPoms>false</addParentPoms> - <copyPom>false</copyPom> - <includeGroupIds>${project.groupId}</includeGroupIds> - <scope>provided</scope> - </configuration> - </execution> - </executions> - </plugin> - <plugin> - <artifactId>maven-resources-plugin</artifactId> - <version>2.6</version> - <executions> - <execution> - <id>copy-version</id> - <goals> - <goal>copy-resources</goal> - </goals><!-- here the phase you need --> - <phase>validate</phase> - <configuration> - <outputDirectory>${basedir}/target/stage</outputDirectory> - <resources> - <resource> - <directory>src/main/resources/scripts</directory> - <includes> - <include>install-feature.sh</include> - </includes> - <filtering>true</filtering> - </resource> - </resources> - </configuration> - </execution> - - </executions> - </plugin> - - </plugins> - </build> -</project> diff --git a/message-router/consumer/installer/src/assembly/assemble_installer_zip.xml b/message-router/consumer/installer/src/assembly/assemble_installer_zip.xml deleted file mode 100755 index c6169a879..000000000 --- a/message-router/consumer/installer/src/assembly/assemble_installer_zip.xml +++ /dev/null @@ -1,59 +0,0 @@ -<!-- - ============LICENSE_START======================================================= - openECOMP : SDN-C - ================================================================================ - Copyright (C) 2017 AT&T Intellectual Property. All rights - reserved. - ================================================================================ - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - ============LICENSE_END========================================================= - --> - -<!-- Defines how we build the .zip file which is our distribution. --> - -<assembly - xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd"> - <id>installer_zip</id> - <formats> - <format>zip</format> - </formats> - - <!-- we want "system" and related files right at the root level - as this file is suppose to be unzip on top of a karaf - distro. --> - <includeBaseDirectory>false</includeBaseDirectory> - - <fileSets> - <fileSet> - <directory>target/stage/</directory> - <outputDirectory>${application.name}</outputDirectory> - <fileMode>755</fileMode> - <includes> - <include>*.sh</include> - </includes> - </fileSet> - <fileSet> - <directory>target/stage/</directory> - <outputDirectory>${application.name}</outputDirectory> - <fileMode>644</fileMode> - <excludes> - <exclude>*.sh</exclude> - </excludes> - </fileSet> - </fileSets> - - - -</assembly>
\ No newline at end of file diff --git a/message-router/consumer/installer/src/assembly/assemble_mvnrepo_zip.xml b/message-router/consumer/installer/src/assembly/assemble_mvnrepo_zip.xml deleted file mode 100755 index 409c66224..000000000 --- a/message-router/consumer/installer/src/assembly/assemble_mvnrepo_zip.xml +++ /dev/null @@ -1,47 +0,0 @@ -<!-- - ============LICENSE_START======================================================= - openECOMP : SDN-C - ================================================================================ - Copyright (C) 2017 AT&T Intellectual Property. All rights - reserved. - ================================================================================ - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - ============LICENSE_END========================================================= - --> - -<!-- Defines how we build the .zip file which is our distribution. --> - -<assembly - xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd"> - <id>repo</id> - <formats> - <format>zip</format> - </formats> - - <!-- we want "system" and related files right at the root level - as this file is suppose to be unzip on top of a karaf - distro. --> - <includeBaseDirectory>false</includeBaseDirectory> - - <fileSets> - <fileSet> - <directory>target/assembly/</directory> - <outputDirectory>.</outputDirectory> - <excludes> - </excludes> - </fileSet> - </fileSets> - -</assembly>
\ No newline at end of file diff --git a/message-router/consumer/installer/src/main/resources/scripts/install-feature.sh b/message-router/consumer/installer/src/main/resources/scripts/install-feature.sh deleted file mode 100755 index 6f2518f64..000000000 --- a/message-router/consumer/installer/src/main/resources/scripts/install-feature.sh +++ /dev/null @@ -1,39 +0,0 @@ -#!/bin/bash - -### -# ============LICENSE_START======================================================= -# openECOMP : SDN-C -# ================================================================================ -# Copyright (C) 2017 AT&T Intellectual Property. All rights -# reserved. -# ================================================================================ -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ============LICENSE_END========================================================= -### - -ODL_HOME=${ODL_HOME:-/opt/opendaylight/current} -ODL_KARAF_CLIENT=${ODL_KARAF_CLIENT:-${ODL_HOME}/bin/client} -INSTALLERDIR=$(dirname $0) - -REPOZIP=${INSTALLERDIR}/${features.boot}-${project.version}-repo.zip - -if [ -f ${REPOZIP} ] -then - unzip -d ${ODL_HOME} ${REPOZIP} -else - echo "ERROR : repo zip ($REPOZIP) not found" - exit 1 -fi - -${ODL_KARAF_CLIENT} feature:repo-add ${features.repositories} -${ODL_KARAF_CLIENT} feature:install ${features.boot}
\ No newline at end of file diff --git a/message-router/consumer/pom.xml b/message-router/consumer/pom.xml deleted file mode 100755 index cad1bbba0..000000000 --- a/message-router/consumer/pom.xml +++ /dev/null @@ -1,19 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.onap.ccsdk.sli.adaptors.messagerouter</groupId> - <artifactId>messagerouter-root</artifactId> - <version>1.1.1-SNAPSHOT</version> - </parent> - - <artifactId>consumer.aggregate</artifactId> - <packaging>pom</packaging> - - <modules> - <module>api</module> - <module>provider</module> - <module>installer</module> - </modules> -</project> diff --git a/message-router/consumer/provider/pom.xml b/message-router/consumer/provider/pom.xml deleted file mode 100755 index d3e568c40..000000000 --- a/message-router/consumer/provider/pom.xml +++ /dev/null @@ -1,43 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.onap.ccsdk.sli.adaptors.messagerouter</groupId> - <artifactId>consumer.aggregate</artifactId> - <version>1.1.1-SNAPSHOT</version> - </parent> - - <artifactId>consumer.provider</artifactId> - <packaging>bundle</packaging> - - <dependencies> - <dependency> - <groupId>${project.groupId}</groupId> - <artifactId>consumer.api</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-api</artifactId> - </dependency> - <dependency> - <groupId>com.google.code.gson</groupId> - <artifactId>gson</artifactId> - </dependency> - </dependencies> - - <build> - <plugins> - <plugin> - <groupId>org.apache.felix</groupId> - <artifactId>maven-bundle-plugin</artifactId> - <configuration> - <instructions> - <Export-Package>${project.groupId}.consumer.provider.impl</Export-Package> - </instructions> - </configuration> - </plugin> - </plugins> - </build> -</project> diff --git a/message-router/consumer/provider/src/main/java/org/onap/ccsdk/sli/adaptors/messagerouter/consumer/provider/impl/AbstractBaseConsumer.java b/message-router/consumer/provider/src/main/java/org/onap/ccsdk/sli/adaptors/messagerouter/consumer/provider/impl/AbstractBaseConsumer.java deleted file mode 100755 index 8937f7b91..000000000 --- a/message-router/consumer/provider/src/main/java/org/onap/ccsdk/sli/adaptors/messagerouter/consumer/provider/impl/AbstractBaseConsumer.java +++ /dev/null @@ -1,207 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * openECOMP : SDN-C - * ================================================================================ - * Copyright (C) 2017 - 2018 AT&T Intellectual Property. All rights - * reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.ccsdk.sli.adaptors.messagerouter.consumer.provider.impl; - -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.net.HttpURLConnection; -import java.net.MalformedURLException; -import java.net.URL; -import java.util.Base64; - -import javax.net.ssl.HostnameVerifier; -import javax.net.ssl.HttpsURLConnection; -import javax.net.ssl.SSLSession; - -import org.onap.ccsdk.sli.adaptors.messagerouter.consumer.api.ConsumerApi; -import org.onap.ccsdk.sli.adaptors.messagerouter.consumer.api.RequestHandler; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.gson.Gson; -import com.google.gson.JsonParseException; - -/* - * java.net based client to build message router consumers - */ -public abstract class AbstractBaseConsumer implements ConsumerApi { - private static final Logger LOG = LoggerFactory.getLogger(AbstractBaseConsumer.class); - private static final String REQUEST_METHOD = "GET"; - - private final String host; - private final Integer connectTimeout; - private final Integer readTimeout; - private final String group; - private final String id; - private final String filter; - private final Integer limit; - private final Integer timeoutQueryParamValue; - private final String authorizationString; - - protected RequestHandler requestHandler; - protected URL url; - protected String topic; - - public AbstractBaseConsumer(String username, String password, String host, String authentication, Integer connectTimeout, Integer readTimeout, String group, String id, String filter, Integer limit, Integer timeoutQueryParamValue) { - this.host = host; - this.connectTimeout = connectTimeout; - this.readTimeout = readTimeout; - this.group = group; - this.id = id; - this.filter = filter; - this.limit = limit; - this.timeoutQueryParamValue = timeoutQueryParamValue; - - if ("basic".equals(authentication)) { - if (username != null && password != null && username.length() > 0 && password.length() > 0) { - authorizationString = buildAuthorizationString(username, password); - } else { - throw new IllegalStateException("Authentication is set to basic but username or password is missing"); - } - } else if ("noauth".equals(authentication)) { - authorizationString = null; - } else { - throw new IllegalStateException("Unknown authentication method: " + authentication); - } - } - - protected void poll() { - String responseBody = performHttpOperation(); - if (responseBody != null && !responseBody.startsWith("[]")) { - LOG.info("New message was fetched from MessageRouter."); - LOG.trace("Fetched message is\n{}", responseBody); - try { - String[] requests = new Gson().fromJson(responseBody, String[].class); - if (requests != null) { - for (String request : requests) { - if (request != null) { - requestHandler.handleRequest(topic,request); - } - } - } else { - LOG.warn("Deserialization of received message results in null array.", responseBody); - } - } catch (JsonParseException e) { - LOG.warn("Received message has bad format. Expected format is JSON."); - } - } else { - LOG.trace("No new message was fetched from MessageRouter."); - } - } - - private String buildlUrlString(String topic) { - StringBuilder sb = new StringBuilder(); - sb.append(host + "/events/" + topic + "/" + group + "/" + id); - sb.append("?timeout=" + timeoutQueryParamValue); - - if (limit != null) { - sb.append("&limit=" + limit); - } - if (filter != null) { - sb.append("&filter=" + filter); - } - return sb.toString(); - } - - private String performHttpOperation() { - HttpURLConnection httpUrlConnection = null; - try { - httpUrlConnection = buildHttpURLConnection(url); - httpUrlConnection.setRequestMethod(REQUEST_METHOD); - httpUrlConnection.connect(); - int status = httpUrlConnection.getResponseCode(); - if (status < 300) { - return readFromStream(httpUrlConnection.getInputStream()); - } else { - String response = readFromStream(httpUrlConnection.getErrorStream()); - LOG.warn("Fetching message from MessageRouter on url {} failed with http status {}. Error message is\n{}.", url, status, response); - } - } catch (Exception e) { - LOG.warn("Exception was thrown during fetching message from MessageRouter on url {}.", url, e); - } finally { - if (httpUrlConnection != null) { - httpUrlConnection.disconnect(); - } - } - return null; - } - - private String buildAuthorizationString(String userName, String password) { - String basicAuthString = userName + ":" + password; - basicAuthString = Base64.getEncoder().encodeToString(basicAuthString.getBytes()); - return "Basic " + basicAuthString; - } - - protected HttpURLConnection buildHttpURLConnection(URL url) throws IOException { - HttpURLConnection httpUrlConnection = (HttpURLConnection) url.openConnection(); - if (authorizationString != null) { - httpUrlConnection.setRequestProperty("Authorization", authorizationString); - } - httpUrlConnection.setRequestProperty("Accept", "application/json"); - httpUrlConnection.setUseCaches(false); - httpUrlConnection.setConnectTimeout(connectTimeout); - httpUrlConnection.setReadTimeout(readTimeout); - - // ignore hostname errors when dealing with HTTPS connections - if (httpUrlConnection instanceof HttpsURLConnection) { - HttpsURLConnection conn = (HttpsURLConnection) httpUrlConnection; - conn.setHostnameVerifier(new HostnameVerifier() { - @Override - public boolean verify(String arg0, SSLSession arg1) { - return true; - } - }); - } - return httpUrlConnection; - } - - protected String readFromStream(InputStream stream) throws IOException { - BufferedReader br = new BufferedReader(new InputStreamReader(stream)); - StringBuilder sb = new StringBuilder(); - String line; - while ((line = br.readLine()) != null) { - sb.append(line); - sb.append("\n"); - } - br.close(); - return sb.toString(); - } - - @Override - public void registerHandler(String topic, RequestHandler requestHandler) { - this.topic = topic; - try { - this.url = new URL(buildlUrlString(topic)); - } catch (MalformedURLException e) { - LOG.error("Topic " + topic + " resulted in MalformedURLException", e); - } - this.requestHandler = requestHandler; - } - - @Override - public void close() throws Exception { - //BaseConsumer doesn't spawn any threads - } - -} diff --git a/message-router/consumer/provider/src/main/java/org/onap/ccsdk/sli/adaptors/messagerouter/consumer/provider/impl/ConsumerFactory.java b/message-router/consumer/provider/src/main/java/org/onap/ccsdk/sli/adaptors/messagerouter/consumer/provider/impl/ConsumerFactory.java deleted file mode 100755 index 1aa02c70a..000000000 --- a/message-router/consumer/provider/src/main/java/org/onap/ccsdk/sli/adaptors/messagerouter/consumer/provider/impl/ConsumerFactory.java +++ /dev/null @@ -1,202 +0,0 @@ -/*-
- * ============LICENSE_START=======================================================
- * openECOMP : SDN-C
- * ================================================================================
- * Copyright (C) 2017 - 2019 AT&T Intellectual Property. All rights
- * reserved.
- *
- * Modifications Copyright (C) 2019 IBM.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.ccsdk.sli.adaptors.messagerouter.consumer.provider.impl;
-
-import java.io.UnsupportedEncodingException;
-import java.net.URLEncoder;
-import java.nio.charset.StandardCharsets;
-import java.util.Properties;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class ConsumerFactory {
- private static final Logger LOG = LoggerFactory.getLogger(ConsumerFactory.class);
-
- // Default values to minimize required configuration
- private static final int DEFAULT_FETCH_PAUSE = 5000;
- private static final int DEFAULT_CONNECT_TIMEOUT = 30000;
- private static final int DEFAULT_READ_TIMEOUT = 180000;
- private static final int DEFAULT_LIMIT = 5; // Limits the number of messages pulled in a single GET request
- private static final int DEFAULT_TIMEOUT_QUERY_PARAM_VALUE = 15000;
- private static final String DEFAULT_AUTH_METHOD = "basic";
-
- // Required properties
- protected final String username;
- protected final String password;
- protected final String host;
- private final String group;
- private final String id;
-
- // Optional properties
- protected Integer connectTimeout;
- protected Integer readTimeout;
- private Integer fetchPause;
- private Integer limit;
- private Integer timeoutQueryParamValue;
- private String filter;
- protected String auth;
-
- public ConsumerFactory(Properties properties) {
- // Required properties
- username = properties.getProperty("username");
- password = properties.getProperty("password");
- host = properties.getProperty("host");
- auth = properties.getProperty("auth");
- group = properties.getProperty("group");
- id = properties.getProperty("id");
-
- // Optional properties
- connectTimeout = readOptionalInteger(properties, "connectTimeoutSeconds");
- readTimeout = readOptionalInteger(properties, "readTimeoutMinutes");
- fetchPause = readOptionalInteger(properties, "fetchPause");
- limit = readOptionalInteger(properties, "limit");
- timeoutQueryParamValue = readOptionalInteger(properties, "timeout");
- processFilter(properties.getProperty("filter"));
-
- setDefaults();
- }
-
- public ConsumerFactory(String username, String password, String host, String group, String id, Integer connectTimeout, Integer readTimeout) {
- this.username = username;
- this.password = password;
- this.host = host;
- this.group = group;
- this.id = id;
- setDefaults();
- }
-
-
- public String getAuth() {
- return auth;
- }
-
- public void setAuth(String auth) {
- this.auth = auth;
- }
-
- public Integer getConnectTimeout() {
- return connectTimeout;
- }
-
- public void setConnectTimeout(Integer connectTimeout) {
- this.connectTimeout = connectTimeout;
- }
-
- public Integer getReadTimeout() {
- return readTimeout;
- }
-
- public void setReadTimeout(Integer readTimeout) {
- this.readTimeout = readTimeout;
- }
-
- public Integer getFetchPause() {
- return fetchPause;
- }
-
- public void setFetchPause(Integer fetchPause) {
- this.fetchPause = fetchPause;
- }
-
- public Integer getLimit() {
- return limit;
- }
-
- public void setLimit(Integer limit) {
- this.limit = limit;
- }
-
- public Integer getTimeoutQueryParamValue() {
- return timeoutQueryParamValue;
- }
-
- public void setTimeoutQueryParamValue(Integer timeoutQueryParamValue) {
- this.timeoutQueryParamValue = timeoutQueryParamValue;
- }
-
- public String getFilter() {
- return filter;
- }
-
- public void setFilter(String filter) {
- processFilter(filter);
- }
-
- private Integer readOptionalInteger(Properties properties, String propertyName) {
- String stringValue = properties.getProperty(propertyName);
- if (stringValue != null && stringValue.length() > 0) {
- try {
- return Integer.valueOf(stringValue);
- } catch (NumberFormatException e) {
- LOG.error("property " + propertyName + " had the value " + stringValue + " that could not be converted to an Integer", e);
- }
- }
- return null;
- }
-
- public PollingConsumerImpl createPollingClient() {
- return new PollingConsumerImpl(username, password, host, auth, connectTimeout, readTimeout, fetchPause, group, id, filter, limit, timeoutQueryParamValue);
- }
-
- public PullingConsumerImpl createPullingClient() {
- return new PullingConsumerImpl(username, password, host, auth, connectTimeout, readTimeout, group, id, filter, limit, timeoutQueryParamValue);
- }
-
- private void processFilter(String filterString) {
- if (filterString != null) {
- if (filterString.length() > 0) {
- try {
- filter = URLEncoder.encode(filterString, StandardCharsets.UTF_8.name());
- } catch (UnsupportedEncodingException e) {
- LOG.warn("Couldn't encode filter string. Filter will be ignored.", e);
- filter = null;
- }
- } else {
- filter = null;
- }
- }
- }
-
- private void setDefaults() {
- if (connectTimeout == null) {
- connectTimeout = DEFAULT_CONNECT_TIMEOUT;
- }
- if (readTimeout == null) {
- readTimeout = DEFAULT_READ_TIMEOUT;
- }
- if (fetchPause == null) {
- fetchPause = DEFAULT_FETCH_PAUSE;
- }
- if (limit == null) {
- limit = DEFAULT_LIMIT;
- }
- if (timeoutQueryParamValue == null) {
- timeoutQueryParamValue = DEFAULT_TIMEOUT_QUERY_PARAM_VALUE;
- }
- if (auth == null) {
- auth = DEFAULT_AUTH_METHOD;
- }
- }
-
-}
diff --git a/message-router/consumer/provider/src/main/java/org/onap/ccsdk/sli/adaptors/messagerouter/consumer/provider/impl/PollingConsumerImpl.java b/message-router/consumer/provider/src/main/java/org/onap/ccsdk/sli/adaptors/messagerouter/consumer/provider/impl/PollingConsumerImpl.java deleted file mode 100644 index 263e94ca9..000000000 --- a/message-router/consumer/provider/src/main/java/org/onap/ccsdk/sli/adaptors/messagerouter/consumer/provider/impl/PollingConsumerImpl.java +++ /dev/null @@ -1,100 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * openECOMP : SDN-C - * ================================================================================ - * Copyright (C) 2017 - 2018 AT&T Intellectual Property. All rights - * reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ - -package org.onap.ccsdk.sli.adaptors.messagerouter.consumer.provider.impl; - -import org.onap.ccsdk.sli.adaptors.messagerouter.consumer.api.PollingConsumer; -import org.onap.ccsdk.sli.adaptors.messagerouter.consumer.api.RequestHandler; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/* - * java.net based client to build message router consumers - */ -public class PollingConsumerImpl implements PollingConsumer { - - //RunnableConsumer is a private inner class so run cannot be called from other code - private class RunnableConsumer extends AbstractBaseConsumer implements Runnable, PollingConsumer { - private final Logger LOG = LoggerFactory.getLogger(PollingConsumerImpl.class); - private volatile Thread t; - private final Integer fetchPause; - - public RunnableConsumer(String username, String password, String host, String authentication, Integer connectTimeout, Integer readTimeout, Integer fetchPause, String group, String id, String filter, Integer limit, Integer timeoutQueryParamValue) { - super(username, password, host, authentication, connectTimeout, readTimeout, group, id, filter, limit, timeoutQueryParamValue); - this.fetchPause = fetchPause; - } - - public void start() { - t = new Thread(this); - t.start(); - LOG.info("ConsumerImpl started. Fetch period is {} ms.", fetchPause); - } - - public void stop() { - t = null; - LOG.info("ConsumerImpl stopped."); - } - - @Override - public void run() { - if (this.url != null) { - Thread thisThread = Thread.currentThread(); - while (t == thisThread) { - poll(); - try { - LOG.trace("Next fetch from MessageRouter url {} after {} milliseconds.", url, fetchPause); - Thread.sleep(fetchPause); - } catch (InterruptedException e) { - LOG.warn("Thread sleep was interrupted.", e); - } - } - } else { - LOG.error("URL is null, can't listen for messages"); - } - } - - @Override - public void close() throws Exception { - stop(); - } - } - - private RunnableConsumer c; - - public PollingConsumerImpl(String username, String password, String host, String authentication, Integer connectTimeout, Integer readTimeout, Integer fetchPause, String group, String id, String filter, Integer limit, Integer timeoutQueryParamValue) { - c = new RunnableConsumer(username, password, host, authentication, connectTimeout, readTimeout, fetchPause, group, id, filter, limit, timeoutQueryParamValue); - } - - @Override - public void start() { - c.start(); - } - - @Override - public void registerHandler(String topic, RequestHandler requestHandler) { - c.registerHandler(topic, requestHandler); - } - - @Override - public void close() throws Exception { - c.close(); - } -} diff --git a/message-router/consumer/provider/src/main/java/org/onap/ccsdk/sli/adaptors/messagerouter/consumer/provider/impl/PullingConsumerImpl.java b/message-router/consumer/provider/src/main/java/org/onap/ccsdk/sli/adaptors/messagerouter/consumer/provider/impl/PullingConsumerImpl.java deleted file mode 100755 index e5a5bc4d8..000000000 --- a/message-router/consumer/provider/src/main/java/org/onap/ccsdk/sli/adaptors/messagerouter/consumer/provider/impl/PullingConsumerImpl.java +++ /dev/null @@ -1,39 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * openECOMP : SDN-C - * ================================================================================ - * Copyright (C) 2017 - 2019 AT&T Intellectual Property. All rights - * reserved. - * - * Modifications Copyright (C) 2019 IBM. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.ccsdk.sli.adaptors.messagerouter.consumer.provider.impl; - -import org.onap.ccsdk.sli.adaptors.messagerouter.consumer.api.PullingConsumer; - - -public class PullingConsumerImpl extends AbstractBaseConsumer implements PullingConsumer { - - public PullingConsumerImpl(String username, String password, String host, String authentication, Integer connectTimeout, Integer readTimeout, String group, String id, String filter, Integer limit, Integer timeoutQueryParamValue) { - super(username, password, host, authentication, connectTimeout, readTimeout, group, id, filter, limit, timeoutQueryParamValue); - } - - @Override - public void pull() { - this.poll(); - } - -} diff --git a/message-router/consumer/provider/src/test/java/org/onap/ccsdk/sli/adaptors/messagerouter/consumer/provider/impl/AbstractBaseConsumerTest.java b/message-router/consumer/provider/src/test/java/org/onap/ccsdk/sli/adaptors/messagerouter/consumer/provider/impl/AbstractBaseConsumerTest.java deleted file mode 100644 index 75873385b..000000000 --- a/message-router/consumer/provider/src/test/java/org/onap/ccsdk/sli/adaptors/messagerouter/consumer/provider/impl/AbstractBaseConsumerTest.java +++ /dev/null @@ -1,91 +0,0 @@ -package org.onap.ccsdk.sli.adaptors.messagerouter.consumer.provider.impl;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-
-import java.net.HttpURLConnection;
-import java.net.URL;
-
-import org.junit.Test;
-import org.onap.ccsdk.sli.adaptors.messagerouter.consumer.api.RequestHandler;
-
-public class AbstractBaseConsumerTest {
- private class DummyConsumer extends AbstractBaseConsumer {
-
- public DummyConsumer(String username, String password, String host, String authentication, Integer connectTimeout, Integer readTimeout, String group, String id, String filter, Integer limit, Integer timeoutQueryParamValue) {
- super(username, password, host, authentication, connectTimeout, readTimeout, group, id, filter, limit, timeoutQueryParamValue);
- }
-
- }
-
- public DummyConsumer getAuthDummy() {
- String username = "deadpool";
- String password = "notSECURE";
- String host = "http://localhost:7001";
- String group = "myCluster";
- String id = "node1";
- Integer connectTimeout = 10000;
- Integer readTimeout = 20000;
- String authentication = "basic";
- String filter = null;
- Integer limit = 3;
- Integer timeoutQueryParamValue = 5000;
- return new DummyConsumer(username, password, host, authentication, connectTimeout, readTimeout, group, id, filter, limit, timeoutQueryParamValue);
- }
-
- @Test
- public void createDummyWithAuth() {
- assertNotNull(getAuthDummy());
- }
-
- @Test
- public void createDummyNohAuth() {
- String username = null;
- String password = null;
- String host = "http://localhost:7001";
- String group = "myCluster";
- String id = "node1";
- Integer connectTimeout = 10000;
- Integer readTimeout = 20000;
- String authentication = "noauth";
- String filter = null;
- Integer limit = 3;
- Integer timeoutQueryParamValue = 5000;
- assertNotNull(new DummyConsumer(username, password, host, authentication, connectTimeout, readTimeout, group, id, filter, limit, timeoutQueryParamValue));
- }
-
- @Test
- public void callClose() throws Exception {
- DummyConsumer dummy = getAuthDummy();
- dummy.close();
- assertNotNull(dummy);
- }
-
- @Test
- public void registerDummyHandler() throws Exception {
- DummyConsumer dummy = getAuthDummy();
- String topic = "politics";
- RequestHandler requestHandler = new RequestHandler() {
-
- @Override
- public void handleRequest(String topic, String requestBody) {
- // TODO Auto-generated method stub
-
- };
-
- };
- dummy.registerHandler(topic, requestHandler);
- assertEquals(new URL("http://localhost:7001/events/politics/myCluster/node1?timeout=5000&limit=3"), dummy.url);
- assertEquals(topic, dummy.topic);
-
- }
-
- @Test
- public void buildURL() throws Exception {
- DummyConsumer dummy = getAuthDummy();
- HttpURLConnection connection = dummy.buildHttpURLConnection(new URL("http://localhost:7001/events/politics/myCluster/node1?timeout=5000&limit=3"));
- assertNotNull(connection);
- assertEquals("application/json", connection.getRequestProperty("Accept"));
- }
-
-}
diff --git a/message-router/consumer/provider/src/test/java/org/onap/ccsdk/sli/adaptors/messagerouter/consumer/provider/impl/ConsumerFactoryTest.java b/message-router/consumer/provider/src/test/java/org/onap/ccsdk/sli/adaptors/messagerouter/consumer/provider/impl/ConsumerFactoryTest.java deleted file mode 100644 index d1018a014..000000000 --- a/message-router/consumer/provider/src/test/java/org/onap/ccsdk/sli/adaptors/messagerouter/consumer/provider/impl/ConsumerFactoryTest.java +++ /dev/null @@ -1,164 +0,0 @@ -package org.onap.ccsdk.sli.adaptors.messagerouter.consumer.provider.impl; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; - -import java.util.Properties; - -import org.junit.Test; - -public class ConsumerFactoryTest { - - @Test - public void testFactoryClientCreation() throws Exception { - Properties props = new Properties(); - String userName = "deadpool"; - String password = "notSECURE"; - String host = "http://localhost:7001"; - String group = "myCluster"; - String id = "node1"; - Integer connectTimeout = 10000; - Integer readTimeout = 20000; - props.put("username", userName); - props.put("password", password); - props.put("host", host); - props.put("group", group); - - ConsumerFactory factory = new ConsumerFactory(userName, password, host, group, id, connectTimeout, readTimeout); - assertNotNull(factory.createPollingClient()); - assertNotNull(factory.createPullingClient()); - } - - @Test - public void testFactoryDefaults() throws Exception { - Properties props = new Properties(); - String userName = "deadpool"; - String password = "notSECURE"; - String host = "http://localhost:7001"; - String group = "myCluster"; - String id = "node1"; - Integer connectTimeout = 10000; - Integer readTimeout = 20000; - props.put("username", userName); - props.put("password", password); - props.put("host", host); - props.put("group", group); - - ConsumerFactory factory = new ConsumerFactory(userName, password, host, group, id, connectTimeout, readTimeout); - - assertNotNull(factory.getAuth()); - assertNotNull(factory.getConnectTimeout()); - assertNotNull(factory.getReadTimeout()); - assertNotNull(factory.getFetchPause()); - assertNotNull(factory.getLimit()); - assertNotNull(factory.getTimeoutQueryParamValue()); - } - - @Test - public void testFactoryDefaultsWithProps() { - Properties props = new Properties(); - String userName = "deadpool"; - String password = "notSECURE"; - String host = "http://localhost:7001"; - String auth = "basic"; - String group = "myCluster"; - props.put("username", userName); - props.put("password", password); - props.put("host", host); - props.put("group", group); - - ConsumerFactory factory = new ConsumerFactory(props); - - assertNotNull(factory.getAuth()); - assertNotNull(factory.getConnectTimeout()); - assertNotNull(factory.getReadTimeout()); - assertNotNull(factory.getFetchPause()); - assertNotNull(factory.getLimit()); - assertNotNull(factory.getTimeoutQueryParamValue()); - } - - @Test - public void testFactoryOverrides() throws Exception { - Properties props = new Properties(); - String userName = "deadpool"; - String password = "notSECURE"; - String host = "http://localhost:7001"; - String group = "myCluster"; - props.put("username", userName); - props.put("password", password); - props.put("host", host); - props.put("group", group); - - String connectTimeout = "200"; - String readTimeout = "300"; - String fetchPause = "1000"; - String auth = "noauth"; - String timeoutQueryParamValue = "50"; - String limit = "2"; - props.put("connectTimeoutSeconds", connectTimeout); - props.put("readTimeoutMinutes", readTimeout); - props.put("fetchPause", fetchPause); - props.put("auth", auth); - props.put("timeout", timeoutQueryParamValue); - props.put("limit", limit); - - ConsumerFactory factory = new ConsumerFactory(props); - - assertEquals(auth, factory.getAuth()); - assertEquals(Integer.valueOf(connectTimeout), factory.getConnectTimeout()); - assertEquals(Integer.valueOf(readTimeout), factory.getReadTimeout()); - assertEquals(Integer.valueOf(fetchPause), factory.getFetchPause()); - assertEquals(Integer.valueOf(limit), factory.getLimit()); - assertEquals(Integer.valueOf(timeoutQueryParamValue), factory.getTimeoutQueryParamValue()); - } - - @Test - public void testManualOverrides() { - Properties props = new Properties(); - String userName = "deadpool"; - String password = "notSECURE"; - String host = "http://localhost:7001"; - String auth = "basic"; - String group = "myCluster"; - props.put("username", userName); - props.put("password", password); - props.put("host", host); - props.put("group", group); - - ConsumerFactory factory = new ConsumerFactory(props); - - assertNotNull(factory.getAuth()); - assertNotNull(factory.getConnectTimeout()); - assertNotNull(factory.getReadTimeout()); - assertNotNull(factory.getFetchPause()); - assertNotNull(factory.getLimit()); - assertNotNull(factory.getTimeoutQueryParamValue()); - String newAuth = "noauth"; - factory.setAuth(newAuth); - assertEquals(newAuth, factory.getAuth()); - - Integer connectTimeout = 1; - factory.setConnectTimeout(connectTimeout); - assertEquals(connectTimeout, factory.getConnectTimeout()); - - Integer fetchPause = 5; - factory.setFetchPause(fetchPause); - assertEquals(fetchPause, factory.getFetchPause()); - - factory.setFilter("\"filter\":{\n" + "\"class\":\"And\",\n" + "\"filters\":\n" + "[\n" + "{ \"class\":\"Equals\", \"foo\":\"abc\" },\n" + "{ \"class\":\"Assigned\", \"field\":\"bar\" }\n" + "]\n" + "}"); - assertNotNull(factory.getFilter()); - - Integer limit = 3; - factory.setLimit(limit); - assertEquals(limit, factory.getLimit()); - - Integer readTimeout = 2; - factory.setReadTimeout(readTimeout); - assertEquals(readTimeout, factory.getReadTimeout()); - - Integer timeoutQueryParamValue = 47; - factory.setTimeoutQueryParamValue(timeoutQueryParamValue); - assertEquals(timeoutQueryParamValue, factory.getTimeoutQueryParamValue()); - } - -} |