From dcb1479b85dc4fa43d7e417a05d01ac153b7fd1f Mon Sep 17 00:00:00 2001 From: "sunil.unnava" Date: Fri, 10 May 2019 11:21:44 -0400 Subject: clean MR codebase Issue-ID: DMAAP-1192 Change-Id: I5cc729c9a475cac859510b57542da546e1570c96 Signed-off-by: sunil.unnava --- src/main/cpp/cambria.cpp | 624 ----------------------------------------------- 1 file changed, 624 deletions(-) delete mode 100644 src/main/cpp/cambria.cpp (limited to 'src/main/cpp/cambria.cpp') diff --git a/src/main/cpp/cambria.cpp b/src/main/cpp/cambria.cpp deleted file mode 100644 index 7aff421..0000000 --- a/src/main/cpp/cambria.cpp +++ /dev/null @@ -1,624 +0,0 @@ -/******************************************************************************* - * ============LICENSE_START======================================================= - * org.onap.dmaap - * ================================================================================ - * Copyright © 2017 AT&T Intellectual Property. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - * - * ECOMP is a trademark and service mark of AT&T Intellectual Property. - * - *******************************************************************************/ - -/* - * This is a client library for the AT&T Cambria Event Routing Service. - */ - -#include "cambria.h" - -#include -#include -#include -#include -#include -#include -#include -#include - -#include -#include -#include -#include -#include - -// field used in JSON encoding to signal stream name -const char* kPartition = "cambria.partition"; - -// map from opaque handle to object pointer -#define toOpaque(x) ((CAMBRIA_CLIENT)x) -#define fromOpaque(x) ((cambriaClient*)x) - -// trace support -extern void traceOutput ( const char* format, ... ); -#ifdef CAMBRIA_TRACING - #define TRACE traceOutput -#else - #define TRACE 1 ? (void) 0 : traceOutput -#endif - -/* - * internal cambria client class - */ -class cambriaClient -{ -public: - cambriaClient ( const std::string& host, int port, const std::string& topic, const std::string& format ); - ~cambriaClient (); - - cambriaSendResponse* send ( const char* streamName, const char* message ); - cambriaSendResponse* send ( const char* streamName, const char** message, unsigned int count ); - - cambriaGetResponse* get ( int timeoutMs, int limit ); - -private: - - std::string fHost; - int fPort; - std::string fTopic; - std::string fFormat; - - bool buildMessageBody ( const char* streamName, const char** msgs, unsigned int count, std::string& buffer ); - - void write ( int socket, const char* line ); -}; - -cambriaClient::cambriaClient ( const std::string& host, int port, const std::string& topic, const std::string& format ) : - fHost ( host ), - fPort ( port ), - fTopic ( topic ), - fFormat ( format ) -{ -} - -cambriaClient::~cambriaClient () -{ -} - -/* - * This isn't quite right -- if the message already has cambria.partition, - * it'll wind up with two entries. Also, message MUST start with '{' and - * have at least one field. - */ -static char* makeJsonMessage ( const char* streamName, const char* message ) -{ - int len = ::strlen ( message ); - if ( streamName ) - { - len += ::strlen ( kPartition ); - len += ::strlen ( streamName ); - len += 6; // quote each and a colon and comma - } - - char* msg = new char [ len + 1 ]; - ::strcpy ( msg, "{" ); - if ( streamName ) - { - ::strcat ( msg, "\"" ); - ::strcat ( msg, kPartition ); - ::strcat ( msg, "\":\"" ); - ::strcat ( msg, streamName ); - ::strcat ( msg, "\"," ); - } - ::strcat ( msg, message + 1 ); - return msg; -} - -cambriaSendResponse* cambriaClient::send ( const char* streamName, const char* message ) -{ - return send ( streamName, &message, 1 ); -} - -static bool replace ( std::string& str, const std::string& from, const std::string& to ) -{ - size_t start_pos = str.find ( from ); - if(start_pos == std::string::npos) - return false; - str.replace(start_pos, from.length(), to); - return true; -} - -static void readResponse ( int s, std::string& response ) -{ - char buffer [ 4096 ]; - - ssize_t n = 0; - while ( ( n = ::read ( s, buffer, 4095 ) ) > 0 ) - { - buffer[n] = '\0'; - response += buffer; - } -} - -static int openSocket ( std::string& host, int port, int& ss ) -{ - TRACE( "connecting to %s\n", host.c_str() ); - - struct hostent *he = ::gethostbyname ( host.c_str() ); - if ( !he ) - { - TRACE("no host entry\n"); - return CAMBRIA_NO_HOST; - } - - if ( he->h_addrtype != AF_INET ) - { - TRACE("not AF_INET\n"); - return CAMBRIA_NO_HOST; - } - - int s = ::socket ( AF_INET, SOCK_STREAM, 0 ); - if ( s == -1 ) - { - TRACE("no socket available\n"); - return CAMBRIA_CANT_CONNECT; - } - - struct sockaddr_in servaddr; - ::memset ( &servaddr, 0, sizeof(servaddr) ); - - ::memcpy ( &servaddr.sin_addr, he->h_addr_list[0], he->h_length ); - servaddr.sin_family = AF_INET; - servaddr.sin_port = ::htons ( port ); - - if ( ::connect ( s, (struct sockaddr *)&servaddr, sizeof(servaddr) ) ) - { - TRACE("couldn't connect\n"); - return CAMBRIA_CANT_CONNECT; - } - - ss = s; - return 0; -} - -cambriaSendResponse* cambriaClient::send ( const char* streamName, const char** msgs, unsigned int count ) -{ - TRACE ( "Sending %d messages.", count ); - - cambriaSendResponse* result = new cambriaSendResponse (); - result->statusCode = 0; - result->statusMessage = NULL; - result->responseBody = NULL; - - TRACE( "building message body\n" ); - - std::string body; - if ( !buildMessageBody ( streamName, msgs, count, body ) ) - { - result->statusCode = CAMBRIA_UNRECOGNIZED_FORMAT; - return result; - } - - int s = -1; - int err = ::openSocket ( fHost, fPort, s ); - if ( err > 0 ) - { - result->statusCode = err; - return result; - } - - // construct path - std::string path = "/cambriaApiServer/v1/event/"; - path += fTopic; - - // send post prefix - char line[4096]; - ::sprintf ( line, - "POST %s HTTP/1.0\r\n" - "Host: %s\r\n" - "Content-Type: %s\r\n" - "Content-Length: %d\r\n" - "\r\n", - path.c_str(), fHost.c_str(), fFormat.c_str(), body.length() ); - write ( s, line ); - - // send the body - write ( s, body.c_str() ); - - TRACE ( "\n" ); - TRACE ( "send complete, reading reply\n" ); - - // receive the response - std::string response; - readResponse ( s, response ); - ::close ( s ); - - // parse the header and body: split header and body on first occurrence of \r\n\r\n - result->statusCode = CAMBRIA_BAD_RESPONSE; - - size_t headerBreak = response.find ( "\r\n\r\n" ); - if ( headerBreak != std::string::npos ) - { - std::string responseBody = response.substr ( headerBreak + 4 ); - result->responseBody = new char [ responseBody.length() + 1 ]; - ::strcpy ( result->responseBody, responseBody.c_str() ); - - // all we need from the header for now is the status line - std::string headerPart = response.substr ( 0, headerBreak + 2 ); - - size_t newline = headerPart.find ( '\r' ); - if ( newline != std::string::npos ) - { - std::string statusLine = headerPart.substr ( 0, newline ); - - size_t firstSpace = statusLine.find ( ' ' ); - if ( firstSpace != std::string::npos ) - { - size_t secondSpace = statusLine.find ( ' ', firstSpace + 1 ); - if ( secondSpace != std::string::npos ) - { - result->statusCode = ::atoi ( statusLine.substr ( firstSpace + 1, secondSpace - firstSpace + 1 ).c_str() ); - std::string statusMessage = statusLine.substr ( secondSpace + 1 ); - result->statusMessage = new char [ statusMessage.length() + 1 ]; - ::strcpy ( result->statusMessage, statusMessage.c_str() ); - } - } - } - } - return result; -} - -void cambriaClient::write ( int socket, const char* str ) -{ - int len = str ? ::strlen ( str ) : 0; - ::write ( socket, str, len ); - - // elaborate tracing nonsense... - std::string trace ( "> " ); - trace += str; - while ( replace ( trace, "\r\n", "\\r\\n\n> " ) ); - - TRACE ( "%s", trace.c_str() ); -} - -bool cambriaClient::buildMessageBody ( const char* streamName, const char** msgs, unsigned int count, std::string& buffer ) -{ - if ( fFormat == CAMBRIA_NATIVE_FORMAT ) - { - int snLen = ::strlen ( streamName ); - for ( unsigned int i=0; i0 ) - { - buffer.append ( "," ); - } - const char* msg = msgs[i]; - char* jsonMsg = ::makeJsonMessage ( streamName, msg ); - buffer.append ( jsonMsg ); - delete jsonMsg; // FIXME: allocating memory here just to delete it - } - buffer.append ( "]" ); - } - else - { - return false; - } - return true; -} - -// read the next string into value, and return the end pos, or 0 on error -static int readNextJsonString ( const std::string& body, int startPos, std::string& value ) -{ - value = ""; - - if ( startPos >= body.length () ) - { - return 0; - } - - // skip a comma - int current = startPos; - if ( body[current] == ',' ) current++; - - if ( current >= body.length() || body[current] != '"' ) - { - return 0; - } - current++; - - // walk the string for the closing quote (FIXME: unicode support) - bool esc = false; - int hex = 0; - while ( ( body[current] != '"' || esc ) && current < body.length() ) - { - if ( hex > 0 ) - { - hex--; - if ( hex == 0 ) - { - // presumably read a unicode escape. this code isn't - // equipped for multibyte or unicode, so just skip it - value += '?'; - } - } - else if ( esc ) - { - esc = false; - switch ( body[current] ) - { - case '"': - case '\\': - case '/': - value += body[current]; - break; - - case 'b': value += '\b'; break; - case 'f': value += '\f'; break; - case 'n': value += '\n'; break; - case 'r': value += '\r'; break; - case 't': value += '\t'; break; - - case 'u': hex=4; break; - } - } - else - { - esc = body[current] == '\\'; - if ( !esc ) value += body[current]; - } - current++; - } - - return current + 1; -} - -static void readGetBody ( std::string& body, cambriaGetResponse& response ) -{ - TRACE("response %s\n", body.c_str() ); - - if ( body.length() < 2 || body[0] != '[' || body[body.length()-1] != ']' ) - { - response.statusCode = CAMBRIA_BAD_RESPONSE; - } - - std::list msgs; - std::string val; - int current = 1; - while ( ( current = readNextJsonString ( body, current, val ) ) > 0 ) - { - char* msg = new char [ val.length() + 1 ]; - ::strcpy ( msg, val.c_str() ); - msgs.push_back ( msg ); - } - - // now build a response - response.messageCount = msgs.size(); - response.messageSet = new char* [ msgs.size() ]; - int index = 0; - for ( std::list::iterator it = msgs.begin(); it != msgs.end(); it++ ) - { - response.messageSet [ index++ ] = *it; - } -} - -cambriaGetResponse* cambriaClient::get ( int timeoutMs, int limit ) -{ - cambriaGetResponse* result = new cambriaGetResponse (); - result->statusCode = 0; - result->statusMessage = NULL; - result->messageCount = 0; - result->messageSet = new char* [ 1 ]; - - int s = -1; - int err = ::openSocket ( fHost, fPort, s ); - if ( err > 0 ) - { - result->statusCode = err; - return result; - } - - // construct path - std::string path = "/cambriaApiServer/v1/event/"; - path += fTopic; - - bool haveAdds = false; - std::ostringstream adds; - if ( timeoutMs > -1 ) - { - adds << "timeout=" << timeoutMs; - haveAdds = true; - } - if ( limit > -1 ) - { - if ( haveAdds ) - { - adds << "&"; - } - adds << "limit=" << limit; - haveAdds = true; - } - if ( haveAdds ) - { - path += "?"; - path += adds.str(); - } - - // send post prefix - char line[4096]; - ::sprintf ( line, - "GET %s HTTP/1.0\r\n" - "Host: %s\r\n" - "\r\n", - path.c_str(), fHost.c_str() ); - write ( s, line ); - - TRACE ( "\n" ); - TRACE ( "request sent; reading reply\n" ); - - // receive the response (FIXME: would be nice to stream rather than load it all) - std::string response; - readResponse ( s, response ); - ::close ( s ); - - // parse the header and body: split header and body on first occurrence of \r\n\r\n - result->statusCode = CAMBRIA_BAD_RESPONSE; - - size_t headerBreak = response.find ( "\r\n\r\n" ); - if ( headerBreak != std::string::npos ) - { - // get the header line - std::string headerPart = response.substr ( 0, headerBreak + 2 ); - - size_t newline = headerPart.find ( '\r' ); - if ( newline != std::string::npos ) - { - std::string statusLine = headerPart.substr ( 0, newline ); - - size_t firstSpace = statusLine.find ( ' ' ); - if ( firstSpace != std::string::npos ) - { - size_t secondSpace = statusLine.find ( ' ', firstSpace + 1 ); - if ( secondSpace != std::string::npos ) - { - result->statusCode = ::atoi ( statusLine.substr ( firstSpace + 1, secondSpace - firstSpace + 1 ).c_str() ); - std::string statusMessage = statusLine.substr ( secondSpace + 1 ); - result->statusMessage = new char [ statusMessage.length() + 1 ]; - ::strcpy ( result->statusMessage, statusMessage.c_str() ); - } - } - } - - if ( result->statusCode < 300 ) - { - std::string responseBody = response.substr ( headerBreak + 4 ); - readGetBody ( responseBody, *result ); - } - } - return result; -} - - -/////////////////////////////////////////////////////////////////////////////// -/////////////////////////////////////////////////////////////////////////////// -/////////////////////////////////////////////////////////////////////////////// - -CAMBRIA_CLIENT cambriaCreateClient ( const char* host, int port, const char* topic, const char* format ) -{ - cambriaClient* cc = new cambriaClient ( host, port, topic, format ); - return toOpaque(cc); -} - -void cambriaDestroyClient ( CAMBRIA_CLIENT client ) -{ - delete fromOpaque ( client ); -} - -cambriaSendResponse* cambriaSendMessage ( CAMBRIA_CLIENT client, const char* streamName, const char* message ) -{ - cambriaClient* c = fromOpaque ( client ); - return c->send ( streamName, message ); -} - -cambriaSendResponse* cambriaSendMessages ( CAMBRIA_CLIENT client, const char* streamName, const char** messages, unsigned int count ) -{ - cambriaClient* c = fromOpaque ( client ); - return c->send ( streamName, messages, count ); -} - -cambriaGetResponse* cambriaGetMessages ( CAMBRIA_CLIENT client, unsigned long timeoutMs, unsigned int limit ) -{ - cambriaClient* c = fromOpaque ( client ); - return c->get ( timeoutMs, limit ); -} - -void cambriaDestroySendResponse ( CAMBRIA_CLIENT client, const cambriaSendResponse* response ) -{ - if ( response ) - { - delete response->statusMessage; - delete response->responseBody; - delete response; - } -} - -void cambriaDestroyGetResponse ( CAMBRIA_CLIENT client, const cambriaGetResponse* response ) -{ - if ( response ) - { - delete response->statusMessage; - for ( int i=0; imessageCount; i++ ) - { - delete response->messageSet[i]; - } - delete response; - } -} - -int cambriaSimpleSend ( const char* host, int port, const char* topic, const char* streamName, const char* msg ) -{ - return cambriaSimpleSendMultiple ( host, port, topic, streamName, &msg, 1 ); -} - -int cambriaSimpleSendMultiple ( const char* host, int port, const char* topic, const char* streamName, const char** messages, unsigned int msgCount ) -{ - int count = 0; - - const CAMBRIA_CLIENT cc = ::cambriaCreateClient ( host, port, topic, CAMBRIA_NATIVE_FORMAT ); - if ( cc ) - { - const cambriaSendResponse* response = ::cambriaSendMessages ( cc, streamName, messages, msgCount ); - if ( response && response->statusCode < 300 ) - { - count = msgCount; - } - ::cambriaDestroySendResponse ( cc, response ); - ::cambriaDestroyClient ( cc ); - } - - return count; -} - -//////////////////////////////////////////////////////////////////////////////// -//////////////////////////////////////////////////////////////////////////////// -//////////////////////////////////////////////////////////////////////////////// - -const unsigned int kMaxTraceBuffer = 2048; - -static void writeTraceString ( const char* msg ) -{ - ::fprintf ( stdout, "%s", msg ); - ::fflush ( stdout ); // because we want output before core dumping :-) -} - -void traceOutput ( const char* format, ... ) -{ - char buffer [ kMaxTraceBuffer ]; - ::memset ( buffer, '\0', kMaxTraceBuffer * sizeof ( char ) ); - - va_list list; - va_start ( list, format ); - ::vsprintf ( buffer, format, list ); - writeTraceString ( buffer ); - va_end ( list ); -} -- cgit 1.2.3-korg