diff options
author | sunil.unnava <sunil.unnava@att.com> | 2019-05-10 11:21:44 -0400 |
---|---|---|
committer | sunil.unnava <sunil.unnava@att.com> | 2019-05-10 11:22:13 -0400 |
commit | dcb1479b85dc4fa43d7e417a05d01ac153b7fd1f (patch) | |
tree | 6068acd908b981fa1055b77660211a23821c94a3 /src/main | |
parent | ce07fa373a7f5bb876d5f31d3910baa66fd56fe1 (diff) |
clean MR codebase
Issue-ID: DMAAP-1192
Change-Id: I5cc729c9a475cac859510b57542da546e1570c96
Signed-off-by: sunil.unnava <sunil.unnava@att.com>
Diffstat (limited to 'src/main')
-rw-r--r-- | src/main/bash/cambriaDel.sh | 71 | ||||
-rw-r--r-- | src/main/bash/cambriaGet.sh | 71 | ||||
-rw-r--r-- | src/main/bash/cambriaPost.sh | 75 | ||||
-rw-r--r-- | src/main/bash/cambriaPut.sh | 75 | ||||
-rw-r--r-- | src/main/cpp/cambria.cpp | 624 | ||||
-rw-r--r-- | src/main/cpp/cambria.h | 114 | ||||
-rw-r--r-- | src/main/cpp/loopingPostClient.cpp | 92 | ||||
-rw-r--r-- | src/main/cpp/make.sh | 34 | ||||
-rw-r--r-- | src/main/cpp/sampleGetClient.cpp | 61 | ||||
-rw-r--r-- | src/main/cpp/samplePostClient.cpp | 112 | ||||
-rw-r--r-- | src/main/java/org/onap/dmaap/mr/client/MRClientFactory.java | 3 | ||||
-rw-r--r-- | src/main/java/org/onap/dmaap/mr/test/clients/ConsolePublisher.java | 2 | ||||
-rw-r--r-- | src/main/java/org/onap/dmaap/mr/test/clients/SampleConsumer.java | 2 | ||||
-rw-r--r-- | src/main/java/org/onap/dmaap/mr/test/clients/SamplePublisher.java | 2 | ||||
-rw-r--r-- | src/main/resources/dme2/consumer.properties | 11 | ||||
-rw-r--r-- | src/main/resources/dme2/producer.properties | 12 |
16 files changed, 14 insertions, 1347 deletions
diff --git a/src/main/bash/cambriaDel.sh b/src/main/bash/cambriaDel.sh deleted file mode 100644 index b5e71b4..0000000 --- a/src/main/bash/cambriaDel.sh +++ /dev/null @@ -1,71 +0,0 @@ -#!/bin/bash -#******************************************************************************* -# ============LICENSE_START======================================================= -# org.onap.dmaap -# ================================================================================ -# Copyright © 2017 AT&T Intellectual Property. All rights reserved. -# ================================================================================ -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ============LICENSE_END========================================================= -# -# ECOMP is a trademark and service mark of AT&T Intellectual Property. -# -#******************************************************************************* - -# format:// -# cambriaDel.sh <apiPath> - -if [ $# -gt 2 ]; then - echo "usage: cambriaDel.sh <apiPath>" - exit -fi -if [ $# -lt 1 ]; then - echo "usage: cambriaDel.sh <apiPath>" - exit -fi - -API=$1 - -# the date needs to be in one of the formats cambria accepts -case "$(uname -s)" in - - Darwin) - # "EEE MMM dd HH:mm:ss z yyyy" - DATE=`date` - ;; - - Linux) - # "EEE MMM dd HH:mm:ss z yyyy" - DATE=`date` - ;; - - CYGWIN*|MINGW32*|MSYS*) - DATE=`date --rfc-2822` - ;; - - *) - DATE=`date` - ;; -esac - - -URI="http://$CAMBRIA_SERVER/$API" - -if [ -z "$CAMBRIA_APIKEY" ]; then - echo "no auth" - curl -i -X GET $AUTHPART $URI -else - echo "auth in use" - SIGNATURE=`echo -n "$DATE" | openssl sha1 -hmac $CAMBRIA_APISECRET -binary | openssl base64` - curl -i -X DELETE -H "X-CambriaAuth: $CAMBRIA_APIKEY:$SIGNATURE" -H "X-CambriaDate: $DATE" $URI -fi - diff --git a/src/main/bash/cambriaGet.sh b/src/main/bash/cambriaGet.sh deleted file mode 100644 index 2e9c789..0000000 --- a/src/main/bash/cambriaGet.sh +++ /dev/null @@ -1,71 +0,0 @@ -#!/bin/bash -#******************************************************************************* -# ============LICENSE_START======================================================= -# org.onap.dmaap -# ================================================================================ -# Copyright © 2017 AT&T Intellectual Property. All rights reserved. -# ================================================================================ -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ============LICENSE_END========================================================= -# -# ECOMP is a trademark and service mark of AT&T Intellectual Property. -# -#******************************************************************************* - -# format:// -# cambriaGet.sh <apiPath> - -if [ $# -gt 2 ]; then - echo "usage: cambriaGet.sh <apiPath>" - exit -fi -if [ $# -lt 1 ]; then - echo "usage: cambriaGet.sh <apiPath>" - exit -fi - -API=$1 - -# the date needs to be in one of the formats cambria accepts -case "$(uname -s)" in - - Darwin) - # "EEE MMM dd HH:mm:ss z yyyy" - DATE=`date` - ;; - - Linux) - # "EEE MMM dd HH:mm:ss z yyyy" - DATE=`date` - ;; - - CYGWIN*|MINGW32*|MSYS*) - DATE=`date --rfc-2822` - ;; - - *) - DATE=`date` - ;; -esac - - -URI="http://$CAMBRIA_SERVER/$API" - -if [ -z "$CAMBRIA_APIKEY" ]; then - echo "no auth" - curl -i -X GET $AUTHPART $URI -else - echo "auth in use" - SIGNATURE=`echo -n "$DATE" | openssl sha1 -hmac $CAMBRIA_APISECRET -binary | openssl base64` - curl -i -X GET -H "X-CambriaAuth: $CAMBRIA_APIKEY:$SIGNATURE" -H "X-CambriaDate: $DATE" $URI -fi - diff --git a/src/main/bash/cambriaPost.sh b/src/main/bash/cambriaPost.sh deleted file mode 100644 index 7bd2911..0000000 --- a/src/main/bash/cambriaPost.sh +++ /dev/null @@ -1,75 +0,0 @@ -#!/bin/bash -#******************************************************************************* -# ============LICENSE_START======================================================= -# org.onap.dmaap -# ================================================================================ -# Copyright © 2017 AT&T Intellectual Property. All rights reserved. -# ================================================================================ -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ============LICENSE_END========================================================= -# -# ECOMP is a trademark and service mark of AT&T Intellectual Property. -# -#******************************************************************************* - -# format: -# cambriaPut.sh <apiPath> [<file>] - -if [ $# -gt 2 ]; then - echo "usage: cambriaPost.sh <apiPath> [<file>]" - exit -fi -if [ $# -lt 1 ]; then - echo "usage: cambriaPost.sh <apiPath> [<file>]" - exit -fi - -API=$1 -FILE= -if [ $# -gt 1 ]; then - FILE="-d @$2" -fi - -# the date needs to be in one of the formats cambria accepts -case "$(uname -s)" in - - Darwin) - # "EEE MMM dd HH:mm:ss z yyyy" - DATE=`date` - ;; - - Linux) - # "EEE MMM dd HH:mm:ss z yyyy" - DATE=`date` - ;; - - CYGWIN*|MINGW32*|MSYS*) - DATE=`date --rfc-2822` - ;; - - *) - DATE=`date` - ;; -esac - - -URI="http://$CAMBRIA_SERVER/$API" - -if [ -z "$CAMBRIA_APIKEY" ]; then - echo "no auth" - curl -i -X POST $FILE -H "Content-Type: application/json" $AUTHPART $URI -else - echo "auth in use" - SIGNATURE=`echo -n "$DATE" | openssl sha1 -hmac $CAMBRIA_APISECRET -binary | openssl base64` - curl -i -X POST $FILE -H "Content-Type: application/json" -H "X-CambriaAuth: $CAMBRIA_APIKEY:$SIGNATURE" -H "X-CambriaDate: $DATE" $URI -fi - diff --git a/src/main/bash/cambriaPut.sh b/src/main/bash/cambriaPut.sh deleted file mode 100644 index 36ee63f..0000000 --- a/src/main/bash/cambriaPut.sh +++ /dev/null @@ -1,75 +0,0 @@ -#!/bin/bash -#******************************************************************************* -# ============LICENSE_START======================================================= -# org.onap.dmaap -# ================================================================================ -# Copyright © 2017 AT&T Intellectual Property. All rights reserved. -# ================================================================================ -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ============LICENSE_END========================================================= -# -# ECOMP is a trademark and service mark of AT&T Intellectual Property. -# -#******************************************************************************* - -# format: -# cambriaPut.sh <apiPath> [<file>] - -if [ $# -gt 2 ]; then - echo "usage: cambriaPut.sh <apiPath> [<file>]" - exit -fi -if [ $# -lt 1 ]; then - echo "usage: cambriaPut.sh <apiPath> [<file>]" - exit -fi - -API=$1 -FILE= -if [ $# -gt 1 ]; then - FILE="-d @$2" -fi - -# the date needs to be in one of the formats cambria accepts -case "$(uname -s)" in - - Darwin) - # "EEE MMM dd HH:mm:ss z yyyy" - DATE=`date` - ;; - - Linux) - # "EEE MMM dd HH:mm:ss z yyyy" - DATE=`date` - ;; - - CYGWIN*|MINGW32*|MSYS*) - DATE=`date --rfc-2822` - ;; - - *) - DATE=`date` - ;; -esac - - -URI="http://$CAMBRIA_SERVER/$API" - -if [ -z "$CAMBRIA_APIKEY" ]; then - echo "no auth" - curl -i -X PUT $FILE -H "Content-Type: application/json" $AUTHPART $URI -else - echo "auth in use" - SIGNATURE=`echo -n "$DATE" | openssl sha1 -hmac $CAMBRIA_APISECRET -binary | openssl base64` - curl -i -X PUT $FILE -H "Content-Type: application/json" -H "X-CambriaAuth: $CAMBRIA_APIKEY:$SIGNATURE" -H "X-CambriaDate: $DATE" $URI -fi - diff --git a/src/main/cpp/cambria.cpp b/src/main/cpp/cambria.cpp deleted file mode 100644 index 7aff421..0000000 --- a/src/main/cpp/cambria.cpp +++ /dev/null @@ -1,624 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * org.onap.dmaap - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * - *******************************************************************************/ - -/* - * This is a client library for the AT&T Cambria Event Routing Service. - */ - -#include "cambria.h" - -#include <arpa/inet.h> -#include <sys/socket.h> -#include <netdb.h> -#include <unistd.h> -#include <stdio.h> -#include <stdlib.h> -#include <stdarg.h> -#include <string.h> - -#include <string> -#include <list> -#include <sstream> -#include <iomanip> -#include <algorithm> - -// field used in JSON encoding to signal stream name -const char* kPartition = "cambria.partition"; - -// map from opaque handle to object pointer -#define toOpaque(x) ((CAMBRIA_CLIENT)x) -#define fromOpaque(x) ((cambriaClient*)x) - -// trace support -extern void traceOutput ( const char* format, ... ); -#ifdef CAMBRIA_TRACING - #define TRACE traceOutput -#else - #define TRACE 1 ? (void) 0 : traceOutput -#endif - -/* - * internal cambria client class - */ -class cambriaClient -{ -public: - cambriaClient ( const std::string& host, int port, const std::string& topic, const std::string& format ); - ~cambriaClient (); - - cambriaSendResponse* send ( const char* streamName, const char* message ); - cambriaSendResponse* send ( const char* streamName, const char** message, unsigned int count ); - - cambriaGetResponse* get ( int timeoutMs, int limit ); - -private: - - std::string fHost; - int fPort; - std::string fTopic; - std::string fFormat; - - bool buildMessageBody ( const char* streamName, const char** msgs, unsigned int count, std::string& buffer ); - - void write ( int socket, const char* line ); -}; - -cambriaClient::cambriaClient ( const std::string& host, int port, const std::string& topic, const std::string& format ) : - fHost ( host ), - fPort ( port ), - fTopic ( topic ), - fFormat ( format ) -{ -} - -cambriaClient::~cambriaClient () -{ -} - -/* - * This isn't quite right -- if the message already has cambria.partition, - * it'll wind up with two entries. Also, message MUST start with '{' and - * have at least one field. - */ -static char* makeJsonMessage ( const char* streamName, const char* message ) -{ - int len = ::strlen ( message ); - if ( streamName ) - { - len += ::strlen ( kPartition ); - len += ::strlen ( streamName ); - len += 6; // quote each and a colon and comma - } - - char* msg = new char [ len + 1 ]; - ::strcpy ( msg, "{" ); - if ( streamName ) - { - ::strcat ( msg, "\"" ); - ::strcat ( msg, kPartition ); - ::strcat ( msg, "\":\"" ); - ::strcat ( msg, streamName ); - ::strcat ( msg, "\"," ); - } - ::strcat ( msg, message + 1 ); - return msg; -} - -cambriaSendResponse* cambriaClient::send ( const char* streamName, const char* message ) -{ - return send ( streamName, &message, 1 ); -} - -static bool replace ( std::string& str, const std::string& from, const std::string& to ) -{ - size_t start_pos = str.find ( from ); - if(start_pos == std::string::npos) - return false; - str.replace(start_pos, from.length(), to); - return true; -} - -static void readResponse ( int s, std::string& response ) -{ - char buffer [ 4096 ]; - - ssize_t n = 0; - while ( ( n = ::read ( s, buffer, 4095 ) ) > 0 ) - { - buffer[n] = '\0'; - response += buffer; - } -} - -static int openSocket ( std::string& host, int port, int& ss ) -{ - TRACE( "connecting to %s\n", host.c_str() ); - - struct hostent *he = ::gethostbyname ( host.c_str() ); - if ( !he ) - { - TRACE("no host entry\n"); - return CAMBRIA_NO_HOST; - } - - if ( he->h_addrtype != AF_INET ) - { - TRACE("not AF_INET\n"); - return CAMBRIA_NO_HOST; - } - - int s = ::socket ( AF_INET, SOCK_STREAM, 0 ); - if ( s == -1 ) - { - TRACE("no socket available\n"); - return CAMBRIA_CANT_CONNECT; - } - - struct sockaddr_in servaddr; - ::memset ( &servaddr, 0, sizeof(servaddr) ); - - ::memcpy ( &servaddr.sin_addr, he->h_addr_list[0], he->h_length ); - servaddr.sin_family = AF_INET; - servaddr.sin_port = ::htons ( port ); - - if ( ::connect ( s, (struct sockaddr *)&servaddr, sizeof(servaddr) ) ) - { - TRACE("couldn't connect\n"); - return CAMBRIA_CANT_CONNECT; - } - - ss = s; - return 0; -} - -cambriaSendResponse* cambriaClient::send ( const char* streamName, const char** msgs, unsigned int count ) -{ - TRACE ( "Sending %d messages.", count ); - - cambriaSendResponse* result = new cambriaSendResponse (); - result->statusCode = 0; - result->statusMessage = NULL; - result->responseBody = NULL; - - TRACE( "building message body\n" ); - - std::string body; - if ( !buildMessageBody ( streamName, msgs, count, body ) ) - { - result->statusCode = CAMBRIA_UNRECOGNIZED_FORMAT; - return result; - } - - int s = -1; - int err = ::openSocket ( fHost, fPort, s ); - if ( err > 0 ) - { - result->statusCode = err; - return result; - } - - // construct path - std::string path = "/cambriaApiServer/v1/event/"; - path += fTopic; - - // send post prefix - char line[4096]; - ::sprintf ( line, - "POST %s HTTP/1.0\r\n" - "Host: %s\r\n" - "Content-Type: %s\r\n" - "Content-Length: %d\r\n" - "\r\n", - path.c_str(), fHost.c_str(), fFormat.c_str(), body.length() ); - write ( s, line ); - - // send the body - write ( s, body.c_str() ); - - TRACE ( "\n" ); - TRACE ( "send complete, reading reply\n" ); - - // receive the response - std::string response; - readResponse ( s, response ); - ::close ( s ); - - // parse the header and body: split header and body on first occurrence of \r\n\r\n - result->statusCode = CAMBRIA_BAD_RESPONSE; - - size_t headerBreak = response.find ( "\r\n\r\n" ); - if ( headerBreak != std::string::npos ) - { - std::string responseBody = response.substr ( headerBreak + 4 ); - result->responseBody = new char [ responseBody.length() + 1 ]; - ::strcpy ( result->responseBody, responseBody.c_str() ); - - // all we need from the header for now is the status line - std::string headerPart = response.substr ( 0, headerBreak + 2 ); - - size_t newline = headerPart.find ( '\r' ); - if ( newline != std::string::npos ) - { - std::string statusLine = headerPart.substr ( 0, newline ); - - size_t firstSpace = statusLine.find ( ' ' ); - if ( firstSpace != std::string::npos ) - { - size_t secondSpace = statusLine.find ( ' ', firstSpace + 1 ); - if ( secondSpace != std::string::npos ) - { - result->statusCode = ::atoi ( statusLine.substr ( firstSpace + 1, secondSpace - firstSpace + 1 ).c_str() ); - std::string statusMessage = statusLine.substr ( secondSpace + 1 ); - result->statusMessage = new char [ statusMessage.length() + 1 ]; - ::strcpy ( result->statusMessage, statusMessage.c_str() ); - } - } - } - } - return result; -} - -void cambriaClient::write ( int socket, const char* str ) -{ - int len = str ? ::strlen ( str ) : 0; - ::write ( socket, str, len ); - - // elaborate tracing nonsense... - std::string trace ( "> " ); - trace += str; - while ( replace ( trace, "\r\n", "\\r\\n\n> " ) ); - - TRACE ( "%s", trace.c_str() ); -} - -bool cambriaClient::buildMessageBody ( const char* streamName, const char** msgs, unsigned int count, std::string& buffer ) -{ - if ( fFormat == CAMBRIA_NATIVE_FORMAT ) - { - int snLen = ::strlen ( streamName ); - for ( unsigned int i=0; i<count; i++ ) - { - const char* msg = msgs[i]; - - std::ostringstream s; - s << snLen << '.' << ::strlen(msg) << '.' << streamName << msg; - buffer.append ( s.str() ); - } - } - else if ( fFormat == CAMBRIA_JSON_FORMAT ) - { - buffer.append ( "[" ); - for ( unsigned int i=0; i<count; i++ ) - { - if ( i>0 ) - { - buffer.append ( "," ); - } - const char* msg = msgs[i]; - char* jsonMsg = ::makeJsonMessage ( streamName, msg ); - buffer.append ( jsonMsg ); - delete jsonMsg; // FIXME: allocating memory here just to delete it - } - buffer.append ( "]" ); - } - else - { - return false; - } - return true; -} - -// read the next string into value, and return the end pos, or 0 on error -static int readNextJsonString ( const std::string& body, int startPos, std::string& value ) -{ - value = ""; - - if ( startPos >= body.length () ) - { - return 0; - } - - // skip a comma - int current = startPos; - if ( body[current] == ',' ) current++; - - if ( current >= body.length() || body[current] != '"' ) - { - return 0; - } - current++; - - // walk the string for the closing quote (FIXME: unicode support) - bool esc = false; - int hex = 0; - while ( ( body[current] != '"' || esc ) && current < body.length() ) - { - if ( hex > 0 ) - { - hex--; - if ( hex == 0 ) - { - // presumably read a unicode escape. this code isn't - // equipped for multibyte or unicode, so just skip it - value += '?'; - } - } - else if ( esc ) - { - esc = false; - switch ( body[current] ) - { - case '"': - case '\\': - case '/': - value += body[current]; - break; - - case 'b': value += '\b'; break; - case 'f': value += '\f'; break; - case 'n': value += '\n'; break; - case 'r': value += '\r'; break; - case 't': value += '\t'; break; - - case 'u': hex=4; break; - } - } - else - { - esc = body[current] == '\\'; - if ( !esc ) value += body[current]; - } - current++; - } - - return current + 1; -} - -static void readGetBody ( std::string& body, cambriaGetResponse& response ) -{ - TRACE("response %s\n", body.c_str() ); - - if ( body.length() < 2 || body[0] != '[' || body[body.length()-1] != ']' ) - { - response.statusCode = CAMBRIA_BAD_RESPONSE; - } - - std::list<char*> msgs; - std::string val; - int current = 1; - while ( ( current = readNextJsonString ( body, current, val ) ) > 0 ) - { - char* msg = new char [ val.length() + 1 ]; - ::strcpy ( msg, val.c_str() ); - msgs.push_back ( msg ); - } - - // now build a response - response.messageCount = msgs.size(); - response.messageSet = new char* [ msgs.size() ]; - int index = 0; - for ( std::list<char*>::iterator it = msgs.begin(); it != msgs.end(); it++ ) - { - response.messageSet [ index++ ] = *it; - } -} - -cambriaGetResponse* cambriaClient::get ( int timeoutMs, int limit ) -{ - cambriaGetResponse* result = new cambriaGetResponse (); - result->statusCode = 0; - result->statusMessage = NULL; - result->messageCount = 0; - result->messageSet = new char* [ 1 ]; - - int s = -1; - int err = ::openSocket ( fHost, fPort, s ); - if ( err > 0 ) - { - result->statusCode = err; - return result; - } - - // construct path - std::string path = "/cambriaApiServer/v1/event/"; - path += fTopic; - - bool haveAdds = false; - std::ostringstream adds; - if ( timeoutMs > -1 ) - { - adds << "timeout=" << timeoutMs; - haveAdds = true; - } - if ( limit > -1 ) - { - if ( haveAdds ) - { - adds << "&"; - } - adds << "limit=" << limit; - haveAdds = true; - } - if ( haveAdds ) - { - path += "?"; - path += adds.str(); - } - - // send post prefix - char line[4096]; - ::sprintf ( line, - "GET %s HTTP/1.0\r\n" - "Host: %s\r\n" - "\r\n", - path.c_str(), fHost.c_str() ); - write ( s, line ); - - TRACE ( "\n" ); - TRACE ( "request sent; reading reply\n" ); - - // receive the response (FIXME: would be nice to stream rather than load it all) - std::string response; - readResponse ( s, response ); - ::close ( s ); - - // parse the header and body: split header and body on first occurrence of \r\n\r\n - result->statusCode = CAMBRIA_BAD_RESPONSE; - - size_t headerBreak = response.find ( "\r\n\r\n" ); - if ( headerBreak != std::string::npos ) - { - // get the header line - std::string headerPart = response.substr ( 0, headerBreak + 2 ); - - size_t newline = headerPart.find ( '\r' ); - if ( newline != std::string::npos ) - { - std::string statusLine = headerPart.substr ( 0, newline ); - - size_t firstSpace = statusLine.find ( ' ' ); - if ( firstSpace != std::string::npos ) - { - size_t secondSpace = statusLine.find ( ' ', firstSpace + 1 ); - if ( secondSpace != std::string::npos ) - { - result->statusCode = ::atoi ( statusLine.substr ( firstSpace + 1, secondSpace - firstSpace + 1 ).c_str() ); - std::string statusMessage = statusLine.substr ( secondSpace + 1 ); - result->statusMessage = new char [ statusMessage.length() + 1 ]; - ::strcpy ( result->statusMessage, statusMessage.c_str() ); - } - } - } - - if ( result->statusCode < 300 ) - { - std::string responseBody = response.substr ( headerBreak + 4 ); - readGetBody ( responseBody, *result ); - } - } - return result; -} - - -/////////////////////////////////////////////////////////////////////////////// -/////////////////////////////////////////////////////////////////////////////// -/////////////////////////////////////////////////////////////////////////////// - -CAMBRIA_CLIENT cambriaCreateClient ( const char* host, int port, const char* topic, const char* format ) -{ - cambriaClient* cc = new cambriaClient ( host, port, topic, format ); - return toOpaque(cc); -} - -void cambriaDestroyClient ( CAMBRIA_CLIENT client ) -{ - delete fromOpaque ( client ); -} - -cambriaSendResponse* cambriaSendMessage ( CAMBRIA_CLIENT client, const char* streamName, const char* message ) -{ - cambriaClient* c = fromOpaque ( client ); - return c->send ( streamName, message ); -} - -cambriaSendResponse* cambriaSendMessages ( CAMBRIA_CLIENT client, const char* streamName, const char** messages, unsigned int count ) -{ - cambriaClient* c = fromOpaque ( client ); - return c->send ( streamName, messages, count ); -} - -cambriaGetResponse* cambriaGetMessages ( CAMBRIA_CLIENT client, unsigned long timeoutMs, unsigned int limit ) -{ - cambriaClient* c = fromOpaque ( client ); - return c->get ( timeoutMs, limit ); -} - -void cambriaDestroySendResponse ( CAMBRIA_CLIENT client, const cambriaSendResponse* response ) -{ - if ( response ) - { - delete response->statusMessage; - delete response->responseBody; - delete response; - } -} - -void cambriaDestroyGetResponse ( CAMBRIA_CLIENT client, const cambriaGetResponse* response ) -{ - if ( response ) - { - delete response->statusMessage; - for ( int i=0; i<response->messageCount; i++ ) - { - delete response->messageSet[i]; - } - delete response; - } -} - -int cambriaSimpleSend ( const char* host, int port, const char* topic, const char* streamName, const char* msg ) -{ - return cambriaSimpleSendMultiple ( host, port, topic, streamName, &msg, 1 ); -} - -int cambriaSimpleSendMultiple ( const char* host, int port, const char* topic, const char* streamName, const char** messages, unsigned int msgCount ) -{ - int count = 0; - - const CAMBRIA_CLIENT cc = ::cambriaCreateClient ( host, port, topic, CAMBRIA_NATIVE_FORMAT ); - if ( cc ) - { - const cambriaSendResponse* response = ::cambriaSendMessages ( cc, streamName, messages, msgCount ); - if ( response && response->statusCode < 300 ) - { - count = msgCount; - } - ::cambriaDestroySendResponse ( cc, response ); - ::cambriaDestroyClient ( cc ); - } - - return count; -} - -//////////////////////////////////////////////////////////////////////////////// -//////////////////////////////////////////////////////////////////////////////// -//////////////////////////////////////////////////////////////////////////////// - -const unsigned int kMaxTraceBuffer = 2048; - -static void writeTraceString ( const char* msg ) -{ - ::fprintf ( stdout, "%s", msg ); - ::fflush ( stdout ); // because we want output before core dumping :-) -} - -void traceOutput ( const char* format, ... ) -{ - char buffer [ kMaxTraceBuffer ]; - ::memset ( buffer, '\0', kMaxTraceBuffer * sizeof ( char ) ); - - va_list list; - va_start ( list, format ); - ::vsprintf ( buffer, format, list ); - writeTraceString ( buffer ); - va_end ( list ); -} diff --git a/src/main/cpp/cambria.h b/src/main/cpp/cambria.h deleted file mode 100644 index 68e9ca9..0000000 --- a/src/main/cpp/cambria.h +++ /dev/null @@ -1,114 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * org.onap.dmaap - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * - *******************************************************************************/ - -#ifndef _CABMRIA_H_ -#define _CABMRIA_H_ - -/* - * This is a client library for the AT&T Cambria Event Routing Service. - * - * Cambria clients post string messages to the broker on a topic, optionally - * with a partition name. - */ - -/* An opaque type for the client instance. */ -typedef void* CAMBRIA_CLIENT; - -/* Cambria has two formats. CAMBRIA_NATIVE_FORMAT is preferred. */ -#define CAMBRIA_NATIVE_FORMAT "application/cambria" -#define CAMBRIA_JSON_FORMAT "application/json" - -/* pseudo-HTTP client-side status codes */ -#define CAMBRIA_NO_HOST 470 -#define CAMBRIA_CANT_CONNECT 471 -#define CAMBRIA_UNRECOGNIZED_FORMAT 472 -#define CAMBRIA_BAD_RESPONSE 570 - -/* - * Send response structure. Be sure to call cambriaDestroySendResponse() after receiving this. - */ -struct cambriaSendResponse -{ - int statusCode; - char* statusMessage; - char* responseBody; -}; - -/* - * Get response structure. Be sure to call cambriaDestroyGetResponse() after receiving this. - */ -struct cambriaGetResponse -{ - int statusCode; - char* statusMessage; - - int messageCount; - char** messageSet; -}; - -/* - * Send a message in a single call. Returns the number sent (1 or 0). - */ -extern "C" int cambriaSimpleSend ( const char* host, int port, const char* topic, const char* streamName, const char* msg ); - -/* - * Send multiple messages in a single call. Returns the number sent. - */ -extern "C" int cambriaSimpleSendMultiple ( const char* host, int port, const char* topic, const char* streamName, const char** messages, unsigned int msgCount ); - -/* - * Create a client instance to post messages to the given host:port, topic, and - * either the CAMBRIA_NATIVE_FORMAT or CAMBRIA_JSON_FORMAT. - */ -extern "C" CAMBRIA_CLIENT cambriaCreateClient ( const char* host, int port, const char* topic, const char* format ); - -/* - * Cleanup a client instance. - */ -extern "C" void cambriaDestroyClient ( CAMBRIA_CLIENT client ); - -/* - * Send a single message to the broker using the stream name provided. (If null, no stream name is used.) - */ -extern "C" cambriaSendResponse* cambriaSendMessage ( CAMBRIA_CLIENT client, const char* streamName, const char* message ); - -/* - * Send a batch of messages to the broker using the stream name provided. (If null, no stream name is used.) - */ -extern "C" cambriaSendResponse* cambriaSendMessages ( CAMBRIA_CLIENT client, const char* streamName, const char** messages, unsigned int count ); - -/* - * Retrieve messages from the broker. If a timeout value is 0 (or lower), the broker returns a response - * immediately. Otherwise, the server holds the connection open up to the given timeout. Likewise, if limit - * is 0 (or lower), the server sends as many messages as it cares to. Otherwise, at most 'limit' messages are - * returned. - */ -extern "C" cambriaGetResponse* cambriaGetMessages ( CAMBRIA_CLIENT client, unsigned long timeoutMs, unsigned int limit ); - -/* - * After processing a response, pass it back to the library for cleanup. - */ -extern "C" void cambriaDestroySendResponse ( CAMBRIA_CLIENT client, const cambriaSendResponse* response ); - -extern "C" void cambriaDestroyGetResponse ( CAMBRIA_CLIENT client, const cambriaGetResponse* response ); - -#endif diff --git a/src/main/cpp/loopingPostClient.cpp b/src/main/cpp/loopingPostClient.cpp deleted file mode 100644 index 1396eea..0000000 --- a/src/main/cpp/loopingPostClient.cpp +++ /dev/null @@ -1,92 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * org.onap.dmaap - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * - *******************************************************************************/ - -#include <stdio.h> -#include <ctime> -#include <string.h> -#include "cambria.h" - -const char* kAlarm = - "<EVENT>" - "<AGENT_ADDR>12.123.70.213</AGENT_ADDR>" - "<AGENT_RESOLVED>ptdor306me1.els-an.att.net</AGENT_RESOLVED>" - "<TIME_RECEIVED>1364716208</TIME_RECEIVED>" - " <PROTOCOL_VERSION>V1</PROTOCOL_VERSION>" - " <ENTERPRISE_LEN>9</ENTERPRISE_LEN>" - " <ENTERPRISE>.1.3.6.1.4.1.9.9.187</ENTERPRISE>" - " <GENERIC>6</GENERIC>" - " <SPECIFIC>2</SPECIFIC>" - " <COMMAND>167</COMMAND>" - " <REQUEST_ID>0</REQUEST_ID>" - " <ERROR_STATUS>0</ERROR_STATUS>" - " <ERROR_INDEX>0</ERROR_INDEX>" - " <AGENT_TIME_UP>1554393204</AGENT_TIME_UP>" - " <COMMUNITY_LEN>10</COMMUNITY_LEN>" - " <COMMUNITY>nidVeskaf0</COMMUNITY>" - " <VARBIND>" - " <VARBIND_OID>.1.3.6.1.2.1.15.3.1.14.32.4.52.58</VARBIND_OID>" - " <VARBIND_TYPE>OCTET_STRING_HEX</VARBIND_TYPE>" - " <VARBIND_VALUE>02 02 </VARBIND_VALUE>" - " </VARBIND>" - " <VARBIND>" - " <VARBIND_OID>.1.3.6.1.2.1.15.3.1.2.32.4.52.58</VARBIND_OID>" - " <VARBIND_TYPE>INTEGER</VARBIND_TYPE>" - " <VARBIND_VALUE>1</VARBIND_VALUE>" - " </VARBIND>" - " <VARBIND>" - " <VARBIND_OID>.1.3.6.1.4.1.9.9.187.1.2.1.1.7.32.4.52.58</VARBIND_OID>" - " <VARBIND_TYPE>OCTET_STRING_ASCII</VARBIND_TYPE>" - " <VARBIND_VALUE>peer in wrong AS</VARBIND_VALUE>" - " </VARBIND>" - " <VARBIND>" - " <VARBIND_OID>.1.3.6.1.4.1.9.9.187.1.2.1.1.8.32.4.52.58</VARBIND_OID>" - " <VARBIND_TYPE>INTEGER</VARBIND_TYPE>" - " <VARBIND_VALUE>4</VARBIND_VALUE>" - " </VARBIND>" - "</EVENT>"; - -int main ( int argc, const char* argv[] ) -{ - char** msgs = new char* [ 100 ]; - for ( int i=0; i<100; i++ ) - { - msgs[i] = new char [ ::strlen ( kAlarm + 1 ) ]; - ::strcpy ( msgs[i], kAlarm ); - } - - std::time_t start = std::time ( NULL ); - for ( int i=0; i<5000; i++ ) - { - ::cambriaSimpleSendMultiple ( "localhost", 8080, "topic", "streamName", (const char**)msgs, 100 ); - if ( i % 50 == 0 ) - { - std::time_t end = std::time ( NULL ); - double seconds = difftime ( end, start ); - ::printf ( "%.f seconds for %u posts.\n", seconds, i*100 ); - } - } - std::time_t end = std::time ( NULL ); - double seconds = difftime ( end, start ); - ::printf ( "%.f seconds for 1,000,000 posts.\n", seconds ); - - return 0; -} diff --git a/src/main/cpp/make.sh b/src/main/cpp/make.sh deleted file mode 100644 index a9b2726..0000000 --- a/src/main/cpp/make.sh +++ /dev/null @@ -1,34 +0,0 @@ -#******************************************************************************* -# ============LICENSE_START======================================================= -# org.onap.dmaap -# ================================================================================ -# Copyright © 2017 AT&T Intellectual Property. All rights reserved. -# ================================================================================ -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ============LICENSE_END========================================================= -# -# ECOMP is a trademark and service mark of AT&T Intellectual Property. -# -#******************************************************************************* - -rm -rf *.o -rm -rf cambriaSamplePost -rm -rf cambriaSampleFetch -rm -rf loopPost - -#-DCAMBRIA_TRACING - -g++ cambria.cpp samplePostClient.cpp -o cambriaSamplePost -g++ cambria.cpp sampleGetClient.cpp -o cambriaSampleFetch - -g++ cambria.cpp loopingPostClient.cpp -o loopPost - diff --git a/src/main/cpp/sampleGetClient.cpp b/src/main/cpp/sampleGetClient.cpp deleted file mode 100644 index 610988c..0000000 --- a/src/main/cpp/sampleGetClient.cpp +++ /dev/null @@ -1,61 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * org.onap.dmaap - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * - *******************************************************************************/ - -#include <stdio.h> -#include "cambria.h" - -int main ( int argc, const char* argv[] ) -{ - const CAMBRIA_CLIENT cc = ::cambriaCreateClient ( "localhost", 8080, "topic", CAMBRIA_NATIVE_FORMAT ); - if ( !cc ) - { - ::printf ( "Couldn't create client.\n" ); - return 1; - } - - int count = 0; - while ( 1 ) - { - cambriaGetResponse* response = ::cambriaGetMessages ( cc, 5000, 1024*1024 ); - if ( response && response->statusCode < 300 ) - { - for ( int i=0; i<response->messageCount; i++ ) - { - const char* msg = response->messageSet [ i ]; - ::printf ( "%d: %s\n", count++, msg ); - } - ::cambriaDestroyGetResponse ( cc, response ); - } - else if ( response ) - { - ::fprintf ( stderr, "%d %s", response->statusCode, response->statusMessage ); - } - else - { - ::fprintf ( stderr, "No response object.\n" ); - } - } - - ::cambriaDestroyClient ( cc ); - - return 0; -} diff --git a/src/main/cpp/samplePostClient.cpp b/src/main/cpp/samplePostClient.cpp deleted file mode 100644 index a4b2207..0000000 --- a/src/main/cpp/samplePostClient.cpp +++ /dev/null @@ -1,112 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * org.onap.dmaap - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * - *******************************************************************************/ - -#include <stdio.h> -#include "cambria.h" - -void handleResponse ( const CAMBRIA_CLIENT cc, const cambriaSendResponse* response ) -{ - if ( response ) - { - ::printf ( "\t%d %s\n", response->statusCode, ( response->statusMessage ? response->statusMessage : "" ) ); - ::printf ( "\t%s\n", response->responseBody ? response->responseBody : "" ); - - // destroy the response (or it'll leak) - ::cambriaDestroySendResponse ( cc, response ); - } - else - { - ::fprintf ( stderr, "No response object.\n" ); - } -} - -int main ( int argc, const char* argv[] ) -{ - //////////////////////////////////////////////////////////////////////////// - //////////////////////////////////////////////////////////////////////////// - //////////////////////////////////////////////////////////////////////////// - - // you can send single message in one call... - ::printf ( "Sending single message...\n" ); - int sent = ::cambriaSimpleSend ( "localhost", 8080, "topic", "streamName", - "{ \"field\":\"this is a JSON formatted alarm\" }" ); - ::printf ( "\t%d sent\n\n", sent ); - - // you can also send multiple messages in one call with cambriaSimpleSendMultiple. - // the message argument becomes an array of strings, and you pass an array - // count too. - const char* msgs[] = - { - "{\"format\":\"json\"}", - "<format>xml</format>", - "or whatever. they're just strings." - }; - sent = ::cambriaSimpleSendMultiple ( "localhost", 8080, "topic", "streamName", msgs, 3 ); - ::printf ( "\t%d sent\n\n", sent ); - - //////////////////////////////////////////////////////////////////////////// - //////////////////////////////////////////////////////////////////////////// - //////////////////////////////////////////////////////////////////////////// - - // you can also create a client instance to keep around and make multiple - // send requests to. Chunked sending isn't supported right now, so each - // call to cambriaSendMessage results in a full socket open / post / close - // cycle, but hopefully we can improve this with chunking so that subsequent - // sends just push the message into the socket. - - // create a client - const CAMBRIA_CLIENT cc = ::cambriaCreateClient ( "localhost", 8080, "topic", CAMBRIA_NATIVE_FORMAT ); - if ( !cc ) - { - ::printf ( "Couldn't create client.\n" ); - return 1; - } - - //////////////////////////////////////////////////////////////////////////// - // send a single message - ::printf ( "Sending single message...\n" ); - const cambriaSendResponse* response = ::cambriaSendMessage ( cc, "streamName", "{\"foo\":\"bar\"}" ); - handleResponse ( cc, response ); - - //////////////////////////////////////////////////////////////////////////// - // send a few messages at once - const char* msgs2[] = - { - "{\"foo\":\"bar\"}", - "{\"bar\":\"baz\"}", - "{\"zoo\":\"zee\"}", - "{\"foo\":\"bar\"}", - "{\"foo\":\"bar\"}", - "{\"foo\":\"bar\"}", - }; - unsigned int count = sizeof(msgs2)/sizeof(const char*); - - ::printf ( "Sending %d messages...\n", count ); - response = ::cambriaSendMessages ( cc, "streamName", msgs2, count ); - handleResponse ( cc, response ); - - //////////////////////////////////////////////////////////////////////////// - // destroy the client (or it'll leak) - ::cambriaDestroyClient ( cc ); - - return 0; -} diff --git a/src/main/java/org/onap/dmaap/mr/client/MRClientFactory.java b/src/main/java/org/onap/dmaap/mr/client/MRClientFactory.java index 5c06383..84885d3 100644 --- a/src/main/java/org/onap/dmaap/mr/client/MRClientFactory.java +++ b/src/main/java/org/onap/dmaap/mr/client/MRClientFactory.java @@ -196,8 +196,7 @@ public class MRClientFactory { * * @param hostList * A comma separated list of hosts to use to connect to MR. You - * can include port numbers (3904 is the default). For example, - * "ueb01hydc.it.att.com:8080,ueb02hydc.it.att.com" + * can include port numbers (3904 is the default)" * @param topic * The topic to consume * @param consumerGroup diff --git a/src/main/java/org/onap/dmaap/mr/test/clients/ConsolePublisher.java b/src/main/java/org/onap/dmaap/mr/test/clients/ConsolePublisher.java index 62e4cda..d8b1979 100644 --- a/src/main/java/org/onap/dmaap/mr/test/clients/ConsolePublisher.java +++ b/src/main/java/org/onap/dmaap/mr/test/clients/ConsolePublisher.java @@ -49,7 +49,7 @@ public class ConsolePublisher public static void main ( String[] args ) throws IOException //throws IOException, InterruptedException { // read the hosts(s) from the command line - final String hosts = args.length > 0 ? args[0] : "aaa.it.att.com,bbb.it.att.com,ccc.it.att.com"; + final String hosts = args.length > 0 ? args[0] : "mr1.onap.com,mr2.onap.com,mr3.onap.com"; // read the topic name from the command line final String topic = args.length > 1 ? args[1] : "TEST-TOPIC"; diff --git a/src/main/java/org/onap/dmaap/mr/test/clients/SampleConsumer.java b/src/main/java/org/onap/dmaap/mr/test/clients/SampleConsumer.java index 1f78be1..eb96780 100644 --- a/src/main/java/org/onap/dmaap/mr/test/clients/SampleConsumer.java +++ b/src/main/java/org/onap/dmaap/mr/test/clients/SampleConsumer.java @@ -38,7 +38,7 @@ public class SampleConsumer { log.info("Sample Consumer Class executing"); - final String topic = "com.att.app.dmaap.mr.testingTopic"; + final String topic = "org.onap.dmaap.mr.testingTopic"; final String url = ( args.length > 1 ? args[1] : "localhost:8181" ); final String group = ( args.length > 2 ? args[2] :"grp" ); diff --git a/src/main/java/org/onap/dmaap/mr/test/clients/SamplePublisher.java b/src/main/java/org/onap/dmaap/mr/test/clients/SamplePublisher.java index 5f8768c..f857afd 100644 --- a/src/main/java/org/onap/dmaap/mr/test/clients/SamplePublisher.java +++ b/src/main/java/org/onap/dmaap/mr/test/clients/SamplePublisher.java @@ -42,7 +42,7 @@ public class SamplePublisher { // read the topic name from the command line - final String topic = ( args.length > 1 ? args[1] : "com.att.app.dmaap.mr.testingTopic" ); + final String topic = ( args.length > 1 ? args[1] : "org.onap.dmaap.mr.testingTopic" ); // set up some batch limits and the compression flag final int maxBatchSize = 100; diff --git a/src/main/resources/dme2/consumer.properties b/src/main/resources/dme2/consumer.properties index 8350288..178b391 100644 --- a/src/main/resources/dme2/consumer.properties +++ b/src/main/resources/dme2/consumer.properties @@ -23,28 +23,27 @@ TransportType=DME2 Latitude =47.778998 Longitude =-122.182883 Version =1.0 -ServiceName =dmaap-v1.dev.dmaap.dt.saat.acsi.att.com/events +ServiceName =mr/events Environment =TEST Partner=BOT_R routeOffer=MR1 SubContextPath =/ Protocol =http MethodType =GET -username =<att uid> +username =<uid> password =<password> contenttype =application/json authKey=<auth key> authDate=2016-02-18T13:57:37-0800 -#host=uebsb91bodc.it.att.com:3904 host=<host>:<port> -topic=com.att.ecomp_test.crm.preDemo1 +topic=org.onap.dmaap.mr.preDemo1 group=con id=5 timeout=15000 limit=1000 filter= -AFT_DME2_EXCHANGE_REQUEST_HANDLERS=com.att.nsa.test.PreferredRouteRequestHandler -AFT_DME2_EXCHANGE_REPLY_HANDLERS=com.att.nsa.test.PreferredRouteReplyHandler +AFT_DME2_EXCHANGE_REQUEST_HANDLERS=org.onap.dmaap.mr.dme.client.PreferredRouteRequestHandler +AFT_DME2_EXCHANGE_REPLY_HANDLERS=org.onap.dmaap.mr.dme.client.PreferredRouteReplyHandler AFT_DME2_REQ_TRACE_ON=true AFT_ENVIRONMENT=AFTUAT AFT_DME2_EP_CONN_TIMEOUT=15000 diff --git a/src/main/resources/dme2/producer.properties b/src/main/resources/dme2/producer.properties index b3c3630..4818a3b 100644 --- a/src/main/resources/dme2/producer.properties +++ b/src/main/resources/dme2/producer.properties @@ -23,27 +23,25 @@ TransportType=DME2 Latitude =47.778998 Longitude =-122.182883 Version =1.0 -ServiceName =dmaap-v1.dev.dmaap.dt.saat.acsi.att.com/events -#com.att.acsi.saat.dt.dmaap.dev.mrclientnew1 +ServiceName =mr/events Environment =TEST Partner=BOT_R routeOffer=MR1 SubContextPath =/ Protocol =http MethodType =POST -username =<att uid> +username =<uid> password =<global logon password> contenttype = application/json authKey=<auth key> authDate=2016-07-20T11:30:56-0700 host=<host>:<port> -topic=com.att.ecomp_test.crm.preDemo1 -#host=uebsb91bodc.it.att.com:3904 +topic=org.onap.dmaap.mr.preDemo1 partition=1 maxBatchSize=100 maxAgeMs=250 -AFT_DME2_EXCHANGE_REQUEST_HANDLERS=com.att.nsa.test.PreferredRouteRequestHandler -AFT_DME2_EXCHANGE_REPLY_HANDLERS=com.att.nsa.test.PreferredRouteReplyHandler +AFT_DME2_EXCHANGE_REQUEST_HANDLERS=org.onap.dmaap.mr.dme.client.PreferredRouteRequestHandler +AFT_DME2_EXCHANGE_REPLY_HANDLERS=org.onap.dmaap.mr.dme.client.PreferredRouteReplyHandler AFT_DME2_REQ_TRACE_ON=true AFT_ENVIRONMENT=AFTUAT AFT_DME2_EP_CONN_TIMEOUT=15000 |