From 8e070076fc4cadea4905af5fd5b6abed0d0d5aa7 Mon Sep 17 00:00:00 2001 From: mutouyun Date: Fri, 4 Feb 2022 16:44:44 +0800 Subject: [PATCH 001/272] =?UTF-8?q?=E6=B6=88=E9=99=A4vs=E7=BC=96=E8=AF=91?= =?UTF-8?q?=E6=97=B6=E7=9A=84warning?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/libipc/waiter.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/libipc/waiter.h b/src/libipc/waiter.h index 2e13dc39..ce4e5924 100644 --- a/src/libipc/waiter.h +++ b/src/libipc/waiter.h @@ -62,12 +62,12 @@ class waiter { } bool notify() noexcept { - std::lock_guard{lock_}; // barrier + { IPC_UNUSED_ std::lock_guard guard{lock_}; } // barrier return cond_.notify(lock_); } bool broadcast() noexcept { - std::lock_guard{lock_}; // barrier + { IPC_UNUSED_ std::lock_guard guard{lock_}; } // barrier return cond_.broadcast(lock_); } From a2e5cc78047670201e5e24571c72a7b31136c8ba Mon Sep 17 00:00:00 2001 From: mutouyun Date: Fri, 4 Feb 2022 17:48:24 +0800 Subject: [PATCH 002/272] =?UTF-8?q?=E4=BF=AE=E6=AD=A3=E5=92=8C=E6=94=B9?= =?UTF-8?q?=E8=BF=9Bposix=20errno=E6=89=93=E5=8D=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- include/libipc/ipc.h | 2 +- src/libipc/platform/posix/get_error.h | 14 ++++++++++++++ src/libipc/platform/posix/semaphore_impl.h | 15 ++++++++------- src/libipc/platform/posix/shm_posix.cpp | 19 ++++++++++--------- src/libipc/waiter.h | 4 ++-- 5 files changed, 35 insertions(+), 19 deletions(-) create mode 100644 src/libipc/platform/posix/get_error.h diff --git a/include/libipc/ipc.h b/include/libipc/ipc.h index 64b262c9..ceb26eac 100755 --- a/include/libipc/ipc.h +++ b/include/libipc/ipc.h @@ -89,7 +89,7 @@ class chan_wrapper { } chan_wrapper clone() const { - return chan_wrapper { name(), mode_ }; + return chan_wrapper {name(), mode_}; } /** diff --git a/src/libipc/platform/posix/get_error.h b/src/libipc/platform/posix/get_error.h new file mode 100644 index 00000000..c552f226 --- /dev/null +++ b/src/libipc/platform/posix/get_error.h @@ -0,0 +1,14 @@ +#pragma once + +#include +#include + +namespace ipc { +namespace detail { + +inline char const *curr_error() noexcept { + return ::strerror(errno); +} + +} // namespace detail +} // namespace ipc diff --git a/src/libipc/platform/posix/semaphore_impl.h b/src/libipc/platform/posix/semaphore_impl.h index d48bcd4b..94a5350a 100644 --- a/src/libipc/platform/posix/semaphore_impl.h +++ b/src/libipc/platform/posix/semaphore_impl.h @@ -11,6 +11,7 @@ #include "libipc/shm.h" #include "get_wait_time.h" +#include "get_error.h" namespace ipc { namespace detail { @@ -40,7 +41,7 @@ class semaphore { } h_ = ::sem_open(name, O_CREAT, 0666, static_cast(count)); if (h_ == SEM_FAILED) { - ipc::error("fail sem_open[%d]: %s\n", errno, name); + ipc::error("fail sem_open[%s]: name = %s\n", curr_error(), name); return false; } return true; @@ -49,14 +50,14 @@ class semaphore { void close() noexcept { if (!valid()) return; if (::sem_close(h_) != 0) { - ipc::error("fail sem_close[%d]: %s\n", errno); + ipc::error("fail sem_close[%s]\n", curr_error()); } h_ = SEM_FAILED; if (shm_.name() != nullptr) { std::string name = shm_.name(); if (shm_.release() <= 1) { if (::sem_unlink(name.c_str()) != 0) { - ipc::error("fail sem_unlink[%d]: %s, name: %s\n", errno, name.c_str()); + ipc::error("fail sem_unlink[%s]: name = %s\n", curr_error(), name.c_str()); } } } @@ -66,15 +67,15 @@ class semaphore { if (!valid()) return false; if (tm == invalid_value) { if (::sem_wait(h_) != 0) { - ipc::error("fail sem_wait[%d]: %s\n", errno); + ipc::error("fail sem_wait[%s]\n", curr_error()); return false; } } else { auto ts = detail::make_timespec(tm); if (::sem_timedwait(h_, &ts) != 0) { if (errno != ETIMEDOUT) { - ipc::error("fail sem_timedwait[%d]: tm = %zd, tv_sec = %ld, tv_nsec = %ld\n", - errno, tm, ts.tv_sec, ts.tv_nsec); + ipc::error("fail sem_timedwait[%s]: tm = %zd, tv_sec = %ld, tv_nsec = %ld\n", + curr_error(), tm, ts.tv_sec, ts.tv_nsec); } return false; } @@ -86,7 +87,7 @@ class semaphore { if (!valid()) return false; for (std::uint32_t i = 0; i < count; ++i) { if (::sem_post(h_) != 0) { - ipc::error("fail sem_post[%d]: %s\n", errno); + ipc::error("fail sem_post[%s]\n", curr_error()); return false; } } diff --git a/src/libipc/platform/posix/shm_posix.cpp b/src/libipc/platform/posix/shm_posix.cpp index 7f70b070..7ed98c87 100644 --- a/src/libipc/platform/posix/shm_posix.cpp +++ b/src/libipc/platform/posix/shm_posix.cpp @@ -4,7 +4,6 @@ #include #include #include -#include #include #include @@ -18,6 +17,8 @@ #include "libipc/utility/log.h" #include "libipc/memory/resource.h" +#include "get_error.h" + namespace { struct info_t { @@ -70,7 +71,7 @@ id_t acquire(char const * name, std::size_t size, unsigned mode) { S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH); if (fd == -1) { - ipc::error("fail shm_open[%d]: %s\n", errno, name); + ipc::error("fail shm_open[%s]: name = %s\n", curr_error(), name); return nullptr; } auto ii = mem::alloc(); @@ -119,12 +120,12 @@ void * get_mem(id_t id, std::size_t * size) { ipc::error("fail get_mem: invalid id (fd = -1)\n"); return nullptr; } + struct stat st; + if (::fstat(fd, &st) != 0) { + ipc::error("fail fstat[%s]: name = %s, size = %zd\n", curr_error(), ii->name_.c_str(), ii->size_); + return nullptr; + } if (ii->size_ == 0) { - struct stat st; - if (::fstat(fd, &st) != 0) { - ipc::error("fail fstat[%d]: %s, size = %zd\n", errno, ii->name_.c_str(), ii->size_); - return nullptr; - } ii->size_ = static_cast(st.st_size); if ((ii->size_ <= sizeof(info_t)) || (ii->size_ % sizeof(info_t))) { ipc::error("fail get_mem: %s, invalid size = %zd\n", ii->name_.c_str(), ii->size_); @@ -134,13 +135,13 @@ void * get_mem(id_t id, std::size_t * size) { else { ii->size_ = calc_size(ii->size_); if (::ftruncate(fd, static_cast(ii->size_)) != 0) { - ipc::error("fail ftruncate[%d]: %s, size = %zd\n", errno, ii->name_.c_str(), ii->size_); + ipc::error("fail ftruncate[%s]: name = %s, size = %zd\n", curr_error(), ii->name_.c_str(), ii->size_); return nullptr; } } void* mem = ::mmap(nullptr, ii->size_, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); if (mem == MAP_FAILED) { - ipc::error("fail mmap[%d]: %s, size = %zd\n", errno, ii->name_.c_str(), ii->size_); + ipc::error("fail mmap[%s]: name = %s, size = %zd\n", curr_error(), ii->name_.c_str(), ii->size_); return nullptr; } ::close(fd); diff --git a/src/libipc/waiter.h b/src/libipc/waiter.h index ce4e5924..130bc984 100644 --- a/src/libipc/waiter.h +++ b/src/libipc/waiter.h @@ -62,12 +62,12 @@ class waiter { } bool notify() noexcept { - { IPC_UNUSED_ std::lock_guard guard{lock_}; } // barrier + { IPC_UNUSED_ std::lock_guard guard {lock_}; } // barrier return cond_.notify(lock_); } bool broadcast() noexcept { - { IPC_UNUSED_ std::lock_guard guard{lock_}; } // barrier + { IPC_UNUSED_ std::lock_guard guard {lock_}; } // barrier return cond_.broadcast(lock_); } From 912c1bfc64435863653f5b2173a0a24bda9c7589 Mon Sep 17 00:00:00 2001 From: mutouyun Date: Fri, 4 Feb 2022 18:08:17 +0800 Subject: [PATCH 003/272] =?UTF-8?q?=E8=BF=98=E5=8E=9Fposix=20fstat?= =?UTF-8?q?=E4=BB=A3=E7=A0=81=E6=AE=B5=E7=9A=84=E4=BD=8D=E7=BD=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/libipc/platform/posix/shm_posix.cpp | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/src/libipc/platform/posix/shm_posix.cpp b/src/libipc/platform/posix/shm_posix.cpp index 7ed98c87..9bf09ca0 100644 --- a/src/libipc/platform/posix/shm_posix.cpp +++ b/src/libipc/platform/posix/shm_posix.cpp @@ -120,19 +120,18 @@ void * get_mem(id_t id, std::size_t * size) { ipc::error("fail get_mem: invalid id (fd = -1)\n"); return nullptr; } - struct stat st; - if (::fstat(fd, &st) != 0) { - ipc::error("fail fstat[%s]: name = %s, size = %zd\n", curr_error(), ii->name_.c_str(), ii->size_); - return nullptr; - } if (ii->size_ == 0) { + struct stat st; + if (::fstat(fd, &st) != 0) { + ipc::error("fail fstat[%s]: name = %s, size = %zd\n", curr_error(), ii->name_.c_str(), ii->size_); + return nullptr; + } ii->size_ = static_cast(st.st_size); if ((ii->size_ <= sizeof(info_t)) || (ii->size_ % sizeof(info_t))) { ipc::error("fail get_mem: %s, invalid size = %zd\n", ii->name_.c_str(), ii->size_); return nullptr; } - } - else { + } else { ii->size_ = calc_size(ii->size_); if (::ftruncate(fd, static_cast(ii->size_)) != 0) { ipc::error("fail ftruncate[%s]: name = %s, size = %zd\n", curr_error(), ii->name_.c_str(), ii->size_); From 20168fb8693d37f62900bbd8981fe8fb09d84dd4 Mon Sep 17 00:00:00 2001 From: mutouyun Date: Sun, 27 Feb 2022 13:47:25 +0800 Subject: [PATCH 004/272] =?UTF-8?q?=E5=B0=9D=E8=AF=95=E5=8E=BB=E9=99=A4?= =?UTF-8?q?=E6=81=B6=E5=BF=83=E7=9A=84=E8=BF=9E=E6=8E=A5=E6=A3=80=E6=B5=8B?= =?UTF-8?q?=EF=BC=88TBD=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- include/libipc/ipc.h | 19 --- src/libipc/circ/elem_array.h | 17 +- src/libipc/circ/elem_def.h | 34 +--- src/libipc/ipc.cpp | 26 --- src/libipc/prod_cons.h | 316 +++++++++++------------------------ src/libipc/queue.h | 30 +--- test/test_ipc.cpp | 2 - test/test_queue.cpp | 25 ++- 8 files changed, 122 insertions(+), 347 deletions(-) diff --git a/include/libipc/ipc.h b/include/libipc/ipc.h index ceb26eac..0b4459f6 100755 --- a/include/libipc/ipc.h +++ b/include/libipc/ipc.h @@ -31,8 +31,6 @@ struct IPC_EXPORT chan_impl { static bool send(ipc::handle_t h, void const * data, std::size_t size, std::uint64_t tm); static buff_t recv(ipc::handle_t h, std::uint64_t tm); - - static bool try_send(ipc::handle_t h, void const * data, std::size_t size, std::uint64_t tm); static buff_t try_recv(ipc::handle_t h); }; @@ -128,9 +126,6 @@ class chan_wrapper { return chan_wrapper(name).wait_for_recv(r_count, tm); } - /** - * If timeout, this function would call 'force_push' to send the data forcibly. - */ bool send(void const * data, std::size_t size, std::uint64_t tm = default_timeout) { return detail_t::send(h_, data, size, tm); } @@ -141,23 +136,9 @@ class chan_wrapper { return this->send(str.c_str(), str.size() + 1, tm); } - /** - * If timeout, this function would just return false. - */ - bool try_send(void const * data, std::size_t size, std::uint64_t tm = default_timeout) { - return detail_t::try_send(h_, data, size, tm); - } - bool try_send(buff_t const & buff, std::uint64_t tm = default_timeout) { - return this->try_send(buff.data(), buff.size(), tm); - } - bool try_send(std::string const & str, std::uint64_t tm = default_timeout) { - return this->try_send(str.c_str(), str.size() + 1, tm); - } - buff_t recv(std::uint64_t tm = invalid_value) { return detail_t::recv(h_, tm); } - buff_t try_recv() { return detail_t::try_recv(h_); } diff --git a/src/libipc/circ/elem_array.h b/src/libipc/circ/elem_array.h index 0b21f486..5febcba3 100755 --- a/src/libipc/circ/elem_array.h +++ b/src/libipc/circ/elem_array.h @@ -120,20 +120,15 @@ class elem_array : public ipc::circ::conn_head { return head_.cursor(); } - template - bool push(Q* que, F&& f) { - return head_.push(que, std::forward(f), block_); + template + bool push(F&& f) { + return head_.push(std::forward(f), block_); } - template - bool force_push(Q* que, F&& f) { - return head_.force_push(que, std::forward(f), block_); - } - - template - bool pop(Q* que, cursor_t* cur, F&& f, R&& out) { + template + bool pop(cursor_t* cur, F&& f) { if (cur == nullptr) return false; - return head_.pop(que, *cur, std::forward(f), std::forward(out), block_); + return head_.pop(*cur, std::forward(f), block_); } }; diff --git a/src/libipc/circ/elem_def.h b/src/libipc/circ/elem_def.h index 40039480..e14ef01a 100755 --- a/src/libipc/circ/elem_def.h +++ b/src/libipc/circ/elem_def.h @@ -50,40 +50,8 @@ class conn_head_base { } }; -template ::is_broadcast> -class conn_head; - -template -class conn_head : public conn_head_base { -public: - cc_t connect() noexcept { - for (unsigned k = 0;; ipc::yield(k)) { - cc_t curr = this->cc_.load(std::memory_order_acquire); - cc_t next = curr | (curr + 1); // find the first 0, and set it to 1. - if (next == 0) { - // connection-slot is full. - return 0; - } - if (this->cc_.compare_exchange_weak(curr, next, std::memory_order_release)) { - return next ^ curr; // return connected id - } - } - } - - cc_t disconnect(cc_t cc_id) noexcept { - return this->cc_.fetch_and(~cc_id, std::memory_order_acq_rel) & ~cc_id; - } - - std::size_t conn_count(std::memory_order order = std::memory_order_acquire) const noexcept { - cc_t cur = this->cc_.load(order); - cc_t cnt; // accumulates the total bits set in cc - for (cnt = 0; cur; ++cnt) cur &= cur - 1; - return cnt; - } -}; - template -class conn_head : public conn_head_base { +class conn_head : public conn_head_base { public: cc_t connect() noexcept { return this->cc_.fetch_add(1, std::memory_order_relaxed) + 1; diff --git a/src/libipc/ipc.cpp b/src/libipc/ipc.cpp index c864a1b1..58d7979f 100755 --- a/src/libipc/ipc.cpp +++ b/src/libipc/ipc.cpp @@ -484,27 +484,6 @@ static bool send(F&& gen_push, ipc::handle_t h, void const * data, std::size_t s } static bool send(ipc::handle_t h, void const * data, std::size_t size, std::uint64_t tm) { - return send([tm](auto info, auto que, auto msg_id) { - return [tm, info, que, msg_id](std::int32_t remain, void const * data, std::size_t size) { - if (!wait_for(info->wt_waiter_, [&] { - return !que->push( - [](void*) { return true; }, - info->cc_id_, msg_id, remain, data, size); - }, tm)) { - ipc::log("force_push: msg_id = %zd, remain = %d, size = %zd\n", msg_id, remain, size); - if (!que->force_push( - clear_message, - info->cc_id_, msg_id, remain, data, size)) { - return false; - } - } - info->rd_waiter_.broadcast(); - return true; - }; - }, h, data, size); -} - -static bool try_send(ipc::handle_t h, void const * data, std::size_t size, std::uint64_t tm) { return send([tm](auto info, auto que, auto msg_id) { return [tm, info, que, msg_id](std::int32_t remain, void const * data, std::size_t size) { if (!wait_for(info->wt_waiter_, [&] { @@ -676,11 +655,6 @@ buff_t chan_impl::recv(ipc::handle_t h, std::uint64_t tm) { return detail_impl>::recv(h, tm); } -template -bool chan_impl::try_send(ipc::handle_t h, void const * data, std::size_t size, std::uint64_t tm) { - return detail_impl>::try_send(h, data, size, tm); -} - template buff_t chan_impl::try_recv(ipc::handle_t h) { return detail_impl>::try_recv(h); diff --git a/src/libipc/prod_cons.h b/src/libipc/prod_cons.h index 28d99bda..1763209e 100755 --- a/src/libipc/prod_cons.h +++ b/src/libipc/prod_cons.h @@ -37,8 +37,8 @@ struct prod_cons_impl> { return 0; } - template - bool push(W* /*wrapper*/, F&& f, E* elems) { + template + bool push(F&& f, E* elems) { auto cur_wt = circ::index_of(wt_.load(std::memory_order_relaxed)); if (cur_wt == circ::index_of(rd_.load(std::memory_order_acquire) - 1)) { return false; // full @@ -48,24 +48,13 @@ struct prod_cons_impl> { return true; } - /** - * In single-single-unicast, 'force_push' means 'no reader' or 'the only one reader is dead'. - * So we could just disconnect all connections of receiver, and return false. - */ - template - bool force_push(W* wrapper, F&&, E*) { - wrapper->elems()->disconnect_receiver(~static_cast(0u)); - return false; - } - - template - bool pop(W* /*wrapper*/, circ::u2_t& /*cur*/, F&& f, R&& out, E* elems) { + template + bool pop(circ::u2_t& /*cur*/, F&& f, E* elems) { auto cur_rd = circ::index_of(rd_.load(std::memory_order_relaxed)); if (cur_rd == circ::index_of(wt_.load(std::memory_order_acquire))) { return false; // empty } std::forward(f)(&(elems[cur_rd].data_)); - std::forward(out)(true); rd_.fetch_add(1, std::memory_order_release); return true; } @@ -75,26 +64,17 @@ template <> struct prod_cons_impl> : prod_cons_impl> { - template - bool force_push(W* wrapper, F&&, E*) { - wrapper->elems()->disconnect_receiver(1); - return false; - } - - template class E, std::size_t DS, std::size_t AS> - bool pop(W* /*wrapper*/, circ::u2_t& /*cur*/, F&& f, R&& out, E* elems) { - byte_t buff[DS]; + bool pop(circ::u2_t& /*cur*/, F&& f, E* elems) { for (unsigned k = 0;;) { auto cur_rd = rd_.load(std::memory_order_relaxed); if (circ::index_of(cur_rd) == circ::index_of(wt_.load(std::memory_order_acquire))) { return false; // empty } - std::memcpy(buff, &(elems[circ::index_of(cur_rd)].data_), sizeof(buff)); if (rd_.compare_exchange_weak(cur_rd, cur_rd + 1, std::memory_order_release)) { - std::forward(f)(buff); - std::forward(out)(true); + std::forward(f)(&(elems[circ::index_of(cur_rd)].data_)); return true; } ipc::yield(k); @@ -116,8 +96,8 @@ struct prod_cons_impl> alignas(cache_line_size) std::atomic ct_; // commit index - template - bool push(W* /*wrapper*/, F&& f, E* elems) { + template + bool push(F&& f, E* elems) { circ::u2_t cur_ct, nxt_ct; for (unsigned k = 0;;) { cur_ct = ct_.load(std::memory_order_relaxed); @@ -153,16 +133,9 @@ struct prod_cons_impl> return true; } - template - bool force_push(W* wrapper, F&&, E*) { - wrapper->elems()->disconnect_receiver(1); - return false; - } - - template class E, std::size_t DS, std::size_t AS> - bool pop(W* /*wrapper*/, circ::u2_t& /*cur*/, F&& f, R&& out, E* elems) { - byte_t buff[DS]; + bool pop(circ::u2_t& /*cur*/, F&& f, E* elems) { for (unsigned k = 0;;) { auto cur_rd = rd_.load(std::memory_order_relaxed); auto cur_wt = wt_.load(std::memory_order_acquire); @@ -179,15 +152,11 @@ struct prod_cons_impl> } k = 0; } - else { - std::memcpy(buff, &(elems[circ::index_of(cur_rd)].data_), sizeof(buff)); - if (rd_.compare_exchange_weak(cur_rd, cur_rd + 1, std::memory_order_release)) { - std::forward(f)(buff); - std::forward(out)(true); - return true; - } - ipc::yield(k); + else if (rd_.compare_exchange_weak(cur_rd, cur_rd + 1, std::memory_order_release)) { + std::forward(f)(&(elems[id_rd].data_)); + return true; } + ipc::yield(k); } } }; @@ -195,234 +164,147 @@ struct prod_cons_impl> template <> struct prod_cons_impl> { - using rc_t = std::uint64_t; - - enum : rc_t { - ep_mask = 0x00000000ffffffffull, - ep_incr = 0x0000000100000000ull - }; + using flag_t = std::uint64_t; template struct elem_t { std::aligned_storage_t data_ {}; - std::atomic rc_ { 0 }; // read-counter + std::atomic f_rc_ { 0 }; // read-flag }; alignas(cache_line_size) std::atomic wt_; // write index - alignas(cache_line_size) rc_t epoch_ { 0 }; // only one writer circ::u2_t cursor() const noexcept { return wt_.load(std::memory_order_acquire); } - template - bool push(W* wrapper, F&& f, E* elems) { - E* el; - for (unsigned k = 0;;) { - circ::cc_t cc = wrapper->elems()->connections(std::memory_order_relaxed); - if (cc == 0) return false; // no reader - el = elems + circ::index_of(wt_.load(std::memory_order_relaxed)); - // check all consumers have finished reading this element - auto cur_rc = el->rc_.load(std::memory_order_acquire); - circ::cc_t rem_cc = cur_rc & ep_mask; - if ((cc & rem_cc) && ((cur_rc & ~ep_mask) == epoch_)) { - return false; // has not finished yet - } - // consider rem_cc to be 0 here - if (el->rc_.compare_exchange_weak( - cur_rc, epoch_ | static_cast(cc), std::memory_order_release)) { - break; - } - ipc::yield(k); + template + bool push(F&& f, E* elems) { + E* el = elems + circ::index_of(wt_.load(std::memory_order_relaxed)); + auto cur_rc = el->f_rc_.exchange(~0ull, std::memory_order_acq_rel); + // check for consumers to read this element + if (cur_rc != 0) { + return false; // full } std::forward(f)(&(el->data_)); wt_.fetch_add(1, std::memory_order_release); return true; } - template - bool force_push(W* wrapper, F&& f, E* elems) { - E* el; - epoch_ += ep_incr; - for (unsigned k = 0;;) { - circ::cc_t cc = wrapper->elems()->connections(std::memory_order_relaxed); - if (cc == 0) return false; // no reader - el = elems + circ::index_of(wt_.load(std::memory_order_relaxed)); - // check all consumers have finished reading this element - auto cur_rc = el->rc_.load(std::memory_order_acquire); - circ::cc_t rem_cc = cur_rc & ep_mask; - if (cc & rem_cc) { - ipc::log("force_push: k = %u, cc = %u, rem_cc = %u\n", k, cc, rem_cc); - cc = wrapper->elems()->disconnect_receiver(rem_cc); // disconnect all invalid readers - if (cc == 0) return false; // no reader - } - // just compare & exchange - if (el->rc_.compare_exchange_weak( - cur_rc, epoch_ | static_cast(cc), std::memory_order_release)) { - break; - } - ipc::yield(k); - } + template + bool pop(circ::u2_t& cur, F&& f, E* elems) { + if (cur == cursor()) return false; // empty + E* el = elems + circ::index_of(cur++); std::forward(f)(&(el->data_)); - wt_.fetch_add(1, std::memory_order_release); + el->f_rc_.store(0, std::memory_order_release); return true; } - - template - bool pop(W* wrapper, circ::u2_t& cur, F&& f, R&& out, E* elems) { - if (cur == cursor()) return false; // acquire - auto* el = elems + circ::index_of(cur++); - std::forward(f)(&(el->data_)); - for (unsigned k = 0;;) { - auto cur_rc = el->rc_.load(std::memory_order_acquire); - if ((cur_rc & ep_mask) == 0) { - std::forward(out)(true); - return true; - } - auto nxt_rc = cur_rc & ~static_cast(wrapper->connected_id()); - if (el->rc_.compare_exchange_weak(cur_rc, nxt_rc, std::memory_order_release)) { - std::forward(out)((nxt_rc & ep_mask) == 0); - return true; - } - ipc::yield(k); - } - } }; template <> struct prod_cons_impl> { - using rc_t = std::uint64_t; using flag_t = std::uint64_t; - enum : rc_t { - rc_mask = 0x00000000ffffffffull, - ep_mask = 0x00ffffffffffffffull, - ep_incr = 0x0100000000000000ull, - ic_mask = 0xff000000ffffffffull, - ic_incr = 0x0000000100000000ull + enum : flag_t { + pushing = 1ull, + pushed = ~0ull, + popped = 0ull, }; template struct elem_t { std::aligned_storage_t data_ {}; - std::atomic rc_ { 0 }; // read-counter - std::atomic f_ct_ { 0 }; // commit flag + std::atomic f_rc_ { 0 }; // read-flag + std::atomic f_ct_ { 0 }; // commit-flag }; alignas(cache_line_size) std::atomic ct_; // commit index - alignas(cache_line_size) std::atomic epoch_ { 0 }; + alignas(cache_line_size) std::atomic wt_; // write index circ::u2_t cursor() const noexcept { - return ct_.load(std::memory_order_acquire); - } - - constexpr static rc_t inc_rc(rc_t rc) noexcept { - return (rc & ic_mask) | ((rc + ic_incr) & ~ic_mask); - } - - constexpr static rc_t inc_mask(rc_t rc) noexcept { - return inc_rc(rc) & ~rc_mask; + return wt_.load(std::memory_order_acquire); } - template - bool push(W* wrapper, F&& f, E* elems) { + template + bool push(F&& f, E* elems) { E* el; - circ::u2_t cur_ct; - rc_t epoch = epoch_.load(std::memory_order_acquire); + circ::u2_t cur_ct, nxt_ct; for (unsigned k = 0;;) { - circ::cc_t cc = wrapper->elems()->connections(std::memory_order_relaxed); - if (cc == 0) return false; // no reader - el = elems + circ::index_of(cur_ct = ct_.load(std::memory_order_relaxed)); - // check all consumers have finished reading this element - auto cur_rc = el->rc_.load(std::memory_order_relaxed); - circ::cc_t rem_cc = cur_rc & rc_mask; - if ((cc & rem_cc) && ((cur_rc & ~ep_mask) == epoch)) { - return false; // has not finished yet - } - else if (!rem_cc) { - auto cur_fl = el->f_ct_.load(std::memory_order_acquire); - if ((cur_fl != cur_ct) && cur_fl) { - return false; // full + auto cac_ct = ct_.load(std::memory_order_relaxed); + cur_ct = cac_ct; + nxt_ct = cur_ct + 1; + el = elems + circ::index_of(cac_ct); + for (unsigned k = 0;;) { + auto cur_rc = el->f_rc_.load(std::memory_order_acquire); + switch (cur_rc) { + // helper + case pushing: + ct_.compare_exchange_strong(cac_ct, nxt_ct, std::memory_order_release); + goto try_next; + // full + case pushed: + return false; + // writable + default: + break; } + if (el->f_rc_.compare_exchange_weak(cur_rc, pushing, std::memory_order_release)) { + break; + } + ipc::yield(k); } - // consider rem_cc to be 0 here - if (el->rc_.compare_exchange_weak( - cur_rc, inc_mask(epoch | (cur_rc & ep_mask)) | static_cast(cc), std::memory_order_relaxed) && - epoch_.compare_exchange_weak(epoch, epoch, std::memory_order_acq_rel)) { - break; - } + ct_.compare_exchange_strong(cac_ct, nxt_ct, std::memory_order_relaxed); + el->f_rc_.store(pushed, std::memory_order_relaxed); + std::atomic_thread_fence(std::memory_order_release); + break; + try_next: ipc::yield(k); } - // only one thread/process would touch here at one time - ct_.store(cur_ct + 1, std::memory_order_release); std::forward(f)(&(el->data_)); // set flag & try update wt el->f_ct_.store(~static_cast(cur_ct), std::memory_order_release); - return true; - } - - template - bool force_push(W* wrapper, F&& f, E* elems) { - E* el; - circ::u2_t cur_ct; - rc_t epoch = epoch_.fetch_add(ep_incr, std::memory_order_release) + ep_incr; - for (unsigned k = 0;;) { - circ::cc_t cc = wrapper->elems()->connections(std::memory_order_relaxed); - if (cc == 0) return false; // no reader - el = elems + circ::index_of(cur_ct = ct_.load(std::memory_order_relaxed)); - // check all consumers have finished reading this element - auto cur_rc = el->rc_.load(std::memory_order_acquire); - circ::cc_t rem_cc = cur_rc & rc_mask; - if (cc & rem_cc) { - ipc::log("force_push: k = %u, cc = %u, rem_cc = %u\n", k, cc, rem_cc); - cc = wrapper->elems()->disconnect_receiver(rem_cc); // disconnect all invalid readers - if (cc == 0) return false; // no reader + while (1) { + auto cac_ct = el->f_ct_.load(std::memory_order_acquire); + if (cur_ct != wt_.load(std::memory_order_relaxed)) { + return true; } - // just compare & exchange - if (el->rc_.compare_exchange_weak( - cur_rc, inc_mask(epoch | (cur_rc & ep_mask)) | static_cast(cc), std::memory_order_relaxed)) { - if (epoch == epoch_.load(std::memory_order_acquire)) { - break; - } - else if (push(wrapper, std::forward(f), elems)) { - return true; - } - epoch = epoch_.fetch_add(ep_incr, std::memory_order_release) + ep_incr; + if ((~cac_ct) != cur_ct) { + return true; } - ipc::yield(k); + if (!el->f_ct_.compare_exchange_strong(cac_ct, 0, std::memory_order_relaxed)) { + return true; + } + wt_.store(nxt_ct, std::memory_order_release); + cur_ct = nxt_ct; + nxt_ct = cur_ct + 1; + el = elems + circ::index_of(cur_ct); } - // only one thread/process would touch here at one time - ct_.store(cur_ct + 1, std::memory_order_release); - std::forward(f)(&(el->data_)); - // set flag & try update wt - el->f_ct_.store(~static_cast(cur_ct), std::memory_order_release); return true; } - template - bool pop(W* wrapper, circ::u2_t& cur, F&& f, R&& out, E(& elems)[N]) { - auto* el = elems + circ::index_of(cur); - auto cur_fl = el->f_ct_.load(std::memory_order_acquire); - if (cur_fl != ~static_cast(cur)) { - return false; // empty - } - ++cur; - std::forward(f)(&(el->data_)); + template + bool pop(circ::u2_t& cur, F&& f, E(& elems)[N]) { for (unsigned k = 0;;) { - auto cur_rc = el->rc_.load(std::memory_order_acquire); - if ((cur_rc & rc_mask) == 0) { - std::forward(out)(true); - el->f_ct_.store(cur + N - 1, std::memory_order_release); - return true; - } - auto nxt_rc = inc_rc(cur_rc) & ~static_cast(wrapper->connected_id()); - bool last_one = false; - if ((last_one = (nxt_rc & rc_mask) == 0)) { - el->f_ct_.store(cur + N - 1, std::memory_order_release); + auto cur_wt = wt_.load(std::memory_order_acquire); + auto id_rd = circ::index_of(cur); + auto id_wt = circ::index_of(cur_wt); + if (id_rd == id_wt) { + auto* el = elems + id_wt; + auto cac_ct = el->f_ct_.load(std::memory_order_acquire); + if ((~cac_ct) != cur_wt) { + return false; // empty + } + if (el->f_ct_.compare_exchange_weak(cac_ct, 0, std::memory_order_relaxed)) { + wt_.store(cur_wt + 1, std::memory_order_release); + } + k = 0; } - if (el->rc_.compare_exchange_weak(cur_rc, nxt_rc, std::memory_order_release)) { - std::forward(out)(last_one); + else { + ++cur; + auto* el = elems + id_rd; + std::forward(f)(&(el->data_)); + el->f_rc_.store(popped, std::memory_order_release); return true; } ipc::yield(k); diff --git a/src/libipc/queue.h b/src/libipc/queue.h index f66860b8..5fe77a2e 100755 --- a/src/libipc/queue.h +++ b/src/libipc/queue.h @@ -158,27 +158,19 @@ class queue_base : public queue_conn { template bool push(F&& prep, P&&... params) { if (elems_ == nullptr) return false; - return elems_->push(this, [&](void* p) { + return elems_->push([&](void* p) { if (prep(p)) ::new (p) T(std::forward

(params)...); }); } - template - bool force_push(F&& prep, P&&... params) { - if (elems_ == nullptr) return false; - return elems_->force_push(this, [&](void* p) { - if (prep(p)) ::new (p) T(std::forward

(params)...); - }); - } - - template - bool pop(T& item, F&& out) { + template + bool pop(T& item) { if (elems_ == nullptr) { return false; } - return elems_->pop(this, &(this->cursor_), [&item](void* p) { + return elems_->pop(&(this->cursor_), [&item](void* p) { ::new (&item) T(std::move(*static_cast(p))); - }, std::forward(out)); + }); } }; @@ -198,18 +190,8 @@ class queue final : public detail::queue_base(std::forward

(params)...); } - template - bool force_push(P&&... params) { - return base_t::template force_push(std::forward

(params)...); - } - bool pop(T& item) { - return base_t::pop(item, [](bool) {}); - } - - template - bool pop(T& item, F&& out) { - return base_t::pop(item, std::forward(out)); + return base_t::pop(item); } }; diff --git a/test/test_ipc.cpp b/test/test_ipc.cpp index a1f01083..695e6743 100755 --- a/test/test_ipc.cpp +++ b/test/test_ipc.cpp @@ -73,11 +73,9 @@ void test_basic(char const * name) { que_t que1 { name }; EXPECT_FALSE(que1.send(test1)); - EXPECT_FALSE(que1.try_send(test2)); que_t que2 { que1.name(), ipc::receiver }; ASSERT_TRUE(que1.send(test1)); - ASSERT_TRUE(que1.try_send(test2)); EXPECT_EQ(que2.recv(), test1); EXPECT_EQ(que2.recv(), test2); diff --git a/test/test_queue.cpp b/test/test_queue.cpp index e85d39fa..1d2c7321 100755 --- a/test/test_queue.cpp +++ b/test/test_queue.cpp @@ -5,7 +5,6 @@ #include #include #include -#include // CHAR_BIT #include "libipc/prod_cons.h" #include "libipc/policy.h" @@ -143,6 +142,8 @@ TEST(Queue, el_connection) { for (std::size_t i = 0; i < 10000; ++i) { ASSERT_TRUE(el.connect_sender()); } + el.disconnect_sender(); + EXPECT_TRUE(el.connect_sender()); } { elems_t el; @@ -156,12 +157,13 @@ TEST(Queue, el_connection) { } { elems_t el; - for (std::size_t i = 0; i < (sizeof(ipc::circ::cc_t) * CHAR_BIT); ++i) { - ASSERT_NE(el.connect_receiver(), 0); - } + auto cc = el.connect_receiver(); + EXPECT_EQ(cc, 1); for (std::size_t i = 0; i < 10000; ++i) { - ASSERT_EQ(el.connect_receiver(), 0); + ASSERT_NE(el.connect_receiver(), 0); } + EXPECT_EQ(el.disconnect_receiver(cc), 10000); + EXPECT_EQ(el.connect_receiver(), 10000 + cc); } } @@ -227,25 +229,18 @@ TEST(Queue, connection) { for (std::size_t i = 0; i < 10000; ++i) { ASSERT_TRUE(que.connect()); } - for (std::size_t i = 1; i < (sizeof(ipc::circ::cc_t) * CHAR_BIT); ++i) { - queue_t que{&el}; - ASSERT_TRUE(que.connect()); - } for (std::size_t i = 0; i < 10000; ++i) { queue_t que{&el}; - ASSERT_FALSE(que.connect()); + ASSERT_TRUE(que.connect()); } ASSERT_TRUE(que.disconnect()); for (std::size_t i = 0; i < 10000; ++i) { ASSERT_FALSE(que.disconnect()); } - { - queue_t que{&el}; - ASSERT_TRUE(que.connect()); - } for (std::size_t i = 0; i < 10000; ++i) { queue_t que{&el}; - ASSERT_FALSE(que.connect()); + ASSERT_TRUE(que.connect()); + ASSERT_TRUE(que.disconnect()); } } } From f18c27ec29449840157ece7a9a57f5f817623933 Mon Sep 17 00:00:00 2001 From: mutouyun Date: Sun, 27 Feb 2022 17:13:28 +0800 Subject: [PATCH 005/272] =?UTF-8?q?=E5=BC=80=E5=A7=8B=E9=87=8D=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- CMakeLists.txt | 72 ++- include/libipc/buffer.h | 68 -- include/libipc/condition.h | 39 -- include/libipc/def.h | 69 +- include/libipc/detect_plat.h | 155 +++++ include/libipc/export.h | 70 +- include/libipc/ipc.h | 173 ----- include/libipc/mutex.h | 39 -- include/libipc/pool_alloc.h | 103 --- include/libipc/rw_lock.h | 171 ----- include/libipc/semaphore.h | 37 -- include/libipc/shm.h | 59 -- include/libipc/spin_lock.h | 179 ++++++ performance.xlsx | Bin 75641 -> 0 bytes src/CMakeLists.txt | 19 +- src/libipc/buffer.cpp | 87 --- src/libipc/circ/elem_array.h | 136 ---- src/libipc/circ/elem_def.h | 77 --- src/libipc/ipc.cpp | 669 +------------------- src/libipc/memory/alloc.h | 424 ------------- src/libipc/memory/allocator_wrapper.h | 121 ---- src/libipc/memory/resource.h | 90 --- src/libipc/memory/wrapper.h | 327 ---------- src/libipc/platform/detail.h | 136 ---- src/libipc/platform/linux/a0/LICENSE | 24 - src/libipc/platform/linux/a0/README.md | 213 ------- src/libipc/platform/linux/a0/atomic.h | 36 -- src/libipc/platform/linux/a0/clock.h | 60 -- src/libipc/platform/linux/a0/empty.h | 13 - src/libipc/platform/linux/a0/err.c | 50 -- src/libipc/platform/linux/a0/err.h | 33 - src/libipc/platform/linux/a0/err_macro.h | 52 -- src/libipc/platform/linux/a0/ftx.h | 111 ---- src/libipc/platform/linux/a0/inline.h | 8 - src/libipc/platform/linux/a0/mtx.c | 420 ------------ src/libipc/platform/linux/a0/mtx.h | 57 -- src/libipc/platform/linux/a0/strconv.c | 64 -- src/libipc/platform/linux/a0/strconv.h | 31 - src/libipc/platform/linux/a0/thread_local.h | 10 - src/libipc/platform/linux/a0/tid.c | 30 - src/libipc/platform/linux/a0/tid.h | 16 - src/libipc/platform/linux/a0/time.c | 124 ---- src/libipc/platform/linux/a0/time.h | 97 --- src/libipc/platform/linux/a0/unused.h | 7 - src/libipc/platform/linux/condition.h | 66 -- src/libipc/platform/linux/get_wait_time.h | 46 -- src/libipc/platform/linux/mutex.h | 204 ------ src/libipc/platform/linux/sync_obj_impl.h | 69 -- src/libipc/platform/platform.c | 13 - src/libipc/platform/platform.cpp | 9 - src/libipc/platform/posix/condition.h | 141 ----- src/libipc/platform/posix/get_error.h | 14 - src/libipc/platform/posix/get_wait_time.h | 39 -- src/libipc/platform/posix/mutex.h | 237 ------- src/libipc/platform/posix/semaphore_impl.h | 100 --- src/libipc/platform/posix/shm_posix.cpp | 197 ------ src/libipc/platform/win/condition.h | 119 ---- src/libipc/platform/win/get_sa.h | 35 - src/libipc/platform/win/mutex.h | 96 --- src/libipc/platform/win/semaphore.h | 74 --- src/libipc/platform/win/shm_win.cpp | 135 ---- src/libipc/platform/win/to_tchar.h | 74 --- src/libipc/policy.h | 25 - src/libipc/pool_alloc.cpp | 17 - src/libipc/prod_cons.h | 315 --------- src/libipc/queue.h | 198 ------ src/libipc/shm.cpp | 103 --- src/libipc/sync/condition.cpp | 72 --- src/libipc/sync/mutex.cpp | 72 --- src/libipc/sync/semaphore.cpp | 66 -- src/libipc/utility/concept.h | 29 - src/libipc/utility/id_pool.h | 103 --- src/libipc/utility/log.h | 39 -- src/libipc/utility/pimpl.h | 64 -- src/libipc/utility/scope_guard.h | 64 -- src/libipc/utility/utility.h | 64 -- src/libipc/waiter.h | 81 --- test/CMakeLists.txt | 15 +- test/test.h | 86 --- test/test_detect_plat.cpp | 79 +++ test/test_ipc.cpp | 183 ------ test/test_mem.cpp | 218 ------- test/test_platform.cpp | 31 - test/test_queue.cpp | 299 --------- test/test_shm.cpp | 102 --- test/test_sync.cpp | 208 ------ test/test_thread_utility.cpp | 203 ------ test/test_waiter.cpp | 68 -- test/thread_pool.h | 123 ---- 89 files changed, 503 insertions(+), 8868 deletions(-) delete mode 100755 include/libipc/buffer.h delete mode 100644 include/libipc/condition.h create mode 100644 include/libipc/detect_plat.h delete mode 100755 include/libipc/ipc.h delete mode 100644 include/libipc/mutex.h delete mode 100755 include/libipc/pool_alloc.h delete mode 100755 include/libipc/rw_lock.h delete mode 100644 include/libipc/semaphore.h delete mode 100755 include/libipc/shm.h create mode 100644 include/libipc/spin_lock.h delete mode 100644 performance.xlsx delete mode 100755 src/libipc/buffer.cpp delete mode 100755 src/libipc/circ/elem_array.h delete mode 100755 src/libipc/circ/elem_def.h delete mode 100755 src/libipc/memory/alloc.h delete mode 100755 src/libipc/memory/allocator_wrapper.h delete mode 100755 src/libipc/memory/resource.h delete mode 100755 src/libipc/memory/wrapper.h delete mode 100755 src/libipc/platform/detail.h delete mode 100644 src/libipc/platform/linux/a0/LICENSE delete mode 100644 src/libipc/platform/linux/a0/README.md delete mode 100644 src/libipc/platform/linux/a0/atomic.h delete mode 100644 src/libipc/platform/linux/a0/clock.h delete mode 100644 src/libipc/platform/linux/a0/empty.h delete mode 100644 src/libipc/platform/linux/a0/err.c delete mode 100644 src/libipc/platform/linux/a0/err.h delete mode 100644 src/libipc/platform/linux/a0/err_macro.h delete mode 100644 src/libipc/platform/linux/a0/ftx.h delete mode 100644 src/libipc/platform/linux/a0/inline.h delete mode 100644 src/libipc/platform/linux/a0/mtx.c delete mode 100644 src/libipc/platform/linux/a0/mtx.h delete mode 100644 src/libipc/platform/linux/a0/strconv.c delete mode 100644 src/libipc/platform/linux/a0/strconv.h delete mode 100644 src/libipc/platform/linux/a0/thread_local.h delete mode 100644 src/libipc/platform/linux/a0/tid.c delete mode 100644 src/libipc/platform/linux/a0/tid.h delete mode 100644 src/libipc/platform/linux/a0/time.c delete mode 100644 src/libipc/platform/linux/a0/time.h delete mode 100644 src/libipc/platform/linux/a0/unused.h delete mode 100644 src/libipc/platform/linux/condition.h delete mode 100644 src/libipc/platform/linux/get_wait_time.h delete mode 100644 src/libipc/platform/linux/mutex.h delete mode 100644 src/libipc/platform/linux/sync_obj_impl.h delete mode 100644 src/libipc/platform/platform.c delete mode 100644 src/libipc/platform/platform.cpp delete mode 100644 src/libipc/platform/posix/condition.h delete mode 100644 src/libipc/platform/posix/get_error.h delete mode 100644 src/libipc/platform/posix/get_wait_time.h delete mode 100644 src/libipc/platform/posix/mutex.h delete mode 100644 src/libipc/platform/posix/semaphore_impl.h delete mode 100644 src/libipc/platform/posix/shm_posix.cpp delete mode 100644 src/libipc/platform/win/condition.h delete mode 100644 src/libipc/platform/win/get_sa.h delete mode 100644 src/libipc/platform/win/mutex.h delete mode 100644 src/libipc/platform/win/semaphore.h delete mode 100755 src/libipc/platform/win/shm_win.cpp delete mode 100755 src/libipc/platform/win/to_tchar.h delete mode 100755 src/libipc/policy.h delete mode 100755 src/libipc/pool_alloc.cpp delete mode 100755 src/libipc/prod_cons.h delete mode 100755 src/libipc/queue.h delete mode 100755 src/libipc/shm.cpp delete mode 100644 src/libipc/sync/condition.cpp delete mode 100644 src/libipc/sync/mutex.cpp delete mode 100644 src/libipc/sync/semaphore.cpp delete mode 100755 src/libipc/utility/concept.h delete mode 100755 src/libipc/utility/id_pool.h delete mode 100755 src/libipc/utility/log.h delete mode 100755 src/libipc/utility/pimpl.h delete mode 100644 src/libipc/utility/scope_guard.h delete mode 100755 src/libipc/utility/utility.h delete mode 100644 src/libipc/waiter.h delete mode 100755 test/test.h create mode 100644 test/test_detect_plat.cpp delete mode 100755 test/test_ipc.cpp delete mode 100755 test/test_mem.cpp delete mode 100644 test/test_platform.cpp delete mode 100755 test/test_queue.cpp delete mode 100755 test/test_shm.cpp delete mode 100644 test/test_sync.cpp delete mode 100755 test/test_thread_utility.cpp delete mode 100755 test/test_waiter.cpp delete mode 100755 test/thread_pool.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 6d9b2a57..1eb24a0a 100755 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -10,29 +10,31 @@ set(CMAKE_POSITION_INDEPENDENT_CODE ON) set(CMAKE_CXX_STANDARD 17) set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -DNDEBUG") if(NOT MSVC) - set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -O2") + set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -O2") endif() +add_definitions(-DLIBIPC_CPP_14 -DLIBIPC_CPP_17) + if (MSVC) - set(CompilerFlags - CMAKE_CXX_FLAGS - CMAKE_CXX_FLAGS_DEBUG - CMAKE_CXX_FLAGS_RELEASE - CMAKE_C_FLAGS - CMAKE_C_FLAGS_DEBUG - CMAKE_C_FLAGS_RELEASE - ) - if (LIBIPC_USE_STATIC_CRT) - foreach(CompilerFlag ${CompilerFlags}) - string(REPLACE "/MD" "/MT" ${CompilerFlag} "${${CompilerFlag}}") - string(REPLACE "/MDd" "/MTd" ${CompilerFlag} "${${CompilerFlag}}") - endforeach() - else() - foreach(CompilerFlag ${CompilerFlags}) - string(REPLACE "/MT" "/MD" ${CompilerFlag} "${${CompilerFlag}}") - string(REPLACE "/MTd" "/MDd" ${CompilerFlag} "${${CompilerFlag}}") - endforeach() - endif() + set(CompilerFlags + CMAKE_CXX_FLAGS + CMAKE_CXX_FLAGS_DEBUG + CMAKE_CXX_FLAGS_RELEASE + CMAKE_C_FLAGS + CMAKE_C_FLAGS_DEBUG + CMAKE_C_FLAGS_RELEASE + ) + if (LIBIPC_USE_STATIC_CRT) + foreach(CompilerFlag ${CompilerFlags}) + string(REPLACE "/MD" "/MT" ${CompilerFlag} "${${CompilerFlag}}") + string(REPLACE "/MDd" "/MTd" ${CompilerFlag} "${${CompilerFlag}}") + endforeach() + else() + foreach(CompilerFlag ${CompilerFlags}) + string(REPLACE "/MT" "/MD" ${CompilerFlag} "${${CompilerFlag}}") + string(REPLACE "/MTd" "/MDd" ${CompilerFlag} "${${CompilerFlag}}") + endforeach() + endif() endif() set(LIBRARY_OUTPUT_PATH ${CMAKE_BINARY_DIR}/bin) @@ -45,23 +47,23 @@ add_definitions(-DUNICODE -D_UNICODE) add_subdirectory(src) if (LIBIPC_BUILD_TESTS) - set(GOOGLETEST_VERSION 1.10.0) - if (LIBIPC_USE_STATIC_CRT) - set(gtest_force_shared_crt OFF) - else() - set(gtest_force_shared_crt ON) - endif() - add_subdirectory(3rdparty/gtest) - add_subdirectory(test) + set(GOOGLETEST_VERSION 1.10.0) + if (LIBIPC_USE_STATIC_CRT) + set(gtest_force_shared_crt OFF) + else() + set(gtest_force_shared_crt ON) + endif() + add_subdirectory(3rdparty/gtest) + add_subdirectory(test) endif() -if (LIBIPC_BUILD_DEMOS) - add_subdirectory(demo/chat) - add_subdirectory(demo/msg_que) - add_subdirectory(demo/send_recv) -endif() +# if (LIBIPC_BUILD_DEMOS) +# add_subdirectory(demo/chat) +# add_subdirectory(demo/msg_que) +# add_subdirectory(demo/send_recv) +# endif() install( - DIRECTORY "include/" - DESTINATION "include" + DIRECTORY "include/" + DESTINATION "include" ) diff --git a/include/libipc/buffer.h b/include/libipc/buffer.h deleted file mode 100755 index 3f8c229b..00000000 --- a/include/libipc/buffer.h +++ /dev/null @@ -1,68 +0,0 @@ -#pragma once - -#include -#include -#include -#include - -#include "libipc/export.h" -#include "libipc/def.h" - -namespace ipc { - -class IPC_EXPORT buffer { -public: - using destructor_t = void (*)(void*, std::size_t); - - buffer(); - - buffer(void* p, std::size_t s, destructor_t d); - buffer(void* p, std::size_t s, destructor_t d, void* additional); - buffer(void* p, std::size_t s); - - template - explicit buffer(byte_t const (& data)[N]) - : buffer(data, sizeof(data)) { - } - explicit buffer(char const & c); - - buffer(buffer&& rhs); - ~buffer(); - - void swap(buffer& rhs); - buffer& operator=(buffer rhs); - - bool empty() const noexcept; - - void * data() noexcept; - void const * data() const noexcept; - - template - T get() const { return T(data()); } - - std::size_t size() const noexcept; - - std::tuple to_tuple() { - return std::make_tuple(data(), size()); - } - - std::tuple to_tuple() const { - return std::make_tuple(data(), size()); - } - - std::vector to_vector() const { - return { - get(), - get() + size() - }; - } - - friend IPC_EXPORT bool operator==(buffer const & b1, buffer const & b2); - friend IPC_EXPORT bool operator!=(buffer const & b1, buffer const & b2); - -private: - class buffer_; - buffer_* p_; -}; - -} // namespace ipc diff --git a/include/libipc/condition.h b/include/libipc/condition.h deleted file mode 100644 index a4e2ac30..00000000 --- a/include/libipc/condition.h +++ /dev/null @@ -1,39 +0,0 @@ -#pragma once - -#include // std::uint64_t - -#include "libipc/export.h" -#include "libipc/def.h" -#include "libipc/mutex.h" - -namespace ipc { -namespace sync { - -class IPC_EXPORT condition { - condition(condition const &) = delete; - condition &operator=(condition const &) = delete; - -public: - condition(); - explicit condition(char const *name); - ~condition(); - - void const *native() const noexcept; - void *native() noexcept; - - bool valid() const noexcept; - - bool open(char const *name) noexcept; - void close() noexcept; - - bool wait(ipc::sync::mutex &mtx, std::uint64_t tm = ipc::invalid_value) noexcept; - bool notify(ipc::sync::mutex &mtx) noexcept; - bool broadcast(ipc::sync::mutex &mtx) noexcept; - -private: - class condition_; - condition_* p_; -}; - -} // namespace sync -} // namespace ipc diff --git a/include/libipc/def.h b/include/libipc/def.h index 8c1a72ba..b3ebd6f7 100755 --- a/include/libipc/def.h +++ b/include/libipc/def.h @@ -1,68 +1,19 @@ +/** + * @file def.h + * @author mutouyun (orz@orzz.org) + * @brief Define the trivial configuration information + * @date 2022-02-27 + */ #pragma once #include #include -#include // std::numeric_limits -#include -#include -namespace ipc { +#define LIBIPC_NAMESPACE_BEG_ namespace ipc { +#define LIBIPC_NAMESPACE_END_ } -// types - -using byte_t = std::uint8_t; - -template -struct uint; - -template <> struct uint<8 > { using type = std::uint8_t ; }; -template <> struct uint<16> { using type = std::uint16_t; }; -template <> struct uint<32> { using type = std::uint32_t; }; -template <> struct uint<64> { using type = std::uint64_t; }; - -template -using uint_t = typename uint::type; +LIBIPC_NAMESPACE_BEG_ // constants -enum : std::uint32_t { - invalid_value = (std::numeric_limits::max)(), - default_timeout = 100, // ms -}; - -enum : std::size_t { - data_length = 64, - large_msg_limit = data_length, - large_msg_align = 1024, - large_msg_cache = 32, -}; - -enum class relat { // multiplicity of the relationship - single, - multi -}; - -enum class trans { // transmission - unicast, - broadcast -}; - -// producer-consumer policy flag - -template -struct wr {}; - -template -struct relat_trait; - -template -struct relat_trait> { - constexpr static bool is_multi_producer = (Rp == relat::multi); - constexpr static bool is_multi_consumer = (Rc == relat::multi); - constexpr static bool is_broadcast = (Ts == trans::broadcast); -}; - -template