From cc9de9bc6803212f0233e0e1bf06aa63fe8b7a6a Mon Sep 17 00:00:00 2001 From: Varun Gudisena Date: Wed, 30 Aug 2017 20:49:32 -0500 Subject: Add Initial Code Import Intial Code import for dmaapClient Issue-id: DMAAP-82 Change-Id: Ib627672d37e233b796619f93dd91f5caaf1592e4 Signed-off-by: Varun Gudisena --- src/main/cpp/cambria.cpp | 624 +++++++++++++++++++++++++++++++++++++ src/main/cpp/cambria.h | 114 +++++++ src/main/cpp/loopingPostClient.cpp | 92 ++++++ src/main/cpp/make.sh | 34 ++ src/main/cpp/sampleGetClient.cpp | 61 ++++ src/main/cpp/samplePostClient.cpp | 112 +++++++ 6 files changed, 1037 insertions(+) create mode 100644 src/main/cpp/cambria.cpp create mode 100644 src/main/cpp/cambria.h create mode 100644 src/main/cpp/loopingPostClient.cpp create mode 100644 src/main/cpp/make.sh create mode 100644 src/main/cpp/sampleGetClient.cpp create mode 100644 src/main/cpp/samplePostClient.cpp (limited to 'src/main/cpp') diff --git a/src/main/cpp/cambria.cpp b/src/main/cpp/cambria.cpp new file mode 100644 index 0000000..7aff421 --- /dev/null +++ b/src/main/cpp/cambria.cpp @@ -0,0 +1,624 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ + +/* + * This is a client library for the AT&T Cambria Event Routing Service. + */ + +#include "cambria.h" + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +// field used in JSON encoding to signal stream name +const char* kPartition = "cambria.partition"; + +// map from opaque handle to object pointer +#define toOpaque(x) ((CAMBRIA_CLIENT)x) +#define fromOpaque(x) ((cambriaClient*)x) + +// trace support +extern void traceOutput ( const char* format, ... ); +#ifdef CAMBRIA_TRACING + #define TRACE traceOutput +#else + #define TRACE 1 ? (void) 0 : traceOutput +#endif + +/* + * internal cambria client class + */ +class cambriaClient +{ +public: + cambriaClient ( const std::string& host, int port, const std::string& topic, const std::string& format ); + ~cambriaClient (); + + cambriaSendResponse* send ( const char* streamName, const char* message ); + cambriaSendResponse* send ( const char* streamName, const char** message, unsigned int count ); + + cambriaGetResponse* get ( int timeoutMs, int limit ); + +private: + + std::string fHost; + int fPort; + std::string fTopic; + std::string fFormat; + + bool buildMessageBody ( const char* streamName, const char** msgs, unsigned int count, std::string& buffer ); + + void write ( int socket, const char* line ); +}; + +cambriaClient::cambriaClient ( const std::string& host, int port, const std::string& topic, const std::string& format ) : + fHost ( host ), + fPort ( port ), + fTopic ( topic ), + fFormat ( format ) +{ +} + +cambriaClient::~cambriaClient () +{ +} + +/* + * This isn't quite right -- if the message already has cambria.partition, + * it'll wind up with two entries. Also, message MUST start with '{' and + * have at least one field. + */ +static char* makeJsonMessage ( const char* streamName, const char* message ) +{ + int len = ::strlen ( message ); + if ( streamName ) + { + len += ::strlen ( kPartition ); + len += ::strlen ( streamName ); + len += 6; // quote each and a colon and comma + } + + char* msg = new char [ len + 1 ]; + ::strcpy ( msg, "{" ); + if ( streamName ) + { + ::strcat ( msg, "\"" ); + ::strcat ( msg, kPartition ); + ::strcat ( msg, "\":\"" ); + ::strcat ( msg, streamName ); + ::strcat ( msg, "\"," ); + } + ::strcat ( msg, message + 1 ); + return msg; +} + +cambriaSendResponse* cambriaClient::send ( const char* streamName, const char* message ) +{ + return send ( streamName, &message, 1 ); +} + +static bool replace ( std::string& str, const std::string& from, const std::string& to ) +{ + size_t start_pos = str.find ( from ); + if(start_pos == std::string::npos) + return false; + str.replace(start_pos, from.length(), to); + return true; +} + +static void readResponse ( int s, std::string& response ) +{ + char buffer [ 4096 ]; + + ssize_t n = 0; + while ( ( n = ::read ( s, buffer, 4095 ) ) > 0 ) + { + buffer[n] = '\0'; + response += buffer; + } +} + +static int openSocket ( std::string& host, int port, int& ss ) +{ + TRACE( "connecting to %s\n", host.c_str() ); + + struct hostent *he = ::gethostbyname ( host.c_str() ); + if ( !he ) + { + TRACE("no host entry\n"); + return CAMBRIA_NO_HOST; + } + + if ( he->h_addrtype != AF_INET ) + { + TRACE("not AF_INET\n"); + return CAMBRIA_NO_HOST; + } + + int s = ::socket ( AF_INET, SOCK_STREAM, 0 ); + if ( s == -1 ) + { + TRACE("no socket available\n"); + return CAMBRIA_CANT_CONNECT; + } + + struct sockaddr_in servaddr; + ::memset ( &servaddr, 0, sizeof(servaddr) ); + + ::memcpy ( &servaddr.sin_addr, he->h_addr_list[0], he->h_length ); + servaddr.sin_family = AF_INET; + servaddr.sin_port = ::htons ( port ); + + if ( ::connect ( s, (struct sockaddr *)&servaddr, sizeof(servaddr) ) ) + { + TRACE("couldn't connect\n"); + return CAMBRIA_CANT_CONNECT; + } + + ss = s; + return 0; +} + +cambriaSendResponse* cambriaClient::send ( const char* streamName, const char** msgs, unsigned int count ) +{ + TRACE ( "Sending %d messages.", count ); + + cambriaSendResponse* result = new cambriaSendResponse (); + result->statusCode = 0; + result->statusMessage = NULL; + result->responseBody = NULL; + + TRACE( "building message body\n" ); + + std::string body; + if ( !buildMessageBody ( streamName, msgs, count, body ) ) + { + result->statusCode = CAMBRIA_UNRECOGNIZED_FORMAT; + return result; + } + + int s = -1; + int err = ::openSocket ( fHost, fPort, s ); + if ( err > 0 ) + { + result->statusCode = err; + return result; + } + + // construct path + std::string path = "/cambriaApiServer/v1/event/"; + path += fTopic; + + // send post prefix + char line[4096]; + ::sprintf ( line, + "POST %s HTTP/1.0\r\n" + "Host: %s\r\n" + "Content-Type: %s\r\n" + "Content-Length: %d\r\n" + "\r\n", + path.c_str(), fHost.c_str(), fFormat.c_str(), body.length() ); + write ( s, line ); + + // send the body + write ( s, body.c_str() ); + + TRACE ( "\n" ); + TRACE ( "send complete, reading reply\n" ); + + // receive the response + std::string response; + readResponse ( s, response ); + ::close ( s ); + + // parse the header and body: split header and body on first occurrence of \r\n\r\n + result->statusCode = CAMBRIA_BAD_RESPONSE; + + size_t headerBreak = response.find ( "\r\n\r\n" ); + if ( headerBreak != std::string::npos ) + { + std::string responseBody = response.substr ( headerBreak + 4 ); + result->responseBody = new char [ responseBody.length() + 1 ]; + ::strcpy ( result->responseBody, responseBody.c_str() ); + + // all we need from the header for now is the status line + std::string headerPart = response.substr ( 0, headerBreak + 2 ); + + size_t newline = headerPart.find ( '\r' ); + if ( newline != std::string::npos ) + { + std::string statusLine = headerPart.substr ( 0, newline ); + + size_t firstSpace = statusLine.find ( ' ' ); + if ( firstSpace != std::string::npos ) + { + size_t secondSpace = statusLine.find ( ' ', firstSpace + 1 ); + if ( secondSpace != std::string::npos ) + { + result->statusCode = ::atoi ( statusLine.substr ( firstSpace + 1, secondSpace - firstSpace + 1 ).c_str() ); + std::string statusMessage = statusLine.substr ( secondSpace + 1 ); + result->statusMessage = new char [ statusMessage.length() + 1 ]; + ::strcpy ( result->statusMessage, statusMessage.c_str() ); + } + } + } + } + return result; +} + +void cambriaClient::write ( int socket, const char* str ) +{ + int len = str ? ::strlen ( str ) : 0; + ::write ( socket, str, len ); + + // elaborate tracing nonsense... + std::string trace ( "> " ); + trace += str; + while ( replace ( trace, "\r\n", "\\r\\n\n> " ) ); + + TRACE ( "%s", trace.c_str() ); +} + +bool cambriaClient::buildMessageBody ( const char* streamName, const char** msgs, unsigned int count, std::string& buffer ) +{ + if ( fFormat == CAMBRIA_NATIVE_FORMAT ) + { + int snLen = ::strlen ( streamName ); + for ( unsigned int i=0; i0 ) + { + buffer.append ( "," ); + } + const char* msg = msgs[i]; + char* jsonMsg = ::makeJsonMessage ( streamName, msg ); + buffer.append ( jsonMsg ); + delete jsonMsg; // FIXME: allocating memory here just to delete it + } + buffer.append ( "]" ); + } + else + { + return false; + } + return true; +} + +// read the next string into value, and return the end pos, or 0 on error +static int readNextJsonString ( const std::string& body, int startPos, std::string& value ) +{ + value = ""; + + if ( startPos >= body.length () ) + { + return 0; + } + + // skip a comma + int current = startPos; + if ( body[current] == ',' ) current++; + + if ( current >= body.length() || body[current] != '"' ) + { + return 0; + } + current++; + + // walk the string for the closing quote (FIXME: unicode support) + bool esc = false; + int hex = 0; + while ( ( body[current] != '"' || esc ) && current < body.length() ) + { + if ( hex > 0 ) + { + hex--; + if ( hex == 0 ) + { + // presumably read a unicode escape. this code isn't + // equipped for multibyte or unicode, so just skip it + value += '?'; + } + } + else if ( esc ) + { + esc = false; + switch ( body[current] ) + { + case '"': + case '\\': + case '/': + value += body[current]; + break; + + case 'b': value += '\b'; break; + case 'f': value += '\f'; break; + case 'n': value += '\n'; break; + case 'r': value += '\r'; break; + case 't': value += '\t'; break; + + case 'u': hex=4; break; + } + } + else + { + esc = body[current] == '\\'; + if ( !esc ) value += body[current]; + } + current++; + } + + return current + 1; +} + +static void readGetBody ( std::string& body, cambriaGetResponse& response ) +{ + TRACE("response %s\n", body.c_str() ); + + if ( body.length() < 2 || body[0] != '[' || body[body.length()-1] != ']' ) + { + response.statusCode = CAMBRIA_BAD_RESPONSE; + } + + std::list msgs; + std::string val; + int current = 1; + while ( ( current = readNextJsonString ( body, current, val ) ) > 0 ) + { + char* msg = new char [ val.length() + 1 ]; + ::strcpy ( msg, val.c_str() ); + msgs.push_back ( msg ); + } + + // now build a response + response.messageCount = msgs.size(); + response.messageSet = new char* [ msgs.size() ]; + int index = 0; + for ( std::list::iterator it = msgs.begin(); it != msgs.end(); it++ ) + { + response.messageSet [ index++ ] = *it; + } +} + +cambriaGetResponse* cambriaClient::get ( int timeoutMs, int limit ) +{ + cambriaGetResponse* result = new cambriaGetResponse (); + result->statusCode = 0; + result->statusMessage = NULL; + result->messageCount = 0; + result->messageSet = new char* [ 1 ]; + + int s = -1; + int err = ::openSocket ( fHost, fPort, s ); + if ( err > 0 ) + { + result->statusCode = err; + return result; + } + + // construct path + std::string path = "/cambriaApiServer/v1/event/"; + path += fTopic; + + bool haveAdds = false; + std::ostringstream adds; + if ( timeoutMs > -1 ) + { + adds << "timeout=" << timeoutMs; + haveAdds = true; + } + if ( limit > -1 ) + { + if ( haveAdds ) + { + adds << "&"; + } + adds << "limit=" << limit; + haveAdds = true; + } + if ( haveAdds ) + { + path += "?"; + path += adds.str(); + } + + // send post prefix + char line[4096]; + ::sprintf ( line, + "GET %s HTTP/1.0\r\n" + "Host: %s\r\n" + "\r\n", + path.c_str(), fHost.c_str() ); + write ( s, line ); + + TRACE ( "\n" ); + TRACE ( "request sent; reading reply\n" ); + + // receive the response (FIXME: would be nice to stream rather than load it all) + std::string response; + readResponse ( s, response ); + ::close ( s ); + + // parse the header and body: split header and body on first occurrence of \r\n\r\n + result->statusCode = CAMBRIA_BAD_RESPONSE; + + size_t headerBreak = response.find ( "\r\n\r\n" ); + if ( headerBreak != std::string::npos ) + { + // get the header line + std::string headerPart = response.substr ( 0, headerBreak + 2 ); + + size_t newline = headerPart.find ( '\r' ); + if ( newline != std::string::npos ) + { + std::string statusLine = headerPart.substr ( 0, newline ); + + size_t firstSpace = statusLine.find ( ' ' ); + if ( firstSpace != std::string::npos ) + { + size_t secondSpace = statusLine.find ( ' ', firstSpace + 1 ); + if ( secondSpace != std::string::npos ) + { + result->statusCode = ::atoi ( statusLine.substr ( firstSpace + 1, secondSpace - firstSpace + 1 ).c_str() ); + std::string statusMessage = statusLine.substr ( secondSpace + 1 ); + result->statusMessage = new char [ statusMessage.length() + 1 ]; + ::strcpy ( result->statusMessage, statusMessage.c_str() ); + } + } + } + + if ( result->statusCode < 300 ) + { + std::string responseBody = response.substr ( headerBreak + 4 ); + readGetBody ( responseBody, *result ); + } + } + return result; +} + + +/////////////////////////////////////////////////////////////////////////////// +/////////////////////////////////////////////////////////////////////////////// +/////////////////////////////////////////////////////////////////////////////// + +CAMBRIA_CLIENT cambriaCreateClient ( const char* host, int port, const char* topic, const char* format ) +{ + cambriaClient* cc = new cambriaClient ( host, port, topic, format ); + return toOpaque(cc); +} + +void cambriaDestroyClient ( CAMBRIA_CLIENT client ) +{ + delete fromOpaque ( client ); +} + +cambriaSendResponse* cambriaSendMessage ( CAMBRIA_CLIENT client, const char* streamName, const char* message ) +{ + cambriaClient* c = fromOpaque ( client ); + return c->send ( streamName, message ); +} + +cambriaSendResponse* cambriaSendMessages ( CAMBRIA_CLIENT client, const char* streamName, const char** messages, unsigned int count ) +{ + cambriaClient* c = fromOpaque ( client ); + return c->send ( streamName, messages, count ); +} + +cambriaGetResponse* cambriaGetMessages ( CAMBRIA_CLIENT client, unsigned long timeoutMs, unsigned int limit ) +{ + cambriaClient* c = fromOpaque ( client ); + return c->get ( timeoutMs, limit ); +} + +void cambriaDestroySendResponse ( CAMBRIA_CLIENT client, const cambriaSendResponse* response ) +{ + if ( response ) + { + delete response->statusMessage; + delete response->responseBody; + delete response; + } +} + +void cambriaDestroyGetResponse ( CAMBRIA_CLIENT client, const cambriaGetResponse* response ) +{ + if ( response ) + { + delete response->statusMessage; + for ( int i=0; imessageCount; i++ ) + { + delete response->messageSet[i]; + } + delete response; + } +} + +int cambriaSimpleSend ( const char* host, int port, const char* topic, const char* streamName, const char* msg ) +{ + return cambriaSimpleSendMultiple ( host, port, topic, streamName, &msg, 1 ); +} + +int cambriaSimpleSendMultiple ( const char* host, int port, const char* topic, const char* streamName, const char** messages, unsigned int msgCount ) +{ + int count = 0; + + const CAMBRIA_CLIENT cc = ::cambriaCreateClient ( host, port, topic, CAMBRIA_NATIVE_FORMAT ); + if ( cc ) + { + const cambriaSendResponse* response = ::cambriaSendMessages ( cc, streamName, messages, msgCount ); + if ( response && response->statusCode < 300 ) + { + count = msgCount; + } + ::cambriaDestroySendResponse ( cc, response ); + ::cambriaDestroyClient ( cc ); + } + + return count; +} + +//////////////////////////////////////////////////////////////////////////////// +//////////////////////////////////////////////////////////////////////////////// +//////////////////////////////////////////////////////////////////////////////// + +const unsigned int kMaxTraceBuffer = 2048; + +static void writeTraceString ( const char* msg ) +{ + ::fprintf ( stdout, "%s", msg ); + ::fflush ( stdout ); // because we want output before core dumping :-) +} + +void traceOutput ( const char* format, ... ) +{ + char buffer [ kMaxTraceBuffer ]; + ::memset ( buffer, '\0', kMaxTraceBuffer * sizeof ( char ) ); + + va_list list; + va_start ( list, format ); + ::vsprintf ( buffer, format, list ); + writeTraceString ( buffer ); + va_end ( list ); +} diff --git a/src/main/cpp/cambria.h b/src/main/cpp/cambria.h new file mode 100644 index 0000000..68e9ca9 --- /dev/null +++ b/src/main/cpp/cambria.h @@ -0,0 +1,114 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ + +#ifndef _CABMRIA_H_ +#define _CABMRIA_H_ + +/* + * This is a client library for the AT&T Cambria Event Routing Service. + * + * Cambria clients post string messages to the broker on a topic, optionally + * with a partition name. + */ + +/* An opaque type for the client instance. */ +typedef void* CAMBRIA_CLIENT; + +/* Cambria has two formats. CAMBRIA_NATIVE_FORMAT is preferred. */ +#define CAMBRIA_NATIVE_FORMAT "application/cambria" +#define CAMBRIA_JSON_FORMAT "application/json" + +/* pseudo-HTTP client-side status codes */ +#define CAMBRIA_NO_HOST 470 +#define CAMBRIA_CANT_CONNECT 471 +#define CAMBRIA_UNRECOGNIZED_FORMAT 472 +#define CAMBRIA_BAD_RESPONSE 570 + +/* + * Send response structure. Be sure to call cambriaDestroySendResponse() after receiving this. + */ +struct cambriaSendResponse +{ + int statusCode; + char* statusMessage; + char* responseBody; +}; + +/* + * Get response structure. Be sure to call cambriaDestroyGetResponse() after receiving this. + */ +struct cambriaGetResponse +{ + int statusCode; + char* statusMessage; + + int messageCount; + char** messageSet; +}; + +/* + * Send a message in a single call. Returns the number sent (1 or 0). + */ +extern "C" int cambriaSimpleSend ( const char* host, int port, const char* topic, const char* streamName, const char* msg ); + +/* + * Send multiple messages in a single call. Returns the number sent. + */ +extern "C" int cambriaSimpleSendMultiple ( const char* host, int port, const char* topic, const char* streamName, const char** messages, unsigned int msgCount ); + +/* + * Create a client instance to post messages to the given host:port, topic, and + * either the CAMBRIA_NATIVE_FORMAT or CAMBRIA_JSON_FORMAT. + */ +extern "C" CAMBRIA_CLIENT cambriaCreateClient ( const char* host, int port, const char* topic, const char* format ); + +/* + * Cleanup a client instance. + */ +extern "C" void cambriaDestroyClient ( CAMBRIA_CLIENT client ); + +/* + * Send a single message to the broker using the stream name provided. (If null, no stream name is used.) + */ +extern "C" cambriaSendResponse* cambriaSendMessage ( CAMBRIA_CLIENT client, const char* streamName, const char* message ); + +/* + * Send a batch of messages to the broker using the stream name provided. (If null, no stream name is used.) + */ +extern "C" cambriaSendResponse* cambriaSendMessages ( CAMBRIA_CLIENT client, const char* streamName, const char** messages, unsigned int count ); + +/* + * Retrieve messages from the broker. If a timeout value is 0 (or lower), the broker returns a response + * immediately. Otherwise, the server holds the connection open up to the given timeout. Likewise, if limit + * is 0 (or lower), the server sends as many messages as it cares to. Otherwise, at most 'limit' messages are + * returned. + */ +extern "C" cambriaGetResponse* cambriaGetMessages ( CAMBRIA_CLIENT client, unsigned long timeoutMs, unsigned int limit ); + +/* + * After processing a response, pass it back to the library for cleanup. + */ +extern "C" void cambriaDestroySendResponse ( CAMBRIA_CLIENT client, const cambriaSendResponse* response ); + +extern "C" void cambriaDestroyGetResponse ( CAMBRIA_CLIENT client, const cambriaGetResponse* response ); + +#endif diff --git a/src/main/cpp/loopingPostClient.cpp b/src/main/cpp/loopingPostClient.cpp new file mode 100644 index 0000000..1396eea --- /dev/null +++ b/src/main/cpp/loopingPostClient.cpp @@ -0,0 +1,92 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ + +#include +#include +#include +#include "cambria.h" + +const char* kAlarm = + "" + "12.123.70.213" + "ptdor306me1.els-an.att.net" + "1364716208" + " V1" + " 9" + " .1.3.6.1.4.1.9.9.187" + " 6" + " 2" + " 167" + " 0" + " 0" + " 0" + " 1554393204" + " 10" + " nidVeskaf0" + " " + " .1.3.6.1.2.1.15.3.1.14.32.4.52.58" + " OCTET_STRING_HEX" + " 02 02 " + " " + " " + " .1.3.6.1.2.1.15.3.1.2.32.4.52.58" + " INTEGER" + " 1" + " " + " " + " .1.3.6.1.4.1.9.9.187.1.2.1.1.7.32.4.52.58" + " OCTET_STRING_ASCII" + " peer in wrong AS" + " " + " " + " .1.3.6.1.4.1.9.9.187.1.2.1.1.8.32.4.52.58" + " INTEGER" + " 4" + " " + ""; + +int main ( int argc, const char* argv[] ) +{ + char** msgs = new char* [ 100 ]; + for ( int i=0; i<100; i++ ) + { + msgs[i] = new char [ ::strlen ( kAlarm + 1 ) ]; + ::strcpy ( msgs[i], kAlarm ); + } + + std::time_t start = std::time ( NULL ); + for ( int i=0; i<5000; i++ ) + { + ::cambriaSimpleSendMultiple ( "localhost", 8080, "topic", "streamName", (const char**)msgs, 100 ); + if ( i % 50 == 0 ) + { + std::time_t end = std::time ( NULL ); + double seconds = difftime ( end, start ); + ::printf ( "%.f seconds for %u posts.\n", seconds, i*100 ); + } + } + std::time_t end = std::time ( NULL ); + double seconds = difftime ( end, start ); + ::printf ( "%.f seconds for 1,000,000 posts.\n", seconds ); + + return 0; +} diff --git a/src/main/cpp/make.sh b/src/main/cpp/make.sh new file mode 100644 index 0000000..a9b2726 --- /dev/null +++ b/src/main/cpp/make.sh @@ -0,0 +1,34 @@ +#******************************************************************************* +# ============LICENSE_START======================================================= +# org.onap.dmaap +# ================================================================================ +# Copyright © 2017 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +# +# ECOMP is a trademark and service mark of AT&T Intellectual Property. +# +#******************************************************************************* + +rm -rf *.o +rm -rf cambriaSamplePost +rm -rf cambriaSampleFetch +rm -rf loopPost + +#-DCAMBRIA_TRACING + +g++ cambria.cpp samplePostClient.cpp -o cambriaSamplePost +g++ cambria.cpp sampleGetClient.cpp -o cambriaSampleFetch + +g++ cambria.cpp loopingPostClient.cpp -o loopPost + diff --git a/src/main/cpp/sampleGetClient.cpp b/src/main/cpp/sampleGetClient.cpp new file mode 100644 index 0000000..610988c --- /dev/null +++ b/src/main/cpp/sampleGetClient.cpp @@ -0,0 +1,61 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ + +#include +#include "cambria.h" + +int main ( int argc, const char* argv[] ) +{ + const CAMBRIA_CLIENT cc = ::cambriaCreateClient ( "localhost", 8080, "topic", CAMBRIA_NATIVE_FORMAT ); + if ( !cc ) + { + ::printf ( "Couldn't create client.\n" ); + return 1; + } + + int count = 0; + while ( 1 ) + { + cambriaGetResponse* response = ::cambriaGetMessages ( cc, 5000, 1024*1024 ); + if ( response && response->statusCode < 300 ) + { + for ( int i=0; imessageCount; i++ ) + { + const char* msg = response->messageSet [ i ]; + ::printf ( "%d: %s\n", count++, msg ); + } + ::cambriaDestroyGetResponse ( cc, response ); + } + else if ( response ) + { + ::fprintf ( stderr, "%d %s", response->statusCode, response->statusMessage ); + } + else + { + ::fprintf ( stderr, "No response object.\n" ); + } + } + + ::cambriaDestroyClient ( cc ); + + return 0; +} diff --git a/src/main/cpp/samplePostClient.cpp b/src/main/cpp/samplePostClient.cpp new file mode 100644 index 0000000..a4b2207 --- /dev/null +++ b/src/main/cpp/samplePostClient.cpp @@ -0,0 +1,112 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * org.onap.dmaap + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * + *******************************************************************************/ + +#include +#include "cambria.h" + +void handleResponse ( const CAMBRIA_CLIENT cc, const cambriaSendResponse* response ) +{ + if ( response ) + { + ::printf ( "\t%d %s\n", response->statusCode, ( response->statusMessage ? response->statusMessage : "" ) ); + ::printf ( "\t%s\n", response->responseBody ? response->responseBody : "" ); + + // destroy the response (or it'll leak) + ::cambriaDestroySendResponse ( cc, response ); + } + else + { + ::fprintf ( stderr, "No response object.\n" ); + } +} + +int main ( int argc, const char* argv[] ) +{ + //////////////////////////////////////////////////////////////////////////// + //////////////////////////////////////////////////////////////////////////// + //////////////////////////////////////////////////////////////////////////// + + // you can send single message in one call... + ::printf ( "Sending single message...\n" ); + int sent = ::cambriaSimpleSend ( "localhost", 8080, "topic", "streamName", + "{ \"field\":\"this is a JSON formatted alarm\" }" ); + ::printf ( "\t%d sent\n\n", sent ); + + // you can also send multiple messages in one call with cambriaSimpleSendMultiple. + // the message argument becomes an array of strings, and you pass an array + // count too. + const char* msgs[] = + { + "{\"format\":\"json\"}", + "xml", + "or whatever. they're just strings." + }; + sent = ::cambriaSimpleSendMultiple ( "localhost", 8080, "topic", "streamName", msgs, 3 ); + ::printf ( "\t%d sent\n\n", sent ); + + //////////////////////////////////////////////////////////////////////////// + //////////////////////////////////////////////////////////////////////////// + //////////////////////////////////////////////////////////////////////////// + + // you can also create a client instance to keep around and make multiple + // send requests to. Chunked sending isn't supported right now, so each + // call to cambriaSendMessage results in a full socket open / post / close + // cycle, but hopefully we can improve this with chunking so that subsequent + // sends just push the message into the socket. + + // create a client + const CAMBRIA_CLIENT cc = ::cambriaCreateClient ( "localhost", 8080, "topic", CAMBRIA_NATIVE_FORMAT ); + if ( !cc ) + { + ::printf ( "Couldn't create client.\n" ); + return 1; + } + + //////////////////////////////////////////////////////////////////////////// + // send a single message + ::printf ( "Sending single message...\n" ); + const cambriaSendResponse* response = ::cambriaSendMessage ( cc, "streamName", "{\"foo\":\"bar\"}" ); + handleResponse ( cc, response ); + + //////////////////////////////////////////////////////////////////////////// + // send a few messages at once + const char* msgs2[] = + { + "{\"foo\":\"bar\"}", + "{\"bar\":\"baz\"}", + "{\"zoo\":\"zee\"}", + "{\"foo\":\"bar\"}", + "{\"foo\":\"bar\"}", + "{\"foo\":\"bar\"}", + }; + unsigned int count = sizeof(msgs2)/sizeof(const char*); + + ::printf ( "Sending %d messages...\n", count ); + response = ::cambriaSendMessages ( cc, "streamName", msgs2, count ); + handleResponse ( cc, response ); + + //////////////////////////////////////////////////////////////////////////// + // destroy the client (or it'll leak) + ::cambriaDestroyClient ( cc ); + + return 0; +} -- cgit 1.2.3-korg