From 7966a4012c0eb263936e1b284801a79c5cb607a7 Mon Sep 17 00:00:00 2001 From: Yatian XU Date: Tue, 10 Sep 2019 13:51:48 +0800 Subject: Contribute C++ implement of VES spec 7.0.1 to ONAP/vnfsdk: Part4: transport library. Issue-ID: VNFSDK-466 Signed-off-by: Yatian XU Change-Id: Icde7b8dad91002a8208ff0d058a44c78aa910cd7 --- .../src/lib/transport/CMakeLists.txt | 30 ++ .../src/lib/transport/XBufferedTransport.cpp | 170 +++++++++ .../src/lib/transport/XBufferedTransport.h | 39 ++ .../src/lib/transport/XDiskQueue.cpp | 192 ++++++++++ .../ves_cpplibrary/src/lib/transport/XDiskQueue.h | 34 ++ .../src/lib/transport/XLibcurlTransport.cpp | 376 ++++++++++++++++++ .../src/lib/transport/XLibcurlTransport.h | 41 ++ .../ves_cpplibrary/src/lib/transport/XMemQueue.cpp | 46 +++ .../ves_cpplibrary/src/lib/transport/XMemQueue.h | 24 ++ .../ves_cpplibrary/src/lib/transport/XQueue.cpp | 15 + .../src/lib/transport/XRetryTransport.cpp | 156 ++++++++ .../src/lib/transport/XRetryTransport.h | 43 +++ .../src/lib/transport/XRpcClientTransport.cpp | 183 +++++++++ .../src/lib/transport/XRpcClientTransport.h | 38 ++ .../src/lib/transport/XRpcServertTransport.cpp | 222 +++++++++++ .../src/lib/transport/XRpcServertTransport.h | 39 ++ .../src/lib/transport/XSwitchableTransport.cpp | 172 +++++++++ .../src/lib/transport/XSwitchableTransport.h | 37 ++ .../src/lib/transport/XSynchronizedTransport.cpp | 115 ++++++ .../src/lib/transport/XSynchronizedTransport.h | 41 ++ .../src/lib/transport/XTransport.cpp | 64 ++++ .../src/lib/transport/gen-cpp/XRpcTransport.cpp | 421 +++++++++++++++++++++ .../src/lib/transport/gen-cpp/XRpcTransport.h | 296 +++++++++++++++ .../src/lib/transport/gen-cpp/rpc_constants.cpp | 17 + .../src/lib/transport/gen-cpp/rpc_constants.h | 24 ++ .../src/lib/transport/gen-cpp/rpc_types.cpp | 16 + .../src/lib/transport/gen-cpp/rpc_types.h | 25 ++ .../src/lib/transport/include/XQueue.h | 40 ++ .../src/lib/transport/include/XTransport.h | 151 ++++++++ .../ves_cpplibrary/src/lib/transport/rpc.idl | 6 + 30 files changed, 3073 insertions(+) create mode 100755 veslibrary/ves_cpplibrary/src/lib/transport/CMakeLists.txt create mode 100755 veslibrary/ves_cpplibrary/src/lib/transport/XBufferedTransport.cpp create mode 100755 veslibrary/ves_cpplibrary/src/lib/transport/XBufferedTransport.h create mode 100755 veslibrary/ves_cpplibrary/src/lib/transport/XDiskQueue.cpp create mode 100755 veslibrary/ves_cpplibrary/src/lib/transport/XDiskQueue.h create mode 100755 veslibrary/ves_cpplibrary/src/lib/transport/XLibcurlTransport.cpp create mode 100755 veslibrary/ves_cpplibrary/src/lib/transport/XLibcurlTransport.h create mode 100755 veslibrary/ves_cpplibrary/src/lib/transport/XMemQueue.cpp create mode 100755 veslibrary/ves_cpplibrary/src/lib/transport/XMemQueue.h create mode 100755 veslibrary/ves_cpplibrary/src/lib/transport/XQueue.cpp create mode 100755 veslibrary/ves_cpplibrary/src/lib/transport/XRetryTransport.cpp create mode 100755 veslibrary/ves_cpplibrary/src/lib/transport/XRetryTransport.h create mode 100755 veslibrary/ves_cpplibrary/src/lib/transport/XRpcClientTransport.cpp create mode 100755 veslibrary/ves_cpplibrary/src/lib/transport/XRpcClientTransport.h create mode 100755 veslibrary/ves_cpplibrary/src/lib/transport/XRpcServertTransport.cpp create mode 100755 veslibrary/ves_cpplibrary/src/lib/transport/XRpcServertTransport.h create mode 100755 veslibrary/ves_cpplibrary/src/lib/transport/XSwitchableTransport.cpp create mode 100755 veslibrary/ves_cpplibrary/src/lib/transport/XSwitchableTransport.h create mode 100755 veslibrary/ves_cpplibrary/src/lib/transport/XSynchronizedTransport.cpp create mode 100755 veslibrary/ves_cpplibrary/src/lib/transport/XSynchronizedTransport.h create mode 100755 veslibrary/ves_cpplibrary/src/lib/transport/XTransport.cpp create mode 100755 veslibrary/ves_cpplibrary/src/lib/transport/gen-cpp/XRpcTransport.cpp create mode 100755 veslibrary/ves_cpplibrary/src/lib/transport/gen-cpp/XRpcTransport.h create mode 100755 veslibrary/ves_cpplibrary/src/lib/transport/gen-cpp/rpc_constants.cpp create mode 100755 veslibrary/ves_cpplibrary/src/lib/transport/gen-cpp/rpc_constants.h create mode 100755 veslibrary/ves_cpplibrary/src/lib/transport/gen-cpp/rpc_types.cpp create mode 100755 veslibrary/ves_cpplibrary/src/lib/transport/gen-cpp/rpc_types.h create mode 100755 veslibrary/ves_cpplibrary/src/lib/transport/include/XQueue.h create mode 100755 veslibrary/ves_cpplibrary/src/lib/transport/include/XTransport.h create mode 100755 veslibrary/ves_cpplibrary/src/lib/transport/rpc.idl diff --git a/veslibrary/ves_cpplibrary/src/lib/transport/CMakeLists.txt b/veslibrary/ves_cpplibrary/src/lib/transport/CMakeLists.txt new file mode 100755 index 0000000..59e42f4 --- /dev/null +++ b/veslibrary/ves_cpplibrary/src/lib/transport/CMakeLists.txt @@ -0,0 +1,30 @@ +aux_source_directory(../common LOG_SRCS) +aux_source_directory(. TRANSPORT_SRCS) +aux_source_directory(gen-cpp RPC_SRCS) + +find_package(nlohmann_json) +find_package(spdlog) +find_package(leveldb) +find_package(thrift) +find_package(CURL) + +include_directories("${THRIFT_INCLUDE_DIR}") +include_directories("${LEVELDB_INCLUDE_DIR}") +include_directories(include) + +SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fpermissive") + +add_library(xtransport SHARED ${LOG_SRCS} ${RPC_SRCS} ${TRANSPORT_SRCS}) +target_link_libraries(xtransport ${CURL_LIBRARY} ${THRIFT_LIBRARY} ${LEVELDB_LIBRARY}) + +file(GLOB HDRS "include/*.h") + +install(FILES ${HDRS} + DESTINATION "include/xvesagent/xtransport" +) +install(FILES ../../cmake/xtransportConfig.cmake + DESTINATION "lib/cmake/xtransport" +) +INSTALL(TARGETS xtransport + LIBRARY DESTINATION lib +) diff --git a/veslibrary/ves_cpplibrary/src/lib/transport/XBufferedTransport.cpp b/veslibrary/ves_cpplibrary/src/lib/transport/XBufferedTransport.cpp new file mode 100755 index 0000000..6abce48 --- /dev/null +++ b/veslibrary/ves_cpplibrary/src/lib/transport/XBufferedTransport.cpp @@ -0,0 +1,170 @@ +#include "XBufferedTransport.h" +#ifndef SPDLOG_ACTIVE_LEVEL +#define SPDLOG_ACTIVE_LEVEL SPDLOG_LEVEL_TRACE +#endif +#include "spdlog/spdlog.h" + +using namespace std; +using namespace vagt::transport; +using namespace spdlog; + +vagt::transport::XBufferedTransport::XBufferedTransport(std::shared_ptr transport, std::shared_ptr queue): + transport_(transport), queue_(queue) +{ +} + +vagt::transport::XBufferedTransport::~XBufferedTransport() +{ +} + +XErrorCode vagt::transport::XBufferedTransport::start() +{ + if (!transport_) + { + SPDLOG_ERROR("Empty transport."); + return XErrorNok; + } + + if (!queue_) + { + SPDLOG_ERROR("Empty queue."); + return XErrorNok; + } + + auto rc = transport_->start(); + + future_ = async(launch::async, [this]() {return this->worker(); }); + + allowPost(); + + return rc; +} + +XErrorCode vagt::transport::XBufferedTransport::stop() +{ + cancelPost(); + if (!transport_) + { + SPDLOG_ERROR("Empty transport."); + return XErrorNok; + } + + if (!queue_) + { + SPDLOG_ERROR("Empty queue."); + return XErrorNok; + } + + future_.get(); + + return transport_->stop(); +} + +XErrorCode vagt::transport::XBufferedTransport::post(const std::string& event) +{ + if (shouldCancelPost()) + { + return XErrorCanceled; + } + + if (event.empty()) + { + SPDLOG_WARN("Trying post empty event."); + return XErrorClientError; + } + + if (!queue_) + { + return XErrorNok; + } + + auto rc = vagt::queue::XErrorOk; + { + unique_lock _(lock_); + rc = queue_->push(event); + cond_.notify_one(); + } + + if (rc == vagt::queue::XErrorOk) + { + return XErrorOk; + } + else + { + return XErrorNok; + } +} + +void vagt::transport::XBufferedTransport::cancelPost() +{ + XTransport::cancelPost(); + + { + unique_lock _(lock_); + cancel_.store(true, std::memory_order_release); + cond_.notify_one(); + } + + if (transport_) + { + transport_->cancelPost(); + } +} + +void vagt::transport::XBufferedTransport::allowPost() +{ + if (transport_) + { + transport_->allowPost(); + } + XTransport::allowPost(); +} + +bool vagt::transport::XBufferedTransport::shouldCancelPost() +{ + return XTransport::shouldCancelPost(); +} + +void vagt::transport::XBufferedTransport::worker() +{ + SPDLOG_INFO("Start transport send thread."); + while (true) + { + string event; + { + unique_lock _(lock_); + cond_.wait(_, [this]() {return shouldCancelPost() || !queue_->empty(); }); + + if (shouldCancelPost()) + { + SPDLOG_INFO("Quit transport send thread."); + return; + } + + if (!queue_->empty()) + { + event = queue_->front(); + } + else + { + continue; + } + } + + if (transport_) + { + auto rc = transport_->post(event); + if (vagt::transport::XErrorOk == rc) + { + unique_lock _(lock_); + queue_->pop(); + } + else if (vagt::transport::XErrorClientError == rc) + { + unique_lock _(lock_); + queue_->pop(); + SPDLOG_WARN("Drop event:({}) ({}).", event, rc); + } + } + } +} diff --git a/veslibrary/ves_cpplibrary/src/lib/transport/XBufferedTransport.h b/veslibrary/ves_cpplibrary/src/lib/transport/XBufferedTransport.h new file mode 100755 index 0000000..af51f64 --- /dev/null +++ b/veslibrary/ves_cpplibrary/src/lib/transport/XBufferedTransport.h @@ -0,0 +1,39 @@ +#pragma once + +#include "XTransport.h" +#include "XQueue.h" +#include +#include +#include + +namespace vagt +{ + namespace transport + { + class XBufferedTransport : public XTransport + { + public: + XBufferedTransport(std::shared_ptr transport, std::shared_ptr queue); + virtual ~XBufferedTransport(); + + virtual XErrorCode start() override; + virtual XErrorCode stop() override; + virtual XErrorCode post(const std::string& event) override; + + virtual void cancelPost() override; + virtual void allowPost() override; + virtual bool shouldCancelPost() override; + + private: + void worker(); + + std::shared_ptr transport_; + std::shared_ptr queue_; + std::future future_; + + std::condition_variable cond_; + std::mutex lock_; + }; + } +} + diff --git a/veslibrary/ves_cpplibrary/src/lib/transport/XDiskQueue.cpp b/veslibrary/ves_cpplibrary/src/lib/transport/XDiskQueue.cpp new file mode 100755 index 0000000..c63d19e --- /dev/null +++ b/veslibrary/ves_cpplibrary/src/lib/transport/XDiskQueue.cpp @@ -0,0 +1,192 @@ +#include "XDiskQueue.h" +#ifndef SPDLOG_ACTIVE_LEVEL +#define SPDLOG_ACTIVE_LEVEL SPDLOG_LEVEL_TRACE +#endif +#include "spdlog/spdlog.h" +#include "leveldb/env.h" + +using namespace std; +using namespace vagt::queue; +using namespace leveldb; + +constexpr auto XDiskQueueCompactInterval = 1800; + +class XDummyLogger : public leveldb::Logger +{ + void Logv(const char* format, va_list ap) {} +}; + +vagt::queue::XDiskQueue::XDiskQueue(const std::string & path):db_(nullptr),it_(nullptr),keyId_(0) +{ + compactTime_ = chrono::system_clock::now(); + + Options opt; + opt.create_if_missing = true; + opt.write_buffer_size = 4 * 1024 * 1024; + opt.max_open_files = 10; + opt.info_log = new XDummyLogger; + + opt_.fill_cache = true; + + Status status = DB::Open(opt, path, &db_); + if (status.ok() && db_ != nullptr) + { + SPDLOG_INFO("Disk queue is ready."); + return; + } + + SPDLOG_INFO("Repairing disk queue."); + status = RepairDB(path, opt); + if (status.ok()) + { + status = DB::Open(opt, path.c_str(), &db_); + if (status.ok() && db_ != nullptr) + { + SPDLOG_INFO("Disk queue is ready."); + return; + } + } + + SPDLOG_ERROR("Fail to initialize disk queue:{}.", status.ToString()); +} + +vagt::queue::XDiskQueue::~XDiskQueue() +{ + if (it_) + { + delete it_; + it_ = nullptr; + } + + if (db_) + { + delete db_; + db_ = nullptr; + } +} + +bool vagt::queue::XDiskQueue::empty() +{ + if (!it_) + { + it_ = db_->NewIterator(opt_); + it_->SeekToFirst(); + if (it_->Valid()) + { + return false; + } + else + { + return true; + } + } + + if (it_->Valid()) + { + return false; + } + + delete it_; + it_ = db_->NewIterator(opt_); + it_->SeekToFirst(); + if (it_->Valid()) + { + return false; + } + else + { + return true; + } +} + +XErrorCode vagt::queue::XDiskQueue::push(const std::string & val) +{ + auto key = createKey(); + SPDLOG_DEBUG("Push {} to disk queue:{}.", val, key); + + WriteOptions opt; + opt.sync = false; + auto status = db_->Put(opt, key, val); + if (!status.ok()) + { + SPDLOG_ERROR("Fail to push {} to disk queue:{}.", key, status.ToString()); + return XErrorNok; + } + + return XErrorOk; +} + +void vagt::queue::XDiskQueue::pop() +{ + if (!it_ || !it_->Valid()) + { + SPDLOG_ERROR("Iterator is not valid."); + return; + } + + auto delKey = it_->key().ToString(); + it_->Next(); + + WriteOptions wo; + wo.sync = false; + + auto status = db_->Delete(wo, delKey); + if (status.ok()) + { + SPDLOG_DEBUG("Pop {} from disk queue.", delKey); + + tryCompact(delKey); + + return; + } + else + { + SPDLOG_ERROR("Fail to pop {} from disk queue:{}.", delKey, status.ToString()); + return; + } +} + +std::string vagt::queue::XDiskQueue::front() +{ + if (it_ && it_->Valid()) + { + return it_->value().ToString(); + } + + SPDLOG_ERROR("Iterator is not valid."); + return ""; +} + +std::string vagt::queue::XDiskQueue::createKey() +{ + auto now = chrono::system_clock::now().time_since_epoch().count(); + snprintf(key, sizeof(key), "%020ld_%010u", now, keyId_.fetch_add(1)); + return key; +} + +void vagt::queue::XDiskQueue::tryCompact(const std::string & key) +{ + if (it_->Valid()) + { + return; + } + + delete it_; + it_ = nullptr; + + auto now = chrono::system_clock::now(); + auto duration = chrono::duration_cast(now - compactTime_).count(); + + if (duration > XDiskQueueCompactInterval && + db_ && + !key.empty()) + { + SPDLOG_INFO("Disk queue compaction starting..."); + Slice end(key); + SPDLOG_INFO("Compact key older than {}.", end.ToString()); + db_->CompactRange(NULL, &end); + SPDLOG_INFO("Disk queue compaction complete..."); + + compactTime_ = now; + } +} diff --git a/veslibrary/ves_cpplibrary/src/lib/transport/XDiskQueue.h b/veslibrary/ves_cpplibrary/src/lib/transport/XDiskQueue.h new file mode 100755 index 0000000..2168a58 --- /dev/null +++ b/veslibrary/ves_cpplibrary/src/lib/transport/XDiskQueue.h @@ -0,0 +1,34 @@ +#pragma once + +#include "XQueue.h" +#include "leveldb/db.h" +#include +#include + +namespace vagt +{ + namespace queue + { + class XDiskQueue : public XQueue + { + public: + XDiskQueue(const std::string& path); + ~XDiskQueue(); + virtual bool empty() override; + virtual XErrorCode push(const std::string & val) override; + virtual void pop() override; + virtual std::string front() override; + private: + std::string createKey(); + void tryCompact(const std::string& key); + + std::atomic keyId_; + char key[32]; + std::chrono::system_clock::time_point compactTime_; + + leveldb::DB* db_; + leveldb::ReadOptions opt_; + leveldb::Iterator* it_; + }; + } +} diff --git a/veslibrary/ves_cpplibrary/src/lib/transport/XLibcurlTransport.cpp b/veslibrary/ves_cpplibrary/src/lib/transport/XLibcurlTransport.cpp new file mode 100755 index 0000000..362e8d2 --- /dev/null +++ b/veslibrary/ves_cpplibrary/src/lib/transport/XLibcurlTransport.cpp @@ -0,0 +1,376 @@ +#include "XLibcurlTransport.h" +#ifndef SPDLOG_ACTIVE_LEVEL +#define SPDLOG_ACTIVE_LEVEL SPDLOG_LEVEL_TRACE +#endif +#include "spdlog/spdlog.h" +#include + +using namespace std; +using namespace vagt; +using namespace vagt::transport; +using namespace spdlog; + +class XAutoCleanUp +{ +public: + explicit XAutoCleanUp(function cleanup) :_cleanup(cleanup) + { + + } + ~XAutoCleanUp() + { + if (_cleanup) _cleanup(); + } +private: + XAutoCleanUp(const XAutoCleanUp&) = delete; + XAutoCleanUp& operator=(const XAutoCleanUp&) = delete; + function _cleanup; +}; + +char XLibcurlTransport::curlErrorBuf_[CURL_ERROR_SIZE] = { 0 }; + +struct XMemoryStuct +{ + char * memory; + size_t size; +}; + +XLibcurlTransport::XLibcurlTransport(const XTransportOption& option): + option_(option), + curl_(nullptr), + hdr_(nullptr) +{ +} + +XErrorCode XLibcurlTransport::start() +{ + auto rc = initLibcurl(); + + allowPost(); + + return rc; +} + +XErrorCode XLibcurlTransport::stop() +{ + cancelPost(); + return cleanupLibcurl(); +} + +XErrorCode XLibcurlTransport::post(const std::string& event) +{ + if (shouldCancelPost()) + { + return XErrorCanceled; + } + + if (event.empty()) + { + SPDLOG_WARN("Trying post empty event."); + return XErrorClientError; + } + return curlPost(event); +} + +XErrorCode XLibcurlTransport::initLibcurl() +{ + auto rc = curl_global_init(CURL_GLOBAL_SSL); + if (rc != CURLE_OK) + { + SPDLOG_ERROR("Fail to init libcurl:({}).", rc); + return XErrorNok; + } + + curl_ = curl_easy_init(); + if (!curl_) + { + SPDLOG_ERROR("Fail to get libcurl handle."); + return XErrorNok; + } + + rc = curl_easy_setopt(curl_, CURLOPT_ERRORBUFFER, curlErrorBuf_); + if (rc != CURLE_OK) + { + SPDLOG_ERROR("Fail to set libcurl error message buffer:({}).", rc); + return XErrorNok; + } + + rc = curl_easy_setopt(curl_, CURLOPT_URL, option_.url_.c_str()); + if (rc != CURLE_OK) + { + SPDLOG_ERROR("Fail to set libcurl url {}:({}) {}.", option_.url_, rc, curlErrorBuf_); + return XErrorNok; + } + + if (!option_.sourceIp_.empty()) + { + rc = curl_easy_setopt(curl_, CURLOPT_INTERFACE, option_.sourceIp_.c_str()); + if (rc != CURLE_OK) + { + SPDLOG_ERROR("Fail to bind interface {} to libcurl:({}) {}.", option_.sourceIp_, rc, curlErrorBuf_); + return XErrorNok; + } + } + + if (option_.secure_) + { + if (!option_.certFilePath_.empty()) + { + rc = curl_easy_setopt(curl_, CURLOPT_SSLCERT, option_.certFilePath_.c_str()); + if (rc != CURLE_OK) + { + SPDLOG_ERROR("Fail to init libcurl with client cert:({}) {}.", rc, curlErrorBuf_); + return XErrorNok; + } + } + + if (!option_.keyFilePath_.empty()) + { + rc = curl_easy_setopt(curl_, CURLOPT_SSLKEY, option_.keyFilePath_.c_str()); + if (rc != CURLE_OK) + { + SPDLOG_ERROR("Fail to init libcurl with client key:({}) {}.", rc, curlErrorBuf_); + return XErrorNok; + } + } + + if (!option_.caInfo_.empty()) + { + rc = curl_easy_setopt(curl_, CURLOPT_CAINFO, option_.caInfo_.c_str()); + if (rc != CURLE_OK) + { + SPDLOG_ERROR("Fail to init libcurl with CA cert:({}) {}.", rc, curlErrorBuf_); + return XErrorNok; + } + } + + if (!option_.caFilePath_.empty()) + { + rc = curl_easy_setopt(curl_, CURLOPT_CAPATH, option_.caFilePath_.c_str()); + if (rc != CURLE_OK) + { + SPDLOG_ERROR("Fail to init libcurl with CA cert path:({}) {}.", rc, curlErrorBuf_); + return XErrorNok; + } + } + + rc = curl_easy_setopt(curl_, CURLOPT_SSL_VERIFYPEER, option_.verifyPeer_); + if (rc != CURLE_OK) + { + SPDLOG_ERROR("Fail to init libcurl with SSL server verification:({}){}.", rc, curlErrorBuf_); + return XErrorNok; + } + + rc = curl_easy_setopt(curl_, CURLOPT_SSL_VERIFYHOST, option_.verifyHost_); + if (rc != CURLE_OK) + { + SPDLOG_ERROR("Fail to init libcurl with client host verification:({}) {}.", rc, curlErrorBuf_); + return XErrorNok; + } + + rc = curl_easy_setopt(curl_, CURLOPT_SSLVERSION, CURL_SSLVERSION_TLSv1_2); + if (rc != CURLE_OK) + { + SPDLOG_ERROR("Fail to init libcurl with SSL version:({}) {}.", rc, curlErrorBuf_); + return XErrorNok; + } + } + + rc = curl_easy_setopt(curl_, CURLOPT_USERAGENT, "libcurl-agent/1.0"); + if (rc != CURLE_OK) + { + SPDLOG_ERROR("Fail to init libcurl with user agent:({}) {}.", rc, curlErrorBuf_); + return XErrorNok; + } + + rc = curl_easy_setopt(curl_, CURLOPT_POST, 1L); + if (rc != CURLE_OK) + { + SPDLOG_ERROR("Fail to init libcurl with regular post:({}) {}.", rc, curlErrorBuf_); + return XErrorNok; + } + + rc = curl_easy_setopt(curl_, CURLOPT_WRITEFUNCTION, writeCallback); + if (rc != CURLE_OK) + { + SPDLOG_ERROR("Fail to init libcurl with write callback:({}) {}.", rc, curlErrorBuf_); + return XErrorNok; + } + + rc = curl_easy_setopt(curl_, CURLOPT_READFUNCTION, readCallback); + if (rc != CURLE_OK) + { + SPDLOG_ERROR("Fail to init libcurl with read callback:({}) {}.", rc, curlErrorBuf_); + return XErrorNok; + } + + hdr_ = curl_slist_append(hdr_, "Content-type: application/json"); + hdr_ = curl_slist_append(hdr_, "Expect:"); + + rc = curl_easy_setopt(curl_, CURLOPT_HTTPHEADER, hdr_); + if (rc != CURLE_OK) + { + SPDLOG_ERROR("Fail to init libcurl to use custom headers:({}) {}.", rc, curlErrorBuf_); + return XErrorNok; + } + + rc = curl_easy_setopt(curl_, CURLOPT_TIMEOUT, option_.timeOut_.count()); + if (rc != CURLE_OK) + { + SPDLOG_ERROR("Fail to init libcurl with timeout:({}) {}.", rc, curlErrorBuf_); + return XErrorNok; + } + + rc = curl_easy_setopt(curl_, CURLOPT_HTTPAUTH, CURLAUTH_BASIC); + if (rc != CURLE_OK) + { + SPDLOG_ERROR("Fail to init libcurl with Baisc Authentication:({}) {}.", rc, curlErrorBuf_); + return XErrorNok; + } + + rc = curl_easy_setopt(curl_, CURLOPT_USERNAME, option_.userName_.c_str()); + if (rc != CURLE_OK) + { + SPDLOG_ERROR("Fail to init libcurl with user name:({}) {}.", rc, curlErrorBuf_); + return XErrorNok; + } + + rc = curl_easy_setopt(curl_, CURLOPT_PASSWORD, option_.userPasswd_.c_str()); + if (rc != CURLE_OK) + { + SPDLOG_ERROR("Fail to init libcurl with user password:({}) {}.", rc, curlErrorBuf_); + return XErrorNok; + } + + return XErrorOk; +} + +XErrorCode XLibcurlTransport::cleanupLibcurl() +{ + if (curl_) + { + curl_easy_cleanup(curl_); + curl_ = nullptr; + } + + if (hdr_) + { + curl_slist_free_all(hdr_); + hdr_ = nullptr; + } + + curl_global_cleanup(); + + return XErrorOk; +} + +XErrorCode XLibcurlTransport::curlPost(const string & body) +{ + XMemoryStuct tMem; + XMemoryStuct rMem; + + tMem.memory = (char*)body.c_str(); + tMem.size = body.size(); + + rMem.size = 0; + rMem.memory = (char*)malloc(1); + + XAutoCleanUp cleanup([&rMem]() { if (rMem.memory) + { + free(rMem.memory); + rMem.memory = nullptr; + } + }); + + auto rc = curl_easy_setopt(curl_, CURLOPT_WRITEDATA, &rMem); + if (rc != CURLE_OK) + { + SPDLOG_ERROR("Fail to init libcurl with write data:({}) {}.", rc, curlErrorBuf_); + return XErrorNok; + } + + rc = curl_easy_setopt(curl_, CURLOPT_READDATA, &tMem); + if (rc != CURLE_OK) + { + SPDLOG_ERROR("Fail to init libcurl with read data:({}) {}.", rc, curlErrorBuf_); + return XErrorNok; + } + + rc = curl_easy_setopt(curl_, CURLOPT_POSTFIELDSIZE, tMem.size); + if (rc != CURLE_OK) + { + SPDLOG_ERROR("Fail to set length of post data for libcurl:({}) {}.", rc, curlErrorBuf_); + return XErrorNok; + } + + SPDLOG_DEBUG("Libcurl posting event:{}.", body); + rc = curl_easy_perform(curl_); + + if (rc != CURLE_OK) + { + SPDLOG_ERROR("Libcurl fails to post event:({}) {}.", rc, curlErrorBuf_); + if (rc == CURLE_OPERATION_TIMEDOUT) + { + return XErrorTimeout; + } + return XErrorNok; + } + + int httpResponseCode = 0; + curl_easy_getinfo(curl_, CURLINFO_RESPONSE_CODE, &httpResponseCode); + SPDLOG_DEBUG("Post event:{}, response code: ({}), response body:{}.", body, httpResponseCode, rMem.size>0 ? rMem.memory : "None"); + + if ((httpResponseCode / 100) == 2) + { + return XErrorOk; + } + + if ((httpResponseCode / 100) == 4) + { + return XErrorClientError; + } + + if ((httpResponseCode / 100) == 5) + { + return XErrorServerError; + } + + return XErrorNok; +} + +size_t XLibcurlTransport::readCallback(void * ptr, size_t size, size_t nmemb, void * userp) +{ + size_t rtn = 0; + size_t bytesToWrite = 0; + XMemoryStuct* tMem = (XMemoryStuct*)userp; + + bytesToWrite = std::min(size*nmemb, tMem->size); + + if (bytesToWrite > 0) + { + strncpy((char *)ptr, tMem->memory, bytesToWrite); + tMem->memory += bytesToWrite; + tMem->size -= bytesToWrite; + rtn = bytesToWrite; + } + return rtn; +} + +size_t XLibcurlTransport::writeCallback(void * ptr, size_t size, size_t nmemb, void * userp) +{ + size_t realSize = size * nmemb; + XMemoryStuct* rMem = (XMemoryStuct*)userp; + rMem->memory = (char*)realloc(rMem->memory, rMem->size + realSize + 1); + if (rMem->memory == nullptr) + { + SPDLOG_ERROR("No enough memory."); + return 0; + } + + memcpy(&(rMem->memory[rMem->size]), ptr, realSize); + rMem->size += realSize; + rMem->memory[rMem->size] = 0; + + return realSize; +} + diff --git a/veslibrary/ves_cpplibrary/src/lib/transport/XLibcurlTransport.h b/veslibrary/ves_cpplibrary/src/lib/transport/XLibcurlTransport.h new file mode 100755 index 0000000..544a5c4 --- /dev/null +++ b/veslibrary/ves_cpplibrary/src/lib/transport/XLibcurlTransport.h @@ -0,0 +1,41 @@ +#pragma once + +#include +#include +#include +#include +#include +#include "XTransport.h" + +namespace vagt +{ + namespace transport + { + class XLibcurlTransport: public XTransport + { + public: + XLibcurlTransport(const XTransportOption& option); + + virtual ~XLibcurlTransport() {} + + virtual XErrorCode start() override; + + virtual XErrorCode stop() override; + + virtual XErrorCode post(const std::string& event) override; + + private: + XErrorCode initLibcurl(); + XErrorCode cleanupLibcurl(); + XErrorCode curlPost(const std::string& body); + + static char curlErrorBuf_[CURL_ERROR_SIZE]; + static size_t readCallback(void *ptr, size_t size, size_t nmemb, void *userp); + static size_t writeCallback(void *ptr, size_t size, size_t nmemb, void *userp); + + XTransportOption option_; + CURL* curl_; + struct curl_slist* hdr_; + }; + } +} diff --git a/veslibrary/ves_cpplibrary/src/lib/transport/XMemQueue.cpp b/veslibrary/ves_cpplibrary/src/lib/transport/XMemQueue.cpp new file mode 100755 index 0000000..443ab40 --- /dev/null +++ b/veslibrary/ves_cpplibrary/src/lib/transport/XMemQueue.cpp @@ -0,0 +1,46 @@ +#include "XMemQueue.h" +#ifndef SPDLOG_ACTIVE_LEVEL +#define SPDLOG_ACTIVE_LEVEL SPDLOG_LEVEL_TRACE +#endif +#include "spdlog/spdlog.h" + +using namespace std; +using namespace vagt::queue; + +vagt::queue::XMemQueue::XMemQueue(int capacity):capacity_(capacity),size_(0) +{ + +} + +bool vagt::queue::XMemQueue::empty() +{ + return 0 == size_; +} + +XErrorCode vagt::queue::XMemQueue::push(const std::string & val) +{ + if (size_ >= capacity_) + { + queue_.pop(); + SPDLOG_WARN("Drop from queue."); + size_--; + } + + queue_.push(val); + size_++; + + return XErrorOk; +} + +void vagt::queue::XMemQueue::pop() +{ + //cal empty() before cal pop() + queue_.pop(); + size_--; +} + +std::string vagt::queue::XMemQueue::front() +{ + //cal empty() before cal pop() + return queue_.front(); +} diff --git a/veslibrary/ves_cpplibrary/src/lib/transport/XMemQueue.h b/veslibrary/ves_cpplibrary/src/lib/transport/XMemQueue.h new file mode 100755 index 0000000..0a1a651 --- /dev/null +++ b/veslibrary/ves_cpplibrary/src/lib/transport/XMemQueue.h @@ -0,0 +1,24 @@ +#pragma once + +#include "XQueue.h" +#include + +namespace vagt +{ + namespace queue + { + class XMemQueue: public XQueue + { + public: + XMemQueue(int capacity); + virtual bool empty() override; + virtual XErrorCode push(const std::string & val) override; + virtual void pop() override; + virtual std::string front() override; + private: + int capacity_; + int size_; + std::queue queue_; + }; + } +} diff --git a/veslibrary/ves_cpplibrary/src/lib/transport/XQueue.cpp b/veslibrary/ves_cpplibrary/src/lib/transport/XQueue.cpp new file mode 100755 index 0000000..a91de1e --- /dev/null +++ b/veslibrary/ves_cpplibrary/src/lib/transport/XQueue.cpp @@ -0,0 +1,15 @@ +#include "XQueue.h" +#include "XMemQueue.h" +#include "XDiskQueue.h" + +using namespace vagt::queue; + +std::shared_ptr vagt::queue::XQueue::create(int capacity) +{ + return std::make_shared(capacity); +} + +std::shared_ptr vagt::queue::XQueue::create(const std::string & path) +{ + return std::make_shared(path); +} diff --git a/veslibrary/ves_cpplibrary/src/lib/transport/XRetryTransport.cpp b/veslibrary/ves_cpplibrary/src/lib/transport/XRetryTransport.cpp new file mode 100755 index 0000000..90db0a9 --- /dev/null +++ b/veslibrary/ves_cpplibrary/src/lib/transport/XRetryTransport.cpp @@ -0,0 +1,156 @@ +#include "XRetryTransport.h" +#ifndef SPDLOG_ACTIVE_LEVEL +#define SPDLOG_ACTIVE_LEVEL SPDLOG_LEVEL_TRACE +#endif +#include "spdlog/spdlog.h" + +using namespace std; +using namespace vagt; +using namespace vagt::transport; +using namespace spdlog; + +XRetryTransport::XRetryTransport(std::shared_ptr transport, std::chrono::milliseconds retryInterval, int tryTimes): + transport_(transport), + retryInterval_(retryInterval), + tryTimes_(tryTimes), + isRetryRunning_(false) +{ +} + +XErrorCode XRetryTransport::start() +{ + if (!transport_) + { + SPDLOG_ERROR("Empty transport."); + return XErrorNok; + } + + auto rc = transport_->start(); + + allowPost(); + + return rc; +} + +XErrorCode XRetryTransport::stop() +{ + cancelPost(); + if (!transport_) + { + SPDLOG_ERROR("Empty transport."); + return XErrorNok; + } + + while (isRetryRunning_.load(std::memory_order_acquire)) + { + SPDLOG_DEBUG("Waiting for retry exit."); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + + return transport_->stop(); +} + +XErrorCode XRetryTransport::post(const std::string& event) +{ + if (shouldCancelPost()) + { + return XErrorCanceled; + } + + if (event.empty()) + { + SPDLOG_WARN("Trying post empty event."); + return XErrorClientError; + } + + if (!transport_) + { + SPDLOG_ERROR("Empty transport."); + return XErrorNok; + } + + XErrorCode rc = XErrorNok; + + isRetryRunning_.store(true, std::memory_order_release); + + int idx = 0; + do + { + SPDLOG_DEBUG("Posting event:{}.", event); + rc = transport_->post(event); + SPDLOG_DEBUG("Post event:{} ({}).", event, rc); + if (rc == XErrorOk || rc == XErrorCanceled) + { + isRetryRunning_.store(false, std::memory_order_release); + return rc; + } + else if (rc == XErrorClientError) + { + isRetryRunning_.store(false, std::memory_order_release); + SPDLOG_ERROR("Post event:({}).", rc); + return rc; + } + //else if (rc == XErrorTimeout || rc == XErrorServerError || rc == XErrorNetworkError) + else + { + ++idx; + if (shouldCancelPost() || idx>=tryTimes_) + { + break; + } + + SPDLOG_INFO("Stopping transport for retry:({}), time:({}).", rc, idx); + transport_->stop(); + { + unique_lock lk(lock_); + bool cancel = cond_.wait_for(lk, retryInterval_, [this]() { return this->cancel_.load(); }); + if (cancel) + { + isRetryRunning_.store(false, std::memory_order_release); + SPDLOG_INFO("Cancel post event:({}).", rc); + return rc; + } + } + + SPDLOG_INFO("Starting transport for retry:({}), time:({}).", rc, idx); + transport_->start(); + } + } + while(true); + + + isRetryRunning_.store(false, std::memory_order_release); + SPDLOG_ERROR("Post event:({}).", rc); + return rc; +} + +void vagt::transport::XRetryTransport::cancelPost() +{ + XTransport::cancelPost(); + + { + unique_lock lk(lock_); + cancel_.store(true, std::memory_order_release); + cond_.notify_one(); + } + + if (transport_) + { + transport_->cancelPost(); + } +} + +void vagt::transport::XRetryTransport::allowPost() +{ + if (transport_) + { + transport_->allowPost(); + } + XTransport::allowPost(); +} + +bool vagt::transport::XRetryTransport::shouldCancelPost() +{ + return XTransport::shouldCancelPost(); +} + diff --git a/veslibrary/ves_cpplibrary/src/lib/transport/XRetryTransport.h b/veslibrary/ves_cpplibrary/src/lib/transport/XRetryTransport.h new file mode 100755 index 0000000..c6d8521 --- /dev/null +++ b/veslibrary/ves_cpplibrary/src/lib/transport/XRetryTransport.h @@ -0,0 +1,43 @@ +#pragma once + +#include +#include +#include +#include +#include "XTransport.h" + +namespace vagt +{ + namespace transport + { + class XRetryTransport : public XTransport + { + public: + XRetryTransport(std::shared_ptr transport, std::chrono::milliseconds retryInterval, int retryTimes); + + virtual ~XRetryTransport() {} + + virtual XErrorCode start() override; + + virtual XErrorCode stop() override; + + virtual XErrorCode post(const std::string& event) override; + + virtual void cancelPost() override; + + virtual void allowPost() override; + + virtual bool shouldCancelPost() override; + + private: + std::shared_ptr transport_; + std::chrono::milliseconds retryInterval_; + int tryTimes_; + + std::atomic isRetryRunning_; + + std::mutex lock_; + std::condition_variable cond_; + }; + } +} \ No newline at end of file diff --git a/veslibrary/ves_cpplibrary/src/lib/transport/XRpcClientTransport.cpp b/veslibrary/ves_cpplibrary/src/lib/transport/XRpcClientTransport.cpp new file mode 100755 index 0000000..a63fc00 --- /dev/null +++ b/veslibrary/ves_cpplibrary/src/lib/transport/XRpcClientTransport.cpp @@ -0,0 +1,183 @@ +#include "XRpcClientTransport.h" +#ifndef SPDLOG_ACTIVE_LEVEL +#define SPDLOG_ACTIVE_LEVEL SPDLOG_LEVEL_TRACE +#endif +#include "spdlog/spdlog.h" +#include "thrift/TOutput.h" + +using namespace std; +using namespace vagt; +using namespace vagt::transport; +using namespace vagt::transport::rpc; +using namespace ::apache::thrift; +using namespace ::apache::thrift::protocol; +using namespace ::apache::thrift::transport; + +void DummyOutput(const char* msg) +{ + SPDLOG_INFO("{}", msg); +} + +vagt::transport::XRpcClientTransport::XRpcClientTransport(const XTransportOption & option): + option_(option) +{ +} + +vagt::transport::XRpcClientTransport::~XRpcClientTransport() +{ +} + +XErrorCode vagt::transport::XRpcClientTransport::start() +{ + auto rc = startRPC(); + + allowPost(); + + return rc; +} + +XErrorCode vagt::transport::XRpcClientTransport::stop() +{ + cancelPost(); + return stopRPC(); +} + +XErrorCode vagt::transport::XRpcClientTransport::post(const std::string & event) +{ + if (shouldCancelPost()) + { + return XErrorCanceled; + } + + if (event.empty()) + { + SPDLOG_WARN("Trying post empty event."); + return XErrorClientError; + } + + if (!rpcClient_) + { + SPDLOG_ERROR("RPC client is not started."); + return XErrorNok; + } + + try + { + return (XErrorCode)(rpcClient_->post(event)); + } + catch (TTransportException ex) + { + SPDLOG_ERROR("Fail to post event:({}).", ex.what()); + return XErrorNetworkError; + } + catch (TException ex) + { + SPDLOG_ERROR("Fail to post event:({}).", ex.what()); + return XErrorNetworkError; + } + catch (...) + { + SPDLOG_ERROR("Fail to post event."); + return XErrorNetworkError; + } +} + +void vagt::transport::XRpcClientTransport::cancelPost() +{ + XTransport::cancelPost(); +} + +void vagt::transport::XRpcClientTransport::allowPost() +{ + XTransport::allowPost(); +} + +bool vagt::transport::XRpcClientTransport::shouldCancelPost() +{ + return XTransport::shouldCancelPost(); +} + +XErrorCode vagt::transport::XRpcClientTransport::startRPC() +{ + GlobalOutput.setOutputFunction(DummyOutput); + + string host("127.0.0.1"); + if (!option_.host_.empty()) + { + host = option_.host_; + } + + int port = 5678; + if (option_.port_ > 1024) + { + port = option_.port_; + } + + SPDLOG_INFO("Connecting to RPC server:({}:{}).", host, port); + + int timeout = 1000; + if (option_.timeOut_.count() >0) + { + timeout = option_.timeOut_.count(); + } + + auto socket = std::make_shared(host, port); + socket->setConnTimeout(timeout); + socket->setRecvTimeout(timeout); + socket->setSendTimeout(timeout); + rpcTransport_ = std::make_shared(socket); + auto protocol = std::make_shared(rpcTransport_); + rpcClient_ = std::make_shared(protocol); + + try + { + rpcTransport_->open(); + } + catch (TTransportException ex) + { + SPDLOG_ERROR("Fail to start rpc client:({}).", ex.what()); + return XErrorNok; + } + catch (TException ex) + { + SPDLOG_ERROR("Fail to start rpc client:({}).", ex.what()); + return XErrorNok; + } + catch (...) + { + SPDLOG_ERROR("Fail to start rpc client."); + return XErrorNok; + } + + return XErrorOk; +} + +XErrorCode vagt::transport::XRpcClientTransport::stopRPC() +{ + if (!rpcTransport_) + { + return XErrorNok; + } + + try + { + rpcTransport_->close(); + rpcTransport_.reset(); + } + catch (TTransportException ex) + { + SPDLOG_ERROR("Fail to stop rpc client:({}).", ex.what()); + return XErrorNok; + } + catch (TException ex) + { + SPDLOG_ERROR("Fail to stop rpc client:({}).", ex.what()); + return XErrorNok; + } + catch (...) + { + SPDLOG_ERROR("Fail to stop rpc client."); + return XErrorNok; + } + return XErrorOk; +} diff --git a/veslibrary/ves_cpplibrary/src/lib/transport/XRpcClientTransport.h b/veslibrary/ves_cpplibrary/src/lib/transport/XRpcClientTransport.h new file mode 100755 index 0000000..8284d7b --- /dev/null +++ b/veslibrary/ves_cpplibrary/src/lib/transport/XRpcClientTransport.h @@ -0,0 +1,38 @@ +#pragma once + +#include "XTransport.h" +#include "thrift/transport/TSocket.h" +#include "thrift/protocol/TCompactProtocol.h" +#include "thrift/transport/TBufferTransports.h" +#include "gen-cpp/rpc_constants.h" +#include "gen-cpp/rpc_types.h" +#include "gen-cpp/XRpcTransport.h" + +namespace vagt +{ + namespace transport + { + class XRpcClientTransport : public XTransport + { + public: + XRpcClientTransport(const XTransportOption& option); + virtual ~XRpcClientTransport(); + + virtual XErrorCode start() override; + virtual XErrorCode stop() override; + virtual XErrorCode post(const std::string& event) override; + + virtual void cancelPost() override; + virtual void allowPost() override; + virtual bool shouldCancelPost() override; + private: + XErrorCode startRPC(); + XErrorCode stopRPC(); + + XTransportOption option_; + + std::shared_ptr rpcTransport_; + std::shared_ptr rpcClient_; + }; + } +} diff --git a/veslibrary/ves_cpplibrary/src/lib/transport/XRpcServertTransport.cpp b/veslibrary/ves_cpplibrary/src/lib/transport/XRpcServertTransport.cpp new file mode 100755 index 0000000..c18db8b --- /dev/null +++ b/veslibrary/ves_cpplibrary/src/lib/transport/XRpcServertTransport.cpp @@ -0,0 +1,222 @@ +#include "XRpcServertTransport.h" +#include "thrift/transport/TServerSocket.h" +#include "thrift/server/TThreadedServer.h" +#include "thrift/protocol/TCompactProtocol.h" +#include "thrift/transport/TBufferTransports.h" +#include "thrift/transport/TSocket.h" +#ifndef SPDLOG_ACTIVE_LEVEL +#define SPDLOG_ACTIVE_LEVEL SPDLOG_LEVEL_TRACE +#endif +#include "spdlog/spdlog.h" + +using namespace std; +using namespace vagt::transport; +using namespace vagt::transport::rpc; +using namespace apache::thrift; +using namespace apache::thrift::protocol; +using namespace apache::thrift::transport; +using namespace apache::thrift::server; + +constexpr auto XRpcServerIdleTime = 60 * 60 * 1000; + +class XRpcTransportImp:public XRpcTransportIf +{ +public: + XRpcTransportImp(XTransport* transport):transport_(transport) + { + } + + virtual int16_t post(const std::string & data) override + { + if (transport_) + { + return transport_->post(data); + } + } +private: + XTransport* transport_; +}; + +class XRpcTransportFactory : public XRpcTransportIfFactory +{ +public: + XRpcTransportFactory(XTransport* transport):transport_(transport) + { + } + + XRpcTransportIf* getHandler(const ::apache::thrift::TConnectionInfo& connInfo) + { + shared_ptr sock = dynamic_pointer_cast(connInfo.transport); + if (sock) + { + auto pt = new XRpcTransportImp(transport_); + SPDLOG_INFO("New RPC client connection from:({}:{}), handler:({:x}).", sock->getPeerAddress(), sock->getPeerPort(), (unsigned long)pt); + return pt; + } + return nullptr; + } + + void releaseHandler(XRpcTransportIf* handler) + { + SPDLOG_INFO("Clean RPC client handler:({:x}).", (unsigned long)handler); + + if (handler) + { + delete handler; + handler = nullptr; + } + } +private: + XTransport* transport_; +}; + +vagt::transport::XRpcServerTransport::XRpcServerTransport(std::shared_ptr transport, const XTransportOption & option): + transport_(transport), + option_(option) +{ +} + +vagt::transport::XRpcServerTransport::~XRpcServerTransport() +{ +} + +XErrorCode vagt::transport::XRpcServerTransport::start() +{ + if (!transport_) + { + SPDLOG_ERROR("Empty transport."); + return XErrorNok; + } + + transport_->start(); + + startRPC(); + + allowPost(); + + return XErrorOk; +} + +XErrorCode vagt::transport::XRpcServerTransport::stop() +{ + cancelPost(); + if (!transport_) + { + SPDLOG_ERROR("Empty transport."); + return XErrorNok; + } + + stopRPC(); + + future_.get(); + + return transport_->stop(); +} + +XErrorCode vagt::transport::XRpcServerTransport::post(const std::string & event) +{ + if (shouldCancelPost()) + { + return XErrorCanceled; + } + + if (event.empty()) + { + SPDLOG_WARN("Trying post empty event."); + return XErrorClientError; + } + + if (!transport_) + { + SPDLOG_ERROR("Empty transport."); + return XErrorNok; + } + + return transport_->post(event); +} + +void vagt::transport::XRpcServerTransport::cancelPost() +{ + XTransport::cancelPost(); + + if (transport_) + { + transport_->cancelPost(); + } +} + +void vagt::transport::XRpcServerTransport::allowPost() +{ + if (transport_) + { + transport_->allowPost(); + } + XTransport::allowPost(); +} + +bool vagt::transport::XRpcServerTransport::shouldCancelPost() +{ + return XTransport::shouldCancelPost(); +} + +XErrorCode vagt::transport::XRpcServerTransport::startRPC() +{ + future_ = std::async(launch::async, [this]() {return this->worker(); }); + return XErrorOk; +} + +XErrorCode vagt::transport::XRpcServerTransport::stopRPC() +{ + if (rpcServer_) + { + rpcServer_->stop(); + } + return XErrorOk; +} + +void vagt::transport::XRpcServerTransport::worker() +{ + string source("0.0.0.0"); + if (!option_.sourceIp_.empty()) + { + source = option_.sourceIp_; + } + + int port = 5678; + if (option_.port_ > 1024) + { + port = option_.port_; + } + + SPDLOG_INFO("Start RPC transport thread:({}:{}).", source, port); + + auto serverSocket = make_shared(source, port); + serverSocket->setRecvTimeout(XRpcServerIdleTime); + rpcServer_ = make_shared( + make_shared(make_shared(this)), + serverSocket, + make_shared(), + make_shared()); + + while (!shouldCancelPost()) + { + try + { + rpcServer_->serve(); + } + catch (apache::thrift::TException ex) + { + SPDLOG_ERROR("{}", ex.what()); + this_thread::sleep_for(chrono::seconds(1)); + continue; + } + catch (...) + { + SPDLOG_ERROR("Unknown Exception in thrift."); + this_thread::sleep_for(chrono::seconds(1)); + continue; + } + } + + SPDLOG_INFO("Quit RPC transport thread."); +} diff --git a/veslibrary/ves_cpplibrary/src/lib/transport/XRpcServertTransport.h b/veslibrary/ves_cpplibrary/src/lib/transport/XRpcServertTransport.h new file mode 100755 index 0000000..14eab07 --- /dev/null +++ b/veslibrary/ves_cpplibrary/src/lib/transport/XRpcServertTransport.h @@ -0,0 +1,39 @@ +#pragma once + +#include "XTransport.h" +#include "thrift/server/TServer.h" +#include "gen-cpp/rpc_constants.h" +#include "gen-cpp/rpc_types.h" +#include "gen-cpp/XRpcTransport.h" +#include + +namespace vagt +{ + namespace transport + { + class XRpcServerTransport : public XTransport + { + public: + XRpcServerTransport(std::shared_ptr transport, const XTransportOption& option); + virtual ~XRpcServerTransport(); + + virtual XErrorCode start() override; + virtual XErrorCode stop() override; + virtual XErrorCode post(const std::string& event) override; + + virtual void cancelPost() override; + virtual void allowPost() override; + virtual bool shouldCancelPost() override; + private: + XErrorCode startRPC(); + XErrorCode stopRPC(); + void worker(); + std::future future_; + + std::shared_ptr transport_; + XTransportOption option_; + + std::shared_ptr rpcServer_; + }; + } +} diff --git a/veslibrary/ves_cpplibrary/src/lib/transport/XSwitchableTransport.cpp b/veslibrary/ves_cpplibrary/src/lib/transport/XSwitchableTransport.cpp new file mode 100755 index 0000000..26e44ac --- /dev/null +++ b/veslibrary/ves_cpplibrary/src/lib/transport/XSwitchableTransport.cpp @@ -0,0 +1,172 @@ +#include "XSwitchableTransport.h" +#ifndef SPDLOG_ACTIVE_LEVEL +#define SPDLOG_ACTIVE_LEVEL SPDLOG_LEVEL_TRACE +#endif +#include "spdlog/spdlog.h" + +using namespace std; +using namespace vagt; +using namespace vagt::transport; + +XSwitchableTransport::XSwitchableTransport(std::vector>& processors): + transports_(processors), + transportIndex_(0) +{ +} + +void XSwitchableTransport::cancelPost() +{ + XTransport::cancelPost(); + for (auto transport : transports_) + { + if (transport) + { + transport->cancelPost(); + } + } +} + +void XSwitchableTransport::allowPost() +{ + for (auto transport : transports_) + { + if (transport) + { + transport->allowPost(); + } + } + XTransport::allowPost(); +} + +bool XSwitchableTransport::shouldCancelPost() +{ + return XTransport::shouldCancelPost(); +} + +XErrorCode XSwitchableTransport::tryTransport(const std::string& event) +{ + auto transport = transports_[transportIndex_]; + if (!transport) + { + SPDLOG_ERROR("Empty transport."); + return XErrorNok; + } + + SPDLOG_DEBUG("Transport({}) posting event:{}.", transportIndex_, event); + return transport->post(event); +} + +void XSwitchableTransport::switchTransport(XErrorCode reason) +{ + SPDLOG_INFO("Switching transport:({}).", reason); + + auto transport = transports_[transportIndex_]; + if (!transport) + { + SPDLOG_ERROR("Empty transport."); + return; + } + + transport->stop(); + + ++transportIndex_; + if (transportIndex_ >= transports_.size()) + { + transportIndex_ = 0; + } + + SPDLOG_INFO("Switch to transport, index:({}).", transportIndex_); + transport = transports_[transportIndex_]; + if (!transport) + { + SPDLOG_ERROR("Empty transport({}).", transportIndex_); + return; + } + + transport->start(); +} + +XErrorCode XSwitchableTransport::start() +{ + if (transportIndex_ >= transports_.size()) + { + SPDLOG_ERROR("Transport index ({}) >= transport number ({}).", transportIndex_, transports_.size()); + return XErrorNok; + } + + auto transport = transports_[transportIndex_]; + XErrorCode rc = XErrorNok; + if (transport) + { + rc = transport->start(); + } + + allowPost(); + + return rc; +} + +XErrorCode XSwitchableTransport::stop() +{ + cancelPost(); + + if (transportIndex_ >= transports_.size()) + { + SPDLOG_ERROR("Transport index ({}) >= transport number ({}).", transportIndex_, transports_.size()); + return XErrorNok; + } + + auto transport = transports_[transportIndex_]; + XErrorCode rc = XErrorNok; + if (transport) + { + rc = transport->stop(); + } + + return rc; +} + +XErrorCode XSwitchableTransport::post(const std::string& event) +{ + if (shouldCancelPost()) + { + return XErrorCanceled; + } + + if (event.empty()) + { + SPDLOG_WARN("Trying post empty event."); + return XErrorClientError; + } + + if (transportIndex_ >= transports_.size()) + { + SPDLOG_ERROR("Transport index ({}) >= transport number ({}).", transportIndex_, transports_.size()); + return XErrorNok; + } + + auto rc = XErrorNok; + for (auto i = 0; + !shouldCancelPost() && i < transports_.size(); + ++i) + { + rc = tryTransport(event); + if (rc == XErrorOk || + rc == XErrorClientError || + rc == XErrorCanceled) + { + return rc; + } + else + { + if (shouldCancelPost()) + { + break; + } + switchTransport(rc); + } + } + + return rc; +} + diff --git a/veslibrary/ves_cpplibrary/src/lib/transport/XSwitchableTransport.h b/veslibrary/ves_cpplibrary/src/lib/transport/XSwitchableTransport.h new file mode 100755 index 0000000..e4bb82b --- /dev/null +++ b/veslibrary/ves_cpplibrary/src/lib/transport/XSwitchableTransport.h @@ -0,0 +1,37 @@ +#pragma once + +#include +#include "XTransport.h" + +namespace vagt +{ + namespace transport + { + class XSwitchableTransport : public XTransport + { + public: + XSwitchableTransport(std::vector>& processors); + + virtual ~XSwitchableTransport() {} + + virtual XErrorCode start() override; + + virtual XErrorCode stop() override; + + virtual XErrorCode post(const std::string& event) override; + + virtual void cancelPost() override; + + virtual void allowPost() override; + + virtual bool shouldCancelPost() override; + + private: + XErrorCode tryTransport(const std::string& event); + void switchTransport(vagt::transport::XErrorCode reason); + + std::vector> transports_; + unsigned int transportIndex_; + }; + } +} diff --git a/veslibrary/ves_cpplibrary/src/lib/transport/XSynchronizedTransport.cpp b/veslibrary/ves_cpplibrary/src/lib/transport/XSynchronizedTransport.cpp new file mode 100755 index 0000000..88f4dfa --- /dev/null +++ b/veslibrary/ves_cpplibrary/src/lib/transport/XSynchronizedTransport.cpp @@ -0,0 +1,115 @@ +#include "XSynchronizedTransport.h" +#ifndef SPDLOG_ACTIVE_LEVEL +#define SPDLOG_ACTIVE_LEVEL SPDLOG_LEVEL_TRACE +#endif +#include "spdlog/spdlog.h" + +using namespace std; +using namespace vagt; +using namespace vagt::transport; + +XSynchronizedTransport::XSynchronizedTransport(std::shared_ptr transport): + transport_(transport), + status_(XStopped) +{ +} + +XErrorCode XSynchronizedTransport::start() +{ + unique_lock lk(controlLock_); + + if (status_ == XStarted) + { + return XErrorOk; + } + + XErrorCode rc = XErrorNok; + if (transport_) + { + rc = transport_->start(); + } + + if (rc == XErrorOk) + { + status_ = XStarted; + } + + allowPost(); + + return rc; +} + +XErrorCode XSynchronizedTransport::stop() +{ + unique_lock lk(controlLock_); + + if (status_ == XStopped) + { + return XErrorOk; + } + + cancelPost(); + + if (transport_) + { + transport_->cancelPost(); + } + + unique_lock lkPost(postLock_); + XErrorCode rc = XErrorNok; + if (transport_) + { + rc = transport_->stop(); + } + + if (rc == XErrorOk) + { + status_ = XStopped; + } + return rc; +} + +XErrorCode XSynchronizedTransport::post(const std::string& event) +{ + if (shouldCancelPost()) + { + return XErrorCanceled; + } + + if (event.empty()) + { + SPDLOG_WARN("Trying post empty event."); + return XErrorClientError; + } + + unique_lock lk(postLock_); + if (transport_) + { + return transport_->post(event); + } + return XErrorNok; +} + +void XSynchronizedTransport::cancelPost() +{ + XTransport::cancelPost(); + + if (transport_) + { + transport_->cancelPost(); + } +} + +void XSynchronizedTransport::allowPost() +{ + if (transport_) + { + transport_->allowPost(); + } + XTransport::allowPost(); +} + +bool XSynchronizedTransport::shouldCancelPost() +{ + return XTransport::shouldCancelPost(); +} diff --git a/veslibrary/ves_cpplibrary/src/lib/transport/XSynchronizedTransport.h b/veslibrary/ves_cpplibrary/src/lib/transport/XSynchronizedTransport.h new file mode 100755 index 0000000..85b16d9 --- /dev/null +++ b/veslibrary/ves_cpplibrary/src/lib/transport/XSynchronizedTransport.h @@ -0,0 +1,41 @@ +#pragma once +#include +#include "XTransport.h" + +namespace vagt +{ + namespace transport + { + class XSynchronizedTransport : public XTransport + { + public: + enum XStatus : unsigned short + { + XStarted, + XStopped, + }; + + XSynchronizedTransport(std::shared_ptr transport); + + virtual ~XSynchronizedTransport() {} + + virtual XErrorCode start() override; + + virtual XErrorCode stop() override; + + virtual XErrorCode post(const std::string& event) override; + + virtual void cancelPost() override; + + virtual void allowPost() override; + + virtual bool shouldCancelPost() override; + + private: + std::mutex postLock_; + std::mutex controlLock_; + std::shared_ptr transport_; + XStatus status_; + }; + } +} diff --git a/veslibrary/ves_cpplibrary/src/lib/transport/XTransport.cpp b/veslibrary/ves_cpplibrary/src/lib/transport/XTransport.cpp new file mode 100755 index 0000000..ac61acc --- /dev/null +++ b/veslibrary/ves_cpplibrary/src/lib/transport/XTransport.cpp @@ -0,0 +1,64 @@ +#include "XTransport.h" +#include "XLibcurlTransport.h" +#include "XRetryTransport.h" +#include "XSwitchableTransport.h" +#include "XSynchronizedTransport.h" +#include "XBufferedTransport.h" +#include "XRpcClientTransport.h" +#include "XRpcServertTransport.h" + +using namespace vagt::transport; + +XTransport::XTransport():cancel_(true) +{ +} + +void XTransport::cancelPost() +{ + cancel_.store(true, std::memory_order_release); +} + +void XTransport::allowPost() +{ + cancel_.store(false, std::memory_order_release); +} + +bool XTransport::shouldCancelPost() +{ + return cancel_.load(std::memory_order_acquire); +} + +std::shared_ptr vagt::transport::XTransport::LibCurlTransport(const XTransportOption & option) +{ + return std::make_shared(option); +} + +std::shared_ptr vagt::transport::XTransport::RetryTransport(std::shared_ptr transport, std::chrono::milliseconds retryInterval, int retryTimes) +{ + return std::make_shared(transport, retryInterval, retryTimes); +} + +std::shared_ptr vagt::transport::XTransport::SwitchableTransport(std::vector>& transports) +{ + return std::make_shared(transports); +} + +std::shared_ptr vagt::transport::XTransport::SynchronizedTransport(std::shared_ptr transport) +{ + return std::make_shared(transport); +} + +std::shared_ptr vagt::transport::XTransport::BufferedTransport(std::shared_ptr transport, std::shared_ptr queue) +{ + return std::make_shared(transport, queue); +} + +std::shared_ptr vagt::transport::XTransport::RpcClientTransport(const XTransportOption& option) +{ + return std::make_shared(option); +} + +std::shared_ptr vagt::transport::XTransport::RpcServerTransport(std::shared_ptr transport, const XTransportOption& option) +{ + return std::make_shared(transport, option); +} diff --git a/veslibrary/ves_cpplibrary/src/lib/transport/gen-cpp/XRpcTransport.cpp b/veslibrary/ves_cpplibrary/src/lib/transport/gen-cpp/XRpcTransport.cpp new file mode 100755 index 0000000..0cdd75b --- /dev/null +++ b/veslibrary/ves_cpplibrary/src/lib/transport/gen-cpp/XRpcTransport.cpp @@ -0,0 +1,421 @@ +/** + * Autogenerated by Thrift Compiler (0.12.0) + * + * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING + * @generated + */ +#include "XRpcTransport.h" + +namespace vagt { namespace transport { namespace rpc { + + +XRpcTransport_post_args::~XRpcTransport_post_args() throw() { +} + + +uint32_t XRpcTransport_post_args::read(::apache::thrift::protocol::TProtocol* iprot) { + + ::apache::thrift::protocol::TInputRecursionTracker tracker(*iprot); + uint32_t xfer = 0; + std::string fname; + ::apache::thrift::protocol::TType ftype; + int16_t fid; + + xfer += iprot->readStructBegin(fname); + + using ::apache::thrift::protocol::TProtocolException; + + + while (true) + { + xfer += iprot->readFieldBegin(fname, ftype, fid); + if (ftype == ::apache::thrift::protocol::T_STOP) { + break; + } + switch (fid) + { + case 1: + if (ftype == ::apache::thrift::protocol::T_STRING) { + xfer += iprot->readString(this->data); + this->__isset.data = true; + } else { + xfer += iprot->skip(ftype); + } + break; + default: + xfer += iprot->skip(ftype); + break; + } + xfer += iprot->readFieldEnd(); + } + + xfer += iprot->readStructEnd(); + + return xfer; +} + +uint32_t XRpcTransport_post_args::write(::apache::thrift::protocol::TProtocol* oprot) const { + uint32_t xfer = 0; + ::apache::thrift::protocol::TOutputRecursionTracker tracker(*oprot); + xfer += oprot->writeStructBegin("XRpcTransport_post_args"); + + xfer += oprot->writeFieldBegin("data", ::apache::thrift::protocol::T_STRING, 1); + xfer += oprot->writeString(this->data); + xfer += oprot->writeFieldEnd(); + + xfer += oprot->writeFieldStop(); + xfer += oprot->writeStructEnd(); + return xfer; +} + + +XRpcTransport_post_pargs::~XRpcTransport_post_pargs() throw() { +} + + +uint32_t XRpcTransport_post_pargs::write(::apache::thrift::protocol::TProtocol* oprot) const { + uint32_t xfer = 0; + ::apache::thrift::protocol::TOutputRecursionTracker tracker(*oprot); + xfer += oprot->writeStructBegin("XRpcTransport_post_pargs"); + + xfer += oprot->writeFieldBegin("data", ::apache::thrift::protocol::T_STRING, 1); + xfer += oprot->writeString((*(this->data))); + xfer += oprot->writeFieldEnd(); + + xfer += oprot->writeFieldStop(); + xfer += oprot->writeStructEnd(); + return xfer; +} + + +XRpcTransport_post_result::~XRpcTransport_post_result() throw() { +} + + +uint32_t XRpcTransport_post_result::read(::apache::thrift::protocol::TProtocol* iprot) { + + ::apache::thrift::protocol::TInputRecursionTracker tracker(*iprot); + uint32_t xfer = 0; + std::string fname; + ::apache::thrift::protocol::TType ftype; + int16_t fid; + + xfer += iprot->readStructBegin(fname); + + using ::apache::thrift::protocol::TProtocolException; + + + while (true) + { + xfer += iprot->readFieldBegin(fname, ftype, fid); + if (ftype == ::apache::thrift::protocol::T_STOP) { + break; + } + switch (fid) + { + case 0: + if (ftype == ::apache::thrift::protocol::T_I16) { + xfer += iprot->readI16(this->success); + this->__isset.success = true; + } else { + xfer += iprot->skip(ftype); + } + break; + default: + xfer += iprot->skip(ftype); + break; + } + xfer += iprot->readFieldEnd(); + } + + xfer += iprot->readStructEnd(); + + return xfer; +} + +uint32_t XRpcTransport_post_result::write(::apache::thrift::protocol::TProtocol* oprot) const { + + uint32_t xfer = 0; + + xfer += oprot->writeStructBegin("XRpcTransport_post_result"); + + if (this->__isset.success) { + xfer += oprot->writeFieldBegin("success", ::apache::thrift::protocol::T_I16, 0); + xfer += oprot->writeI16(this->success); + xfer += oprot->writeFieldEnd(); + } + xfer += oprot->writeFieldStop(); + xfer += oprot->writeStructEnd(); + return xfer; +} + + +XRpcTransport_post_presult::~XRpcTransport_post_presult() throw() { +} + + +uint32_t XRpcTransport_post_presult::read(::apache::thrift::protocol::TProtocol* iprot) { + + ::apache::thrift::protocol::TInputRecursionTracker tracker(*iprot); + uint32_t xfer = 0; + std::string fname; + ::apache::thrift::protocol::TType ftype; + int16_t fid; + + xfer += iprot->readStructBegin(fname); + + using ::apache::thrift::protocol::TProtocolException; + + + while (true) + { + xfer += iprot->readFieldBegin(fname, ftype, fid); + if (ftype == ::apache::thrift::protocol::T_STOP) { + break; + } + switch (fid) + { + case 0: + if (ftype == ::apache::thrift::protocol::T_I16) { + xfer += iprot->readI16((*(this->success))); + this->__isset.success = true; + } else { + xfer += iprot->skip(ftype); + } + break; + default: + xfer += iprot->skip(ftype); + break; + } + xfer += iprot->readFieldEnd(); + } + + xfer += iprot->readStructEnd(); + + return xfer; +} + +int16_t XRpcTransportClient::post(const std::string& data) +{ + send_post(data); + return recv_post(); +} + +void XRpcTransportClient::send_post(const std::string& data) +{ + int32_t cseqid = 0; + oprot_->writeMessageBegin("post", ::apache::thrift::protocol::T_CALL, cseqid); + + XRpcTransport_post_pargs args; + args.data = &data; + args.write(oprot_); + + oprot_->writeMessageEnd(); + oprot_->getTransport()->writeEnd(); + oprot_->getTransport()->flush(); +} + +int16_t XRpcTransportClient::recv_post() +{ + + int32_t rseqid = 0; + std::string fname; + ::apache::thrift::protocol::TMessageType mtype; + + iprot_->readMessageBegin(fname, mtype, rseqid); + if (mtype == ::apache::thrift::protocol::T_EXCEPTION) { + ::apache::thrift::TApplicationException x; + x.read(iprot_); + iprot_->readMessageEnd(); + iprot_->getTransport()->readEnd(); + throw x; + } + if (mtype != ::apache::thrift::protocol::T_REPLY) { + iprot_->skip(::apache::thrift::protocol::T_STRUCT); + iprot_->readMessageEnd(); + iprot_->getTransport()->readEnd(); + } + if (fname.compare("post") != 0) { + iprot_->skip(::apache::thrift::protocol::T_STRUCT); + iprot_->readMessageEnd(); + iprot_->getTransport()->readEnd(); + } + int16_t _return; + XRpcTransport_post_presult result; + result.success = &_return; + result.read(iprot_); + iprot_->readMessageEnd(); + iprot_->getTransport()->readEnd(); + + if (result.__isset.success) { + return _return; + } + throw ::apache::thrift::TApplicationException(::apache::thrift::TApplicationException::MISSING_RESULT, "post failed: unknown result"); +} + +bool XRpcTransportProcessor::dispatchCall(::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, const std::string& fname, int32_t seqid, void* callContext) { + ProcessMap::iterator pfn; + pfn = processMap_.find(fname); + if (pfn == processMap_.end()) { + iprot->skip(::apache::thrift::protocol::T_STRUCT); + iprot->readMessageEnd(); + iprot->getTransport()->readEnd(); + ::apache::thrift::TApplicationException x(::apache::thrift::TApplicationException::UNKNOWN_METHOD, "Invalid method name: '"+fname+"'"); + oprot->writeMessageBegin(fname, ::apache::thrift::protocol::T_EXCEPTION, seqid); + x.write(oprot); + oprot->writeMessageEnd(); + oprot->getTransport()->writeEnd(); + oprot->getTransport()->flush(); + return true; + } + (this->*(pfn->second))(seqid, iprot, oprot, callContext); + return true; +} + +void XRpcTransportProcessor::process_post(int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, void* callContext) +{ + void* ctx = NULL; + if (this->eventHandler_.get() != NULL) { + ctx = this->eventHandler_->getContext("XRpcTransport.post", callContext); + } + ::apache::thrift::TProcessorContextFreer freer(this->eventHandler_.get(), ctx, "XRpcTransport.post"); + + if (this->eventHandler_.get() != NULL) { + this->eventHandler_->preRead(ctx, "XRpcTransport.post"); + } + + XRpcTransport_post_args args; + args.read(iprot); + iprot->readMessageEnd(); + uint32_t bytes = iprot->getTransport()->readEnd(); + + if (this->eventHandler_.get() != NULL) { + this->eventHandler_->postRead(ctx, "XRpcTransport.post", bytes); + } + + XRpcTransport_post_result result; + try { + result.success = iface_->post(args.data); + result.__isset.success = true; + } catch (const std::exception& e) { + if (this->eventHandler_.get() != NULL) { + this->eventHandler_->handlerError(ctx, "XRpcTransport.post"); + } + + ::apache::thrift::TApplicationException x(e.what()); + oprot->writeMessageBegin("post", ::apache::thrift::protocol::T_EXCEPTION, seqid); + x.write(oprot); + oprot->writeMessageEnd(); + oprot->getTransport()->writeEnd(); + oprot->getTransport()->flush(); + return; + } + + if (this->eventHandler_.get() != NULL) { + this->eventHandler_->preWrite(ctx, "XRpcTransport.post"); + } + + oprot->writeMessageBegin("post", ::apache::thrift::protocol::T_REPLY, seqid); + result.write(oprot); + oprot->writeMessageEnd(); + bytes = oprot->getTransport()->writeEnd(); + oprot->getTransport()->flush(); + + if (this->eventHandler_.get() != NULL) { + this->eventHandler_->postWrite(ctx, "XRpcTransport.post", bytes); + } +} + +::apache::thrift::stdcxx::shared_ptr< ::apache::thrift::TProcessor > XRpcTransportProcessorFactory::getProcessor(const ::apache::thrift::TConnectionInfo& connInfo) { + ::apache::thrift::ReleaseHandler< XRpcTransportIfFactory > cleanup(handlerFactory_); + ::apache::thrift::stdcxx::shared_ptr< XRpcTransportIf > handler(handlerFactory_->getHandler(connInfo), cleanup); + ::apache::thrift::stdcxx::shared_ptr< ::apache::thrift::TProcessor > processor(new XRpcTransportProcessor(handler)); + return processor; +} + +int16_t XRpcTransportConcurrentClient::post(const std::string& data) +{ + int32_t seqid = send_post(data); + return recv_post(seqid); +} + +int32_t XRpcTransportConcurrentClient::send_post(const std::string& data) +{ + int32_t cseqid = this->sync_.generateSeqId(); + ::apache::thrift::async::TConcurrentSendSentry sentry(&this->sync_); + oprot_->writeMessageBegin("post", ::apache::thrift::protocol::T_CALL, cseqid); + + XRpcTransport_post_pargs args; + args.data = &data; + args.write(oprot_); + + oprot_->writeMessageEnd(); + oprot_->getTransport()->writeEnd(); + oprot_->getTransport()->flush(); + + sentry.commit(); + return cseqid; +} + +int16_t XRpcTransportConcurrentClient::recv_post(const int32_t seqid) +{ + + int32_t rseqid = 0; + std::string fname; + ::apache::thrift::protocol::TMessageType mtype; + + // the read mutex gets dropped and reacquired as part of waitForWork() + // The destructor of this sentry wakes up other clients + ::apache::thrift::async::TConcurrentRecvSentry sentry(&this->sync_, seqid); + + while(true) { + if(!this->sync_.getPending(fname, mtype, rseqid)) { + iprot_->readMessageBegin(fname, mtype, rseqid); + } + if(seqid == rseqid) { + if (mtype == ::apache::thrift::protocol::T_EXCEPTION) { + ::apache::thrift::TApplicationException x; + x.read(iprot_); + iprot_->readMessageEnd(); + iprot_->getTransport()->readEnd(); + sentry.commit(); + throw x; + } + if (mtype != ::apache::thrift::protocol::T_REPLY) { + iprot_->skip(::apache::thrift::protocol::T_STRUCT); + iprot_->readMessageEnd(); + iprot_->getTransport()->readEnd(); + } + if (fname.compare("post") != 0) { + iprot_->skip(::apache::thrift::protocol::T_STRUCT); + iprot_->readMessageEnd(); + iprot_->getTransport()->readEnd(); + + // in a bad state, don't commit + using ::apache::thrift::protocol::TProtocolException; + throw TProtocolException(TProtocolException::INVALID_DATA); + } + int16_t _return; + XRpcTransport_post_presult result; + result.success = &_return; + result.read(iprot_); + iprot_->readMessageEnd(); + iprot_->getTransport()->readEnd(); + + if (result.__isset.success) { + sentry.commit(); + return _return; + } + // in a bad state, don't commit + throw ::apache::thrift::TApplicationException(::apache::thrift::TApplicationException::MISSING_RESULT, "post failed: unknown result"); + } + // seqid != rseqid + this->sync_.updatePending(fname, mtype, rseqid); + + // this will temporarily unlock the readMutex, and let other clients get work done + this->sync_.waitForWork(seqid); + } // end while(true) +} + +}}} // namespace + diff --git a/veslibrary/ves_cpplibrary/src/lib/transport/gen-cpp/XRpcTransport.h b/veslibrary/ves_cpplibrary/src/lib/transport/gen-cpp/XRpcTransport.h new file mode 100755 index 0000000..124702e --- /dev/null +++ b/veslibrary/ves_cpplibrary/src/lib/transport/gen-cpp/XRpcTransport.h @@ -0,0 +1,296 @@ +/** + * Autogenerated by Thrift Compiler (0.12.0) + * + * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING + * @generated + */ +#ifndef XRpcTransport_H +#define XRpcTransport_H + +#include +#include +#include "rpc_types.h" + +namespace vagt { namespace transport { namespace rpc { + +#ifdef _MSC_VER + #pragma warning( push ) + #pragma warning (disable : 4250 ) //inheriting methods via dominance +#endif + +class XRpcTransportIf { + public: + virtual ~XRpcTransportIf() {} + virtual int16_t post(const std::string& data) = 0; +}; + +class XRpcTransportIfFactory { + public: + typedef XRpcTransportIf Handler; + + virtual ~XRpcTransportIfFactory() {} + + virtual XRpcTransportIf* getHandler(const ::apache::thrift::TConnectionInfo& connInfo) = 0; + virtual void releaseHandler(XRpcTransportIf* /* handler */) = 0; +}; + +class XRpcTransportIfSingletonFactory : virtual public XRpcTransportIfFactory { + public: + XRpcTransportIfSingletonFactory(const ::apache::thrift::stdcxx::shared_ptr& iface) : iface_(iface) {} + virtual ~XRpcTransportIfSingletonFactory() {} + + virtual XRpcTransportIf* getHandler(const ::apache::thrift::TConnectionInfo&) { + return iface_.get(); + } + virtual void releaseHandler(XRpcTransportIf* /* handler */) {} + + protected: + ::apache::thrift::stdcxx::shared_ptr iface_; +}; + +class XRpcTransportNull : virtual public XRpcTransportIf { + public: + virtual ~XRpcTransportNull() {} + int16_t post(const std::string& /* data */) { + int16_t _return = 0; + return _return; + } +}; + +typedef struct _XRpcTransport_post_args__isset { + _XRpcTransport_post_args__isset() : data(false) {} + bool data :1; +} _XRpcTransport_post_args__isset; + +class XRpcTransport_post_args { + public: + + XRpcTransport_post_args(const XRpcTransport_post_args&); + XRpcTransport_post_args& operator=(const XRpcTransport_post_args&); + XRpcTransport_post_args() : data() { + } + + virtual ~XRpcTransport_post_args() throw(); + std::string data; + + _XRpcTransport_post_args__isset __isset; + + void __set_data(const std::string& val); + + bool operator == (const XRpcTransport_post_args & rhs) const + { + if (!(data == rhs.data)) + return false; + return true; + } + bool operator != (const XRpcTransport_post_args &rhs) const { + return !(*this == rhs); + } + + bool operator < (const XRpcTransport_post_args & ) const; + + uint32_t read(::apache::thrift::protocol::TProtocol* iprot); + uint32_t write(::apache::thrift::protocol::TProtocol* oprot) const; + +}; + + +class XRpcTransport_post_pargs { + public: + + + virtual ~XRpcTransport_post_pargs() throw(); + const std::string* data; + + uint32_t write(::apache::thrift::protocol::TProtocol* oprot) const; + +}; + +typedef struct _XRpcTransport_post_result__isset { + _XRpcTransport_post_result__isset() : success(false) {} + bool success :1; +} _XRpcTransport_post_result__isset; + +class XRpcTransport_post_result { + public: + + XRpcTransport_post_result(const XRpcTransport_post_result&); + XRpcTransport_post_result& operator=(const XRpcTransport_post_result&); + XRpcTransport_post_result() : success(0) { + } + + virtual ~XRpcTransport_post_result() throw(); + int16_t success; + + _XRpcTransport_post_result__isset __isset; + + void __set_success(const int16_t val); + + bool operator == (const XRpcTransport_post_result & rhs) const + { + if (!(success == rhs.success)) + return false; + return true; + } + bool operator != (const XRpcTransport_post_result &rhs) const { + return !(*this == rhs); + } + + bool operator < (const XRpcTransport_post_result & ) const; + + uint32_t read(::apache::thrift::protocol::TProtocol* iprot); + uint32_t write(::apache::thrift::protocol::TProtocol* oprot) const; + +}; + +typedef struct _XRpcTransport_post_presult__isset { + _XRpcTransport_post_presult__isset() : success(false) {} + bool success :1; +} _XRpcTransport_post_presult__isset; + +class XRpcTransport_post_presult { + public: + + + virtual ~XRpcTransport_post_presult() throw(); + int16_t* success; + + _XRpcTransport_post_presult__isset __isset; + + uint32_t read(::apache::thrift::protocol::TProtocol* iprot); + +}; + +class XRpcTransportClient : virtual public XRpcTransportIf { + public: + XRpcTransportClient(apache::thrift::stdcxx::shared_ptr< ::apache::thrift::protocol::TProtocol> prot) { + setProtocol(prot); + } + XRpcTransportClient(apache::thrift::stdcxx::shared_ptr< ::apache::thrift::protocol::TProtocol> iprot, apache::thrift::stdcxx::shared_ptr< ::apache::thrift::protocol::TProtocol> oprot) { + setProtocol(iprot,oprot); + } + private: + void setProtocol(apache::thrift::stdcxx::shared_ptr< ::apache::thrift::protocol::TProtocol> prot) { + setProtocol(prot,prot); + } + void setProtocol(apache::thrift::stdcxx::shared_ptr< ::apache::thrift::protocol::TProtocol> iprot, apache::thrift::stdcxx::shared_ptr< ::apache::thrift::protocol::TProtocol> oprot) { + piprot_=iprot; + poprot_=oprot; + iprot_ = iprot.get(); + oprot_ = oprot.get(); + } + public: + apache::thrift::stdcxx::shared_ptr< ::apache::thrift::protocol::TProtocol> getInputProtocol() { + return piprot_; + } + apache::thrift::stdcxx::shared_ptr< ::apache::thrift::protocol::TProtocol> getOutputProtocol() { + return poprot_; + } + int16_t post(const std::string& data); + void send_post(const std::string& data); + int16_t recv_post(); + protected: + apache::thrift::stdcxx::shared_ptr< ::apache::thrift::protocol::TProtocol> piprot_; + apache::thrift::stdcxx::shared_ptr< ::apache::thrift::protocol::TProtocol> poprot_; + ::apache::thrift::protocol::TProtocol* iprot_; + ::apache::thrift::protocol::TProtocol* oprot_; +}; + +class XRpcTransportProcessor : public ::apache::thrift::TDispatchProcessor { + protected: + ::apache::thrift::stdcxx::shared_ptr iface_; + virtual bool dispatchCall(::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, const std::string& fname, int32_t seqid, void* callContext); + private: + typedef void (XRpcTransportProcessor::*ProcessFunction)(int32_t, ::apache::thrift::protocol::TProtocol*, ::apache::thrift::protocol::TProtocol*, void*); + typedef std::map ProcessMap; + ProcessMap processMap_; + void process_post(int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, void* callContext); + public: + XRpcTransportProcessor(::apache::thrift::stdcxx::shared_ptr iface) : + iface_(iface) { + processMap_["post"] = &XRpcTransportProcessor::process_post; + } + + virtual ~XRpcTransportProcessor() {} +}; + +class XRpcTransportProcessorFactory : public ::apache::thrift::TProcessorFactory { + public: + XRpcTransportProcessorFactory(const ::apache::thrift::stdcxx::shared_ptr< XRpcTransportIfFactory >& handlerFactory) : + handlerFactory_(handlerFactory) {} + + ::apache::thrift::stdcxx::shared_ptr< ::apache::thrift::TProcessor > getProcessor(const ::apache::thrift::TConnectionInfo& connInfo); + + protected: + ::apache::thrift::stdcxx::shared_ptr< XRpcTransportIfFactory > handlerFactory_; +}; + +class XRpcTransportMultiface : virtual public XRpcTransportIf { + public: + XRpcTransportMultiface(std::vector >& ifaces) : ifaces_(ifaces) { + } + virtual ~XRpcTransportMultiface() {} + protected: + std::vector > ifaces_; + XRpcTransportMultiface() {} + void add(::apache::thrift::stdcxx::shared_ptr iface) { + ifaces_.push_back(iface); + } + public: + int16_t post(const std::string& data) { + size_t sz = ifaces_.size(); + size_t i = 0; + for (; i < (sz - 1); ++i) { + ifaces_[i]->post(data); + } + return ifaces_[i]->post(data); + } + +}; + +// The 'concurrent' client is a thread safe client that correctly handles +// out of order responses. It is slower than the regular client, so should +// only be used when you need to share a connection among multiple threads +class XRpcTransportConcurrentClient : virtual public XRpcTransportIf { + public: + XRpcTransportConcurrentClient(apache::thrift::stdcxx::shared_ptr< ::apache::thrift::protocol::TProtocol> prot) { + setProtocol(prot); + } + XRpcTransportConcurrentClient(apache::thrift::stdcxx::shared_ptr< ::apache::thrift::protocol::TProtocol> iprot, apache::thrift::stdcxx::shared_ptr< ::apache::thrift::protocol::TProtocol> oprot) { + setProtocol(iprot,oprot); + } + private: + void setProtocol(apache::thrift::stdcxx::shared_ptr< ::apache::thrift::protocol::TProtocol> prot) { + setProtocol(prot,prot); + } + void setProtocol(apache::thrift::stdcxx::shared_ptr< ::apache::thrift::protocol::TProtocol> iprot, apache::thrift::stdcxx::shared_ptr< ::apache::thrift::protocol::TProtocol> oprot) { + piprot_=iprot; + poprot_=oprot; + iprot_ = iprot.get(); + oprot_ = oprot.get(); + } + public: + apache::thrift::stdcxx::shared_ptr< ::apache::thrift::protocol::TProtocol> getInputProtocol() { + return piprot_; + } + apache::thrift::stdcxx::shared_ptr< ::apache::thrift::protocol::TProtocol> getOutputProtocol() { + return poprot_; + } + int16_t post(const std::string& data); + int32_t send_post(const std::string& data); + int16_t recv_post(const int32_t seqid); + protected: + apache::thrift::stdcxx::shared_ptr< ::apache::thrift::protocol::TProtocol> piprot_; + apache::thrift::stdcxx::shared_ptr< ::apache::thrift::protocol::TProtocol> poprot_; + ::apache::thrift::protocol::TProtocol* iprot_; + ::apache::thrift::protocol::TProtocol* oprot_; + ::apache::thrift::async::TConcurrentClientSyncInfo sync_; +}; + +#ifdef _MSC_VER + #pragma warning( pop ) +#endif + +}}} // namespace + +#endif diff --git a/veslibrary/ves_cpplibrary/src/lib/transport/gen-cpp/rpc_constants.cpp b/veslibrary/ves_cpplibrary/src/lib/transport/gen-cpp/rpc_constants.cpp new file mode 100755 index 0000000..8fa4367 --- /dev/null +++ b/veslibrary/ves_cpplibrary/src/lib/transport/gen-cpp/rpc_constants.cpp @@ -0,0 +1,17 @@ +/** + * Autogenerated by Thrift Compiler (0.12.0) + * + * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING + * @generated + */ +#include "rpc_constants.h" + +namespace vagt { namespace transport { namespace rpc { + +const rpcConstants g_rpc_constants; + +rpcConstants::rpcConstants() { +} + +}}} // namespace + diff --git a/veslibrary/ves_cpplibrary/src/lib/transport/gen-cpp/rpc_constants.h b/veslibrary/ves_cpplibrary/src/lib/transport/gen-cpp/rpc_constants.h new file mode 100755 index 0000000..56fde2a --- /dev/null +++ b/veslibrary/ves_cpplibrary/src/lib/transport/gen-cpp/rpc_constants.h @@ -0,0 +1,24 @@ +/** + * Autogenerated by Thrift Compiler (0.12.0) + * + * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING + * @generated + */ +#ifndef rpc_CONSTANTS_H +#define rpc_CONSTANTS_H + +#include "rpc_types.h" + +namespace vagt { namespace transport { namespace rpc { + +class rpcConstants { + public: + rpcConstants(); + +}; + +extern const rpcConstants g_rpc_constants; + +}}} // namespace + +#endif diff --git a/veslibrary/ves_cpplibrary/src/lib/transport/gen-cpp/rpc_types.cpp b/veslibrary/ves_cpplibrary/src/lib/transport/gen-cpp/rpc_types.cpp new file mode 100755 index 0000000..02a866e --- /dev/null +++ b/veslibrary/ves_cpplibrary/src/lib/transport/gen-cpp/rpc_types.cpp @@ -0,0 +1,16 @@ +/** + * Autogenerated by Thrift Compiler (0.12.0) + * + * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING + * @generated + */ +#include "rpc_types.h" + +#include +#include + +#include + +namespace vagt { namespace transport { namespace rpc { + +}}} // namespace diff --git a/veslibrary/ves_cpplibrary/src/lib/transport/gen-cpp/rpc_types.h b/veslibrary/ves_cpplibrary/src/lib/transport/gen-cpp/rpc_types.h new file mode 100755 index 0000000..7ace455 --- /dev/null +++ b/veslibrary/ves_cpplibrary/src/lib/transport/gen-cpp/rpc_types.h @@ -0,0 +1,25 @@ +/** + * Autogenerated by Thrift Compiler (0.12.0) + * + * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING + * @generated + */ +#ifndef rpc_TYPES_H +#define rpc_TYPES_H + +#include + +#include +#include +#include +#include +#include + +#include + + +namespace vagt { namespace transport { namespace rpc { + +}}} // namespace + +#endif diff --git a/veslibrary/ves_cpplibrary/src/lib/transport/include/XQueue.h b/veslibrary/ves_cpplibrary/src/lib/transport/include/XQueue.h new file mode 100755 index 0000000..c2a08b6 --- /dev/null +++ b/veslibrary/ves_cpplibrary/src/lib/transport/include/XQueue.h @@ -0,0 +1,40 @@ +#pragma once + +#include +#include + +namespace vagt +{ + namespace queue + { + /*************************************************************************************************//** + * @brief Error codes + *****************************************************************************************************/ + enum XErrorCode : unsigned short + { + XErrorOk, /**< The operation is successful */ + XErrorNok, /**< General failure */ + XErrorFull, /**< The buffer is full */ + XErrorEmpty, /**< The buffer is empty */ + }; + + class XQueue + { + public: + virtual bool empty() = 0; + virtual XErrorCode push(const std::string& val) = 0; + virtual void pop() = 0; + virtual std::string front() = 0; + + /*************************************************************************************************//** + * Create a queue in memory with specified capacity. + *****************************************************************************************************/ + static std::shared_ptr create(int capacity); + + /*************************************************************************************************//** + * Create a queue on disk. + *****************************************************************************************************/ + static std::shared_ptr create(const std::string& path); + }; + } +} \ No newline at end of file diff --git a/veslibrary/ves_cpplibrary/src/lib/transport/include/XTransport.h b/veslibrary/ves_cpplibrary/src/lib/transport/include/XTransport.h new file mode 100755 index 0000000..9416cf4 --- /dev/null +++ b/veslibrary/ves_cpplibrary/src/lib/transport/include/XTransport.h @@ -0,0 +1,151 @@ +#pragma once +/*************************************************************************************************//** +@file XTransport.h +@brief Transport API, used to send string to collector +*****************************************************************************************************/ +#include +#include +#include +#include +#include +#include "XQueue.h" + +namespace vagt +{ + namespace transport + { + /*************************************************************************************************//** + * @brief Error codes + *****************************************************************************************************/ + enum XErrorCode: unsigned short + { + XErrorOk, /**< The operation is successful */ + XErrorNok, /**< General failure */ + XErrorTimeout, /**< Timeout */ + XErrorUnauthorized, /**< Unauthorized */ + XErrorCanceled, /**< The operation is canceled */ + XErrorClientError, /**< Client error, e.g. HTTP 404 */ + XErrorServerError, /**< Server error, e.g. HTTP 500 */ + XErrorNetworkError, /**< Network error */ + }; + + /*************************************************************************************************//** + * @brief Transport option + *****************************************************************************************************/ + class XTransportOption + { + public: + std::string host_; /**< Remote RPC server hostname or IP address*/ + int port_; /**< Remote RPC server port */ + std::chrono::milliseconds timeOut_; /**< Timeout */ + std::string url_; /**< URL of Collector */ + std::string sourceIp_; /**< Source IP Address */ + bool secure_; /**< Enable TLS or not */ + std::string caInfo_; /**< CA INFO for TLS */ + std::string caFilePath_; /**< CA pATH for TLS */ + std::string certFilePath_; /**< CERT for TLS */ + std::string keyFilePath_; /**< KEY for TLS */ + long verifyPeer_; /**< Refer to CURLOPT_SSL_VERIFYPEER */ + long verifyHost_; /**< Refer to CURLOPT_SSL_VERIFYHOST */ + std::string userName_; /**< User name */ + std::string userPasswd_; /**< Password */ + + XTransportOption():secure_(false),verifyPeer_(1),verifyHost_(2) + {} + }; + + /*************************************************************************************************//** + * @brief Transport interface + *****************************************************************************************************/ + class XTransport + { + public: + XTransport(); + + /*************************************************************************************************//** + * Start Transport and the decorated transports. + * + * @note This method will call allowPost(). + *****************************************************************************************************/ + virtual XErrorCode start() = 0; + + /*************************************************************************************************//** + * Stop Transport and the decorated transports. + * + * @note This method will call cancelPost(). + *****************************************************************************************************/ + virtual XErrorCode stop() = 0; + + /*************************************************************************************************//** + * Post event string. + *****************************************************************************************************/ + virtual XErrorCode post(const std::string& event) = 0; + + /*************************************************************************************************//** + * Cancel the running and subsequent post() until allowPost() is called. + * It is useful especially when transport is wrapped by SynchronizedTransport. + *****************************************************************************************************/ + virtual void cancelPost(); + + /*************************************************************************************************//** + * Allow the post() method. + *****************************************************************************************************/ + virtual void allowPost(); + + /*************************************************************************************************//** + * Check if the post() method will be canceled. + *****************************************************************************************************/ + virtual bool shouldCancelPost(); + + /*************************************************************************************************//** + * Create a libcurl based transport. + *****************************************************************************************************/ + static std::shared_ptr LibCurlTransport(const XTransportOption& option); + + /*************************************************************************************************//** + * Create a retry transport decorator. + * The retry will be performed if the decorated post() does NOT return + * XErrorOk, XErrorCanceled or XErrorClientError. + *****************************************************************************************************/ + static std::shared_ptr RetryTransport(std::shared_ptr transport, + std::chrono::milliseconds retryInterval, + int retryTimes); + + /*************************************************************************************************//** + * Create a switchable transport decorator. + * The transport will be switched if the decorated post() does NOT return + * XErrorOk, XErrorCanceled or XErrorClientError. + *****************************************************************************************************/ + static std::shared_ptr SwitchableTransport(std::vector>& transports); + + /*************************************************************************************************//** + * Create a synchronized transport decorator. + * The decorated transport will be thread safe. + *****************************************************************************************************/ + static std::shared_ptr SynchronizedTransport(std::shared_ptr transport); + + /*************************************************************************************************//** + * Create a bufferd(FIFO) transport decorator. + * The event string will be buffered first and then post via decorated transport. + *****************************************************************************************************/ + static std::shared_ptr BufferedTransport(std::shared_ptr transport, std::shared_ptr queue); + + /*************************************************************************************************//** + * Create a RPC client transport. + *****************************************************************************************************/ + static std::shared_ptr RpcClientTransport(const XTransportOption& option); + + /*************************************************************************************************//** + * Create a RPC server transport. + *****************************************************************************************************/ + static std::shared_ptr RpcServerTransport(std::shared_ptr transport, const XTransportOption& option); + + protected: + std::atomic cancel_; + + private: + XTransport(const XTransport&) = delete; + XTransport& operator=(const XTransport&) = delete; + }; + } +} diff --git a/veslibrary/ves_cpplibrary/src/lib/transport/rpc.idl b/veslibrary/ves_cpplibrary/src/lib/transport/rpc.idl new file mode 100755 index 0000000..de4979b --- /dev/null +++ b/veslibrary/ves_cpplibrary/src/lib/transport/rpc.idl @@ -0,0 +1,6 @@ +namespace cpp vagt.transport.rpc + +service XRpcTransport +{ + i16 post(1: string data) +} \ No newline at end of file -- cgit 1.2.3-korg