/******************************************************************************* * ============LICENSE_START======================================================= * org.onap.dmaap * ================================================================================ * Copyright © 2017 AT&T Intellectual Property. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * ============LICENSE_END========================================================= * * ECOMP is a trademark and service mark of AT&T Intellectual Property. * *******************************************************************************/ /* * This is a client library for the AT&T Cambria Event Routing Service. */ #include "cambria.h" #include #include #include #include #include #include #include #include #include #include #include #include #include // field used in JSON encoding to signal stream name const char* kPartition = "cambria.partition"; // map from opaque handle to object pointer #define toOpaque(x) ((CAMBRIA_CLIENT)x) #define fromOpaque(x) ((cambriaClient*)x) // trace support extern void traceOutput ( const char* format, ... ); #ifdef CAMBRIA_TRACING #define TRACE traceOutput #else #define TRACE 1 ? (void) 0 : traceOutput #endif /* * internal cambria client class */ class cambriaClient { public: cambriaClient ( const std::string& host, int port, const std::string& topic, const std::string& format ); ~cambriaClient (); cambriaSendResponse* send ( const char* streamName, const char* message ); cambriaSendResponse* send ( const char* streamName, const char** message, unsigned int count ); cambriaGetResponse* get ( int timeoutMs, int limit ); private: std::string fHost; int fPort; std::string fTopic; std::string fFormat; bool buildMessageBody ( const char* streamName, const char** msgs, unsigned int count, std::string& buffer ); void write ( int socket, const char* line ); }; cambriaClient::cambriaClient ( const std::string& host, int port, const std::string& topic, const std::string& format ) : fHost ( host ), fPort ( port ), fTopic ( topic ), fFormat ( format ) { } cambriaClient::~cambriaClient () { } /* * This isn't quite right -- if the message already has cambria.partition, * it'll wind up with two entries. Also, message MUST start with '{' and * have at least one field. */ static char* makeJsonMessage ( const char* streamName, const char* message ) { int len = ::strlen ( message ); if ( streamName ) { len += ::strlen ( kPartition ); len += ::strlen ( streamName ); len += 6; // quote each and a colon and comma } char* msg = new char [ len + 1 ]; ::strcpy ( msg, "{" ); if ( streamName ) { ::strcat ( msg, "\"" ); ::strcat ( msg, kPartition ); ::strcat ( msg, "\":\"" ); ::strcat ( msg, streamName ); ::strcat ( msg, "\"," ); } ::strcat ( msg, message + 1 ); return msg; } cambriaSendResponse* cambriaClient::send ( const char* streamName, const char* message ) { return send ( streamName, &message, 1 ); } static bool replace ( std::string& str, const std::string& from, const std::string& to ) { size_t start_pos = str.find ( from ); if(start_pos == std::string::npos) return false; str.replace(start_pos, from.length(), to); return true; } static void readResponse ( int s, std::string& response ) { char buffer [ 4096 ]; ssize_t n = 0; while ( ( n = ::read ( s, buffer, 4095 ) ) > 0 ) { buffer[n] = '\0'; response += buffer; } } static int openSocket ( std::string& host, int port, int& ss ) { TRACE( "connecting to %s\n", host.c_str() ); struct hostent *he = ::gethostbyname ( host.c_str() ); if ( !he ) { TRACE("no host entry\n"); return CAMBRIA_NO_HOST; } if ( he->h_addrtype != AF_INET ) { TRACE("not AF_INET\n"); return CAMBRIA_NO_HOST; } int s = ::socket ( AF_INET, SOCK_STREAM, 0 ); if ( s == -1 ) { TRACE("no socket available\n"); return CAMBRIA_CANT_CONNECT; } struct sockaddr_in servaddr; ::memset ( &servaddr, 0, sizeof(servaddr) ); ::memcpy ( &servaddr.sin_addr, he->h_addr_list[0], he->h_length ); servaddr.sin_family = AF_INET; servaddr.sin_port = ::htons ( port ); if ( ::connect ( s, (struct sockaddr *)&servaddr, sizeof(servaddr) ) ) { TRACE("couldn't connect\n"); return CAMBRIA_CANT_CONNECT; } ss = s; return 0; } cambriaSendResponse* cambriaClient::send ( const char* streamName, const char** msgs, unsigned int count ) { TRACE ( "Sending %d messages.", count ); cambriaSendResponse* result = new cambriaSendResponse (); result->statusCode = 0; result->statusMessage = NULL; result->responseBody = NULL; TRACE( "building message body\n" ); std::string body; if ( !buildMessageBody ( streamName, msgs, count, body ) ) { result->statusCode = CAMBRIA_UNRECOGNIZED_FORMAT; return result; } int s = -1; int err = ::openSocket ( fHost, fPort, s ); if ( err > 0 ) { result->statusCode = err; return result; } // construct path std::string path = "/cambriaApiServer/v1/event/"; path += fTopic; // send post prefix char line[4096]; ::sprintf ( line, "POST %s HTTP/1.0\r\n" "Host: %s\r\n" "Content-Type: %s\r\n" "Content-Length: %d\r\n" "\r\n", path.c_str(), fHost.c_str(), fFormat.c_str(), body.length() ); write ( s, line ); // send the body write ( s, body.c_str() ); TRACE ( "\n" ); TRACE ( "send complete, reading reply\n" ); // receive the response std::string response; readResponse ( s, response ); ::close ( s ); // parse the header and body: split header and body on first occurrence of \r\n\r\n result->statusCode = CAMBRIA_BAD_RESPONSE; size_t headerBreak = response.find ( "\r\n\r\n" ); if ( headerBreak != std::string::npos ) { std::string responseBody = response.substr ( headerBreak + 4 ); result->responseBody = new char [ responseBody.length() + 1 ]; ::strcpy ( result->responseBody, responseBody.c_str() ); // all we need from the header for now is the status line std::string headerPart = response.substr ( 0, headerBreak + 2 ); size_t newline = headerPart.find ( '\r' ); if ( newline != std::string::npos ) { std::string statusLine = headerPart.substr ( 0, newline ); size_t firstSpace = statusLine.find ( ' ' ); if ( firstSpace != std::string::npos ) { size_t secondSpace = statusLine.find ( ' ', firstSpace + 1 ); if ( secondSpace != std::string::npos ) { result->statusCode = ::atoi ( statusLine.substr ( firstSpace + 1, secondSpace - firstSpace + 1 ).c_str() ); std::string statusMessage = statusLine.substr ( secondSpace + 1 ); result->statusMessage = new char [ statusMessage.length() + 1 ]; ::strcpy ( result->statusMessage, statusMessage.c_str() ); } } } } return result; } void cambriaClient::write ( int socket, const char* str ) { int len = str ? ::strlen ( str ) : 0; ::write ( socket, str, len ); // elaborate tracing nonsense... std::string trace ( "> " ); trace += str; while ( replace ( trace, "\r\n", "\\r\\n\n> " ) ); TRACE ( "%s", trace.c_str() ); } bool cambriaClient::buildMessageBody ( const char* streamName, const char** msgs, unsigned int count, std::string& buffer ) { if ( fFormat == CAMBRIA_NATIVE_FORMAT ) { int snLen = ::strlen ( streamName ); for ( unsigned int i=0; i0 ) { buffer.append ( "," ); } const char* msg = msgs[i]; char* jsonMsg = ::makeJsonMessage ( streamName, msg ); buffer.append ( jsonMsg ); delete jsonMsg; // FIXME: allocating memory here just to delete it } buffer.append ( "]" ); } else { return false; } return true; } // read the next string into value, and return the end pos, or 0 on error static int readNextJsonString ( const std::string& body, int startPos, std::string& value ) { value = ""; if ( startPos >= body.length () ) { return 0; } // skip a comma int current = startPos; if ( body[current] == ',' ) current++; if ( current >= body.length() || body[current] != '"' ) { return 0; } current++; // walk the string for the closing quote (FIXME: unicode support) bool esc = false; int hex = 0; while ( ( body[current] != '"' || esc ) && current < body.length() ) { if ( hex > 0 ) { hex--; if ( hex == 0 ) { // presumably read a unicode escape. this code isn't // equipped for multibyte or unicode, so just skip it value += '?'; } } else if ( esc ) { esc = false; switch ( body[current] ) { case '"': case '\\': case '/': value += body[current]; break; case 'b': value += '\b'; break; case 'f': value += '\f'; break; case 'n': value += '\n'; break; case 'r': value += '\r'; break; case 't': value += '\t'; break; case 'u': hex=4; break; } } else { esc = body[current] == '\\'; if ( !esc ) value += body[current]; } current++; } return current + 1; } static void readGetBody ( std::string& body, cambriaGetResponse& response ) { TRACE("response %s\n", body.c_str() ); if ( body.length() < 2 || body[0] != '[' || body[body.length()-1] != ']' ) { response.statusCode = CAMBRIA_BAD_RESPONSE; } std::list msgs; std::string val; int current = 1; while ( ( current = readNextJsonString ( body, current, val ) ) > 0 ) { char* msg = new char [ val.length() + 1 ]; ::strcpy ( msg, val.c_str() ); msgs.push_back ( msg ); } // now build a response response.messageCount = msgs.size(); response.messageSet = new char* [ msgs.size() ]; int index = 0; for ( std::list::iterator it = msgs.begin(); it != msgs.end(); it++ ) { response.messageSet [ index++ ] = *it; } } cambriaGetResponse* cambriaClient::get ( int timeoutMs, int limit ) { cambriaGetResponse* result = new cambriaGetResponse (); result->statusCode = 0; result->statusMessage = NULL; result->messageCount = 0; result->messageSet = new char* [ 1 ]; int s = -1; int err = ::openSocket ( fHost, fPort, s ); if ( err > 0 ) { result->statusCode = err; return result; } // construct path std::string path = "/cambriaApiServer/v1/event/"; path += fTopic; bool haveAdds = false; std::ostringstream adds; if ( timeoutMs > -1 ) { adds << "timeout=" << timeoutMs; haveAdds = true; } if ( limit > -1 ) { if ( haveAdds ) { adds << "&"; } adds << "limit=" << limit; haveAdds = true; } if ( haveAdds ) { path += "?"; path += adds.str(); } // send post prefix char line[4096]; ::sprintf ( line, "GET %s HTTP/1.0\r\n" "Host: %s\r\n" "\r\n", path.c_str(), fHost.c_str() ); write ( s, line ); TRACE ( "\n" ); TRACE ( "request sent; reading reply\n" ); // receive the response (FIXME: would be nice to stream rather than load it all) std::string response; readResponse ( s, response ); ::close ( s ); // parse the header and body: split header and body on first occurrence of \r\n\r\n result->statusCode = CAMBRIA_BAD_RESPONSE; size_t headerBreak = response.find ( "\r\n\r\n" ); if ( headerBreak != std::string::npos ) { // get the header line std::string headerPart = response.substr ( 0, headerBreak + 2 ); size_t newline = headerPart.find ( '\r' ); if ( newline != std::string::npos ) { std::string statusLine = headerPart.substr ( 0, newline ); size_t firstSpace = statusLine.find ( ' ' ); if ( firstSpace != std::string::npos ) { size_t secondSpace = statusLine.find ( ' ', firstSpace + 1 ); if ( secondSpace != std::string::npos ) { result->statusCode = ::atoi ( statusLine.substr ( firstSpace + 1, secondSpace - firstSpace + 1 ).c_str() ); std::string statusMessage = statusLine.substr ( secondSpace + 1 ); result->statusMessage = new char [ statusMessage.length() + 1 ]; ::strcpy ( result->statusMessage, statusMessage.c_str() ); } } } if ( result->statusCode < 300 ) { std::string responseBody = response.substr ( headerBreak + 4 ); readGetBody ( responseBody, *result ); } } return result; } /////////////////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////////////// CAMBRIA_CLIENT cambriaCreateClient ( const char* host, int port, const char* topic, const char* format ) { cambriaClient* cc = new cambriaClient ( host, port, topic, format ); return toOpaque(cc); } void cambriaDestroyClient ( CAMBRIA_CLIENT client ) { delete fromOpaque ( client ); } cambriaSendResponse* cambriaSendMessage ( CAMBRIA_CLIENT client, const char* streamName, const char* message ) { cambriaClient* c = fromOpaque ( client ); return c->send ( streamName, message ); } cambriaSendResponse* cambriaSendMessages ( CAMBRIA_CLIENT client, const char* streamName, const char** messages, unsigned int count ) { cambriaClient* c = fromOpaque ( client ); return c->send ( streamName, messages, count ); } cambriaGetResponse* cambriaGetMessages ( CAMBRIA_CLIENT client, unsigned long timeoutMs, unsigned int limit ) { cambriaClient* c = fromOpaque ( client ); return c->get ( timeoutMs, limit ); } void cambriaDestroySendResponse ( CAMBRIA_CLIENT client, const cambriaSendResponse* response ) { if ( response ) { delete response->statusMessage; delete response->responseBody; delete response; } } void cambriaDestroyGetResponse ( CAMBRIA_CLIENT client, const cambriaGetResponse* response ) { if ( response ) { delete response->statusMessage; for ( int i=0; imessageCount; i++ ) { delete response->messageSet[i]; } delete response; } } int cambriaSimpleSend ( const char* host, int port, const char* topic, const char* streamName, const char* msg ) { return cambriaSimpleSendMultiple ( host, port, topic, streamName, &msg, 1 ); } int cambriaSimpleSendMultiple ( const char* host, int port, const char* topic, const char* streamName, const char** messages, unsigned int msgCount ) { int count = 0; const CAMBRIA_CLIENT cc = ::cambriaCreateClient ( host, port, topic, CAMBRIA_NATIVE_FORMAT ); if ( cc ) { const cambriaSendResponse* response = ::cambriaSendMessages ( cc, streamName, messages, msgCount ); if ( response && response->statusCode < 300 ) { count = msgCount; } ::cambriaDestroySendResponse ( cc, response ); ::cambriaDestroyClient ( cc ); } return count; } //////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////// const unsigned int kMaxTraceBuffer = 2048; static void writeTraceString ( const char* msg ) { ::fprintf ( stdout, "%s", msg ); ::fflush ( stdout ); // because we want output before core dumping :-) } void traceOutput ( const char* format, ... ) { char buffer [ kMaxTraceBuffer ]; ::memset ( buffer, '\0', kMaxTraceBuffer * sizeof ( char ) ); va_list list; va_start ( list, format ); ::vsprintf ( buffer, format, list ); writeTraceString ( buffer ); va_end ( list ); }