summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorYatian XU <yatian.xu@nokia-sbell.com>2019-09-10 13:51:48 +0800
committerYatian XU <yatian.xu@nokia-sbell.com>2019-09-10 06:03:34 +0000
commit7966a4012c0eb263936e1b284801a79c5cb607a7 (patch)
tree87b9308ccfb01e19ddf2f3dd4cbb16523901c6d7
parentbe6db99ca9b99a3b72b71a8fb1f5fcd9fc8aaf8f (diff)
Contribute C++ implement of VES spec 7.0.1 to ONAP/vnfsdk:
Part4: transport library. Issue-ID: VNFSDK-466 Signed-off-by: Yatian XU <yatian.xu@nokia-sbell.com> Change-Id: Icde7b8dad91002a8208ff0d058a44c78aa910cd7
-rwxr-xr-xveslibrary/ves_cpplibrary/src/lib/transport/CMakeLists.txt30
-rwxr-xr-xveslibrary/ves_cpplibrary/src/lib/transport/XBufferedTransport.cpp170
-rwxr-xr-xveslibrary/ves_cpplibrary/src/lib/transport/XBufferedTransport.h39
-rwxr-xr-xveslibrary/ves_cpplibrary/src/lib/transport/XDiskQueue.cpp192
-rwxr-xr-xveslibrary/ves_cpplibrary/src/lib/transport/XDiskQueue.h34
-rwxr-xr-xveslibrary/ves_cpplibrary/src/lib/transport/XLibcurlTransport.cpp376
-rwxr-xr-xveslibrary/ves_cpplibrary/src/lib/transport/XLibcurlTransport.h41
-rwxr-xr-xveslibrary/ves_cpplibrary/src/lib/transport/XMemQueue.cpp46
-rwxr-xr-xveslibrary/ves_cpplibrary/src/lib/transport/XMemQueue.h24
-rwxr-xr-xveslibrary/ves_cpplibrary/src/lib/transport/XQueue.cpp15
-rwxr-xr-xveslibrary/ves_cpplibrary/src/lib/transport/XRetryTransport.cpp156
-rwxr-xr-xveslibrary/ves_cpplibrary/src/lib/transport/XRetryTransport.h43
-rwxr-xr-xveslibrary/ves_cpplibrary/src/lib/transport/XRpcClientTransport.cpp183
-rwxr-xr-xveslibrary/ves_cpplibrary/src/lib/transport/XRpcClientTransport.h38
-rwxr-xr-xveslibrary/ves_cpplibrary/src/lib/transport/XRpcServertTransport.cpp222
-rwxr-xr-xveslibrary/ves_cpplibrary/src/lib/transport/XRpcServertTransport.h39
-rwxr-xr-xveslibrary/ves_cpplibrary/src/lib/transport/XSwitchableTransport.cpp172
-rwxr-xr-xveslibrary/ves_cpplibrary/src/lib/transport/XSwitchableTransport.h37
-rwxr-xr-xveslibrary/ves_cpplibrary/src/lib/transport/XSynchronizedTransport.cpp115
-rwxr-xr-xveslibrary/ves_cpplibrary/src/lib/transport/XSynchronizedTransport.h41
-rwxr-xr-xveslibrary/ves_cpplibrary/src/lib/transport/XTransport.cpp64
-rwxr-xr-xveslibrary/ves_cpplibrary/src/lib/transport/gen-cpp/XRpcTransport.cpp421
-rwxr-xr-xveslibrary/ves_cpplibrary/src/lib/transport/gen-cpp/XRpcTransport.h296
-rwxr-xr-xveslibrary/ves_cpplibrary/src/lib/transport/gen-cpp/rpc_constants.cpp17
-rwxr-xr-xveslibrary/ves_cpplibrary/src/lib/transport/gen-cpp/rpc_constants.h24
-rwxr-xr-xveslibrary/ves_cpplibrary/src/lib/transport/gen-cpp/rpc_types.cpp16
-rwxr-xr-xveslibrary/ves_cpplibrary/src/lib/transport/gen-cpp/rpc_types.h25
-rwxr-xr-xveslibrary/ves_cpplibrary/src/lib/transport/include/XQueue.h40
-rwxr-xr-xveslibrary/ves_cpplibrary/src/lib/transport/include/XTransport.h151
-rwxr-xr-xveslibrary/ves_cpplibrary/src/lib/transport/rpc.idl6
30 files changed, 3073 insertions, 0 deletions
diff --git a/veslibrary/ves_cpplibrary/src/lib/transport/CMakeLists.txt b/veslibrary/ves_cpplibrary/src/lib/transport/CMakeLists.txt
new file mode 100755
index 0000000..59e42f4
--- /dev/null
+++ b/veslibrary/ves_cpplibrary/src/lib/transport/CMakeLists.txt
@@ -0,0 +1,30 @@
+aux_source_directory(../common LOG_SRCS)
+aux_source_directory(. TRANSPORT_SRCS)
+aux_source_directory(gen-cpp RPC_SRCS)
+
+find_package(nlohmann_json)
+find_package(spdlog)
+find_package(leveldb)
+find_package(thrift)
+find_package(CURL)
+
+include_directories("${THRIFT_INCLUDE_DIR}")
+include_directories("${LEVELDB_INCLUDE_DIR}")
+include_directories(include)
+
+SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fpermissive")
+
+add_library(xtransport SHARED ${LOG_SRCS} ${RPC_SRCS} ${TRANSPORT_SRCS})
+target_link_libraries(xtransport ${CURL_LIBRARY} ${THRIFT_LIBRARY} ${LEVELDB_LIBRARY})
+
+file(GLOB HDRS "include/*.h")
+
+install(FILES ${HDRS}
+ DESTINATION "include/xvesagent/xtransport"
+)
+install(FILES ../../cmake/xtransportConfig.cmake
+ DESTINATION "lib/cmake/xtransport"
+)
+INSTALL(TARGETS xtransport
+ LIBRARY DESTINATION lib
+)
diff --git a/veslibrary/ves_cpplibrary/src/lib/transport/XBufferedTransport.cpp b/veslibrary/ves_cpplibrary/src/lib/transport/XBufferedTransport.cpp
new file mode 100755
index 0000000..6abce48
--- /dev/null
+++ b/veslibrary/ves_cpplibrary/src/lib/transport/XBufferedTransport.cpp
@@ -0,0 +1,170 @@
+#include "XBufferedTransport.h"
+#ifndef SPDLOG_ACTIVE_LEVEL
+#define SPDLOG_ACTIVE_LEVEL SPDLOG_LEVEL_TRACE
+#endif
+#include "spdlog/spdlog.h"
+
+using namespace std;
+using namespace vagt::transport;
+using namespace spdlog;
+
+vagt::transport::XBufferedTransport::XBufferedTransport(std::shared_ptr<XTransport> transport, std::shared_ptr<queue::XQueue> queue):
+ transport_(transport), queue_(queue)
+{
+}
+
+vagt::transport::XBufferedTransport::~XBufferedTransport()
+{
+}
+
+XErrorCode vagt::transport::XBufferedTransport::start()
+{
+ if (!transport_)
+ {
+ SPDLOG_ERROR("Empty transport.");
+ return XErrorNok;
+ }
+
+ if (!queue_)
+ {
+ SPDLOG_ERROR("Empty queue.");
+ return XErrorNok;
+ }
+
+ auto rc = transport_->start();
+
+ future_ = async(launch::async, [this]() {return this->worker(); });
+
+ allowPost();
+
+ return rc;
+}
+
+XErrorCode vagt::transport::XBufferedTransport::stop()
+{
+ cancelPost();
+ if (!transport_)
+ {
+ SPDLOG_ERROR("Empty transport.");
+ return XErrorNok;
+ }
+
+ if (!queue_)
+ {
+ SPDLOG_ERROR("Empty queue.");
+ return XErrorNok;
+ }
+
+ future_.get();
+
+ return transport_->stop();
+}
+
+XErrorCode vagt::transport::XBufferedTransport::post(const std::string& event)
+{
+ if (shouldCancelPost())
+ {
+ return XErrorCanceled;
+ }
+
+ if (event.empty())
+ {
+ SPDLOG_WARN("Trying post empty event.");
+ return XErrorClientError;
+ }
+
+ if (!queue_)
+ {
+ return XErrorNok;
+ }
+
+ auto rc = vagt::queue::XErrorOk;
+ {
+ unique_lock<mutex> _(lock_);
+ rc = queue_->push(event);
+ cond_.notify_one();
+ }
+
+ if (rc == vagt::queue::XErrorOk)
+ {
+ return XErrorOk;
+ }
+ else
+ {
+ return XErrorNok;
+ }
+}
+
+void vagt::transport::XBufferedTransport::cancelPost()
+{
+ XTransport::cancelPost();
+
+ {
+ unique_lock<mutex> _(lock_);
+ cancel_.store(true, std::memory_order_release);
+ cond_.notify_one();
+ }
+
+ if (transport_)
+ {
+ transport_->cancelPost();
+ }
+}
+
+void vagt::transport::XBufferedTransport::allowPost()
+{
+ if (transport_)
+ {
+ transport_->allowPost();
+ }
+ XTransport::allowPost();
+}
+
+bool vagt::transport::XBufferedTransport::shouldCancelPost()
+{
+ return XTransport::shouldCancelPost();
+}
+
+void vagt::transport::XBufferedTransport::worker()
+{
+ SPDLOG_INFO("Start transport send thread.");
+ while (true)
+ {
+ string event;
+ {
+ unique_lock<mutex> _(lock_);
+ cond_.wait(_, [this]() {return shouldCancelPost() || !queue_->empty(); });
+
+ if (shouldCancelPost())
+ {
+ SPDLOG_INFO("Quit transport send thread.");
+ return;
+ }
+
+ if (!queue_->empty())
+ {
+ event = queue_->front();
+ }
+ else
+ {
+ continue;
+ }
+ }
+
+ if (transport_)
+ {
+ auto rc = transport_->post(event);
+ if (vagt::transport::XErrorOk == rc)
+ {
+ unique_lock<mutex> _(lock_);
+ queue_->pop();
+ }
+ else if (vagt::transport::XErrorClientError == rc)
+ {
+ unique_lock<mutex> _(lock_);
+ queue_->pop();
+ SPDLOG_WARN("Drop event:({}) ({}).", event, rc);
+ }
+ }
+ }
+}
diff --git a/veslibrary/ves_cpplibrary/src/lib/transport/XBufferedTransport.h b/veslibrary/ves_cpplibrary/src/lib/transport/XBufferedTransport.h
new file mode 100755
index 0000000..af51f64
--- /dev/null
+++ b/veslibrary/ves_cpplibrary/src/lib/transport/XBufferedTransport.h
@@ -0,0 +1,39 @@
+#pragma once
+
+#include "XTransport.h"
+#include "XQueue.h"
+#include <future>
+#include <mutex>
+#include <condition_variable>
+
+namespace vagt
+{
+ namespace transport
+ {
+ class XBufferedTransport : public XTransport
+ {
+ public:
+ XBufferedTransport(std::shared_ptr<XTransport> transport, std::shared_ptr<vagt::queue::XQueue> queue);
+ virtual ~XBufferedTransport();
+
+ virtual XErrorCode start() override;
+ virtual XErrorCode stop() override;
+ virtual XErrorCode post(const std::string& event) override;
+
+ virtual void cancelPost() override;
+ virtual void allowPost() override;
+ virtual bool shouldCancelPost() override;
+
+ private:
+ void worker();
+
+ std::shared_ptr<XTransport> transport_;
+ std::shared_ptr<queue::XQueue> queue_;
+ std::future<void> future_;
+
+ std::condition_variable cond_;
+ std::mutex lock_;
+ };
+ }
+}
+
diff --git a/veslibrary/ves_cpplibrary/src/lib/transport/XDiskQueue.cpp b/veslibrary/ves_cpplibrary/src/lib/transport/XDiskQueue.cpp
new file mode 100755
index 0000000..c63d19e
--- /dev/null
+++ b/veslibrary/ves_cpplibrary/src/lib/transport/XDiskQueue.cpp
@@ -0,0 +1,192 @@
+#include "XDiskQueue.h"
+#ifndef SPDLOG_ACTIVE_LEVEL
+#define SPDLOG_ACTIVE_LEVEL SPDLOG_LEVEL_TRACE
+#endif
+#include "spdlog/spdlog.h"
+#include "leveldb/env.h"
+
+using namespace std;
+using namespace vagt::queue;
+using namespace leveldb;
+
+constexpr auto XDiskQueueCompactInterval = 1800;
+
+class XDummyLogger : public leveldb::Logger
+{
+ void Logv(const char* format, va_list ap) {}
+};
+
+vagt::queue::XDiskQueue::XDiskQueue(const std::string & path):db_(nullptr),it_(nullptr),keyId_(0)
+{
+ compactTime_ = chrono::system_clock::now();
+
+ Options opt;
+ opt.create_if_missing = true;
+ opt.write_buffer_size = 4 * 1024 * 1024;
+ opt.max_open_files = 10;
+ opt.info_log = new XDummyLogger;
+
+ opt_.fill_cache = true;
+
+ Status status = DB::Open(opt, path, &db_);
+ if (status.ok() && db_ != nullptr)
+ {
+ SPDLOG_INFO("Disk queue is ready.");
+ return;
+ }
+
+ SPDLOG_INFO("Repairing disk queue.");
+ status = RepairDB(path, opt);
+ if (status.ok())
+ {
+ status = DB::Open(opt, path.c_str(), &db_);
+ if (status.ok() && db_ != nullptr)
+ {
+ SPDLOG_INFO("Disk queue is ready.");
+ return;
+ }
+ }
+
+ SPDLOG_ERROR("Fail to initialize disk queue:{}.", status.ToString());
+}
+
+vagt::queue::XDiskQueue::~XDiskQueue()
+{
+ if (it_)
+ {
+ delete it_;
+ it_ = nullptr;
+ }
+
+ if (db_)
+ {
+ delete db_;
+ db_ = nullptr;
+ }
+}
+
+bool vagt::queue::XDiskQueue::empty()
+{
+ if (!it_)
+ {
+ it_ = db_->NewIterator(opt_);
+ it_->SeekToFirst();
+ if (it_->Valid())
+ {
+ return false;
+ }
+ else
+ {
+ return true;
+ }
+ }
+
+ if (it_->Valid())
+ {
+ return false;
+ }
+
+ delete it_;
+ it_ = db_->NewIterator(opt_);
+ it_->SeekToFirst();
+ if (it_->Valid())
+ {
+ return false;
+ }
+ else
+ {
+ return true;
+ }
+}
+
+XErrorCode vagt::queue::XDiskQueue::push(const std::string & val)
+{
+ auto key = createKey();
+ SPDLOG_DEBUG("Push {} to disk queue:{}.", val, key);
+
+ WriteOptions opt;
+ opt.sync = false;
+ auto status = db_->Put(opt, key, val);
+ if (!status.ok())
+ {
+ SPDLOG_ERROR("Fail to push {} to disk queue:{}.", key, status.ToString());
+ return XErrorNok;
+ }
+
+ return XErrorOk;
+}
+
+void vagt::queue::XDiskQueue::pop()
+{
+ if (!it_ || !it_->Valid())
+ {
+ SPDLOG_ERROR("Iterator is not valid.");
+ return;
+ }
+
+ auto delKey = it_->key().ToString();
+ it_->Next();
+
+ WriteOptions wo;
+ wo.sync = false;
+
+ auto status = db_->Delete(wo, delKey);
+ if (status.ok())
+ {
+ SPDLOG_DEBUG("Pop {} from disk queue.", delKey);
+
+ tryCompact(delKey);
+
+ return;
+ }
+ else
+ {
+ SPDLOG_ERROR("Fail to pop {} from disk queue:{}.", delKey, status.ToString());
+ return;
+ }
+}
+
+std::string vagt::queue::XDiskQueue::front()
+{
+ if (it_ && it_->Valid())
+ {
+ return it_->value().ToString();
+ }
+
+ SPDLOG_ERROR("Iterator is not valid.");
+ return "";
+}
+
+std::string vagt::queue::XDiskQueue::createKey()
+{
+ auto now = chrono::system_clock::now().time_since_epoch().count();
+ snprintf(key, sizeof(key), "%020ld_%010u", now, keyId_.fetch_add(1));
+ return key;
+}
+
+void vagt::queue::XDiskQueue::tryCompact(const std::string & key)
+{
+ if (it_->Valid())
+ {
+ return;
+ }
+
+ delete it_;
+ it_ = nullptr;
+
+ auto now = chrono::system_clock::now();
+ auto duration = chrono::duration_cast<std::chrono::seconds>(now - compactTime_).count();
+
+ if (duration > XDiskQueueCompactInterval &&
+ db_ &&
+ !key.empty())
+ {
+ SPDLOG_INFO("Disk queue compaction starting...");
+ Slice end(key);
+ SPDLOG_INFO("Compact key older than {}.", end.ToString());
+ db_->CompactRange(NULL, &end);
+ SPDLOG_INFO("Disk queue compaction complete...");
+
+ compactTime_ = now;
+ }
+}
diff --git a/veslibrary/ves_cpplibrary/src/lib/transport/XDiskQueue.h b/veslibrary/ves_cpplibrary/src/lib/transport/XDiskQueue.h
new file mode 100755
index 0000000..2168a58
--- /dev/null
+++ b/veslibrary/ves_cpplibrary/src/lib/transport/XDiskQueue.h
@@ -0,0 +1,34 @@
+#pragma once
+
+#include "XQueue.h"
+#include "leveldb/db.h"
+#include <atomic>
+#include <chrono>
+
+namespace vagt
+{
+ namespace queue
+ {
+ class XDiskQueue : public XQueue
+ {
+ public:
+ XDiskQueue(const std::string& path);
+ ~XDiskQueue();
+ virtual bool empty() override;
+ virtual XErrorCode push(const std::string & val) override;
+ virtual void pop() override;
+ virtual std::string front() override;
+ private:
+ std::string createKey();
+ void tryCompact(const std::string& key);
+
+ std::atomic<long> keyId_;
+ char key[32];
+ std::chrono::system_clock::time_point compactTime_;
+
+ leveldb::DB* db_;
+ leveldb::ReadOptions opt_;
+ leveldb::Iterator* it_;
+ };
+ }
+}
diff --git a/veslibrary/ves_cpplibrary/src/lib/transport/XLibcurlTransport.cpp b/veslibrary/ves_cpplibrary/src/lib/transport/XLibcurlTransport.cpp
new file mode 100755
index 0000000..362e8d2
--- /dev/null
+++ b/veslibrary/ves_cpplibrary/src/lib/transport/XLibcurlTransport.cpp
@@ -0,0 +1,376 @@
+#include "XLibcurlTransport.h"
+#ifndef SPDLOG_ACTIVE_LEVEL
+#define SPDLOG_ACTIVE_LEVEL SPDLOG_LEVEL_TRACE
+#endif
+#include "spdlog/spdlog.h"
+#include <functional>
+
+using namespace std;
+using namespace vagt;
+using namespace vagt::transport;
+using namespace spdlog;
+
+class XAutoCleanUp
+{
+public:
+ explicit XAutoCleanUp(function<void(void)> cleanup) :_cleanup(cleanup)
+ {
+
+ }
+ ~XAutoCleanUp()
+ {
+ if (_cleanup) _cleanup();
+ }
+private:
+ XAutoCleanUp(const XAutoCleanUp&) = delete;
+ XAutoCleanUp& operator=(const XAutoCleanUp&) = delete;
+ function<void(void)> _cleanup;
+};
+
+char XLibcurlTransport::curlErrorBuf_[CURL_ERROR_SIZE] = { 0 };
+
+struct XMemoryStuct
+{
+ char * memory;
+ size_t size;
+};
+
+XLibcurlTransport::XLibcurlTransport(const XTransportOption& option):
+ option_(option),
+ curl_(nullptr),
+ hdr_(nullptr)
+{
+}
+
+XErrorCode XLibcurlTransport::start()
+{
+ auto rc = initLibcurl();
+
+ allowPost();
+
+ return rc;
+}
+
+XErrorCode XLibcurlTransport::stop()
+{
+ cancelPost();
+ return cleanupLibcurl();
+}
+
+XErrorCode XLibcurlTransport::post(const std::string& event)
+{
+ if (shouldCancelPost())
+ {
+ return XErrorCanceled;
+ }
+
+ if (event.empty())
+ {
+ SPDLOG_WARN("Trying post empty event.");
+ return XErrorClientError;
+ }
+ return curlPost(event);
+}
+
+XErrorCode XLibcurlTransport::initLibcurl()
+{
+ auto rc = curl_global_init(CURL_GLOBAL_SSL);
+ if (rc != CURLE_OK)
+ {
+ SPDLOG_ERROR("Fail to init libcurl:({}).", rc);
+ return XErrorNok;
+ }
+
+ curl_ = curl_easy_init();
+ if (!curl_)
+ {
+ SPDLOG_ERROR("Fail to get libcurl handle.");
+ return XErrorNok;
+ }
+
+ rc = curl_easy_setopt(curl_, CURLOPT_ERRORBUFFER, curlErrorBuf_);
+ if (rc != CURLE_OK)
+ {
+ SPDLOG_ERROR("Fail to set libcurl error message buffer:({}).", rc);
+ return XErrorNok;
+ }
+
+ rc = curl_easy_setopt(curl_, CURLOPT_URL, option_.url_.c_str());
+ if (rc != CURLE_OK)
+ {
+ SPDLOG_ERROR("Fail to set libcurl url {}:({}) {}.", option_.url_, rc, curlErrorBuf_);
+ return XErrorNok;
+ }
+
+ if (!option_.sourceIp_.empty())
+ {
+ rc = curl_easy_setopt(curl_, CURLOPT_INTERFACE, option_.sourceIp_.c_str());
+ if (rc != CURLE_OK)
+ {
+ SPDLOG_ERROR("Fail to bind interface {} to libcurl:({}) {}.", option_.sourceIp_, rc, curlErrorBuf_);
+ return XErrorNok;
+ }
+ }
+
+ if (option_.secure_)
+ {
+ if (!option_.certFilePath_.empty())
+ {
+ rc = curl_easy_setopt(curl_, CURLOPT_SSLCERT, option_.certFilePath_.c_str());
+ if (rc != CURLE_OK)
+ {
+ SPDLOG_ERROR("Fail to init libcurl with client cert:({}) {}.", rc, curlErrorBuf_);
+ return XErrorNok;
+ }
+ }
+
+ if (!option_.keyFilePath_.empty())
+ {
+ rc = curl_easy_setopt(curl_, CURLOPT_SSLKEY, option_.keyFilePath_.c_str());
+ if (rc != CURLE_OK)
+ {
+ SPDLOG_ERROR("Fail to init libcurl with client key:({}) {}.", rc, curlErrorBuf_);
+ return XErrorNok;
+ }
+ }
+
+ if (!option_.caInfo_.empty())
+ {
+ rc = curl_easy_setopt(curl_, CURLOPT_CAINFO, option_.caInfo_.c_str());
+ if (rc != CURLE_OK)
+ {
+ SPDLOG_ERROR("Fail to init libcurl with CA cert:({}) {}.", rc, curlErrorBuf_);
+ return XErrorNok;
+ }
+ }
+
+ if (!option_.caFilePath_.empty())
+ {
+ rc = curl_easy_setopt(curl_, CURLOPT_CAPATH, option_.caFilePath_.c_str());
+ if (rc != CURLE_OK)
+ {
+ SPDLOG_ERROR("Fail to init libcurl with CA cert path:({}) {}.", rc, curlErrorBuf_);
+ return XErrorNok;
+ }
+ }
+
+ rc = curl_easy_setopt(curl_, CURLOPT_SSL_VERIFYPEER, option_.verifyPeer_);
+ if (rc != CURLE_OK)
+ {
+ SPDLOG_ERROR("Fail to init libcurl with SSL server verification:({}){}.", rc, curlErrorBuf_);
+ return XErrorNok;
+ }
+
+ rc = curl_easy_setopt(curl_, CURLOPT_SSL_VERIFYHOST, option_.verifyHost_);
+ if (rc != CURLE_OK)
+ {
+ SPDLOG_ERROR("Fail to init libcurl with client host verification:({}) {}.", rc, curlErrorBuf_);
+ return XErrorNok;
+ }
+
+ rc = curl_easy_setopt(curl_, CURLOPT_SSLVERSION, CURL_SSLVERSION_TLSv1_2);
+ if (rc != CURLE_OK)
+ {
+ SPDLOG_ERROR("Fail to init libcurl with SSL version:({}) {}.", rc, curlErrorBuf_);
+ return XErrorNok;
+ }
+ }
+
+ rc = curl_easy_setopt(curl_, CURLOPT_USERAGENT, "libcurl-agent/1.0");
+ if (rc != CURLE_OK)
+ {
+ SPDLOG_ERROR("Fail to init libcurl with user agent:({}) {}.", rc, curlErrorBuf_);
+ return XErrorNok;
+ }
+
+ rc = curl_easy_setopt(curl_, CURLOPT_POST, 1L);
+ if (rc != CURLE_OK)
+ {
+ SPDLOG_ERROR("Fail to init libcurl with regular post:({}) {}.", rc, curlErrorBuf_);
+ return XErrorNok;
+ }
+
+ rc = curl_easy_setopt(curl_, CURLOPT_WRITEFUNCTION, writeCallback);
+ if (rc != CURLE_OK)
+ {
+ SPDLOG_ERROR("Fail to init libcurl with write callback:({}) {}.", rc, curlErrorBuf_);
+ return XErrorNok;
+ }
+
+ rc = curl_easy_setopt(curl_, CURLOPT_READFUNCTION, readCallback);
+ if (rc != CURLE_OK)
+ {
+ SPDLOG_ERROR("Fail to init libcurl with read callback:({}) {}.", rc, curlErrorBuf_);
+ return XErrorNok;
+ }
+
+ hdr_ = curl_slist_append(hdr_, "Content-type: application/json");
+ hdr_ = curl_slist_append(hdr_, "Expect:");
+
+ rc = curl_easy_setopt(curl_, CURLOPT_HTTPHEADER, hdr_);
+ if (rc != CURLE_OK)
+ {
+ SPDLOG_ERROR("Fail to init libcurl to use custom headers:({}) {}.", rc, curlErrorBuf_);
+ return XErrorNok;
+ }
+
+ rc = curl_easy_setopt(curl_, CURLOPT_TIMEOUT, option_.timeOut_.count());
+ if (rc != CURLE_OK)
+ {
+ SPDLOG_ERROR("Fail to init libcurl with timeout:({}) {}.", rc, curlErrorBuf_);
+ return XErrorNok;
+ }
+
+ rc = curl_easy_setopt(curl_, CURLOPT_HTTPAUTH, CURLAUTH_BASIC);
+ if (rc != CURLE_OK)
+ {
+ SPDLOG_ERROR("Fail to init libcurl with Baisc Authentication:({}) {}.", rc, curlErrorBuf_);
+ return XErrorNok;
+ }
+
+ rc = curl_easy_setopt(curl_, CURLOPT_USERNAME, option_.userName_.c_str());
+ if (rc != CURLE_OK)
+ {
+ SPDLOG_ERROR("Fail to init libcurl with user name:({}) {}.", rc, curlErrorBuf_);
+ return XErrorNok;
+ }
+
+ rc = curl_easy_setopt(curl_, CURLOPT_PASSWORD, option_.userPasswd_.c_str());
+ if (rc != CURLE_OK)
+ {
+ SPDLOG_ERROR("Fail to init libcurl with user password:({}) {}.", rc, curlErrorBuf_);
+ return XErrorNok;
+ }
+
+ return XErrorOk;
+}
+
+XErrorCode XLibcurlTransport::cleanupLibcurl()
+{
+ if (curl_)
+ {
+ curl_easy_cleanup(curl_);
+ curl_ = nullptr;
+ }
+
+ if (hdr_)
+ {
+ curl_slist_free_all(hdr_);
+ hdr_ = nullptr;
+ }
+
+ curl_global_cleanup();
+
+ return XErrorOk;
+}
+
+XErrorCode XLibcurlTransport::curlPost(const string & body)
+{
+ XMemoryStuct tMem;
+ XMemoryStuct rMem;
+
+ tMem.memory = (char*)body.c_str();
+ tMem.size = body.size();
+
+ rMem.size = 0;
+ rMem.memory = (char*)malloc(1);
+
+ XAutoCleanUp cleanup([&rMem]() { if (rMem.memory)
+ {
+ free(rMem.memory);
+ rMem.memory = nullptr;
+ }
+ });
+
+ auto rc = curl_easy_setopt(curl_, CURLOPT_WRITEDATA, &rMem);
+ if (rc != CURLE_OK)
+ {
+ SPDLOG_ERROR("Fail to init libcurl with write data:({}) {}.", rc, curlErrorBuf_);
+ return XErrorNok;
+ }
+
+ rc = curl_easy_setopt(curl_, CURLOPT_READDATA, &tMem);
+ if (rc != CURLE_OK)
+ {
+ SPDLOG_ERROR("Fail to init libcurl with read data:({}) {}.", rc, curlErrorBuf_);
+ return XErrorNok;
+ }
+
+ rc = curl_easy_setopt(curl_, CURLOPT_POSTFIELDSIZE, tMem.size);
+ if (rc != CURLE_OK)
+ {
+ SPDLOG_ERROR("Fail to set length of post data for libcurl:({}) {}.", rc, curlErrorBuf_);
+ return XErrorNok;
+ }
+
+ SPDLOG_DEBUG("Libcurl posting event:{}.", body);
+ rc = curl_easy_perform(curl_);
+
+ if (rc != CURLE_OK)
+ {
+ SPDLOG_ERROR("Libcurl fails to post event:({}) {}.", rc, curlErrorBuf_);
+ if (rc == CURLE_OPERATION_TIMEDOUT)
+ {
+ return XErrorTimeout;
+ }
+ return XErrorNok;
+ }
+
+ int httpResponseCode = 0;
+ curl_easy_getinfo(curl_, CURLINFO_RESPONSE_CODE, &httpResponseCode);
+ SPDLOG_DEBUG("Post event:{}, response code: ({}), response body:{}.", body, httpResponseCode, rMem.size>0 ? rMem.memory : "None");
+
+ if ((httpResponseCode / 100) == 2)
+ {
+ return XErrorOk;
+ }
+
+ if ((httpResponseCode / 100) == 4)
+ {
+ return XErrorClientError;
+ }
+
+ if ((httpResponseCode / 100) == 5)
+ {
+ return XErrorServerError;
+ }
+
+ return XErrorNok;
+}
+
+size_t XLibcurlTransport::readCallback(void * ptr, size_t size, size_t nmemb, void * userp)
+{
+ size_t rtn = 0;
+ size_t bytesToWrite = 0;
+ XMemoryStuct* tMem = (XMemoryStuct*)userp;
+
+ bytesToWrite = std::min(size*nmemb, tMem->size);
+
+ if (bytesToWrite > 0)
+ {
+ strncpy((char *)ptr, tMem->memory, bytesToWrite);
+ tMem->memory += bytesToWrite;
+ tMem->size -= bytesToWrite;
+ rtn = bytesToWrite;
+ }
+ return rtn;
+}
+
+size_t XLibcurlTransport::writeCallback(void * ptr, size_t size, size_t nmemb, void * userp)
+{
+ size_t realSize = size * nmemb;
+ XMemoryStuct* rMem = (XMemoryStuct*)userp;
+ rMem->memory = (char*)realloc(rMem->memory, rMem->size + realSize + 1);
+ if (rMem->memory == nullptr)
+ {
+ SPDLOG_ERROR("No enough memory.");
+ return 0;
+ }
+
+ memcpy(&(rMem->memory[rMem->size]), ptr, realSize);
+ rMem->size += realSize;
+ rMem->memory[rMem->size] = 0;
+
+ return realSize;
+}
+
diff --git a/veslibrary/ves_cpplibrary/src/lib/transport/XLibcurlTransport.h b/veslibrary/ves_cpplibrary/src/lib/transport/XLibcurlTransport.h
new file mode 100755
index 0000000..544a5c4
--- /dev/null
+++ b/veslibrary/ves_cpplibrary/src/lib/transport/XLibcurlTransport.h
@@ -0,0 +1,41 @@
+#pragma once
+
+#include <atomic>
+#include <chrono>
+#include <mutex>
+#include <string>
+#include <curl/curl.h>
+#include "XTransport.h"
+
+namespace vagt
+{
+ namespace transport
+ {
+ class XLibcurlTransport: public XTransport
+ {
+ public:
+ XLibcurlTransport(const XTransportOption& option);
+
+ virtual ~XLibcurlTransport() {}
+
+ virtual XErrorCode start() override;
+
+ virtual XErrorCode stop() override;
+
+ virtual XErrorCode post(const std::string& event) override;
+
+ private:
+ XErrorCode initLibcurl();
+ XErrorCode cleanupLibcurl();
+ XErrorCode curlPost(const std::string& body);
+
+ static char curlErrorBuf_[CURL_ERROR_SIZE];
+ static size_t readCallback(void *ptr, size_t size, size_t nmemb, void *userp);
+ static size_t writeCallback(void *ptr, size_t size, size_t nmemb, void *userp);
+
+ XTransportOption option_;
+ CURL* curl_;
+ struct curl_slist* hdr_;
+ };
+ }
+}
diff --git a/veslibrary/ves_cpplibrary/src/lib/transport/XMemQueue.cpp b/veslibrary/ves_cpplibrary/src/lib/transport/XMemQueue.cpp
new file mode 100755
index 0000000..443ab40
--- /dev/null
+++ b/veslibrary/ves_cpplibrary/src/lib/transport/XMemQueue.cpp
@@ -0,0 +1,46 @@
+#include "XMemQueue.h"
+#ifndef SPDLOG_ACTIVE_LEVEL
+#define SPDLOG_ACTIVE_LEVEL SPDLOG_LEVEL_TRACE
+#endif
+#include "spdlog/spdlog.h"
+
+using namespace std;
+using namespace vagt::queue;
+
+vagt::queue::XMemQueue::XMemQueue(int capacity):capacity_(capacity),size_(0)
+{
+
+}
+
+bool vagt::queue::XMemQueue::empty()
+{
+ return 0 == size_;
+}
+
+XErrorCode vagt::queue::XMemQueue::push(const std::string & val)
+{
+ if (size_ >= capacity_)
+ {
+ queue_.pop();
+ SPDLOG_WARN("Drop from queue.");
+ size_--;
+ }
+
+ queue_.push(val);
+ size_++;
+
+ return XErrorOk;
+}
+
+void vagt::queue::XMemQueue::pop()
+{
+ //cal empty() before cal pop()
+ queue_.pop();
+ size_--;
+}
+
+std::string vagt::queue::XMemQueue::front()
+{
+ //cal empty() before cal pop()
+ return queue_.front();
+}
diff --git a/veslibrary/ves_cpplibrary/src/lib/transport/XMemQueue.h b/veslibrary/ves_cpplibrary/src/lib/transport/XMemQueue.h
new file mode 100755
index 0000000..0a1a651
--- /dev/null
+++ b/veslibrary/ves_cpplibrary/src/lib/transport/XMemQueue.h
@@ -0,0 +1,24 @@
+#pragma once
+
+#include "XQueue.h"
+#include <queue>
+
+namespace vagt
+{
+ namespace queue
+ {
+ class XMemQueue: public XQueue
+ {
+ public:
+ XMemQueue(int capacity);
+ virtual bool empty() override;
+ virtual XErrorCode push(const std::string & val) override;
+ virtual void pop() override;
+ virtual std::string front() override;
+ private:
+ int capacity_;
+ int size_;
+ std::queue<std::string> queue_;
+ };
+ }
+}
diff --git a/veslibrary/ves_cpplibrary/src/lib/transport/XQueue.cpp b/veslibrary/ves_cpplibrary/src/lib/transport/XQueue.cpp
new file mode 100755
index 0000000..a91de1e
--- /dev/null
+++ b/veslibrary/ves_cpplibrary/src/lib/transport/XQueue.cpp
@@ -0,0 +1,15 @@
+#include "XQueue.h"
+#include "XMemQueue.h"
+#include "XDiskQueue.h"
+
+using namespace vagt::queue;
+
+std::shared_ptr<XQueue> vagt::queue::XQueue::create(int capacity)
+{
+ return std::make_shared<XMemQueue>(capacity);
+}
+
+std::shared_ptr<XQueue> vagt::queue::XQueue::create(const std::string & path)
+{
+ return std::make_shared<XDiskQueue>(path);
+}
diff --git a/veslibrary/ves_cpplibrary/src/lib/transport/XRetryTransport.cpp b/veslibrary/ves_cpplibrary/src/lib/transport/XRetryTransport.cpp
new file mode 100755
index 0000000..90db0a9
--- /dev/null
+++ b/veslibrary/ves_cpplibrary/src/lib/transport/XRetryTransport.cpp
@@ -0,0 +1,156 @@
+#include "XRetryTransport.h"
+#ifndef SPDLOG_ACTIVE_LEVEL
+#define SPDLOG_ACTIVE_LEVEL SPDLOG_LEVEL_TRACE
+#endif
+#include "spdlog/spdlog.h"
+
+using namespace std;
+using namespace vagt;
+using namespace vagt::transport;
+using namespace spdlog;
+
+XRetryTransport::XRetryTransport(std::shared_ptr<XTransport> transport, std::chrono::milliseconds retryInterval, int tryTimes):
+ transport_(transport),
+ retryInterval_(retryInterval),
+ tryTimes_(tryTimes),
+ isRetryRunning_(false)
+{
+}
+
+XErrorCode XRetryTransport::start()
+{
+ if (!transport_)
+ {
+ SPDLOG_ERROR("Empty transport.");
+ return XErrorNok;
+ }
+
+ auto rc = transport_->start();
+
+ allowPost();
+
+ return rc;
+}
+
+XErrorCode XRetryTransport::stop()
+{
+ cancelPost();
+ if (!transport_)
+ {
+ SPDLOG_ERROR("Empty transport.");
+ return XErrorNok;
+ }
+
+ while (isRetryRunning_.load(std::memory_order_acquire))
+ {
+ SPDLOG_DEBUG("Waiting for retry exit.");
+ std::this_thread::sleep_for(std::chrono::milliseconds(100));
+ }
+
+ return transport_->stop();
+}
+
+XErrorCode XRetryTransport::post(const std::string& event)
+{
+ if (shouldCancelPost())
+ {
+ return XErrorCanceled;
+ }
+
+ if (event.empty())
+ {
+ SPDLOG_WARN("Trying post empty event.");
+ return XErrorClientError;
+ }
+
+ if (!transport_)
+ {
+ SPDLOG_ERROR("Empty transport.");
+ return XErrorNok;
+ }
+
+ XErrorCode rc = XErrorNok;
+
+ isRetryRunning_.store(true, std::memory_order_release);
+
+ int idx = 0;
+ do
+ {
+ SPDLOG_DEBUG("Posting event:{}.", event);
+ rc = transport_->post(event);
+ SPDLOG_DEBUG("Post event:{} ({}).", event, rc);
+ if (rc == XErrorOk || rc == XErrorCanceled)
+ {
+ isRetryRunning_.store(false, std::memory_order_release);
+ return rc;
+ }
+ else if (rc == XErrorClientError)
+ {
+ isRetryRunning_.store(false, std::memory_order_release);
+ SPDLOG_ERROR("Post event:({}).", rc);
+ return rc;
+ }
+ //else if (rc == XErrorTimeout || rc == XErrorServerError || rc == XErrorNetworkError)
+ else
+ {
+ ++idx;
+ if (shouldCancelPost() || idx>=tryTimes_)
+ {
+ break;
+ }
+
+ SPDLOG_INFO("Stopping transport for retry:({}), time:({}).", rc, idx);
+ transport_->stop();
+ {
+ unique_lock<mutex> lk(lock_);
+ bool cancel = cond_.wait_for(lk, retryInterval_, [this]() { return this->cancel_.load(); });
+ if (cancel)
+ {
+ isRetryRunning_.store(false, std::memory_order_release);
+ SPDLOG_INFO("Cancel post event:({}).", rc);
+ return rc;
+ }
+ }
+
+ SPDLOG_INFO("Starting transport for retry:({}), time:({}).", rc, idx);
+ transport_->start();
+ }
+ }
+ while(true);
+
+
+ isRetryRunning_.store(false, std::memory_order_release);
+ SPDLOG_ERROR("Post event:({}).", rc);
+ return rc;
+}
+
+void vagt::transport::XRetryTransport::cancelPost()
+{
+ XTransport::cancelPost();
+
+ {
+ unique_lock<mutex> lk(lock_);
+ cancel_.store(true, std::memory_order_release);
+ cond_.notify_one();
+ }
+
+ if (transport_)
+ {
+ transport_->cancelPost();
+ }
+}
+
+void vagt::transport::XRetryTransport::allowPost()
+{
+ if (transport_)
+ {
+ transport_->allowPost();
+ }
+ XTransport::allowPost();
+}
+
+bool vagt::transport::XRetryTransport::shouldCancelPost()
+{
+ return XTransport::shouldCancelPost();
+}
+
diff --git a/veslibrary/ves_cpplibrary/src/lib/transport/XRetryTransport.h b/veslibrary/ves_cpplibrary/src/lib/transport/XRetryTransport.h
new file mode 100755
index 0000000..c6d8521
--- /dev/null
+++ b/veslibrary/ves_cpplibrary/src/lib/transport/XRetryTransport.h
@@ -0,0 +1,43 @@
+#pragma once
+
+#include <chrono>
+#include <atomic>
+#include <condition_variable>
+#include <mutex>
+#include "XTransport.h"
+
+namespace vagt
+{
+ namespace transport
+ {
+ class XRetryTransport : public XTransport
+ {
+ public:
+ XRetryTransport(std::shared_ptr<XTransport> transport, std::chrono::milliseconds retryInterval, int retryTimes);
+
+ virtual ~XRetryTransport() {}
+
+ virtual XErrorCode start() override;
+
+ virtual XErrorCode stop() override;
+
+ virtual XErrorCode post(const std::string& event) override;
+
+ virtual void cancelPost() override;
+
+ virtual void allowPost() override;
+
+ virtual bool shouldCancelPost() override;
+
+ private:
+ std::shared_ptr<XTransport> transport_;
+ std::chrono::milliseconds retryInterval_;
+ int tryTimes_;
+
+ std::atomic<bool> isRetryRunning_;
+
+ std::mutex lock_;
+ std::condition_variable cond_;
+ };
+ }
+} \ No newline at end of file
diff --git a/veslibrary/ves_cpplibrary/src/lib/transport/XRpcClientTransport.cpp b/veslibrary/ves_cpplibrary/src/lib/transport/XRpcClientTransport.cpp
new file mode 100755
index 0000000..a63fc00
--- /dev/null
+++ b/veslibrary/ves_cpplibrary/src/lib/transport/XRpcClientTransport.cpp
@@ -0,0 +1,183 @@
+#include "XRpcClientTransport.h"
+#ifndef SPDLOG_ACTIVE_LEVEL
+#define SPDLOG_ACTIVE_LEVEL SPDLOG_LEVEL_TRACE
+#endif
+#include "spdlog/spdlog.h"
+#include "thrift/TOutput.h"
+
+using namespace std;
+using namespace vagt;
+using namespace vagt::transport;
+using namespace vagt::transport::rpc;
+using namespace ::apache::thrift;
+using namespace ::apache::thrift::protocol;
+using namespace ::apache::thrift::transport;
+
+void DummyOutput(const char* msg)
+{
+ SPDLOG_INFO("{}", msg);
+}
+
+vagt::transport::XRpcClientTransport::XRpcClientTransport(const XTransportOption & option):
+ option_(option)
+{
+}
+
+vagt::transport::XRpcClientTransport::~XRpcClientTransport()
+{
+}
+
+XErrorCode vagt::transport::XRpcClientTransport::start()
+{
+ auto rc = startRPC();
+
+ allowPost();
+
+ return rc;
+}
+
+XErrorCode vagt::transport::XRpcClientTransport::stop()
+{
+ cancelPost();
+ return stopRPC();
+}
+
+XErrorCode vagt::transport::XRpcClientTransport::post(const std::string & event)
+{
+ if (shouldCancelPost())
+ {
+ return XErrorCanceled;
+ }
+
+ if (event.empty())
+ {
+ SPDLOG_WARN("Trying post empty event.");
+ return XErrorClientError;
+ }
+
+ if (!rpcClient_)
+ {
+ SPDLOG_ERROR("RPC client is not started.");
+ return XErrorNok;
+ }
+
+ try
+ {
+ return (XErrorCode)(rpcClient_->post(event));
+ }
+ catch (TTransportException ex)
+ {
+ SPDLOG_ERROR("Fail to post event:({}).", ex.what());
+ return XErrorNetworkError;
+ }
+ catch (TException ex)
+ {
+ SPDLOG_ERROR("Fail to post event:({}).", ex.what());
+ return XErrorNetworkError;
+ }
+ catch (...)
+ {
+ SPDLOG_ERROR("Fail to post event.");
+ return XErrorNetworkError;
+ }
+}
+
+void vagt::transport::XRpcClientTransport::cancelPost()
+{
+ XTransport::cancelPost();
+}
+
+void vagt::transport::XRpcClientTransport::allowPost()
+{
+ XTransport::allowPost();
+}
+
+bool vagt::transport::XRpcClientTransport::shouldCancelPost()
+{
+ return XTransport::shouldCancelPost();
+}
+
+XErrorCode vagt::transport::XRpcClientTransport::startRPC()
+{
+ GlobalOutput.setOutputFunction(DummyOutput);
+
+ string host("127.0.0.1");
+ if (!option_.host_.empty())
+ {
+ host = option_.host_;
+ }
+
+ int port = 5678;
+ if (option_.port_ > 1024)
+ {
+ port = option_.port_;
+ }
+
+ SPDLOG_INFO("Connecting to RPC server:({}:{}).", host, port);
+
+ int timeout = 1000;
+ if (option_.timeOut_.count() >0)
+ {
+ timeout = option_.timeOut_.count();
+ }
+
+ auto socket = std::make_shared<TSocket>(host, port);
+ socket->setConnTimeout(timeout);
+ socket->setRecvTimeout(timeout);
+ socket->setSendTimeout(timeout);
+ rpcTransport_ = std::make_shared<TBufferedTransport>(socket);
+ auto protocol = std::make_shared<TCompactProtocol>(rpcTransport_);
+ rpcClient_ = std::make_shared<vagt::transport::rpc::XRpcTransportClient>(protocol);
+
+ try
+ {
+ rpcTransport_->open();
+ }
+ catch (TTransportException ex)
+ {
+ SPDLOG_ERROR("Fail to start rpc client:({}).", ex.what());
+ return XErrorNok;
+ }
+ catch (TException ex)
+ {
+ SPDLOG_ERROR("Fail to start rpc client:({}).", ex.what());
+ return XErrorNok;
+ }
+ catch (...)
+ {
+ SPDLOG_ERROR("Fail to start rpc client.");
+ return XErrorNok;
+ }
+
+ return XErrorOk;
+}
+
+XErrorCode vagt::transport::XRpcClientTransport::stopRPC()
+{
+ if (!rpcTransport_)
+ {
+ return XErrorNok;
+ }
+
+ try
+ {
+ rpcTransport_->close();
+ rpcTransport_.reset();
+ }
+ catch (TTransportException ex)
+ {
+ SPDLOG_ERROR("Fail to stop rpc client:({}).", ex.what());
+ return XErrorNok;
+ }
+ catch (TException ex)
+ {
+ SPDLOG_ERROR("Fail to stop rpc client:({}).", ex.what());
+ return XErrorNok;
+ }
+ catch (...)
+ {
+ SPDLOG_ERROR("Fail to stop rpc client.");
+ return XErrorNok;
+ }
+ return XErrorOk;
+}
diff --git a/veslibrary/ves_cpplibrary/src/lib/transport/XRpcClientTransport.h b/veslibrary/ves_cpplibrary/src/lib/transport/XRpcClientTransport.h
new file mode 100755
index 0000000..8284d7b
--- /dev/null
+++ b/veslibrary/ves_cpplibrary/src/lib/transport/XRpcClientTransport.h
@@ -0,0 +1,38 @@
+#pragma once
+
+#include "XTransport.h"
+#include "thrift/transport/TSocket.h"
+#include "thrift/protocol/TCompactProtocol.h"
+#include "thrift/transport/TBufferTransports.h"
+#include "gen-cpp/rpc_constants.h"
+#include "gen-cpp/rpc_types.h"
+#include "gen-cpp/XRpcTransport.h"
+
+namespace vagt
+{
+ namespace transport
+ {
+ class XRpcClientTransport : public XTransport
+ {
+ public:
+ XRpcClientTransport(const XTransportOption& option);
+ virtual ~XRpcClientTransport();
+
+ virtual XErrorCode start() override;
+ virtual XErrorCode stop() override;
+ virtual XErrorCode post(const std::string& event) override;
+
+ virtual void cancelPost() override;
+ virtual void allowPost() override;
+ virtual bool shouldCancelPost() override;
+ private:
+ XErrorCode startRPC();
+ XErrorCode stopRPC();
+
+ XTransportOption option_;
+
+ std::shared_ptr<apache::thrift::transport::TTransport> rpcTransport_;
+ std::shared_ptr<vagt::transport::rpc::XRpcTransportClient> rpcClient_;
+ };
+ }
+}
diff --git a/veslibrary/ves_cpplibrary/src/lib/transport/XRpcServertTransport.cpp b/veslibrary/ves_cpplibrary/src/lib/transport/XRpcServertTransport.cpp
new file mode 100755
index 0000000..c18db8b
--- /dev/null
+++ b/veslibrary/ves_cpplibrary/src/lib/transport/XRpcServertTransport.cpp
@@ -0,0 +1,222 @@
+#include "XRpcServertTransport.h"
+#include "thrift/transport/TServerSocket.h"
+#include "thrift/server/TThreadedServer.h"
+#include "thrift/protocol/TCompactProtocol.h"
+#include "thrift/transport/TBufferTransports.h"
+#include "thrift/transport/TSocket.h"
+#ifndef SPDLOG_ACTIVE_LEVEL
+#define SPDLOG_ACTIVE_LEVEL SPDLOG_LEVEL_TRACE
+#endif
+#include "spdlog/spdlog.h"
+
+using namespace std;
+using namespace vagt::transport;
+using namespace vagt::transport::rpc;
+using namespace apache::thrift;
+using namespace apache::thrift::protocol;
+using namespace apache::thrift::transport;
+using namespace apache::thrift::server;
+
+constexpr auto XRpcServerIdleTime = 60 * 60 * 1000;
+
+class XRpcTransportImp:public XRpcTransportIf
+{
+public:
+ XRpcTransportImp(XTransport* transport):transport_(transport)
+ {
+ }
+
+ virtual int16_t post(const std::string & data) override
+ {
+ if (transport_)
+ {
+ return transport_->post(data);
+ }
+ }
+private:
+ XTransport* transport_;
+};
+
+class XRpcTransportFactory : public XRpcTransportIfFactory
+{
+public:
+ XRpcTransportFactory(XTransport* transport):transport_(transport)
+ {
+ }
+
+ XRpcTransportIf* getHandler(const ::apache::thrift::TConnectionInfo& connInfo)
+ {
+ shared_ptr<TSocket> sock = dynamic_pointer_cast<TSocket>(connInfo.transport);
+ if (sock)
+ {
+ auto pt = new XRpcTransportImp(transport_);
+ SPDLOG_INFO("New RPC client connection from:({}:{}), handler:({:x}).", sock->getPeerAddress(), sock->getPeerPort(), (unsigned long)pt);
+ return pt;
+ }
+ return nullptr;
+ }
+
+ void releaseHandler(XRpcTransportIf* handler)
+ {
+ SPDLOG_INFO("Clean RPC client handler:({:x}).", (unsigned long)handler);
+
+ if (handler)
+ {
+ delete handler;
+ handler = nullptr;
+ }
+ }
+private:
+ XTransport* transport_;
+};
+
+vagt::transport::XRpcServerTransport::XRpcServerTransport(std::shared_ptr<XTransport> transport, const XTransportOption & option):
+ transport_(transport),
+ option_(option)
+{
+}
+
+vagt::transport::XRpcServerTransport::~XRpcServerTransport()
+{
+}
+
+XErrorCode vagt::transport::XRpcServerTransport::start()
+{
+ if (!transport_)
+ {
+ SPDLOG_ERROR("Empty transport.");
+ return XErrorNok;
+ }
+
+ transport_->start();
+
+ startRPC();
+
+ allowPost();
+
+ return XErrorOk;
+}
+
+XErrorCode vagt::transport::XRpcServerTransport::stop()
+{
+ cancelPost();
+ if (!transport_)
+ {
+ SPDLOG_ERROR("Empty transport.");
+ return XErrorNok;
+ }
+
+ stopRPC();
+
+ future_.get();
+
+ return transport_->stop();
+}
+
+XErrorCode vagt::transport::XRpcServerTransport::post(const std::string & event)
+{
+ if (shouldCancelPost())
+ {
+ return XErrorCanceled;
+ }
+
+ if (event.empty())
+ {
+ SPDLOG_WARN("Trying post empty event.");
+ return XErrorClientError;
+ }
+
+ if (!transport_)
+ {
+ SPDLOG_ERROR("Empty transport.");
+ return XErrorNok;
+ }
+
+ return transport_->post(event);
+}
+
+void vagt::transport::XRpcServerTransport::cancelPost()
+{
+ XTransport::cancelPost();
+
+ if (transport_)
+ {
+ transport_->cancelPost();
+ }
+}
+
+void vagt::transport::XRpcServerTransport::allowPost()
+{
+ if (transport_)
+ {
+ transport_->allowPost();
+ }
+ XTransport::allowPost();
+}
+
+bool vagt::transport::XRpcServerTransport::shouldCancelPost()
+{
+ return XTransport::shouldCancelPost();
+}
+
+XErrorCode vagt::transport::XRpcServerTransport::startRPC()
+{
+ future_ = std::async(launch::async, [this]() {return this->worker(); });
+ return XErrorOk;
+}
+
+XErrorCode vagt::transport::XRpcServerTransport::stopRPC()
+{
+ if (rpcServer_)
+ {
+ rpcServer_->stop();
+ }
+ return XErrorOk;
+}
+
+void vagt::transport::XRpcServerTransport::worker()
+{
+ string source("0.0.0.0");
+ if (!option_.sourceIp_.empty())
+ {
+ source = option_.sourceIp_;
+ }
+
+ int port = 5678;
+ if (option_.port_ > 1024)
+ {
+ port = option_.port_;
+ }
+
+ SPDLOG_INFO("Start RPC transport thread:({}:{}).", source, port);
+
+ auto serverSocket = make_shared<TServerSocket>(source, port);
+ serverSocket->setRecvTimeout(XRpcServerIdleTime);
+ rpcServer_ = make_shared<TThreadedServer>(
+ make_shared<XRpcTransportProcessorFactory>(make_shared<XRpcTransportFactory>(this)),
+ serverSocket,
+ make_shared<TBufferedTransportFactory>(),
+ make_shared<TCompactProtocolFactory>());
+
+ while (!shouldCancelPost())
+ {
+ try
+ {
+ rpcServer_->serve();
+ }
+ catch (apache::thrift::TException ex)
+ {
+ SPDLOG_ERROR("{}", ex.what());
+ this_thread::sleep_for(chrono::seconds(1));
+ continue;
+ }
+ catch (...)
+ {
+ SPDLOG_ERROR("Unknown Exception in thrift.");
+ this_thread::sleep_for(chrono::seconds(1));
+ continue;
+ }
+ }
+
+ SPDLOG_INFO("Quit RPC transport thread.");
+}
diff --git a/veslibrary/ves_cpplibrary/src/lib/transport/XRpcServertTransport.h b/veslibrary/ves_cpplibrary/src/lib/transport/XRpcServertTransport.h
new file mode 100755
index 0000000..14eab07
--- /dev/null
+++ b/veslibrary/ves_cpplibrary/src/lib/transport/XRpcServertTransport.h
@@ -0,0 +1,39 @@
+#pragma once
+
+#include "XTransport.h"
+#include "thrift/server/TServer.h"
+#include "gen-cpp/rpc_constants.h"
+#include "gen-cpp/rpc_types.h"
+#include "gen-cpp/XRpcTransport.h"
+#include <future>
+
+namespace vagt
+{
+ namespace transport
+ {
+ class XRpcServerTransport : public XTransport
+ {
+ public:
+ XRpcServerTransport(std::shared_ptr<XTransport> transport, const XTransportOption& option);
+ virtual ~XRpcServerTransport();
+
+ virtual XErrorCode start() override;
+ virtual XErrorCode stop() override;
+ virtual XErrorCode post(const std::string& event) override;
+
+ virtual void cancelPost() override;
+ virtual void allowPost() override;
+ virtual bool shouldCancelPost() override;
+ private:
+ XErrorCode startRPC();
+ XErrorCode stopRPC();
+ void worker();
+ std::future<void> future_;
+
+ std::shared_ptr<XTransport> transport_;
+ XTransportOption option_;
+
+ std::shared_ptr<apache::thrift::server::TServer> rpcServer_;
+ };
+ }
+}
diff --git a/veslibrary/ves_cpplibrary/src/lib/transport/XSwitchableTransport.cpp b/veslibrary/ves_cpplibrary/src/lib/transport/XSwitchableTransport.cpp
new file mode 100755
index 0000000..26e44ac
--- /dev/null
+++ b/veslibrary/ves_cpplibrary/src/lib/transport/XSwitchableTransport.cpp
@@ -0,0 +1,172 @@
+#include "XSwitchableTransport.h"
+#ifndef SPDLOG_ACTIVE_LEVEL
+#define SPDLOG_ACTIVE_LEVEL SPDLOG_LEVEL_TRACE
+#endif
+#include "spdlog/spdlog.h"
+
+using namespace std;
+using namespace vagt;
+using namespace vagt::transport;
+
+XSwitchableTransport::XSwitchableTransport(std::vector<std::shared_ptr<XTransport>>& processors):
+ transports_(processors),
+ transportIndex_(0)
+{
+}
+
+void XSwitchableTransport::cancelPost()
+{
+ XTransport::cancelPost();
+ for (auto transport : transports_)
+ {
+ if (transport)
+ {
+ transport->cancelPost();
+ }
+ }
+}
+
+void XSwitchableTransport::allowPost()
+{
+ for (auto transport : transports_)
+ {
+ if (transport)
+ {
+ transport->allowPost();
+ }
+ }
+ XTransport::allowPost();
+}
+
+bool XSwitchableTransport::shouldCancelPost()
+{
+ return XTransport::shouldCancelPost();
+}
+
+XErrorCode XSwitchableTransport::tryTransport(const std::string& event)
+{
+ auto transport = transports_[transportIndex_];
+ if (!transport)
+ {
+ SPDLOG_ERROR("Empty transport.");
+ return XErrorNok;
+ }
+
+ SPDLOG_DEBUG("Transport({}) posting event:{}.", transportIndex_, event);
+ return transport->post(event);
+}
+
+void XSwitchableTransport::switchTransport(XErrorCode reason)
+{
+ SPDLOG_INFO("Switching transport:({}).", reason);
+
+ auto transport = transports_[transportIndex_];
+ if (!transport)
+ {
+ SPDLOG_ERROR("Empty transport.");
+ return;
+ }
+
+ transport->stop();
+
+ ++transportIndex_;
+ if (transportIndex_ >= transports_.size())
+ {
+ transportIndex_ = 0;
+ }
+
+ SPDLOG_INFO("Switch to transport, index:({}).", transportIndex_);
+ transport = transports_[transportIndex_];
+ if (!transport)
+ {
+ SPDLOG_ERROR("Empty transport({}).", transportIndex_);
+ return;
+ }
+
+ transport->start();
+}
+
+XErrorCode XSwitchableTransport::start()
+{
+ if (transportIndex_ >= transports_.size())
+ {
+ SPDLOG_ERROR("Transport index ({}) >= transport number ({}).", transportIndex_, transports_.size());
+ return XErrorNok;
+ }
+
+ auto transport = transports_[transportIndex_];
+ XErrorCode rc = XErrorNok;
+ if (transport)
+ {
+ rc = transport->start();
+ }
+
+ allowPost();
+
+ return rc;
+}
+
+XErrorCode XSwitchableTransport::stop()
+{
+ cancelPost();
+
+ if (transportIndex_ >= transports_.size())
+ {
+ SPDLOG_ERROR("Transport index ({}) >= transport number ({}).", transportIndex_, transports_.size());
+ return XErrorNok;
+ }
+
+ auto transport = transports_[transportIndex_];
+ XErrorCode rc = XErrorNok;
+ if (transport)
+ {
+ rc = transport->stop();
+ }
+
+ return rc;
+}
+
+XErrorCode XSwitchableTransport::post(const std::string& event)
+{
+ if (shouldCancelPost())
+ {
+ return XErrorCanceled;
+ }
+
+ if (event.empty())
+ {
+ SPDLOG_WARN("Trying post empty event.");
+ return XErrorClientError;
+ }
+
+ if (transportIndex_ >= transports_.size())
+ {
+ SPDLOG_ERROR("Transport index ({}) >= transport number ({}).", transportIndex_, transports_.size());
+ return XErrorNok;
+ }
+
+ auto rc = XErrorNok;
+ for (auto i = 0;
+ !shouldCancelPost() && i < transports_.size();
+ ++i)
+ {
+ rc = tryTransport(event);
+ if (rc == XErrorOk ||
+ rc == XErrorClientError ||
+ rc == XErrorCanceled)
+ {
+ return rc;
+ }
+ else
+ {
+ if (shouldCancelPost())
+ {
+ break;
+ }
+ switchTransport(rc);
+ }
+ }
+
+ return rc;
+}
+
diff --git a/veslibrary/ves_cpplibrary/src/lib/transport/XSwitchableTransport.h b/veslibrary/ves_cpplibrary/src/lib/transport/XSwitchableTransport.h
new file mode 100755
index 0000000..e4bb82b
--- /dev/null
+++ b/veslibrary/ves_cpplibrary/src/lib/transport/XSwitchableTransport.h
@@ -0,0 +1,37 @@
+#pragma once
+
+#include <vector>
+#include "XTransport.h"
+
+namespace vagt
+{
+ namespace transport
+ {
+ class XSwitchableTransport : public XTransport
+ {
+ public:
+ XSwitchableTransport(std::vector<std::shared_ptr<XTransport>>& processors);
+
+ virtual ~XSwitchableTransport() {}
+
+ virtual XErrorCode start() override;
+
+ virtual XErrorCode stop() override;
+
+ virtual XErrorCode post(const std::string& event) override;
+
+ virtual void cancelPost() override;
+
+ virtual void allowPost() override;
+
+ virtual bool shouldCancelPost() override;
+
+ private:
+ XErrorCode tryTransport(const std::string& event);
+ void switchTransport(vagt::transport::XErrorCode reason);
+
+ std::vector<std::shared_ptr<XTransport>> transports_;
+ unsigned int transportIndex_;
+ };
+ }
+}
diff --git a/veslibrary/ves_cpplibrary/src/lib/transport/XSynchronizedTransport.cpp b/veslibrary/ves_cpplibrary/src/lib/transport/XSynchronizedTransport.cpp
new file mode 100755
index 0000000..88f4dfa
--- /dev/null
+++ b/veslibrary/ves_cpplibrary/src/lib/transport/XSynchronizedTransport.cpp
@@ -0,0 +1,115 @@
+#include "XSynchronizedTransport.h"
+#ifndef SPDLOG_ACTIVE_LEVEL
+#define SPDLOG_ACTIVE_LEVEL SPDLOG_LEVEL_TRACE
+#endif
+#include "spdlog/spdlog.h"
+
+using namespace std;
+using namespace vagt;
+using namespace vagt::transport;
+
+XSynchronizedTransport::XSynchronizedTransport(std::shared_ptr<XTransport> transport):
+ transport_(transport),
+ status_(XStopped)
+{
+}
+
+XErrorCode XSynchronizedTransport::start()
+{
+ unique_lock<mutex> lk(controlLock_);
+
+ if (status_ == XStarted)
+ {
+ return XErrorOk;
+ }
+
+ XErrorCode rc = XErrorNok;
+ if (transport_)
+ {
+ rc = transport_->start();
+ }
+
+ if (rc == XErrorOk)
+ {
+ status_ = XStarted;
+ }
+
+ allowPost();
+
+ return rc;
+}
+
+XErrorCode XSynchronizedTransport::stop()
+{
+ unique_lock<mutex> lk(controlLock_);
+
+ if (status_ == XStopped)
+ {
+ return XErrorOk;
+ }
+
+ cancelPost();
+
+ if (transport_)
+ {
+ transport_->cancelPost();
+ }
+
+ unique_lock<mutex> lkPost(postLock_);
+ XErrorCode rc = XErrorNok;
+ if (transport_)
+ {
+ rc = transport_->stop();
+ }
+
+ if (rc == XErrorOk)
+ {
+ status_ = XStopped;
+ }
+ return rc;
+}
+
+XErrorCode XSynchronizedTransport::post(const std::string& event)
+{
+ if (shouldCancelPost())
+ {
+ return XErrorCanceled;
+ }
+
+ if (event.empty())
+ {
+ SPDLOG_WARN("Trying post empty event.");
+ return XErrorClientError;
+ }
+
+ unique_lock<mutex> lk(postLock_);
+ if (transport_)
+ {
+ return transport_->post(event);
+ }
+ return XErrorNok;
+}
+
+void XSynchronizedTransport::cancelPost()
+{
+ XTransport::cancelPost();
+
+ if (transport_)
+ {
+ transport_->cancelPost();
+ }
+}
+
+void XSynchronizedTransport::allowPost()
+{
+ if (transport_)
+ {
+ transport_->allowPost();
+ }
+ XTransport::allowPost();
+}
+
+bool XSynchronizedTransport::shouldCancelPost()
+{
+ return XTransport::shouldCancelPost();
+}
diff --git a/veslibrary/ves_cpplibrary/src/lib/transport/XSynchronizedTransport.h b/veslibrary/ves_cpplibrary/src/lib/transport/XSynchronizedTransport.h
new file mode 100755
index 0000000..85b16d9
--- /dev/null
+++ b/veslibrary/ves_cpplibrary/src/lib/transport/XSynchronizedTransport.h
@@ -0,0 +1,41 @@
+#pragma once
+#include <mutex>
+#include "XTransport.h"
+
+namespace vagt
+{
+ namespace transport
+ {
+ class XSynchronizedTransport : public XTransport
+ {
+ public:
+ enum XStatus : unsigned short
+ {
+ XStarted,
+ XStopped,
+ };
+
+ XSynchronizedTransport(std::shared_ptr<XTransport> transport);
+
+ virtual ~XSynchronizedTransport() {}
+
+ virtual XErrorCode start() override;
+
+ virtual XErrorCode stop() override;
+
+ virtual XErrorCode post(const std::string& event) override;
+
+ virtual void cancelPost() override;
+
+ virtual void allowPost() override;
+
+ virtual bool shouldCancelPost() override;
+
+ private:
+ std::mutex postLock_;
+ std::mutex controlLock_;
+ std::shared_ptr<XTransport> transport_;
+ XStatus status_;
+ };
+ }
+}
diff --git a/veslibrary/ves_cpplibrary/src/lib/transport/XTransport.cpp b/veslibrary/ves_cpplibrary/src/lib/transport/XTransport.cpp
new file mode 100755
index 0000000..ac61acc
--- /dev/null
+++ b/veslibrary/ves_cpplibrary/src/lib/transport/XTransport.cpp
@@ -0,0 +1,64 @@
+#include "XTransport.h"
+#include "XLibcurlTransport.h"
+#include "XRetryTransport.h"
+#include "XSwitchableTransport.h"
+#include "XSynchronizedTransport.h"
+#include "XBufferedTransport.h"
+#include "XRpcClientTransport.h"
+#include "XRpcServertTransport.h"
+
+using namespace vagt::transport;
+
+XTransport::XTransport():cancel_(true)
+{
+}
+
+void XTransport::cancelPost()
+{
+ cancel_.store(true, std::memory_order_release);
+}
+
+void XTransport::allowPost()
+{
+ cancel_.store(false, std::memory_order_release);
+}
+
+bool XTransport::shouldCancelPost()
+{
+ return cancel_.load(std::memory_order_acquire);
+}
+
+std::shared_ptr<XTransport> vagt::transport::XTransport::LibCurlTransport(const XTransportOption & option)
+{
+ return std::make_shared<XLibcurlTransport>(option);
+}
+
+std::shared_ptr<XTransport> vagt::transport::XTransport::RetryTransport(std::shared_ptr<XTransport> transport, std::chrono::milliseconds retryInterval, int retryTimes)
+{
+ return std::make_shared<XRetryTransport>(transport, retryInterval, retryTimes);
+}
+
+std::shared_ptr<XTransport> vagt::transport::XTransport::SwitchableTransport(std::vector<std::shared_ptr<XTransport>>& transports)
+{
+ return std::make_shared<XSwitchableTransport>(transports);
+}
+
+std::shared_ptr<XTransport> vagt::transport::XTransport::SynchronizedTransport(std::shared_ptr<XTransport> transport)
+{
+ return std::make_shared<XSynchronizedTransport>(transport);
+}
+
+std::shared_ptr<XTransport> vagt::transport::XTransport::BufferedTransport(std::shared_ptr<XTransport> transport, std::shared_ptr<vagt::queue::XQueue> queue)
+{
+ return std::make_shared<XBufferedTransport>(transport, queue);
+}
+
+std::shared_ptr<XTransport> vagt::transport::XTransport::RpcClientTransport(const XTransportOption& option)
+{
+ return std::make_shared<XRpcClientTransport>(option);
+}
+
+std::shared_ptr<XTransport> vagt::transport::XTransport::RpcServerTransport(std::shared_ptr<XTransport> transport, const XTransportOption& option)
+{
+ return std::make_shared<XRpcServerTransport>(transport, option);
+}
diff --git a/veslibrary/ves_cpplibrary/src/lib/transport/gen-cpp/XRpcTransport.cpp b/veslibrary/ves_cpplibrary/src/lib/transport/gen-cpp/XRpcTransport.cpp
new file mode 100755
index 0000000..0cdd75b
--- /dev/null
+++ b/veslibrary/ves_cpplibrary/src/lib/transport/gen-cpp/XRpcTransport.cpp
@@ -0,0 +1,421 @@
+/**
+ * Autogenerated by Thrift Compiler (0.12.0)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ * @generated
+ */
+#include "XRpcTransport.h"
+
+namespace vagt { namespace transport { namespace rpc {
+
+
+XRpcTransport_post_args::~XRpcTransport_post_args() throw() {
+}
+
+
+uint32_t XRpcTransport_post_args::read(::apache::thrift::protocol::TProtocol* iprot) {
+
+ ::apache::thrift::protocol::TInputRecursionTracker tracker(*iprot);
+ uint32_t xfer = 0;
+ std::string fname;
+ ::apache::thrift::protocol::TType ftype;
+ int16_t fid;
+
+ xfer += iprot->readStructBegin(fname);
+
+ using ::apache::thrift::protocol::TProtocolException;
+
+
+ while (true)
+ {
+ xfer += iprot->readFieldBegin(fname, ftype, fid);
+ if (ftype == ::apache::thrift::protocol::T_STOP) {
+ break;
+ }
+ switch (fid)
+ {
+ case 1:
+ if (ftype == ::apache::thrift::protocol::T_STRING) {
+ xfer += iprot->readString(this->data);
+ this->__isset.data = true;
+ } else {
+ xfer += iprot->skip(ftype);
+ }
+ break;
+ default:
+ xfer += iprot->skip(ftype);
+ break;
+ }
+ xfer += iprot->readFieldEnd();
+ }
+
+ xfer += iprot->readStructEnd();
+
+ return xfer;
+}
+
+uint32_t XRpcTransport_post_args::write(::apache::thrift::protocol::TProtocol* oprot) const {
+ uint32_t xfer = 0;
+ ::apache::thrift::protocol::TOutputRecursionTracker tracker(*oprot);
+ xfer += oprot->writeStructBegin("XRpcTransport_post_args");
+
+ xfer += oprot->writeFieldBegin("data", ::apache::thrift::protocol::T_STRING, 1);
+ xfer += oprot->writeString(this->data);
+ xfer += oprot->writeFieldEnd();
+
+ xfer += oprot->writeFieldStop();
+ xfer += oprot->writeStructEnd();
+ return xfer;
+}
+
+
+XRpcTransport_post_pargs::~XRpcTransport_post_pargs() throw() {
+}
+
+
+uint32_t XRpcTransport_post_pargs::write(::apache::thrift::protocol::TProtocol* oprot) const {
+ uint32_t xfer = 0;
+ ::apache::thrift::protocol::TOutputRecursionTracker tracker(*oprot);
+ xfer += oprot->writeStructBegin("XRpcTransport_post_pargs");
+
+ xfer += oprot->writeFieldBegin("data", ::apache::thrift::protocol::T_STRING, 1);
+ xfer += oprot->writeString((*(this->data)));
+ xfer += oprot->writeFieldEnd();
+
+ xfer += oprot->writeFieldStop();
+ xfer += oprot->writeStructEnd();
+ return xfer;
+}
+
+
+XRpcTransport_post_result::~XRpcTransport_post_result() throw() {
+}
+
+
+uint32_t XRpcTransport_post_result::read(::apache::thrift::protocol::TProtocol* iprot) {
+
+ ::apache::thrift::protocol::TInputRecursionTracker tracker(*iprot);
+ uint32_t xfer = 0;
+ std::string fname;
+ ::apache::thrift::protocol::TType ftype;
+ int16_t fid;
+
+ xfer += iprot->readStructBegin(fname);
+
+ using ::apache::thrift::protocol::TProtocolException;
+
+
+ while (true)
+ {
+ xfer += iprot->readFieldBegin(fname, ftype, fid);
+ if (ftype == ::apache::thrift::protocol::T_STOP) {
+ break;
+ }
+ switch (fid)
+ {
+ case 0:
+ if (ftype == ::apache::thrift::protocol::T_I16) {
+ xfer += iprot->readI16(this->success);
+ this->__isset.success = true;
+ } else {
+ xfer += iprot->skip(ftype);
+ }
+ break;
+ default:
+ xfer += iprot->skip(ftype);
+ break;
+ }
+ xfer += iprot->readFieldEnd();
+ }
+
+ xfer += iprot->readStructEnd();
+
+ return xfer;
+}
+
+uint32_t XRpcTransport_post_result::write(::apache::thrift::protocol::TProtocol* oprot) const {
+
+ uint32_t xfer = 0;
+
+ xfer += oprot->writeStructBegin("XRpcTransport_post_result");
+
+ if (this->__isset.success) {
+ xfer += oprot->writeFieldBegin("success", ::apache::thrift::protocol::T_I16, 0);
+ xfer += oprot->writeI16(this->success);
+ xfer += oprot->writeFieldEnd();
+ }
+ xfer += oprot->writeFieldStop();
+ xfer += oprot->writeStructEnd();
+ return xfer;
+}
+
+
+XRpcTransport_post_presult::~XRpcTransport_post_presult() throw() {
+}
+
+
+uint32_t XRpcTransport_post_presult::read(::apache::thrift::protocol::TProtocol* iprot) {
+
+ ::apache::thrift::protocol::TInputRecursionTracker tracker(*iprot);
+ uint32_t xfer = 0;
+ std::string fname;
+ ::apache::thrift::protocol::TType ftype;
+ int16_t fid;
+
+ xfer += iprot->readStructBegin(fname);
+
+ using ::apache::thrift::protocol::TProtocolException;
+
+
+ while (true)
+ {
+ xfer += iprot->readFieldBegin(fname, ftype, fid);
+ if (ftype == ::apache::thrift::protocol::T_STOP) {
+ break;
+ }
+ switch (fid)
+ {
+ case 0:
+ if (ftype == ::apache::thrift::protocol::T_I16) {
+ xfer += iprot->readI16((*(this->success)));
+ this->__isset.success = true;
+ } else {
+ xfer += iprot->skip(ftype);
+ }
+ break;
+ default:
+ xfer += iprot->skip(ftype);
+ break;
+ }
+ xfer += iprot->readFieldEnd();
+ }
+
+ xfer += iprot->readStructEnd();
+
+ return xfer;
+}
+
+int16_t XRpcTransportClient::post(const std::string& data)
+{
+ send_post(data);
+ return recv_post();
+}
+
+void XRpcTransportClient::send_post(const std::string& data)
+{
+ int32_t cseqid = 0;
+ oprot_->writeMessageBegin("post", ::apache::thrift::protocol::T_CALL, cseqid);
+
+ XRpcTransport_post_pargs args;
+ args.data = &data;
+ args.write(oprot_);
+
+ oprot_->writeMessageEnd();
+ oprot_->getTransport()->writeEnd();
+ oprot_->getTransport()->flush();
+}
+
+int16_t XRpcTransportClient::recv_post()
+{
+
+ int32_t rseqid = 0;
+ std::string fname;
+ ::apache::thrift::protocol::TMessageType mtype;
+
+ iprot_->readMessageBegin(fname, mtype, rseqid);
+ if (mtype == ::apache::thrift::protocol::T_EXCEPTION) {
+ ::apache::thrift::TApplicationException x;
+ x.read(iprot_);
+ iprot_->readMessageEnd();
+ iprot_->getTransport()->readEnd();
+ throw x;
+ }
+ if (mtype != ::apache::thrift::protocol::T_REPLY) {
+ iprot_->skip(::apache::thrift::protocol::T_STRUCT);
+ iprot_->readMessageEnd();
+ iprot_->getTransport()->readEnd();
+ }
+ if (fname.compare("post") != 0) {
+ iprot_->skip(::apache::thrift::protocol::T_STRUCT);
+ iprot_->readMessageEnd();
+ iprot_->getTransport()->readEnd();
+ }
+ int16_t _return;
+ XRpcTransport_post_presult result;
+ result.success = &_return;
+ result.read(iprot_);
+ iprot_->readMessageEnd();
+ iprot_->getTransport()->readEnd();
+
+ if (result.__isset.success) {
+ return _return;
+ }
+ throw ::apache::thrift::TApplicationException(::apache::thrift::TApplicationException::MISSING_RESULT, "post failed: unknown result");
+}
+
+bool XRpcTransportProcessor::dispatchCall(::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, const std::string& fname, int32_t seqid, void* callContext) {
+ ProcessMap::iterator pfn;
+ pfn = processMap_.find(fname);
+ if (pfn == processMap_.end()) {
+ iprot->skip(::apache::thrift::protocol::T_STRUCT);
+ iprot->readMessageEnd();
+ iprot->getTransport()->readEnd();
+ ::apache::thrift::TApplicationException x(::apache::thrift::TApplicationException::UNKNOWN_METHOD, "Invalid method name: '"+fname+"'");
+ oprot->writeMessageBegin(fname, ::apache::thrift::protocol::T_EXCEPTION, seqid);
+ x.write(oprot);
+ oprot->writeMessageEnd();
+ oprot->getTransport()->writeEnd();
+ oprot->getTransport()->flush();
+ return true;
+ }
+ (this->*(pfn->second))(seqid, iprot, oprot, callContext);
+ return true;
+}
+
+void XRpcTransportProcessor::process_post(int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, void* callContext)
+{
+ void* ctx = NULL;
+ if (this->eventHandler_.get() != NULL) {
+ ctx = this->eventHandler_->getContext("XRpcTransport.post", callContext);
+ }
+ ::apache::thrift::TProcessorContextFreer freer(this->eventHandler_.get(), ctx, "XRpcTransport.post");
+
+ if (this->eventHandler_.get() != NULL) {
+ this->eventHandler_->preRead(ctx, "XRpcTransport.post");
+ }
+
+ XRpcTransport_post_args args;
+ args.read(iprot);
+ iprot->readMessageEnd();
+ uint32_t bytes = iprot->getTransport()->readEnd();
+
+ if (this->eventHandler_.get() != NULL) {
+ this->eventHandler_->postRead(ctx, "XRpcTransport.post", bytes);
+ }
+
+ XRpcTransport_post_result result;
+ try {
+ result.success = iface_->post(args.data);
+ result.__isset.success = true;
+ } catch (const std::exception& e) {
+ if (this->eventHandler_.get() != NULL) {
+ this->eventHandler_->handlerError(ctx, "XRpcTransport.post");
+ }
+
+ ::apache::thrift::TApplicationException x(e.what());
+ oprot->writeMessageBegin("post", ::apache::thrift::protocol::T_EXCEPTION, seqid);
+ x.write(oprot);
+ oprot->writeMessageEnd();
+ oprot->getTransport()->writeEnd();
+ oprot->getTransport()->flush();
+ return;
+ }
+
+ if (this->eventHandler_.get() != NULL) {
+ this->eventHandler_->preWrite(ctx, "XRpcTransport.post");
+ }
+
+ oprot->writeMessageBegin("post", ::apache::thrift::protocol::T_REPLY, seqid);
+ result.write(oprot);
+ oprot->writeMessageEnd();
+ bytes = oprot->getTransport()->writeEnd();
+ oprot->getTransport()->flush();
+
+ if (this->eventHandler_.get() != NULL) {
+ this->eventHandler_->postWrite(ctx, "XRpcTransport.post", bytes);
+ }
+}
+
+::apache::thrift::stdcxx::shared_ptr< ::apache::thrift::TProcessor > XRpcTransportProcessorFactory::getProcessor(const ::apache::thrift::TConnectionInfo& connInfo) {
+ ::apache::thrift::ReleaseHandler< XRpcTransportIfFactory > cleanup(handlerFactory_);
+ ::apache::thrift::stdcxx::shared_ptr< XRpcTransportIf > handler(handlerFactory_->getHandler(connInfo), cleanup);
+ ::apache::thrift::stdcxx::shared_ptr< ::apache::thrift::TProcessor > processor(new XRpcTransportProcessor(handler));
+ return processor;
+}
+
+int16_t XRpcTransportConcurrentClient::post(const std::string& data)
+{
+ int32_t seqid = send_post(data);
+ return recv_post(seqid);
+}
+
+int32_t XRpcTransportConcurrentClient::send_post(const std::string& data)
+{
+ int32_t cseqid = this->sync_.generateSeqId();
+ ::apache::thrift::async::TConcurrentSendSentry sentry(&this->sync_);
+ oprot_->writeMessageBegin("post", ::apache::thrift::protocol::T_CALL, cseqid);
+
+ XRpcTransport_post_pargs args;
+ args.data = &data;
+ args.write(oprot_);
+
+ oprot_->writeMessageEnd();
+ oprot_->getTransport()->writeEnd();
+ oprot_->getTransport()->flush();
+
+ sentry.commit();
+ return cseqid;
+}
+
+int16_t XRpcTransportConcurrentClient::recv_post(const int32_t seqid)
+{
+
+ int32_t rseqid = 0;
+ std::string fname;
+ ::apache::thrift::protocol::TMessageType mtype;
+
+ // the read mutex gets dropped and reacquired as part of waitForWork()
+ // The destructor of this sentry wakes up other clients
+ ::apache::thrift::async::TConcurrentRecvSentry sentry(&this->sync_, seqid);
+
+ while(true) {
+ if(!this->sync_.getPending(fname, mtype, rseqid)) {
+ iprot_->readMessageBegin(fname, mtype, rseqid);
+ }
+ if(seqid == rseqid) {
+ if (mtype == ::apache::thrift::protocol::T_EXCEPTION) {
+ ::apache::thrift::TApplicationException x;
+ x.read(iprot_);
+ iprot_->readMessageEnd();
+ iprot_->getTransport()->readEnd();
+ sentry.commit();
+ throw x;
+ }
+ if (mtype != ::apache::thrift::protocol::T_REPLY) {
+ iprot_->skip(::apache::thrift::protocol::T_STRUCT);
+ iprot_->readMessageEnd();
+ iprot_->getTransport()->readEnd();
+ }
+ if (fname.compare("post") != 0) {
+ iprot_->skip(::apache::thrift::protocol::T_STRUCT);
+ iprot_->readMessageEnd();
+ iprot_->getTransport()->readEnd();
+
+ // in a bad state, don't commit
+ using ::apache::thrift::protocol::TProtocolException;
+ throw TProtocolException(TProtocolException::INVALID_DATA);
+ }
+ int16_t _return;
+ XRpcTransport_post_presult result;
+ result.success = &_return;
+ result.read(iprot_);
+ iprot_->readMessageEnd();
+ iprot_->getTransport()->readEnd();
+
+ if (result.__isset.success) {
+ sentry.commit();
+ return _return;
+ }
+ // in a bad state, don't commit
+ throw ::apache::thrift::TApplicationException(::apache::thrift::TApplicationException::MISSING_RESULT, "post failed: unknown result");
+ }
+ // seqid != rseqid
+ this->sync_.updatePending(fname, mtype, rseqid);
+
+ // this will temporarily unlock the readMutex, and let other clients get work done
+ this->sync_.waitForWork(seqid);
+ } // end while(true)
+}
+
+}}} // namespace
+
diff --git a/veslibrary/ves_cpplibrary/src/lib/transport/gen-cpp/XRpcTransport.h b/veslibrary/ves_cpplibrary/src/lib/transport/gen-cpp/XRpcTransport.h
new file mode 100755
index 0000000..124702e
--- /dev/null
+++ b/veslibrary/ves_cpplibrary/src/lib/transport/gen-cpp/XRpcTransport.h
@@ -0,0 +1,296 @@
+/**
+ * Autogenerated by Thrift Compiler (0.12.0)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ * @generated
+ */
+#ifndef XRpcTransport_H
+#define XRpcTransport_H
+
+#include <thrift/TDispatchProcessor.h>
+#include <thrift/async/TConcurrentClientSyncInfo.h>
+#include "rpc_types.h"
+
+namespace vagt { namespace transport { namespace rpc {
+
+#ifdef _MSC_VER
+ #pragma warning( push )
+ #pragma warning (disable : 4250 ) //inheriting methods via dominance
+#endif
+
+class XRpcTransportIf {
+ public:
+ virtual ~XRpcTransportIf() {}
+ virtual int16_t post(const std::string& data) = 0;
+};
+
+class XRpcTransportIfFactory {
+ public:
+ typedef XRpcTransportIf Handler;
+
+ virtual ~XRpcTransportIfFactory() {}
+
+ virtual XRpcTransportIf* getHandler(const ::apache::thrift::TConnectionInfo& connInfo) = 0;
+ virtual void releaseHandler(XRpcTransportIf* /* handler */) = 0;
+};
+
+class XRpcTransportIfSingletonFactory : virtual public XRpcTransportIfFactory {
+ public:
+ XRpcTransportIfSingletonFactory(const ::apache::thrift::stdcxx::shared_ptr<XRpcTransportIf>& iface) : iface_(iface) {}
+ virtual ~XRpcTransportIfSingletonFactory() {}
+
+ virtual XRpcTransportIf* getHandler(const ::apache::thrift::TConnectionInfo&) {
+ return iface_.get();
+ }
+ virtual void releaseHandler(XRpcTransportIf* /* handler */) {}
+
+ protected:
+ ::apache::thrift::stdcxx::shared_ptr<XRpcTransportIf> iface_;
+};
+
+class XRpcTransportNull : virtual public XRpcTransportIf {
+ public:
+ virtual ~XRpcTransportNull() {}
+ int16_t post(const std::string& /* data */) {
+ int16_t _return = 0;
+ return _return;
+ }
+};
+
+typedef struct _XRpcTransport_post_args__isset {
+ _XRpcTransport_post_args__isset() : data(false) {}
+ bool data :1;
+} _XRpcTransport_post_args__isset;
+
+class XRpcTransport_post_args {
+ public:
+
+ XRpcTransport_post_args(const XRpcTransport_post_args&);
+ XRpcTransport_post_args& operator=(const XRpcTransport_post_args&);
+ XRpcTransport_post_args() : data() {
+ }
+
+ virtual ~XRpcTransport_post_args() throw();
+ std::string data;
+
+ _XRpcTransport_post_args__isset __isset;
+
+ void __set_data(const std::string& val);
+
+ bool operator == (const XRpcTransport_post_args & rhs) const
+ {
+ if (!(data == rhs.data))
+ return false;
+ return true;
+ }
+ bool operator != (const XRpcTransport_post_args &rhs) const {
+ return !(*this == rhs);
+ }
+
+ bool operator < (const XRpcTransport_post_args & ) const;
+
+ uint32_t read(::apache::thrift::protocol::TProtocol* iprot);
+ uint32_t write(::apache::thrift::protocol::TProtocol* oprot) const;
+
+};
+
+
+class XRpcTransport_post_pargs {
+ public:
+
+
+ virtual ~XRpcTransport_post_pargs() throw();
+ const std::string* data;
+
+ uint32_t write(::apache::thrift::protocol::TProtocol* oprot) const;
+
+};
+
+typedef struct _XRpcTransport_post_result__isset {
+ _XRpcTransport_post_result__isset() : success(false) {}
+ bool success :1;
+} _XRpcTransport_post_result__isset;
+
+class XRpcTransport_post_result {
+ public:
+
+ XRpcTransport_post_result(const XRpcTransport_post_result&);
+ XRpcTransport_post_result& operator=(const XRpcTransport_post_result&);
+ XRpcTransport_post_result() : success(0) {
+ }
+
+ virtual ~XRpcTransport_post_result() throw();
+ int16_t success;
+
+ _XRpcTransport_post_result__isset __isset;
+
+ void __set_success(const int16_t val);
+
+ bool operator == (const XRpcTransport_post_result & rhs) const
+ {
+ if (!(success == rhs.success))
+ return false;
+ return true;
+ }
+ bool operator != (const XRpcTransport_post_result &rhs) const {
+ return !(*this == rhs);
+ }
+
+ bool operator < (const XRpcTransport_post_result & ) const;
+
+ uint32_t read(::apache::thrift::protocol::TProtocol* iprot);
+ uint32_t write(::apache::thrift::protocol::TProtocol* oprot) const;
+
+};
+
+typedef struct _XRpcTransport_post_presult__isset {
+ _XRpcTransport_post_presult__isset() : success(false) {}
+ bool success :1;
+} _XRpcTransport_post_presult__isset;
+
+class XRpcTransport_post_presult {
+ public:
+
+
+ virtual ~XRpcTransport_post_presult() throw();
+ int16_t* success;
+
+ _XRpcTransport_post_presult__isset __isset;
+
+ uint32_t read(::apache::thrift::protocol::TProtocol* iprot);
+
+};
+
+class XRpcTransportClient : virtual public XRpcTransportIf {
+ public:
+ XRpcTransportClient(apache::thrift::stdcxx::shared_ptr< ::apache::thrift::protocol::TProtocol> prot) {
+ setProtocol(prot);
+ }
+ XRpcTransportClient(apache::thrift::stdcxx::shared_ptr< ::apache::thrift::protocol::TProtocol> iprot, apache::thrift::stdcxx::shared_ptr< ::apache::thrift::protocol::TProtocol> oprot) {
+ setProtocol(iprot,oprot);
+ }
+ private:
+ void setProtocol(apache::thrift::stdcxx::shared_ptr< ::apache::thrift::protocol::TProtocol> prot) {
+ setProtocol(prot,prot);
+ }
+ void setProtocol(apache::thrift::stdcxx::shared_ptr< ::apache::thrift::protocol::TProtocol> iprot, apache::thrift::stdcxx::shared_ptr< ::apache::thrift::protocol::TProtocol> oprot) {
+ piprot_=iprot;
+ poprot_=oprot;
+ iprot_ = iprot.get();
+ oprot_ = oprot.get();
+ }
+ public:
+ apache::thrift::stdcxx::shared_ptr< ::apache::thrift::protocol::TProtocol> getInputProtocol() {
+ return piprot_;
+ }
+ apache::thrift::stdcxx::shared_ptr< ::apache::thrift::protocol::TProtocol> getOutputProtocol() {
+ return poprot_;
+ }
+ int16_t post(const std::string& data);
+ void send_post(const std::string& data);
+ int16_t recv_post();
+ protected:
+ apache::thrift::stdcxx::shared_ptr< ::apache::thrift::protocol::TProtocol> piprot_;
+ apache::thrift::stdcxx::shared_ptr< ::apache::thrift::protocol::TProtocol> poprot_;
+ ::apache::thrift::protocol::TProtocol* iprot_;
+ ::apache::thrift::protocol::TProtocol* oprot_;
+};
+
+class XRpcTransportProcessor : public ::apache::thrift::TDispatchProcessor {
+ protected:
+ ::apache::thrift::stdcxx::shared_ptr<XRpcTransportIf> iface_;
+ virtual bool dispatchCall(::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, const std::string& fname, int32_t seqid, void* callContext);
+ private:
+ typedef void (XRpcTransportProcessor::*ProcessFunction)(int32_t, ::apache::thrift::protocol::TProtocol*, ::apache::thrift::protocol::TProtocol*, void*);
+ typedef std::map<std::string, ProcessFunction> ProcessMap;
+ ProcessMap processMap_;
+ void process_post(int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, void* callContext);
+ public:
+ XRpcTransportProcessor(::apache::thrift::stdcxx::shared_ptr<XRpcTransportIf> iface) :
+ iface_(iface) {
+ processMap_["post"] = &XRpcTransportProcessor::process_post;
+ }
+
+ virtual ~XRpcTransportProcessor() {}
+};
+
+class XRpcTransportProcessorFactory : public ::apache::thrift::TProcessorFactory {
+ public:
+ XRpcTransportProcessorFactory(const ::apache::thrift::stdcxx::shared_ptr< XRpcTransportIfFactory >& handlerFactory) :
+ handlerFactory_(handlerFactory) {}
+
+ ::apache::thrift::stdcxx::shared_ptr< ::apache::thrift::TProcessor > getProcessor(const ::apache::thrift::TConnectionInfo& connInfo);
+
+ protected:
+ ::apache::thrift::stdcxx::shared_ptr< XRpcTransportIfFactory > handlerFactory_;
+};
+
+class XRpcTransportMultiface : virtual public XRpcTransportIf {
+ public:
+ XRpcTransportMultiface(std::vector<apache::thrift::stdcxx::shared_ptr<XRpcTransportIf> >& ifaces) : ifaces_(ifaces) {
+ }
+ virtual ~XRpcTransportMultiface() {}
+ protected:
+ std::vector<apache::thrift::stdcxx::shared_ptr<XRpcTransportIf> > ifaces_;
+ XRpcTransportMultiface() {}
+ void add(::apache::thrift::stdcxx::shared_ptr<XRpcTransportIf> iface) {
+ ifaces_.push_back(iface);
+ }
+ public:
+ int16_t post(const std::string& data) {
+ size_t sz = ifaces_.size();
+ size_t i = 0;
+ for (; i < (sz - 1); ++i) {
+ ifaces_[i]->post(data);
+ }
+ return ifaces_[i]->post(data);
+ }
+
+};
+
+// The 'concurrent' client is a thread safe client that correctly handles
+// out of order responses. It is slower than the regular client, so should
+// only be used when you need to share a connection among multiple threads
+class XRpcTransportConcurrentClient : virtual public XRpcTransportIf {
+ public:
+ XRpcTransportConcurrentClient(apache::thrift::stdcxx::shared_ptr< ::apache::thrift::protocol::TProtocol> prot) {
+ setProtocol(prot);
+ }
+ XRpcTransportConcurrentClient(apache::thrift::stdcxx::shared_ptr< ::apache::thrift::protocol::TProtocol> iprot, apache::thrift::stdcxx::shared_ptr< ::apache::thrift::protocol::TProtocol> oprot) {
+ setProtocol(iprot,oprot);
+ }
+ private:
+ void setProtocol(apache::thrift::stdcxx::shared_ptr< ::apache::thrift::protocol::TProtocol> prot) {
+ setProtocol(prot,prot);
+ }
+ void setProtocol(apache::thrift::stdcxx::shared_ptr< ::apache::thrift::protocol::TProtocol> iprot, apache::thrift::stdcxx::shared_ptr< ::apache::thrift::protocol::TProtocol> oprot) {
+ piprot_=iprot;
+ poprot_=oprot;
+ iprot_ = iprot.get();
+ oprot_ = oprot.get();
+ }
+ public:
+ apache::thrift::stdcxx::shared_ptr< ::apache::thrift::protocol::TProtocol> getInputProtocol() {
+ return piprot_;
+ }
+ apache::thrift::stdcxx::shared_ptr< ::apache::thrift::protocol::TProtocol> getOutputProtocol() {
+ return poprot_;
+ }
+ int16_t post(const std::string& data);
+ int32_t send_post(const std::string& data);
+ int16_t recv_post(const int32_t seqid);
+ protected:
+ apache::thrift::stdcxx::shared_ptr< ::apache::thrift::protocol::TProtocol> piprot_;
+ apache::thrift::stdcxx::shared_ptr< ::apache::thrift::protocol::TProtocol> poprot_;
+ ::apache::thrift::protocol::TProtocol* iprot_;
+ ::apache::thrift::protocol::TProtocol* oprot_;
+ ::apache::thrift::async::TConcurrentClientSyncInfo sync_;
+};
+
+#ifdef _MSC_VER
+ #pragma warning( pop )
+#endif
+
+}}} // namespace
+
+#endif
diff --git a/veslibrary/ves_cpplibrary/src/lib/transport/gen-cpp/rpc_constants.cpp b/veslibrary/ves_cpplibrary/src/lib/transport/gen-cpp/rpc_constants.cpp
new file mode 100755
index 0000000..8fa4367
--- /dev/null
+++ b/veslibrary/ves_cpplibrary/src/lib/transport/gen-cpp/rpc_constants.cpp
@@ -0,0 +1,17 @@
+/**
+ * Autogenerated by Thrift Compiler (0.12.0)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ * @generated
+ */
+#include "rpc_constants.h"
+
+namespace vagt { namespace transport { namespace rpc {
+
+const rpcConstants g_rpc_constants;
+
+rpcConstants::rpcConstants() {
+}
+
+}}} // namespace
+
diff --git a/veslibrary/ves_cpplibrary/src/lib/transport/gen-cpp/rpc_constants.h b/veslibrary/ves_cpplibrary/src/lib/transport/gen-cpp/rpc_constants.h
new file mode 100755
index 0000000..56fde2a
--- /dev/null
+++ b/veslibrary/ves_cpplibrary/src/lib/transport/gen-cpp/rpc_constants.h
@@ -0,0 +1,24 @@
+/**
+ * Autogenerated by Thrift Compiler (0.12.0)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ * @generated
+ */
+#ifndef rpc_CONSTANTS_H
+#define rpc_CONSTANTS_H
+
+#include "rpc_types.h"
+
+namespace vagt { namespace transport { namespace rpc {
+
+class rpcConstants {
+ public:
+ rpcConstants();
+
+};
+
+extern const rpcConstants g_rpc_constants;
+
+}}} // namespace
+
+#endif
diff --git a/veslibrary/ves_cpplibrary/src/lib/transport/gen-cpp/rpc_types.cpp b/veslibrary/ves_cpplibrary/src/lib/transport/gen-cpp/rpc_types.cpp
new file mode 100755
index 0000000..02a866e
--- /dev/null
+++ b/veslibrary/ves_cpplibrary/src/lib/transport/gen-cpp/rpc_types.cpp
@@ -0,0 +1,16 @@
+/**
+ * Autogenerated by Thrift Compiler (0.12.0)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ * @generated
+ */
+#include "rpc_types.h"
+
+#include <algorithm>
+#include <ostream>
+
+#include <thrift/TToString.h>
+
+namespace vagt { namespace transport { namespace rpc {
+
+}}} // namespace
diff --git a/veslibrary/ves_cpplibrary/src/lib/transport/gen-cpp/rpc_types.h b/veslibrary/ves_cpplibrary/src/lib/transport/gen-cpp/rpc_types.h
new file mode 100755
index 0000000..7ace455
--- /dev/null
+++ b/veslibrary/ves_cpplibrary/src/lib/transport/gen-cpp/rpc_types.h
@@ -0,0 +1,25 @@
+/**
+ * Autogenerated by Thrift Compiler (0.12.0)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ * @generated
+ */
+#ifndef rpc_TYPES_H
+#define rpc_TYPES_H
+
+#include <iosfwd>
+
+#include <thrift/Thrift.h>
+#include <thrift/TApplicationException.h>
+#include <thrift/TBase.h>
+#include <thrift/protocol/TProtocol.h>
+#include <thrift/transport/TTransport.h>
+
+#include <thrift/stdcxx.h>
+
+
+namespace vagt { namespace transport { namespace rpc {
+
+}}} // namespace
+
+#endif
diff --git a/veslibrary/ves_cpplibrary/src/lib/transport/include/XQueue.h b/veslibrary/ves_cpplibrary/src/lib/transport/include/XQueue.h
new file mode 100755
index 0000000..c2a08b6
--- /dev/null
+++ b/veslibrary/ves_cpplibrary/src/lib/transport/include/XQueue.h
@@ -0,0 +1,40 @@
+#pragma once
+
+#include <string>
+#include <memory>
+
+namespace vagt
+{
+ namespace queue
+ {
+ /*************************************************************************************************//**
+ * @brief Error codes
+ *****************************************************************************************************/
+ enum XErrorCode : unsigned short
+ {
+ XErrorOk, /**< The operation is successful */
+ XErrorNok, /**< General failure */
+ XErrorFull, /**< The buffer is full */
+ XErrorEmpty, /**< The buffer is empty */
+ };
+
+ class XQueue
+ {
+ public:
+ virtual bool empty() = 0;
+ virtual XErrorCode push(const std::string& val) = 0;
+ virtual void pop() = 0;
+ virtual std::string front() = 0;
+
+ /*************************************************************************************************//**
+ * Create a queue in memory with specified capacity.
+ *****************************************************************************************************/
+ static std::shared_ptr<XQueue> create(int capacity);
+
+ /*************************************************************************************************//**
+ * Create a queue on disk.
+ *****************************************************************************************************/
+ static std::shared_ptr<XQueue> create(const std::string& path);
+ };
+ }
+} \ No newline at end of file
diff --git a/veslibrary/ves_cpplibrary/src/lib/transport/include/XTransport.h b/veslibrary/ves_cpplibrary/src/lib/transport/include/XTransport.h
new file mode 100755
index 0000000..9416cf4
--- /dev/null
+++ b/veslibrary/ves_cpplibrary/src/lib/transport/include/XTransport.h
@@ -0,0 +1,151 @@
+#pragma once
+/*************************************************************************************************//**
+@file XTransport.h
+@brief Transport API, used to send string to collector
+*****************************************************************************************************/
+#include <memory>
+#include <atomic>
+#include <string>
+#include <vector>
+#include <chrono>
+#include "XQueue.h"
+
+namespace vagt
+{
+ namespace transport
+ {
+ /*************************************************************************************************//**
+ * @brief Error codes
+ *****************************************************************************************************/
+ enum XErrorCode: unsigned short
+ {
+ XErrorOk, /**< The operation is successful */
+ XErrorNok, /**< General failure */
+ XErrorTimeout, /**< Timeout */
+ XErrorUnauthorized, /**< Unauthorized */
+ XErrorCanceled, /**< The operation is canceled */
+ XErrorClientError, /**< Client error, e.g. HTTP 404 */
+ XErrorServerError, /**< Server error, e.g. HTTP 500 */
+ XErrorNetworkError, /**< Network error */
+ };
+
+ /*************************************************************************************************//**
+ * @brief Transport option
+ *****************************************************************************************************/
+ class XTransportOption
+ {
+ public:
+ std::string host_; /**< Remote RPC server hostname or IP address*/
+ int port_; /**< Remote RPC server port */
+ std::chrono::milliseconds timeOut_; /**< Timeout */
+ std::string url_; /**< URL of Collector */
+ std::string sourceIp_; /**< Source IP Address */
+ bool secure_; /**< Enable TLS or not */
+ std::string caInfo_; /**< CA INFO for TLS */
+ std::string caFilePath_; /**< CA pATH for TLS */
+ std::string certFilePath_; /**< CERT for TLS */
+ std::string keyFilePath_; /**< KEY for TLS */
+ long verifyPeer_; /**< Refer to CURLOPT_SSL_VERIFYPEER */
+ long verifyHost_; /**< Refer to CURLOPT_SSL_VERIFYHOST */
+ std::string userName_; /**< User name */
+ std::string userPasswd_; /**< Password */
+
+ XTransportOption():secure_(false),verifyPeer_(1),verifyHost_(2)
+ {}
+ };
+
+ /*************************************************************************************************//**
+ * @brief Transport interface
+ *****************************************************************************************************/
+ class XTransport
+ {
+ public:
+ XTransport();
+
+ /*************************************************************************************************//**
+ * Start Transport and the decorated transports.
+ *
+ * @note This method will call allowPost().
+ *****************************************************************************************************/
+ virtual XErrorCode start() = 0;
+
+ /*************************************************************************************************//**
+ * Stop Transport and the decorated transports.
+ *
+ * @note This method will call cancelPost().
+ *****************************************************************************************************/
+ virtual XErrorCode stop() = 0;
+
+ /*************************************************************************************************//**
+ * Post event string.
+ *****************************************************************************************************/
+ virtual XErrorCode post(const std::string& event) = 0;
+
+ /*************************************************************************************************//**
+ * Cancel the running and subsequent post() until allowPost() is called.
+ * It is useful especially when transport is wrapped by SynchronizedTransport.
+ *****************************************************************************************************/
+ virtual void cancelPost();
+
+ /*************************************************************************************************//**
+ * Allow the post() method.
+ *****************************************************************************************************/
+ virtual void allowPost();
+
+ /*************************************************************************************************//**
+ * Check if the post() method will be canceled.
+ *****************************************************************************************************/
+ virtual bool shouldCancelPost();
+
+ /*************************************************************************************************//**
+ * Create a libcurl based transport.
+ *****************************************************************************************************/
+ static std::shared_ptr<XTransport> LibCurlTransport(const XTransportOption& option);
+
+ /*************************************************************************************************//**
+ * Create a retry transport decorator.
+ * The retry will be performed if the decorated post() does NOT return
+ * XErrorOk, XErrorCanceled or XErrorClientError.
+ *****************************************************************************************************/
+ static std::shared_ptr<XTransport> RetryTransport(std::shared_ptr<XTransport> transport,
+ std::chrono::milliseconds retryInterval,
+ int retryTimes);
+
+ /*************************************************************************************************//**
+ * Create a switchable transport decorator.
+ * The transport will be switched if the decorated post() does NOT return
+ * XErrorOk, XErrorCanceled or XErrorClientError.
+ *****************************************************************************************************/
+ static std::shared_ptr<XTransport> SwitchableTransport(std::vector<std::shared_ptr<XTransport>>& transports);
+
+ /*************************************************************************************************//**
+ * Create a synchronized transport decorator.
+ * The decorated transport will be thread safe.
+ *****************************************************************************************************/
+ static std::shared_ptr<XTransport> SynchronizedTransport(std::shared_ptr<XTransport> transport);
+
+ /*************************************************************************************************//**
+ * Create a bufferd(FIFO) transport decorator.
+ * The event string will be buffered first and then post via decorated transport.
+ *****************************************************************************************************/
+ static std::shared_ptr<XTransport> BufferedTransport(std::shared_ptr<XTransport> transport, std::shared_ptr<vagt::queue::XQueue> queue);
+
+ /*************************************************************************************************//**
+ * Create a RPC client transport.
+ *****************************************************************************************************/
+ static std::shared_ptr<XTransport> RpcClientTransport(const XTransportOption& option);
+
+ /*************************************************************************************************//**
+ * Create a RPC server transport.
+ *****************************************************************************************************/
+ static std::shared_ptr<XTransport> RpcServerTransport(std::shared_ptr<XTransport> transport, const XTransportOption& option);
+
+ protected:
+ std::atomic<bool> cancel_;
+
+ private:
+ XTransport(const XTransport&) = delete;
+ XTransport& operator=(const XTransport&) = delete;
+ };
+ }
+}
diff --git a/veslibrary/ves_cpplibrary/src/lib/transport/rpc.idl b/veslibrary/ves_cpplibrary/src/lib/transport/rpc.idl
new file mode 100755
index 0000000..de4979b
--- /dev/null
+++ b/veslibrary/ves_cpplibrary/src/lib/transport/rpc.idl
@@ -0,0 +1,6 @@
+namespace cpp vagt.transport.rpc
+
+service XRpcTransport
+{
+ i16 post(1: string data)
+} \ No newline at end of file