python多线程和多进程教程
文章目录
多线程和多进程的区分
多线程和多进程最大的不同在于,多进程中,同一个变量,各自有一份拷贝存在于每个进程中,互不影响;而多线程中,所有变量都由所有线程共享,所以,任何一个变量都可以被任何一个线程修改,因此,线程之间共享数据最大的危险在于多个线程同时改一个变量,把内容给改乱了。
python中,多线程并没有实际用户,并不能充分利用cpu资源 ,而多进程可以,所有,下面记录一下多进程的使用
多进程的基本使用
Process([group [, target [, name [, args [, kwargs]]]]])
- 传参:
target <-> 函数名a
name <-> 子进程的别名(没有写的必要)
args <-> a函数要传入的参数,为元组类型
kwargs <-> 表示调用对象的字典
group <-> 分组,实际上不会使用
- 示例:
import multiprocessing
import time
def process(num):
time.sleep(num)
print('process ', num)
if __name__ == '__main__':
for i in range(5):
p = multiprocessing.Process(target=process, args=(1,))
p.start()
print('cpu number:', multiprocessing.cpu_count())
for p in multiprocessing.active_children():
print('child process name: {} id: {}'.format(p.name, str(p.pid)))
print("Process Ended")
daemon,join()
p.daemon = True
,使父进程结束时,所有子进程也都被关闭p.join()
, 设置父进程等待所有子进程结束后再结束- 示例:
from multiprocessing import Process
import time
class MyProcess(Process):
def __init__(self, loop):
Process.__init__(self)
self.loop = loop
def run(self):
print(self.loop)
for count in range(self.loop):
time.sleep(1)
print('Pid: ' + str(self.pid) + ' LoopCount: ' + str(count))
if __name__ == '__main__':
for i in range(2, 5):
p = MyProcess(i)
# 设置daemon为True,程序将会在主进程完成后终止
p.daemon = True
p.start()
# 设置join则让主(父)进程等待所有的子进程完成后结束
p.join()
print('Main process Ended')
Lock
- Lock()加锁,使每个子进程在执行时,其他子进程处于等待状态
- 示例:
class MyProcess(Process):
def __init__(self, loop, lock):
Process.__init__(self)
self.loop = loop
self.lock = lock
def run(self):
for count in range(self.loop):
time.sleep(0.1)
print('Pid = ', self.pid, 'LoopCount = ', count)
if __name__ == '__main__':
lock = Lock()
for i in range(10, 15):
p = MyProcess(i, lock)
p.start()
Semaphore
- semaphore是一个内置的计数器
- 每当调用acquire()时,内置计数器-1
- 每当调用release()时,内置计数器+1
- 计数器不能小于0,当计数器为0时,acquire()将阻塞线程直到其他线程调用release()
- 计数器的作用:
如果在主机执行IO密集型任务的时候再执行这种类型的程序时,计算机就有很大可能会宕机。这时候就可以为这段程序添加一个计数器功能,来限制一个时间点内的线程数量。
- 示例:
```python
import time
import threading
s1=threading.Semaphore(5) #添加一个计数器
def foo():
s1.acquire() #计数器获得锁
time.sleep(2) #程序休眠2秒
print("ok",time.ctime())
s1.release() #计数器释放锁
for i in range(20):
t1=threading.Thread(target=foo,args=()) #创建线程
t1.start() #启动线程
```
进程间得通信
Queue
- 进程间通信必须通过Queue队列,不然起不到效果, 这里的队列可以使用
multiprocessing.Queue
- 示例:
from multiprocessing import Process, Queue
import os, time, random
# 写数据进程执行的代码:
def write(q):
print('Process to write: %s' % os.getpid())
for value in ['A', 'B', 'C']:
print('Put %s to queue...' % value)
q.put(value)
time.sleep(random.random())
# 读数据进程执行的代码:
def read(q):
print('Process to read: %s' % os.getpid())
while True:
value = q.get(True)
print('Get %s from queue.' % value)
if __name__=='__main__':
# 父进程创建Queue,并传给各个子进程:
q = Queue()
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
# 启动子进程pw,写入:
pw.start()
# 启动子进程pr,读取:
pr.start()
# 等待pw结束:
pw.join()
# pr进程里是死循环,无法等待其结束,只能强行终止:
pr.terminate()
Pipe
- 介绍
Pipe可以是单向(half-duplex),也可以是双向(duplex)。我们通过mutiprocessing.Pipe(duplex=False)
创建单向管道 (默认为双向)。一个进程从PIPE一端输入对象,然后被PIPE另一端的进程接收,单向管道只允许管道一端的进程输入,而双向管道则允许从两端输入。
- 示例:
from multiprocessing import Process, Pipe
class Consumer(Process):
def __init__(self, pipe):
Process.__init__(self)
self.pipe = pipe
def run(self):
self.pipe.send('Consumer Words')
print('Consumer Received:', self.pipe.recv())
class Producer(Process):
def __init__(self, pipe):
Process.__init__(self)
self.pipe = pipe
def run(self):
print('Producer Received:', self.pipe.recv())
self.pipe.send('Producer Words')
if __name__ == '__main__':
pipe = Pipe()
p = Producer(pipe[0])
c = Consumer(pipe[1])
p.daemon = c.daemon = True
p.start()
c.start()
p.join()
c.join()
print('Ended!')
Pool
- 在利用Python进行系统管理的时候,特别是同时操作多个文件目录,或者远程控制多台主机,并行操作可以节约大量的时间。当被操作对象数目不大时,可以直接利用multiprocessing中的Process动态成生多个进程,十几个还好,但如果是上百个,上千个目标,手动的去限制进程数量却又太过繁琐,此时可以发挥进程池的功效。
Pool的用法有阻塞和非阻塞两种方式。非阻塞即为添加进程后,不一定非要等到改进程执行完就添加其他进程运行,阻塞则相反。
个人理解: 开启多个Process,还需要通过Semaphore
来控制阻塞或非阻塞,而使用Pool可以方便得解决这个问题,省去手动限制进程数量的工作。
- 示例:
from multiprocessing import Lock, Pool
import time
def function(index):
print 'Start process: ', index
time.sleep(3)
print 'End process', index
if __name__ == '__main__':
pool = Pool(processes=3)
for i in xrange(4):
pool.apply_async(function, (i,)) # 非阻塞的用法
# pool.apply(function, (i,)) # 阻塞的用法
print "Started processes"
pool.close()
pool.join()
print "Subprocess done."
感谢
本文是自己学习多线程和多进程过程中记录的知识点,内容大多来自下面两个大大的文章