aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/cpp/cambria.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/cpp/cambria.cpp')
-rw-r--r--src/main/cpp/cambria.cpp624
1 files changed, 624 insertions, 0 deletions
diff --git a/src/main/cpp/cambria.cpp b/src/main/cpp/cambria.cpp
new file mode 100644
index 0000000..7aff421
--- /dev/null
+++ b/src/main/cpp/cambria.cpp
@@ -0,0 +1,624 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+
+/*
+ * This is a client library for the AT&T Cambria Event Routing Service.
+ */
+
+#include "cambria.h"
+
+#include <arpa/inet.h>
+#include <sys/socket.h>
+#include <netdb.h>
+#include <unistd.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <stdarg.h>
+#include <string.h>
+
+#include <string>
+#include <list>
+#include <sstream>
+#include <iomanip>
+#include <algorithm>
+
+// field used in JSON encoding to signal stream name
+const char* kPartition = "cambria.partition";
+
+// map from opaque handle to object pointer
+#define toOpaque(x) ((CAMBRIA_CLIENT)x)
+#define fromOpaque(x) ((cambriaClient*)x)
+
+// trace support
+extern void traceOutput ( const char* format, ... );
+#ifdef CAMBRIA_TRACING
+ #define TRACE traceOutput
+#else
+ #define TRACE 1 ? (void) 0 : traceOutput
+#endif
+
+/*
+ * internal cambria client class
+ */
+class cambriaClient
+{
+public:
+ cambriaClient ( const std::string& host, int port, const std::string& topic, const std::string& format );
+ ~cambriaClient ();
+
+ cambriaSendResponse* send ( const char* streamName, const char* message );
+ cambriaSendResponse* send ( const char* streamName, const char** message, unsigned int count );
+
+ cambriaGetResponse* get ( int timeoutMs, int limit );
+
+private:
+
+ std::string fHost;
+ int fPort;
+ std::string fTopic;
+ std::string fFormat;
+
+ bool buildMessageBody ( const char* streamName, const char** msgs, unsigned int count, std::string& buffer );
+
+ void write ( int socket, const char* line );
+};
+
+cambriaClient::cambriaClient ( const std::string& host, int port, const std::string& topic, const std::string& format ) :
+ fHost ( host ),
+ fPort ( port ),
+ fTopic ( topic ),
+ fFormat ( format )
+{
+}
+
+cambriaClient::~cambriaClient ()
+{
+}
+
+/*
+ * This isn't quite right -- if the message already has cambria.partition,
+ * it'll wind up with two entries. Also, message MUST start with '{' and
+ * have at least one field.
+ */
+static char* makeJsonMessage ( const char* streamName, const char* message )
+{
+ int len = ::strlen ( message );
+ if ( streamName )
+ {
+ len += ::strlen ( kPartition );
+ len += ::strlen ( streamName );
+ len += 6; // quote each and a colon and comma
+ }
+
+ char* msg = new char [ len + 1 ];
+ ::strcpy ( msg, "{" );
+ if ( streamName )
+ {
+ ::strcat ( msg, "\"" );
+ ::strcat ( msg, kPartition );
+ ::strcat ( msg, "\":\"" );
+ ::strcat ( msg, streamName );
+ ::strcat ( msg, "\"," );
+ }
+ ::strcat ( msg, message + 1 );
+ return msg;
+}
+
+cambriaSendResponse* cambriaClient::send ( const char* streamName, const char* message )
+{
+ return send ( streamName, &message, 1 );
+}
+
+static bool replace ( std::string& str, const std::string& from, const std::string& to )
+{
+ size_t start_pos = str.find ( from );
+ if(start_pos == std::string::npos)
+ return false;
+ str.replace(start_pos, from.length(), to);
+ return true;
+}
+
+static void readResponse ( int s, std::string& response )
+{
+ char buffer [ 4096 ];
+
+ ssize_t n = 0;
+ while ( ( n = ::read ( s, buffer, 4095 ) ) > 0 )
+ {
+ buffer[n] = '\0';
+ response += buffer;
+ }
+}
+
+static int openSocket ( std::string& host, int port, int& ss )
+{
+ TRACE( "connecting to %s\n", host.c_str() );
+
+ struct hostent *he = ::gethostbyname ( host.c_str() );
+ if ( !he )
+ {
+ TRACE("no host entry\n");
+ return CAMBRIA_NO_HOST;
+ }
+
+ if ( he->h_addrtype != AF_INET )
+ {
+ TRACE("not AF_INET\n");
+ return CAMBRIA_NO_HOST;
+ }
+
+ int s = ::socket ( AF_INET, SOCK_STREAM, 0 );
+ if ( s == -1 )
+ {
+ TRACE("no socket available\n");
+ return CAMBRIA_CANT_CONNECT;
+ }
+
+ struct sockaddr_in servaddr;
+ ::memset ( &servaddr, 0, sizeof(servaddr) );
+
+ ::memcpy ( &servaddr.sin_addr, he->h_addr_list[0], he->h_length );
+ servaddr.sin_family = AF_INET;
+ servaddr.sin_port = ::htons ( port );
+
+ if ( ::connect ( s, (struct sockaddr *)&servaddr, sizeof(servaddr) ) )
+ {
+ TRACE("couldn't connect\n");
+ return CAMBRIA_CANT_CONNECT;
+ }
+
+ ss = s;
+ return 0;
+}
+
+cambriaSendResponse* cambriaClient::send ( const char* streamName, const char** msgs, unsigned int count )
+{
+ TRACE ( "Sending %d messages.", count );
+
+ cambriaSendResponse* result = new cambriaSendResponse ();
+ result->statusCode = 0;
+ result->statusMessage = NULL;
+ result->responseBody = NULL;
+
+ TRACE( "building message body\n" );
+
+ std::string body;
+ if ( !buildMessageBody ( streamName, msgs, count, body ) )
+ {
+ result->statusCode = CAMBRIA_UNRECOGNIZED_FORMAT;
+ return result;
+ }
+
+ int s = -1;
+ int err = ::openSocket ( fHost, fPort, s );
+ if ( err > 0 )
+ {
+ result->statusCode = err;
+ return result;
+ }
+
+ // construct path
+ std::string path = "/cambriaApiServer/v1/event/";
+ path += fTopic;
+
+ // send post prefix
+ char line[4096];
+ ::sprintf ( line,
+ "POST %s HTTP/1.0\r\n"
+ "Host: %s\r\n"
+ "Content-Type: %s\r\n"
+ "Content-Length: %d\r\n"
+ "\r\n",
+ path.c_str(), fHost.c_str(), fFormat.c_str(), body.length() );
+ write ( s, line );
+
+ // send the body
+ write ( s, body.c_str() );
+
+ TRACE ( "\n" );
+ TRACE ( "send complete, reading reply\n" );
+
+ // receive the response
+ std::string response;
+ readResponse ( s, response );
+ ::close ( s );
+
+ // parse the header and body: split header and body on first occurrence of \r\n\r\n
+ result->statusCode = CAMBRIA_BAD_RESPONSE;
+
+ size_t headerBreak = response.find ( "\r\n\r\n" );
+ if ( headerBreak != std::string::npos )
+ {
+ std::string responseBody = response.substr ( headerBreak + 4 );
+ result->responseBody = new char [ responseBody.length() + 1 ];
+ ::strcpy ( result->responseBody, responseBody.c_str() );
+
+ // all we need from the header for now is the status line
+ std::string headerPart = response.substr ( 0, headerBreak + 2 );
+
+ size_t newline = headerPart.find ( '\r' );
+ if ( newline != std::string::npos )
+ {
+ std::string statusLine = headerPart.substr ( 0, newline );
+
+ size_t firstSpace = statusLine.find ( ' ' );
+ if ( firstSpace != std::string::npos )
+ {
+ size_t secondSpace = statusLine.find ( ' ', firstSpace + 1 );
+ if ( secondSpace != std::string::npos )
+ {
+ result->statusCode = ::atoi ( statusLine.substr ( firstSpace + 1, secondSpace - firstSpace + 1 ).c_str() );
+ std::string statusMessage = statusLine.substr ( secondSpace + 1 );
+ result->statusMessage = new char [ statusMessage.length() + 1 ];
+ ::strcpy ( result->statusMessage, statusMessage.c_str() );
+ }
+ }
+ }
+ }
+ return result;
+}
+
+void cambriaClient::write ( int socket, const char* str )
+{
+ int len = str ? ::strlen ( str ) : 0;
+ ::write ( socket, str, len );
+
+ // elaborate tracing nonsense...
+ std::string trace ( "> " );
+ trace += str;
+ while ( replace ( trace, "\r\n", "\\r\\n\n> " ) );
+
+ TRACE ( "%s", trace.c_str() );
+}
+
+bool cambriaClient::buildMessageBody ( const char* streamName, const char** msgs, unsigned int count, std::string& buffer )
+{
+ if ( fFormat == CAMBRIA_NATIVE_FORMAT )
+ {
+ int snLen = ::strlen ( streamName );
+ for ( unsigned int i=0; i<count; i++ )
+ {
+ const char* msg = msgs[i];
+
+ std::ostringstream s;
+ s << snLen << '.' << ::strlen(msg) << '.' << streamName << msg;
+ buffer.append ( s.str() );
+ }
+ }
+ else if ( fFormat == CAMBRIA_JSON_FORMAT )
+ {
+ buffer.append ( "[" );
+ for ( unsigned int i=0; i<count; i++ )
+ {
+ if ( i>0 )
+ {
+ buffer.append ( "," );
+ }
+ const char* msg = msgs[i];
+ char* jsonMsg = ::makeJsonMessage ( streamName, msg );
+ buffer.append ( jsonMsg );
+ delete jsonMsg; // FIXME: allocating memory here just to delete it
+ }
+ buffer.append ( "]" );
+ }
+ else
+ {
+ return false;
+ }
+ return true;
+}
+
+// read the next string into value, and return the end pos, or 0 on error
+static int readNextJsonString ( const std::string& body, int startPos, std::string& value )
+{
+ value = "";
+
+ if ( startPos >= body.length () )
+ {
+ return 0;
+ }
+
+ // skip a comma
+ int current = startPos;
+ if ( body[current] == ',' ) current++;
+
+ if ( current >= body.length() || body[current] != '"' )
+ {
+ return 0;
+ }
+ current++;
+
+ // walk the string for the closing quote (FIXME: unicode support)
+ bool esc = false;
+ int hex = 0;
+ while ( ( body[current] != '"' || esc ) && current < body.length() )
+ {
+ if ( hex > 0 )
+ {
+ hex--;
+ if ( hex == 0 )
+ {
+ // presumably read a unicode escape. this code isn't
+ // equipped for multibyte or unicode, so just skip it
+ value += '?';
+ }
+ }
+ else if ( esc )
+ {
+ esc = false;
+ switch ( body[current] )
+ {
+ case '"':
+ case '\\':
+ case '/':
+ value += body[current];
+ break;
+
+ case 'b': value += '\b'; break;
+ case 'f': value += '\f'; break;
+ case 'n': value += '\n'; break;
+ case 'r': value += '\r'; break;
+ case 't': value += '\t'; break;
+
+ case 'u': hex=4; break;
+ }
+ }
+ else
+ {
+ esc = body[current] == '\\';
+ if ( !esc ) value += body[current];
+ }
+ current++;
+ }
+
+ return current + 1;
+}
+
+static void readGetBody ( std::string& body, cambriaGetResponse& response )
+{
+ TRACE("response %s\n", body.c_str() );
+
+ if ( body.length() < 2 || body[0] != '[' || body[body.length()-1] != ']' )
+ {
+ response.statusCode = CAMBRIA_BAD_RESPONSE;
+ }
+
+ std::list<char*> msgs;
+ std::string val;
+ int current = 1;
+ while ( ( current = readNextJsonString ( body, current, val ) ) > 0 )
+ {
+ char* msg = new char [ val.length() + 1 ];
+ ::strcpy ( msg, val.c_str() );
+ msgs.push_back ( msg );
+ }
+
+ // now build a response
+ response.messageCount = msgs.size();
+ response.messageSet = new char* [ msgs.size() ];
+ int index = 0;
+ for ( std::list<char*>::iterator it = msgs.begin(); it != msgs.end(); it++ )
+ {
+ response.messageSet [ index++ ] = *it;
+ }
+}
+
+cambriaGetResponse* cambriaClient::get ( int timeoutMs, int limit )
+{
+ cambriaGetResponse* result = new cambriaGetResponse ();
+ result->statusCode = 0;
+ result->statusMessage = NULL;
+ result->messageCount = 0;
+ result->messageSet = new char* [ 1 ];
+
+ int s = -1;
+ int err = ::openSocket ( fHost, fPort, s );
+ if ( err > 0 )
+ {
+ result->statusCode = err;
+ return result;
+ }
+
+ // construct path
+ std::string path = "/cambriaApiServer/v1/event/";
+ path += fTopic;
+
+ bool haveAdds = false;
+ std::ostringstream adds;
+ if ( timeoutMs > -1 )
+ {
+ adds << "timeout=" << timeoutMs;
+ haveAdds = true;
+ }
+ if ( limit > -1 )
+ {
+ if ( haveAdds )
+ {
+ adds << "&";
+ }
+ adds << "limit=" << limit;
+ haveAdds = true;
+ }
+ if ( haveAdds )
+ {
+ path += "?";
+ path += adds.str();
+ }
+
+ // send post prefix
+ char line[4096];
+ ::sprintf ( line,
+ "GET %s HTTP/1.0\r\n"
+ "Host: %s\r\n"
+ "\r\n",
+ path.c_str(), fHost.c_str() );
+ write ( s, line );
+
+ TRACE ( "\n" );
+ TRACE ( "request sent; reading reply\n" );
+
+ // receive the response (FIXME: would be nice to stream rather than load it all)
+ std::string response;
+ readResponse ( s, response );
+ ::close ( s );
+
+ // parse the header and body: split header and body on first occurrence of \r\n\r\n
+ result->statusCode = CAMBRIA_BAD_RESPONSE;
+
+ size_t headerBreak = response.find ( "\r\n\r\n" );
+ if ( headerBreak != std::string::npos )
+ {
+ // get the header line
+ std::string headerPart = response.substr ( 0, headerBreak + 2 );
+
+ size_t newline = headerPart.find ( '\r' );
+ if ( newline != std::string::npos )
+ {
+ std::string statusLine = headerPart.substr ( 0, newline );
+
+ size_t firstSpace = statusLine.find ( ' ' );
+ if ( firstSpace != std::string::npos )
+ {
+ size_t secondSpace = statusLine.find ( ' ', firstSpace + 1 );
+ if ( secondSpace != std::string::npos )
+ {
+ result->statusCode = ::atoi ( statusLine.substr ( firstSpace + 1, secondSpace - firstSpace + 1 ).c_str() );
+ std::string statusMessage = statusLine.substr ( secondSpace + 1 );
+ result->statusMessage = new char [ statusMessage.length() + 1 ];
+ ::strcpy ( result->statusMessage, statusMessage.c_str() );
+ }
+ }
+ }
+
+ if ( result->statusCode < 300 )
+ {
+ std::string responseBody = response.substr ( headerBreak + 4 );
+ readGetBody ( responseBody, *result );
+ }
+ }
+ return result;
+}
+
+
+///////////////////////////////////////////////////////////////////////////////
+///////////////////////////////////////////////////////////////////////////////
+///////////////////////////////////////////////////////////////////////////////
+
+CAMBRIA_CLIENT cambriaCreateClient ( const char* host, int port, const char* topic, const char* format )
+{
+ cambriaClient* cc = new cambriaClient ( host, port, topic, format );
+ return toOpaque(cc);
+}
+
+void cambriaDestroyClient ( CAMBRIA_CLIENT client )
+{
+ delete fromOpaque ( client );
+}
+
+cambriaSendResponse* cambriaSendMessage ( CAMBRIA_CLIENT client, const char* streamName, const char* message )
+{
+ cambriaClient* c = fromOpaque ( client );
+ return c->send ( streamName, message );
+}
+
+cambriaSendResponse* cambriaSendMessages ( CAMBRIA_CLIENT client, const char* streamName, const char** messages, unsigned int count )
+{
+ cambriaClient* c = fromOpaque ( client );
+ return c->send ( streamName, messages, count );
+}
+
+cambriaGetResponse* cambriaGetMessages ( CAMBRIA_CLIENT client, unsigned long timeoutMs, unsigned int limit )
+{
+ cambriaClient* c = fromOpaque ( client );
+ return c->get ( timeoutMs, limit );
+}
+
+void cambriaDestroySendResponse ( CAMBRIA_CLIENT client, const cambriaSendResponse* response )
+{
+ if ( response )
+ {
+ delete response->statusMessage;
+ delete response->responseBody;
+ delete response;
+ }
+}
+
+void cambriaDestroyGetResponse ( CAMBRIA_CLIENT client, const cambriaGetResponse* response )
+{
+ if ( response )
+ {
+ delete response->statusMessage;
+ for ( int i=0; i<response->messageCount; i++ )
+ {
+ delete response->messageSet[i];
+ }
+ delete response;
+ }
+}
+
+int cambriaSimpleSend ( const char* host, int port, const char* topic, const char* streamName, const char* msg )
+{
+ return cambriaSimpleSendMultiple ( host, port, topic, streamName, &msg, 1 );
+}
+
+int cambriaSimpleSendMultiple ( const char* host, int port, const char* topic, const char* streamName, const char** messages, unsigned int msgCount )
+{
+ int count = 0;
+
+ const CAMBRIA_CLIENT cc = ::cambriaCreateClient ( host, port, topic, CAMBRIA_NATIVE_FORMAT );
+ if ( cc )
+ {
+ const cambriaSendResponse* response = ::cambriaSendMessages ( cc, streamName, messages, msgCount );
+ if ( response && response->statusCode < 300 )
+ {
+ count = msgCount;
+ }
+ ::cambriaDestroySendResponse ( cc, response );
+ ::cambriaDestroyClient ( cc );
+ }
+
+ return count;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+////////////////////////////////////////////////////////////////////////////////
+////////////////////////////////////////////////////////////////////////////////
+
+const unsigned int kMaxTraceBuffer = 2048;
+
+static void writeTraceString ( const char* msg )
+{
+ ::fprintf ( stdout, "%s", msg );
+ ::fflush ( stdout ); // because we want output before core dumping :-)
+}
+
+void traceOutput ( const char* format, ... )
+{
+ char buffer [ kMaxTraceBuffer ];
+ ::memset ( buffer, '\0', kMaxTraceBuffer * sizeof ( char ) );
+
+ va_list list;
+ va_start ( list, format );
+ ::vsprintf ( buffer, format, list );
+ writeTraceString ( buffer );
+ va_end ( list );
+}