1、预处理操作、禁用拷贝和移动语义
#pragma once
// 禁用拷贝构造和赋值操作符以及其移动语义
#define OF_DISALLOW_COPY(ClassName) \
ClassName(const ClassName&) = delete; \
ClassName& operator=(const ClassName&) = delete
#define OF_DISALLOW_MOVE(ClassName) \
ClassName(ClassName&&) = delete; \
ClassName& operator=(ClassName&&) = delete
#define OF_DISALLOW_COPY_AND_MOVE(ClassName) \
OF_DISALLOW_COPY(ClassName); \
OF_DISALLOW_MOVE(ClassName)
2、多线程之间发送与接收信息的核心代码
在这个核心代码中,我们使用了模板编程、禁止继承、禁止拷贝与移动语义、锁、以及条件变量控制多线程并发等知识点。
#pragma once
#ifndef CHANNEL_H_
#define CHANNEL_H_
#include <queue>
#include <mutex>
#include "util.hpp"
// 接收与发送是否成功的状态
enum ChannelStatus { kChannelStatusSuccess = 0, kChannelStatusErrorClosed };
// 禁止继承
template<typename T>
class Channel final {
public:
// 禁止拷贝和移动语义
OF_DISALLOW_COPY_AND_MOVE(Channel);
Channel() : is_closed_(false) {}
~Channel() = default;
/*
* 发送
**/
template<typename U>
ChannelStatus Send(U&& item);
/*
* 接收
**/
ChannelStatus Receive(T* item);
ChannelStatus ReceiveMany(std::queue<T>* items);
/*
* 停止发送与接收
*/
void Close();
private:
// 队列
std::queue<T> queue_;
// 锁
std::mutex mutex_;
// 是否继续发送与接收
bool is_closed_;
// 条件变量,控制多线程并发操作
std::condition_variable cond_;
};
template<typename T>
template<typename U>
ChannelStatus Channel<T>::Send(U&& item) {
// 是否需要唤醒等待的线程
bool notify;
{
std::unique_lock<std::mutex> lock(mutex_);
if (is_closed_) { return kChannelStatusErrorClosed; }
notify = queue_.empty();
// 通过完美转发将数据存到队列
queue_.push(std::forward<U>(item));
}
// 如果队列为空,则需要唤醒一个线程
if (notify) { cond_.notify_one(); }
return kChannelStatusSuccess;
}
template<typename T>
ChannelStatus Channel<T>::Receive(T* item) {
std::unique_lock<std::mutex> lock(mutex_);
// 如果lambda表达式为TRUE,则不阻塞;如果为FALSE,则阻塞
cond_.wait(lock, [this]() {return (!queue_.empty()) || is_closed_;});
// 获取数据时,队列为空,则反馈错误状态
if (queue_.empty()) { return kChannelStatusErrorClosed; }
// 通过移动语义来获取队列中的数据
*item = std::move(queue_.front());
queue_.pop();// 将队列的数据pop
return kChannelStatusSuccess;
}
template<typename T>
ChannelStatus Channel<T>::ReceiveMany(std::queue<T>* items) {
std::unique_lock<std::mutex> lock(mutex_);
cond_.wait(lock, [this]() {return (!queue_.empty()) || is_closed_; });
if (queue_.empty()) { return kChannelStatusErrorClosed; }
// 将队列中的数据全部存储到items中,并且将队列的数据全部pop
while (!queue_.empty()) {
items->push(std::move(queue_.front()));
queue_.pop();
}
return kChannelStatusSuccess;
}
template<typename T>
void Channel<T>::Close() {
std::unique_lock<std::mutex> lock(mutex_);
is_closed_ = true;
cond_.notify_all();
}
#endif
3、对上述核心代码用30个发送线程和40个接收线程进行测试
通过自定义线程发送函数和线程接收函数来了解多线程之间是如何进行交互处理的。
#include "channel.hpp"
#include <thread>
// 发送线程函数
void CallFromSenderThread(Channel<int>* channel, int nSize) {
// 发送数据
for (int i = 0; i < nSize; ++i) {
if (channel->Send(i) != kChannelStatusSuccess) { break; }
}
}
// 接收线程函数
void CallFromReceiverThread(std::vector<int>* visit, Channel<int>* channel) {
int num = -1;
int* num_ptr = #
// 接收数据
while (channel->Receive(num_ptr) == kChannelStatusSuccess){
++visit->at(*num_ptr);
}
}
// 测试函数
void test_30sender40receiver() {
// 核心通信
Channel<int> channel;
// 发送者线程
std::vector<std::thread> senders;
// 接收者线程
std::vector<std::thread> receivers;
// 定义发送线程与接收线程的数量
int sender_num = 30;
int receiver_num = 40;
int range_num = 200;
std::vector<std::vector<int>> visits;
// 设置每一个接收者线程统计的任务
for (int i = 0; i < receiver_num; ++i) {
std::vector<int> visit_i;
for (int j = 0; j < range_num; j++) {
visit_i.emplace_back(0);
}
visits.emplace_back(visit_i);
}
// 将发送者线程与线程函数绑定
for (int i = 0; i < sender_num; ++i) {
senders.emplace_back(CallFromSenderThread, &channel, range_num);
}
// 将接收者线程与线程函数绑定
for (int i = 0; i < receiver_num; ++i) {
receivers.emplace_back(CallFromReceiverThread, &visits[i], &channel);
}
// 等待每一个发送者线程执行完毕
for (std::thread& this_thread : senders) { this_thread.join(); }
// 保证每一个接收者线程全部执行
channel.Close();
// 等待每一个接收者线程执行完毕
for (std::thread& this_thread : receivers) { this_thread.join(); }
// 统计所有接收者线程总共执行的任务量,以及每一个接收者线程执行的任务量
int visit_count = 0;
for (int i = 0; i < range_num; ++i) {
for (int j = 0; j < receiver_num; j++) {
std::cout << "visits[j][i] = " << visits[j][i] << std::endl;
visit_count += visits[j][i];
}
}
std::cout << "visit_count:" << visit_count << std::endl;
}
int main() {
test_30sender40receiver();
}
4、总结
通过这个例子的学习,会使我们了解C++高级编程更多的知识点,让我们深入理解与运用C++,了解C++线程如何与线程函数进行绑定,以及线程函数参数的传递等等。
本文暂时没有评论,来添加一个吧(●'◡'●)