python多任务编程---进程教程
多任务编程
- 意义:充分利用计算机CPU的多和资源,同时处理多个应用程序任务,一次提高程序的运行效率.
- 实现方案:多进程,多线程
进程(process)
进程理论基础
定义:程序在计算机中的一次运行.
- 程序是一个可执行的文件,是静态的战友磁盘.
- 进程是一个动态的过程描述,占有计算机运行资源,有一定的生命周期
系统中如何产生一个进程
- 用户空间通过调用程序接口或者命令发起请求
- 操作系统接受用户请求,开始创建进程
- 操作系统调配计算机资源,确定进程状态等
- 操作系统将创建的进程提供给用户使用
进程基本概念
- CPU时间片:如果一个进程占有CPU内核则称这个进程在cpu时间片上.
- PCB(进程控制块):在内存中开辟的一块空间,用于存放进程的基本信息,也用于系统查找识别进程
- 进程ID(PID):系统为每个进程分配的一个大于0的整数,作为进程ID.每个ID不重复
Linux查看进程ID:ps - aux
- 父子进程:系统中每一个进程(除了系统初始化进程)都有唯一的父进程,可以有0个或者多个子进程.父子进程关系便于进程管理
查看进程树: pstree
- 进程状态
- 三态
- 就绪态:进程具备执行条件,等待分配CPU资源
- 运行态:进程占有CPU时间片正在运行
- 等待态:进程暂时停止运行,让出CPU
五态(在三态基础上增加新建和终止)
- 新建: 创建一个进程,获取资源过程
- 终止:进程结束,释放资源过程
状态查看命令: ps - aux --->STAT列
S---等待态
R---执行态
Z---僵尸
+---前台进程
l---有多线程
进程的运行特征
- 多进程可以更充分使用计算机多核资源
- 进程之间的运行互不影响,各自独立
- 每个进程拥有独立的空间,各自使用自己空间资源
面试要求:
什么是进程,进程和程序有什么区别?
进程有哪些状态,状态之间如何转化?
基于fork的多进程编程
fork使用
pdi = os.fork()
功能:创建新的进程
返回值:整数,如果创建进程失败返回一个负数,如果成功则在原有进程中返回新进程的PID,在新进程中返回0
注意:
- 子进程会复制父进程全部内存空间,从fork下一句开始执行
- 父子进程各自独立运行,运行顺序不一定
- 利用父子进程fork返回值的区别,配合if结构让父子进程执行不同的内容几乎是固定搭配.
- 父子进程有各自特有特征比如PID,PCB命令集等.
- 父进程fork之前开辟的空间子进程同样拥有,父子进程对各自空间的操作不会相互影响
fork进程代码示例:
1 import os
2 from time import sleep
3
4 # 创建子进程
5 pid = os.fork()
6
7 if pid < 0:
8 print("Create process failed")
9 elif pid == 0:
10 # 只有子进程执行
11 sleep(3)
12 print("The new process")
13 else:
14 # 只有父进程执行
15 sleep(4)
16 print("The old process")
17
18 # 父子进程都执行
19 print("process test over")
View Code
fork进程代码示例细节:
1 import os
2 from time import sleep
3
4 print("=========================")
5 a = 1
6 def fun():
7 print("fun .... ")
8
9 pid = os.fork()
10
11 if pid < 0:
12 print("Create process failed")
13 elif pid == 0:
14 print("Child process")
15 print("a = ",a) # 从父进程空间拷贝了变量
16 fun()
17 a = 10000 # 只是修改了自己空间的a
18 else:
19 sleep(1)
20 print("Parent process")
21 print("a:",a)
22
23 print("All a ->",a)
View Code
进程相关函数
os.getpid()
功能:获取一个进程的PID值
返回值:返回当前进程的PID
os.getppid()
功能:获取父进程的PID值
返回值:返回父进程PID
os.\_exit(status)
功能:结束一个进程
参数:进程的终止状态
sys.exit([status])
功能:退出进程
参数:整数 表示退出状态
字符串 表示退出时打印内容
获取进程pid号代码示例:
1 import os
2 from time import sleep
3
4 pid = os.fork()
5
6 if pid < 0:
7 print("Error")
8 elif pid == 0:
9 sleep(1)
10 print("Child PID:",os.getpid()) # 自己pid
11 print("Get parent PID:",os.getppid()) # 父pid
12 else:
13 print("Parent PID:", os.getpid()) # 自己pid
14 print("Get child PID:",pid)
View Code
进程退出代码示例:
1 import os,sys
2
3 pid = os.fork()
4
5 # 父子进程退出不会影响对方继续执行
6 if pid < 0:
7 print("Error")
8 elif pid == 0:
9 # os._exit(0) # 子进程退出
10 print("Child process")
11 else:
12 sys.exit("退出父进程")
13 print("Parent process")
View Code
孤儿和僵尸
- 孤儿进程:父进程先于子进程退出,此时子进程成为孤儿进程
特点:孤儿进程会被系统进程收养,此时系统进程就会成为孤儿进程新的父进程,孤儿进程退出该进程会自动处理
- 僵尸进程:子进程先与父进程退出,父进程又没有处理子进程的退出状态,此时子进程就会称为僵尸进程
特点:僵尸进程虽然结束,但是会存留部分PCD在内存中,大量的僵尸进程会浪费系统的内存资源
- 如何避免僵尸进程产生
- 使用wait函数处理子进程退出
模拟僵尸进程产生以及处理示例:
1 import os,sys
2 import signal
3
4 # 忽略子进程的退出行为,子进程退出自动由系统处理
5 signal.signal(signal.SIGCHLD,signal.SIG_IGN)
6
7 pid = os.fork()
8 if pid < 0:
9 print("Error")
10 elif pid == 0:
11 print("Child PID:",os.getpid())
12 sys.exit(2)
13 else:
14 """
15 os.wait() 处理僵尸
16 """
17 # pid,status = os.wait()
18 # print("pid:",pid)
19 # print('status:',os.WEXITSTATUS(status))
20 while True: # 让父进程不退出
21 pass
View Code
- 创建二级子进程处理僵尸
- 父进程创建子进程,等待回收子进程
- 子进程创建二级子进程然后退出
- 二级子进程称为孤儿进程,和原来父进程一同执行事件
代码示例:
1 import os
2 from time import sleep
3
4 def f1():
5 for i in range(3):
6 sleep(2)
7 print("写代码")
8
9 def f2():
10 for i in range(2):
11 sleep(4)
12 print("测代码")
13
14 pid = os.fork()
15 if pid == 0:
16 p = os.fork() # 创建二级子进程
17 if p == 0:
18 f1()
19 else:
20 os._exit(0) # 一级子进程退出
21 else:
22 os.wait() # 等待回收一级子进程
23 f2()
View Code
通过信号处理子进程退出
原理:子进程退出时会发送信号给父进程,如果父进程忽略子进程信号,则系统就会自动处理子进程退出
方法:使用signal模块在父进程创建子进程前写如下语句:
import signal
signal.signal(signal.SIGCHLD,signal.SIG\_IGN)
特点:非阻塞,不会影响父进程运行,可以处理所有子进程退出
multiprocessing模块创建进程
进程创建方法
- 流程特点
- 将需要子进程执行的事件封装为函数
- 通过模块的Process类创建进程对象,关联函数
- 可以通过进程对象设置进程信息及属性
- 通过进程对象调用start启动进程
- 通过进程对象调用join回收进程
- 基本接口使用
Process()
功能 : 创建进程对象
参数 : target 绑定要执行的目标函数
args 元组,用于给target函数位置传参
kwargs 字典,给target函数键值传参
p.start()
功能 : 启动进程
注意:启动进程此时target绑定函数开始执行,该函数作为子进程执行内容,此时进程真正被创建
<pre class="brush:python;gutter:true;">p.join([timeout])
功能:阻塞等待回收进程
参数:超时时间
注意:
- 使用multiprocessing创建进程同样是子进程复制父进程空间代码段,父子进程运行互不影响。
- 子进程只运行target绑定的函数部分,其余内容均是父进程执行内容。
- multiprocessing中父进程往往只用来创建子进程回收子进程,具体事件由子进程完成。
- multiprocessing创建的子进程中无法使用标准输入
进程对象
p.name 进程名称
p.pid 对应子进程的PID号
p.is\_alive() 查看子进程是否在生命周期
p.daemon 设置父子进程的退出关系
- 如果设置为True则子进程会随父进程的退出而结束
- 要求必须在start()前设置
- 如果daemon设置成True通常就不会使用join()
进程对象代码示例:
1 from multiprocessing import Process
2 import time
3
4 def tm():
5 for i in range(3):
6 print(time.ctime())
7 time.sleep(2)
8
9 p = Process(target = tm,name = 'Tarena')
10
11 # 父进程退出,其所有子进程也退出
12 p.daemon = True
13
14 p.start() # 进程真正产生
15
16 print("Name:",p.name) # 进程名
17 print("PID:",p.pid) # pid号
18 print("is alive:",p.is_alive()) # 是否在生命周期
View Code
自定义进程类
- 创建步骤
- 继承Process类
- 重写\_\_init\_\_方法添加自己的属性,使用super()加载父类属性
- 重写run()方法
- 使用方法
- 实例化对象
- 调用start自动执行run方法
- 调用join()回收进程
自定义进程类代码示例:
1 from multiprocessing import Process
2 from time import *
3
4 # 自定义进程类
5 class MyProcess(Process):
6 def __init__(self,value):
7 self.value = value
8 super().__init__() # 加载父类init
9
10 def f1(self):
11 print("步骤1")
12 def f2(self):
13 print("步骤2")
14
15 # 作为流程启动函数
16 def run(self):
17 for i in range(self.value):
18 self.f1()
19 self.f2()
20
21 if __name__ == '__main__':
22 p = MyProcess(2)
23 p.start()
24 p.join()
View Code
进程池实现
- 必要性
- 进程的创建和销毁过程消耗的资源较多
- 当任务量众多,每个任务在很短时间内完成时,需要频繁的创建和销毁进程.此时对计算机压力较大
- 进程池技术很好的解决了以上问题
- 原理
创建一定数量的进程来处理事件,事件处理完进程不退出而是继续处理其他时间,直到所有事件全都处理完毕统一销毁.增加进程的重复利用,降低资源消耗.
- 进程池实现
1.创建进程池对象,放入适当的进程
from multiprocessing import Pool
Pool(processes)
功能: 创建进程池对象
参数: 指定进程数量,默认根据系统自动判定
2.将事件加入进程池队列执行
pool.apply_async(func,args,kwds)
功能: 使用进程池执行 func事件
参数: func 事件函数
args 元组 给func按位置传参
kwds 字典 给func按照键值传参
返回值: 返回函数事件对象
3.关闭进程池
pool.close()
功能: 关闭进程池
4.回收进程池中进程
pool.join()
功能: 回收进程池中进程
进程池代码示例:
1 from multiprocessing import Pool
2 from time import sleep,ctime
3
4 # 进程池事件
5 def worker(msg):
6 sleep(2)
7 print(ctime(),'--',msg)
8
9 # 创建进程池
10 pool = Pool(4)
11
12 # 向进程池队列中添加事件
13 for i in range(10):
14 msg = "Tedu %d"%i
15 pool.apply_async(func=worker,args=(msg,))
16
17 # 关闭进程池
18 pool.close()
19
20 # 回收进程池
21 pool.join()
View Code
管道通信
- 通信原理
在内存中开辟管道空间,生成管道操作对象,多个进程使用同一管道对象,进行读写即可实现通信
- 实现方法
from multiprocessing import Pipe
fd1,fd2 = Pipe(duplex = True)
功能: 创建管道
参数:默认表示双向管道
如果为False 表示单向管道
返回值:表示管道两端的读写对象
如果是双向管道均可读写
如果是单向管道fd1只读 fd2只写
fd.recv()
功能 : 从管道获取内容
返回值:获取到的数据
fd.send(data) 可以是任何内容,如:整数,浮点数,字符串等
功能: 向管道写入内容
参数: 要写入的数据
管道操作
注意:
- multiprocessing中提供的通信只用于亲缘关系进程间通信
- 管道在父进程中创建,子进程从父进程中获取管道对象
管道操作代码示例:
1 from multiprocessing import Process,Pipe
2
3 # 创建管道对象
4 # 参数False 表示fd1 只能 recv , fd2 只能 send
5 fd1,fd2 = Pipe()
6
7 # APP1可以使用app2提供的信息登录
8 def app1():
9 print("启动app1,请登录")
10 print("请求app2授权")
11 # 写管道
12 fd1.send("app1 可以用你的账号登录吗?")
13 data = fd1.recv()
14 if data:
15 print("登录成功:",data)
16
17 def app2():
18 request = fd2.recv() # 阻塞等待读取管道
19 print(request)
20 fd2.send(('Joy','123')) # 发送python数据类型
21
22 p1 = Process(target=app1)
23 p2 = Process(target=app2)
24 p1.start()
25 p2.start()
26 p1.join()
27 p2.join()
28
29 代码示例
View Code
消息队列
- 通信原理
在内存中建立队列模型,进程通过队列将消息存入,或者从队列取出完成进程间通信.
- 实现方法
<pre class="brush:python;gutter:true;">from multiprocessing import Queue
q = Queue(maxsize=0)
功能: 创建队列对象
参数:最多存放消息个数
返回值:队列对象
q.put(data,[block,timeout])
功能:向队列存入消息
参数:data 要存入的内容
block 设置是否阻塞 False为非阻塞
timeout 超时检测
q.get([block,timeout])
功能:从队列取出消息
参数:block 设置是否阻塞 False为非阻塞
timeout 超时检测
返回值: 返回获取到的内容
q.full() 判断队列是否为满
q.empty() 判断队列是否为空
q.qsize() 获取队列中消息个数
q.close() 关闭队列
消息队列代码示例:
1 from multiprocessing import Queue,Process
2 from time import sleep
3 from random import randint
4
5 # 创建队列
6 q = Queue(5) # 最大存储5个消息
7
8 def request():
9 for i in range(10):
10 x = randint(1,100)
11 y = randint(1,100)
12 q.put((x,y)) # 写消息队列
13 print("=============")
14
15 def handle():
16 while not q.empty():
17 data = q.get() # 读消息队列
18 print("x + y = ",data[0]+data[1])
19 sleep(2)
20
21 p1 = Process(target=request)
22 p2 = Process(target=handle)
23 p1.start()
24 p2.start()
25 p1.join()
26 p2.join()
27
28 代码示例
View Code
共享内存
- 通信原理:在内存中开辟一块空间,进程可以写入内容和读取内容完成通信,但是每次写入内容会覆盖之前内容
- 实现方法
<pre class="brush:python;gutter:false;">from multiprocessing import Value,Array
obj = Value(ctype,data)
功能 : 开辟共享内存
参数 : ctype 表示共享内存空间类型 'i' 'f' 'c'
data 共享内存空间初始数据
返回值:共享内存对象
obj.value 对该属性的修改查看即对共享内存读写
obj = Array(ctype,data)
功能: 开辟共享内存空间
参数: ctype 表示共享内存数据类型
data 整数则表示开辟空间的大小,其他数据类型表示开辟空间存放的初始化数据
返回值:共享内存对象
Array共享内存读写: 通过遍历obj可以得到每个值,直接可以通过索引序号修改任意值。
* 可以使用obj.value直接打印共享内存中的字节串
共享内存代码示例:
注意:共享内存中只能有一个值
1 from multiprocessing import Process,Value
2 import time
3 import random
4
5 # 创建共享内存
6 money = Value('i',5000)
7
8 def man():
9 for i in range(30):
10 time.sleep(0.2)
11 # 修改共享内存
12 money.value += random.randint(1,1000)
13
14 def girl():
15 for i in range(30):
16 time.sleep(0.15)
17 money.value -= random.randint(100,800)
18
19 p1 = Process(target=man)
20 p2 = Process(target=girl)
21 p1.start()
22 p2.start()
23 p1.join()
24 p2.join()
25 print("一个月余额:",money.value) #读取共享内存
View Code
共享内存存放列表,字节串:
1 from multiprocessing import Process,Array
2
3 # 创建共享内存,初始数据 [1,2,3,4]
4 # shm = Array('i',[1,2,3,4])
5 # shm = Array('i',4) # 开辟4个整形的列表空间
6 shm = Array('c',b'hello')
7
8 def fun():
9 # 共享内存对象可以迭代
10 for i in shm:
11 print(i)
12 shm[0] = b'H' # 修改共享内存
13
14 p = Process(target=fun)
15 p.start()
16 p.join()
17 for i in shm:
18 print(i)
19 print(shm.value) # 整体打印字节串
View Code
信号量(信号灯集)
- 通信原理
给定一个数量对多个进程可见.多个进程都可以操作改数量增减,并根据数量值决定自己的行为
- 实现方法
<pre class="brush:python;gutter:true;">from multiprocessing import Semaphore
sem = Semaphore(num)
功能 : 创建信号量对象
参数 : 信号量的初始值
返回值 : 信号量对象
sem.acquire() 将信号量减1 当信号量为0时阻塞
sem.release() 将信号量加1
sem.get_value() 获取信号量数量
信号量代码示例:
注意: 信号量可以当做是一种资源,执行任务需要消耗信号量资源,
这样可以控制进程执行行为
1 from multiprocessing import Process,Semaphore
2 from time import sleep
3 import os
4
5 # 创建信号量资源
6 sem = Semaphore(3)
7
8 # 任务函数 (系统中最多能够同时运行三个该任务)
9 def handle():
10 sem.acquire() # 消耗一个信号量
11 print("%s执行任务"%os.getpid())
12 sleep(2)
13 print("%s 拯救了宇宙"%os.getpid())
14 sem.release() # 增加一个信号量
15
16 jobs = []
17 for i in range(10):
18 p = Process(target = handle)
19 jobs.append(p)
20 p.start()
21
22 for i in jobs:
23 i.join()
View Code