首页 > 教程 > iceoryx源码阅读(四)——共享内存通信(二)

iceoryx源码阅读(四)——共享内存通信(二)

时间:2024-05-29 | 来源: | 阅读:200

话题: C 阅读 通信 CEO

目录0 导引1 队列数据结构2 共享内存获取2.1 PublisherImpl::loan2.2 PublisherImpl::loanSample2.3 PublisherPortUser::tryAllocateChunk2.4 ChunkSender::tryAllocate3 消息发送逻辑3

0 导引

  • iceoryx源码阅读(一)——全局概览

  • iceoryx源码阅读(二)——共享内存管理

  • iceoryx源码阅读(三)——共享内存管理(一)

  • iceoryx源码阅读(四)——共享内存通信(二)

  • iceoryx源码阅读(五)——共享内存通信(三)

  • iceoryx源码阅读(六)——共享内存创建

  • iceoryx源码阅读(七)——服务发现机制

  • iceoryx源码阅读(八)——IPC通信机制

本文阅读与共享内存通信相关的逻辑。发布者首先获取一块共享内存,往其中写入数据,然后向消息队列中推入消息描述数据,订阅者从消息队列中读取消息描述数据。本文从四方面进行解读:队列数据结构、共享内存获取、消息发送逻辑、消息接收逻辑。

1 队列数据结构

根据前文知道,队列元素为 ShmSafeUnmanagedChunk ,其中存放的是 ChunkManagement 所在共享内存段的id和相对该共享内存首地址的偏移,具体如下所示:

消息队列由如下代码定义:

struct ChunkQueueData : public LockingPolicy
{
    // ...
    static constexpr uint64_t MAX_CAPACITY = ChunkQueueDataProperties_t::MAX_QUEUE_CAPACITY;
    cxx::VariantQueue m_queue;
    // ...
};

struct ChunkDistributorData : public LockingPolicy
{
    // ...
    using QueueContainer_t =
    cxx::vector, ChunkDistributorDataProperties_t::MAX_QUEUES>;
    QueueContainer_t m_queues;
    // ...
};

struct ChunkReceiverData : public ChunkQueueDataType
{
    // ...
};
  • ChunkDistributorData 是发布者所持有的队列数据结构,由于一个发布者会分发至多个订阅端,所以持有多个队列。

  • ChunkReceiverData 是订阅者的组件,它继承自 ChunkQueueData ,内部只有一个队列,队列元素类型为 ShmSafeUnmanagedChunk

上述代码中,队列数据结构的类型为 cxx::VariantQueue 。从类名看,是一个变长数组,但实际上这是一个定长数组,以下是相关数据结构定义:

enum class VariantQueueTypes : uint64_t
{
    FiFo_SingleProducerSingleConsumer = 0,
    SoFi_SingleProducerSingleConsumer = 1,
    FiFo_MultiProducerSingleConsumer = 2,
    SoFi_MultiProducerSingleConsumer = 3
};

template 
class VariantQueue
{
public:
    using fifo_t = variant,
                           concurrent::SoFi,
                           concurrent::ResizeableLockFreeQueue,
                           concurrent::ResizeableLockFreeQueue>;
    // ...

private:
    VariantQueueTypes m_type;
    fifo_t m_fifo;
};

fifo_t 是队列底层结构类型,可能是 concurrent::FiFo concurrent::SoFi concurrent::ResizeableLockFreeQueue 之一,至于使用哪一种,由枚举值 m_type 确定。这三个内部会依赖以下数据结构:

template 
struct NonZeroedBuffer
{
    struct alignas(ElementType) element_t
    {
        cxx::byte_t data[sizeof(ElementType)];
    };
    element_t value[Capacity];
};

上面这一结构本质就是一个数组,其元素类型类型为Element。

2 共享内存获取

发送数据前,应用程序首先需要先获取一块合适大小的Chunk,往其中写入数据,然后调用消息发送接口进行发送。

2.1 PublisherImpl::loan

职责:

获取一块共享内存,并调用构造函数进行初始化。

入参:

args:模板变参,用于调用待传类型的构造函数,也可以不传。

返回:

Sample类型实例,本质是对用户可操作的共享内存段的封装。

template 
template 
inline cxx::expected, AllocationError>
PublisherImpl::loan(Args&&... args) noexcept
{
    return std::move(loanSample().and_then([&](auto& sample) { new (sample.get()) T(std::forward(args)...); }));
}

整体代码分析:

首先调用loanSample方法获取共享内存,然后调用构造函数进行初始化,这里使用Placement new语法。需要指出的是,loanSample返回的是将用于存放用户数据的首地址,而不是Chunk的首地址。

2.2 PublisherImpl::loanSample

职责:

分配共享内存,并将其转换为Sample类型,并返回。

返回:

Sample类型实例。

template 
inline cxx::expected, AllocationError> PublisherImpl::loanSample() noexcept
{
    static constexpr uint32_t USER_HEADER_SIZE{std::is_same::value ? 0U : sizeof(H)};

    auto result = port().tryAllocateChunk(sizeof(T), alignof(T), USER_HEADER_SIZE, alignof(H));
    if (result.has_error())
    {
        return cxx::error(result.get_error());
    }
    else
    {
        return cxx::success>(convertChunkHeaderToSample(result.value()));
    }
}

整体代码分析:

首先调用 tryAllocateChunk 获得一块共享内存,并构造Sample实例。

2.3 PublisherPortUser::tryAllocateChunk

职责:

分配共享内存,并将其转换为Sample类型,并返回。

入参:

4个用于计算所需共享内存大小的参数,这里不展开介绍了。

返回值:

共享内存首地址(类型为 ChunkHeader * ,见 4.1 Chunk管理结构 )

cxx::expected
PublisherPortUser::tryAllocateChunk(const uint32_t userPayloadSize,
                                    const uint32_t userPayloadAlignment,
                                    const uint32_t userHeaderSize,
                                    const uint32_t userHeaderAlignment) noexcept
{
    return m_chunkSender.tryAllocate(
        getUniqueID(), userPayloadSize, userPayloadAlignment, userHeaderSize, userHeaderAlignment);
}

整体代码分析:

上述函数只是简单地调用 ChunkSender tryAllocate 方法。

2.4 ChunkSender::tryAllocate

职责:

调用 MemoryManager的成员方法getChunk 得到共享内存块或复用最后一次使用的共享内存块。

入参:

同上(略)

返回值:

指向共享内存块首地址的指针,类型为 ChunkHeader

template 
inline cxx::expected
ChunkSender::tryAllocate(const UniquePortId originId,
                                              const uint32_t userPayloadSize,
                                              const uint32_t userPayloadAlignment,
                                              const uint32_t userHeaderSize,
                                              const uint32_t userHeaderAlignment) noexcept
{
    const auto chunkSettingsResult =
        mepoo::ChunkSettings::create(userPayloadSize, userPayloadAlignment, userHeaderSize, userHeaderAlignment);
    if (chunkSettingsResult.has_error())
    {
        return cxx::error(AllocationError::INVALID_PARAMETER_FOR_USER_PAYLOAD_OR_USER_HEADER);
    }

    const auto& chunkSettings = chunkSettingsResult.value();
    const uint32_t requiredChunkSize = chunkSettings.requiredChunkSize();

    auto& lastChunkUnmanaged = getMembers()->m_lastChunkUnmanaged;
    mepoo::ChunkHeader* lastChunkChunkHeader =
        lastChunkUnmanaged.isNotLogicalNullptrAndHasNoOtherOwners() ? lastChunkUnmanaged.getChunkHeader() : nullptr;

    if (lastChunkChunkHeader && (lastChunkChunkHeader->chunkSize() >= requiredChunkSize))
    {
        /* * * * *  见代码段2-4-1:复用最近一次分配的共享内存  * * * * */
    }
    else
    {
        /* * * * *  见代码段2-4-2:分配一块新的未使用的共享内存 * * * * */
    }
}

逐段代码分析:

  • LINE 09 ~ LINE 17: 计算所需共享内存大小。

  • LINE 19 ~ LINE 30: 判断最近一次分配的共享内存块是否所有订阅者都已读取,并且大小超过所需大小,则复用最近一次分配的共享内存块,否则新分配共享内存块。

代码段2-4-1:复用最近一次分配的共享内存

auto sharedChunk = lastChunkUnmanaged.cloneToSharedChunk();
if (getMembers()->m_chunksInUse.insert(sharedChunk))
{
    auto chunkSize = lastChunkChunkHeader->chunkSize();
    lastChunkChunkHeader->~ChunkHeader();
    new (lastChunkChunkHeader) mepoo::ChunkHeader(chunkSize, chunkSettings);
    lastChunkChunkHeader->setOriginId(originId);
    return cxx::success(lastChunkChunkHeader);
}
else
{
    return cxx::error(AllocationError::TOO_MANY_CHUNKS_ALLOCATED_IN_PARALLEL);
}

整体代码分析:

如果正在使用的共享内存块未满,则插入,并析构之前的数据,同时在这块内存上构造新的 ChunkHeader ;否则返回错误。

代码段2-4-2:分配一块新的未使用的共享内存

auto getChunkResult = getMembers()->m_memoryMgr->getChunk(chunkSettings);

if (!getChunkResult.has_error())
{
    auto& chunk = getChunkResult.value();

    // if the application allocated too much chunks, return no more chunks
    if (getMembers()->m_chunksInUse.insert(chunk))
    {
        // END of critical section
        chunk.getChunkHeader()->setOriginId(originId);
        return cxx::success(chunk.getChunkHeader());
    }
    else
    {
        // release the allocated chunk
        chunk = nullptr;
        return cxx::error(AllocationError::TOO_MANY_CHUNKS_ALLOCATED_IN_PARALLEL);
    }
}
else
{
    /// @todo iox-#1012 use cxx::error::from(E1); once available
    return cxx::error(cxx::into(getChunkResult.get_error()));
}

整体代码分析:

调用MemoryManager的成员方法getChunk获取共享内存块,如果获取成功,存入数组 m_chunksInUse 。如果获取失败或数组已满,则返回获取失败,此时根据RAII原理, SharedChunk 的析构函数会自动将共享内存块返还给 MemPool

m_chunksInUse 内部封装的数组元素的类型为我们在 上一篇文章 中介绍的 ShmSafeUnmanagedChunk ,这个类型不具有引用计数,为什么退出作用域不会被析构?

为什么要存 m_chunksInUse 数组?原因如下:我们看到 tryAllocate 返回的是消息内存块的指针,而消息发送的时候需要使用 SharedChunk ,我们无法将前者转换为后者。所以,此处存入数组,消息发送函数中通过消息内存块的指针查找对应数组元素,恢复出 SharedChunk 实例, 具体见3.3 。

3 消息发送逻辑

本质是往消息队列推入消息描述结构 ShmSafeUnmanagedChunk

3.1 PublisherImpl::publish

职责:

上层应用程序调用此方法推送消息。

入参:

sample :用户负载数据的封装实例。

template 
inline void PublisherImpl::publish(Sample&& sample) noexcept
{
    auto userPayload = sample.release(); // release the Samples ownership of the chunk before publishing
    auto chunkHeader = mepoo::ChunkHeader::fromUserPayload(userPayload);
    port().sendChunk(chunkHeader);
}

整体代码分析:

上述代码从 sample 中取出用户负载数据指针,据此计算 Chunk 首地址,然后调用 sendChunk 进行发送。

根据用户负载数据指针计算 Chunk 首地址其实就是减去一个偏移量,具体计算方法如下:

ChunkHeader* ChunkHeader::fromUserPayload(void* const userPayload) noexcept
{
    if (userPayload == nullptr)
    {
        return nullptr;
    }
    uint64_t userPayloadAddress = reinterpret_cast(userPayload);
    auto backOffset = reinterpret_cast(userPayloadAddress - sizeof(UserPayloadOffset_t));
    return reinterpret_cast(userPayloadAddress - *backOffset);
}

其中偏移放在payload之前,即: *backOffset

3.2 PublisherPortUser::sendChunk

职责:

发送用户数据。

入参:

chunkHeader ChunkHeader 类型的指针, Chunk 的首地址。

void PublisherPortUser::sendChunk(mepoo::ChunkHeader* const chunkHeader) noexcept
{
    const auto offerRequested = getMembers()->m_offeringRequested.load(std::memory_order_relaxed);

    if (offerRequested)
    {
        m_chunkSender.send(chunkHeader);
    }
    else
    {
        m_chunkSender.pushToHistory(chunkHeader);
    }
}

整体代码分析:

3.3 ChunkSender::send

职责:

发送用户数据。

入参:

chunkHeader ChunkHeader 指针, Chunk 的首地址。

template 
inline uint64_t ChunkSender::send(mepoo::ChunkHeader* const chunkHeader) noexcept
{
    uint64_t numberOfReceiverTheChunkWasDelivered{0};
    mepoo::SharedChunk chunk(nullptr);
    // BEGIN of critical section, chunk will be lost if the process terminates in this section
    if (getChunkReadyForSend(chunkHeader, chunk))
    {
        numberOfReceiverTheChunkWasDelivered = this->deliverToAllStoredQueues(chunk);

        getMembers()->m_lastChunkUnmanaged.releaseToSharedChunk();
        getMembers()->m_lastChunkUnmanaged = chunk;
    }
    // END of critical section

    return numberOfReceiverTheChunkWasDelivered;
}

逐段代码分析:

  • LINE 05 ~ LINE 07: 根据 chunkHeader 指针和 m_chunksInUse 数组,恢复 SharedChunk 实例;

  • LINE 09 ~ LINE 09: 调用基类的成员方法 deliverToAllStoredQueues 向各队列发送(推入)消息;

  • LINE 11 ~ LINE 12: 更新 m_lastChunkUnmanaged 实例,以提升性能。

3.4 ChunkDistributor::deliverToAllStoredQueues

template 
inline uint64_t ChunkDistributor::deliverToAllStoredQueues(mepoo::SharedChunk chunk) noexcept
{
    uint64_t numberOfQueuesTheChunkWasDeliveredTo{0U};
    typename ChunkDistributorDataType::QueueContainer_t remainingQueues;

    /* * * * *  见代码段3-3-1:向队列发送消息,失败入remainingQueues  * * * * */

    /* * * * *  见代码段3-3-2:发送失败的不断尝试重新发送  * * * * */

    addToHistoryWithoutDelivery(chunk);

    return numberOfQueuesTheChunkWasDeliveredTo;
}

整体代码分析:

这部分没有什么内容,主要实现在代码段3-3-1和代码段3-3-2。

代码段3-3-1:

{
    {
    typename MemberType_t::LockGuard_t lock(*getMembers());

    bool willWaitForConsumer = getMembers()->m_consumerTooSlowPolicy == ConsumerTooSlowPolicy::WAIT_FOR_CONSUMER;
    // send to all the queues
    for (auto& queue : getMembers()->m_queues)
    {
        bool isBlockingQueue = (willWaitForConsumer && queue->m_queueFullPolicy == QueueFullPolicy::BLOCK_PRODUCER);

        if (pushToQueue(queue.get(), chunk))
        {
            ++numberOfQueuesTheChunkWasDeliveredTo;
        }
        else
        {
            if (isBlockingQueue)
            {
                remainingQueues.emplace_back(queue);
            }
            else
            {
                ++numberOfQueuesTheChunkWasDeliveredTo;
                ChunkQueuePusher_t(queue.get()).lostAChunk();
            }
        }
    }
}

整体代码分析:

这段代码整体上是遍历所有订阅者队列,调用 pushToQueue 向消息队列推入消息,实现消息发送。但是消息队列的长度是有限的,如果由于订阅者处理速度太慢,队列满了应该怎么处理,根据设置,可以选择两种应对策略:

  • 将队列保存下来(LINE 17 ~ LINE 20),后续对这些队列不断尝试发送,直到所有队列推送成功,见代码段3-3-2;

  • 将队列标记为 有消息丢失 (LINE 22 ~ LINE 25):

template 
inline void ChunkQueuePusher::lostAChunk() noexcept
{
    getMembers()->m_queueHasLostChunks.store(true, std::memory_order_relaxed);
}

代码段3-3-2:不断尝试发送,直到所有消息发送成功

cxx::internal::adaptive_wait adaptiveWait;
while (!remainingQueues.empty())
{
    adaptiveWait.wait();
    {
        typename MemberType_t::LockGuard_t lock(*getMembers());

        /* * * * *  见代码段3-3-3:与活跃队列求交  * * * * */

        for (uint64_t i = remainingQueues.size() - 1U; !remainingQueues.empty(); --i)
        {
            if (pushToQueue(remainingQueues[i].get(), chunk))
            {
                remainingQueues.erase(remainingQueues.begin() + i);
                ++numberOfQueuesTheChunkWasDeliveredTo;
            }

            if (i == 0U)
            {
                break;
            }
        }
    }
}

整体代码分析:

这部分代码就是对剩余未发送成功的队列进行重新发送,直到所有队列发送成功。每轮尝试中间会使用yield或sleep函数等待一段时间,以免不必要的性能浪费。同时,发送过程中,还会与当前活跃队列求交,如下:

代码段3-3-3:与活跃队列求交

typename ChunkDistributorDataType::QueueContainer_t queueIntersection(remainingQueues.size());
auto greaterThan = [](memory::RelativePointer& a,
                  memory::RelativePointer& b) -> bool {
return reinterpret_cast(a.get()) > reinterpret_cast(b.get());
};
std::sort(getMembers()->m_queues.begin(), getMembers()->m_queues.end(), greaterThan);
std::sort(remainingQueues.begin(), remainingQueues.end(), greaterThan);

auto iter = std::set_intersection(getMembers()->m_queues.begin(),
                              getMembers()->m_queues.end(),
                              remainingQueues.begin(),
                              remainingQueues.end(),
                              queueIntersection.begin(),
                              greaterThan);
queueIntersection.resize(static_cast(iter - queueIntersection.begin()));
remainingQueues = queueIntersection;

整体代码分析:

上面这段代码就是求解 remainingQueues 和当前活跃队列 m_queues 交集,以免发生无限循环。 set_intersection 是C++标准库函数,详见: https://en.cppreference.com/w/cpp/algorithm/set_intersection

至此,消息发送的流程分析完毕。

4 小结

本文介绍了消息发布者获取共享内存块和发送逻辑,下文将介绍消息订阅者的接收逻辑。


湘ICP备2022002427号-10湘公网安备:43070202000427号
© 2013~2019 haote.com 好特网