龙空技术网

C++ 并发编程(四):生产者 - 消费者

Red果果 676

前言:

目前我们对“生产者消费者c语言编程”都比较重视,看官们都想要学习一些“生产者消费者c语言编程”的相关内容。那么小编同时在网上搜集了一些有关“生产者消费者c语言编程””的相关内容,希望各位老铁们能喜欢,同学们快快来了解一下吧!

生产者 - 消费者(Producer-Consumer),也叫有限缓冲(Bounded-Buffer),是多线程同步的经典问题之一。详见 Wikipedia。

代码改写自 Boost.Thread 自带的示例(libs/thread/example/condition.cpp),以「条件变量」实现同步。

头文件

#include <condition_variable>#include <iostream>#include <mutex>#include <thread>#include <vector>
有限缓冲类
class BoundedBuffer {public:  BoundedBuffer(const BoundedBuffer& rhs) = delete;  BoundedBuffer& operator=(const BoundedBuffer& rhs) = delete;  BoundedBuffer(std::size_t size)      : begin_(0), end_(0), buffered_(0), circular_buffer_(size) {  }  void Produce(int n) {    {      std::unique_lock<std::mutex> lock(mutex_);      // 等待缓冲不为满。      not_full_cv_.wait(lock, [=] { return buffered_ < circular_buffer_.size(); });      // 插入新的元素,更新下标。      circular_buffer_[end_] = n;      end_ = (end_ + 1) % circular_buffer_.size();      ++buffered_;    }  // 通知前,自动解锁。    // 通知消费者。    not_empty_cv_.notify_one();  }  int Consume() {    std::unique_lock<std::mutex> lock(mutex_);    // 等待缓冲不为空。    not_empty_cv_.wait(lock, [=] { return buffered_ > 0; });    // 移除一个元素。    int n = circular_buffer_[begin_];    begin_ = (begin_ + 1) % circular_buffer_.size();    --buffered_;    // 通知前,手动解锁。    lock.unlock();    // 通知生产者。    not_full_cv_.notify_one();    return n;  }private:  std::size_t begin_;  std::size_t end_;  std::size_t buffered_;  std::vector<int> circular_buffer_;  std::condition_variable not_full_cv_;  std::condition_variable not_empty_cv_;  std::mutex mutex_;};

生产者与消费者线程共享的缓冲。g_io_mutex 是用来同步输出的。

BoundedBuffer g_buffer(2);boost::mutex g_io_mutex;
生产者

生产 100000 个元素,每 10000 个打印一次。

void Producer() {  int n = 0;  while (n < 100000) {    g_buffer.Produce(n);    if ((n % 10000) == 0) {      std::unique_lock<std::mutex> lock(g_io_mutex);      std::cout << "Produce: " << n << std::endl;    }    ++n;  }  g_buffer.Produce(-1);}
消费者

每消费到 10000 的倍数,打印一次。

void Consumer() {  std::thread::id thread_id = std::this_thread::get_id();  int n = 0;  do {    n = g_buffer.Consume();    if ((n % 10000) == 0) {      std::unique_lock<std::mutex> lock(g_io_mutex);      std::cout << "Consume: " << n << " (" << thread_id << ")" << std::endl;    }  } while (n != -1);  // -1 表示缓冲已达末尾。  // 往缓冲里再放一个 -1,这样其他消费者才能结束。  g_buffer.Produce(-1);}
主程序

一个生产者线程,三个消费者线程。

int main() {  std::vector<std::thread> threads;  threads.push_back(std::thread(&Producer));  threads.push_back(std::thread(&Consumer));  threads.push_back(std::thread(&Consumer));  threads.push_back(std::thread(&Consumer));  for (auto& t : threads) {    t.join();  }  return 0;}

输出(括号中为线程 ID):

Produce: 0Consume: 0 (13c0)Produce: 10000Consume: 10000 (15fc)Produce: 20000Consume: 20000 (2558)Produce: 30000Consume: 30000 (13c0)Produce: 40000Consume: 40000 (15fc)Produce: 50000Consume: 50000 (13c0)Produce: 60000Consume: 60000 (15fc)Produce: 70000Consume: 70000 (13c0)Produce: 80000Consume: 80000 (15fc)Produce: 90000Consume: 90000 (15fc)
分析

考虑一个生产者和一个消费者的情形,假定缓冲的大小为 2,来看看三个成员变量如何变化。

            buffered_    begin_      end_ 初始           0          0          0 生产           1          0          1 消费           0          1          1 消费          等待 buffered_ > 0 ... 生产           1          1          0 ...

参考:

Wikipedia: Producer–consumer problemStackOverflow: Bounded Buffers (Producer Consumer)StackOverflow: Empty element in array-based bounded buffer

标签: #生产者消费者c语言编程