进程间通信IPC---队列、生产者消费者模型、生产者消费者模型_joinableQueue(五)教程
\# 队列
# 队列 先进先出<br></br># IPC<br></br># from multiprocessing import Queue<br></br># q = Queue(5)<br></br># q.put(1)<br></br># q.put(2)<br></br># q.put(3)<br></br># q.put(4)<br></br># q.put(5)<br></br># print(q.full()) # 队列是否满了,已满话再次放入会阻塞<br></br># print(q.get())<br></br># print(q.get())<br></br># print(q.get())<br></br># print(q.get())<br></br># print(q.get())<br></br># print(q.empty()) # 队列是否空了,空了再去会阻塞<br></br># while True: #不阻塞处理<br></br># try:<br></br># q.get_nowait()<br></br># except:<br></br># print('队列已空')<br></br># time.sleep(0.5)<br></br># for i in range(6):<br></br># q.put(i)<br></br><br></br>from multiprocessing import Queue,Process<br></br>def produce(q):<br></br> q.put('hello')<br></br><br></br>def consume(q):<br></br> print(q.get())<br></br><br></br>if __name__ == '__main__': #在win下才需要这段代码<br></br> q = Queue()<br></br> p = Process(target=produce,args=(q,))<br></br> p.start()<br></br> c = Process(target=consume, args=(q,))<br></br> c.start()<br></br>
\# 生产者消费者模型
# 队列<br></br># 生产者消费者模型<br></br><br></br># 生产者 进程<br></br># 消费者 进程<br></br>import time<br></br>import random<br></br>from multiprocessing import Process,Queue<br></br>def consumer(q,name):<br></br> while True:<br></br> food = q.get()<br></br> if food is None: #用q.empty()) 不可靠,也许在上报空后,另外又有生产者放入东西<br></br> print('%s获取到了一个空'%name)<br></br> break<br></br> print('\033[31m%s消费了%s\033[0m' % (name,food))<br></br> time.sleep(random.randint(1,3))<br></br><br></br>def producer(name,food,q):<br></br> for i in range(4):<br></br> time.sleep(random.randint(1,3))<br></br> f = '%s生产了%s%s'%(name,food,i)<br></br> print(f)<br></br> q.put(f)<br></br><br></br>if __name__ == '__main__':<br></br> q = Queue(20)<br></br> p1 = Process(target=producer,args=('Egon','包子',q))<br></br> p2 = Process(target=producer, args=('wusir','泔水', q))<br></br> c1 = Process(target=consumer, args=(q,'alex'))<br></br> c2 = Process(target=consumer, args=(q,'jinboss'))<br></br> p1.start() 异步<br></br> p2.start()<br></br> c1.start()<br></br> c2.start() 异步<br></br> p1.join() # 这里非异步转同步,而是判断生产者是否结束<br></br> p2.join()<br></br> q.put(None)<br></br> q.put(None)<br></br><br></br> 当取到None时为什么有阻塞情况,未显示程序执行完,因为两个人其中一个人拿到None,<br></br> 另一个人就取不到值出现等待情况,有几个人就put几个None就解决了
\# 生产者消费者模型\_joinableQueue(解决一个None,多人get阻塞问题)
import time<br></br>import random<br></br>from multiprocessing import Process,<strong>JoinableQueue<br></br></strong>def consumer(q,name):<br></br> while True:<br></br> food = q.get()<br></br> print('\033[31m%s消费了%s\033[0m' % (name,food))<br></br> time.sleep(random.randint(1,3))<br></br> q.task_done() # count - 1<br></br><br></br>def producer(name,food,q):<br></br> for i in range(4):<br></br> time.sleep(random.randint(1,3))<br></br> f = '%s生产了%s%s'%(name,food,i)<br></br> print(f)<br></br> q.put(f)<br></br> q.join() # 阻塞 直到一个队列中的所有数据 全部被处理完毕<br></br><br></br>if __name__ == '__main__':<br></br> q = JoinableQueue(20)<br></br> p1 = Process(target=producer,args=('Egon','包子',q))<br></br> p2 = Process(target=producer, args=('wusir','泔水', q))<br></br> c1 = Process(target=consumer, args=(q,'alex'))<br></br> c2 = Process(target=consumer, args=(q,'jinboss'))<br></br> p1.start()<br></br> p2.start()<br></br> c1.daemon = True # 设置为守护进程 主进程中的代码执行完毕之后,子进程自动结束
<br></br> c2.daemon = True
<br></br> c1.start()<br></br> c2.start()<br></br> p1.join()<br></br> p2.join() # 感知一个进程的结束<br></br><br></br># 在消费者这一端:<br></br> # 每次获取一个数据<br></br> # 处理一个数据<br></br> # 发送一个记号 : 标志一个数据被处理成功<br></br><br></br># 在生产者这一端:<br></br> # 每一次生产一个数据,<br></br> # 且每一次生产的数据都放在队列中<br></br> # 在队列中刻上一个记号<br></br> # 当生产者全部生产完毕之后,<br></br> # join信号 : 已经停止生产数据了<br></br> # 且要等待之前被刻上的记号都被消费完<br></br> # 当数据都被处理完时,join阻塞结束<br></br><br></br># consumer 中把所有的任务消耗完<br></br># producer 端 的 join感知到,停止阻塞<br></br># 所有的producer进程结束<br></br># 主进程中的p.join结束<br></br># 主进程中代码结束<br></br># 守护进程(消费者的进程)结束