进程、线程与协程
一、基本概念
1.进程和线程
1.1 进程
-
进程:运行中的程序。每个进行拥有自己的地址空间、内存、数据栈等。多进程之间的数据不共享。进程也可以通过派生(fork 或 spawn)新的进程来执行其他任务,进程间采用进程间通信(IPC,Inter Process communication)的方式共享信息。
-
僵尸进程
-
孤儿进程
- 守护进程
1.2 线程
在一个主进程或“主线程”中并行运行的一些“迷你进程”。能被操作系统调度的最小单位。线程之间数据共享,
-
全局解释器锁:对 Python 虚拟机的访问是由全局解释器锁(GIL,global interpreter lock)控制的。这个锁就是用来保证同时只能有一个线程运行的。作用是为了完成gc的回收机制,对不同线程的引用计数的变化记录更加精准。导致了同一个进程中的多个进程只能有一个线程真正被CPU执行。
-
守护线程:
-
池:在程序开始之前先创建几个线程或进程放在一个池中,开好的进程和线程会一直存在在池中,可以被多个任务反复利用,可以减少调度开销,减轻操作系统的负担。
2. 同步和异步
描述的任务的提交方法
2.1 同步
任务提交以后,原地等待任务的返回结果
2.2 异步
任务提交以后,不原地等待任务的返回结果
3. 阻塞和非阻塞
描述的任务的运行状态
3.3 阻塞
3.4 非阻塞
二、多进程
1. 开启子进程
1.1 方法一
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 | from multiprocessing import Process
import os
def func(): # 子进程
print(__name__, 'func: ppid: %s, pid: %s' % (os.getppid(), os.getpid()))
print(__name__, 'main: ppid: %s, pid: %s' % (os.getppid(), os.getpid()))
p = Process(target=func) # 创建一个对象
p.start() # 启动一个子进程
# 输出结果:
# __main__ main: ppid: 72950, pid: 83952
# __main__ func: ppid: 83952, pid: 83953
|
如果在Windows下执行会报错:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 | __main__ main: ppid: 10740, pid: 22656
__mp_main__ main: ppid: 22656, pid: 24816
RuntimeError:
An attempt has been made to start a new process before the
current process has finished its bootstrapping phase.
This probably means that you are not using fork to start your
child processes and you have forgotten to use the proper idiom
in the main module:
if __name__ == '__main__':
freeze_support()
...
The "freeze_support()" line can be omitted if the program
is not going to be frozen to produce an executable.
|
出错原因:Windows启动子进程时会自动import
启动它的这个文件,而在import
的时候会执行这些语句。无限递归创建子进程报错。但是在multiprocessing.Process的源码中是对子进程再次产生子进程是做了限制的,是不允许的,于是出现如上的错误提示 。Linux下开启子进程会copy主进程的内存空间到子进程,父子进程之间隔离。
解决方法:
1
2
3
4
5
6
7
8
9
10
11
12 | from multiprocessing import Process
import os
def func(): # 子进程
print(__name__, 'func: ppid: %s, pid: %s' % (os.getppid(), os.getpid()))
if __name__ == '__main__':
print(__name__, 'main: ppid: %s, pid: %s' % (os.getppid(), os.getpid()))
p = Process(target=func) # 创建一个对象
p.start() # 启动一个子进程
|
用多进程实现socket server连接多个client
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24 | #!/usr/bin/env python3
# _*_ coding:utf-8 _*_
# filename: server.py
import socket
from multiprocessing import Process
def talk(conn):
while True:
msg = conn.recv(1024).decode("utf-8")
ret = msg.upper().encode("utf-8")
conn.send(ret)
conn.close()
if __name__ == '__main__':
sock = socket.socket()
sock.bind(("127.0.0.1", 8080))
sock.listen()
while True:
conn, addr = sock.accept()
Process(target=talk, args=(conn,)).start()
sock.close()
|
client端
1
2
3
4
5
6
7
8
9
10
11
12
13
14 | #!/usr/bin/env python3
# _*_ coding:utf-8 _*_
# filename: client.py
import socket
import time
sock = socket.socket()
sock.connect(("127.0.0.1", 8080))
while True:
sock.send(b'hello')
content = sock.recv(1024).decode('utf-8')
print(content)
time.sleep(1)
|
1.2 方法二
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23 | from multiprocessing import Process
import os
class MyProcess(Process):
def __init__(self, name, age):
super().__init__()
self.name = name
self.age = age
def run(self):
print('func: ppid: %s, pid: %s' % (os.getppid(), os.getpid()))
print('func:', self.name, self.age)
if __name__ == '__main__':
print('main: ppid: %s, pid: %s' % (os.getppid(), os.getpid()))
p = MyProcess('Jerry', 18)
p.start()
#main: ppid: 10740, pid: 22884
#func: ppid: 22884, pid: 21232
#func: Jerry 18
|
2. join
等待子进程结束后继续执行。
join():同步阻塞--直至启动的线程终止之前一直挂起;除非给出了 timeout(秒),否则会一直阻塞
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23 | from multiprocessing import Process
import os
import time
def func():
time.sleep(2)
print('%s func: ppid:%s, pid: %s' % (time.strftime("%Y-%m-%d %H:%M:%S"), os.getppid(),os .getpid()))
if __name__ == '__main__':
print('%s main: ppid:%s, pid: %s' % (time.strftime("%Y-%m-%d %H:%M:%S"), os.getppid(),os .getpid()))
p = Process(target=func)
p.start() # 异步非阻塞
p.join() # 同步阻塞 直至启动的线程终止之前一直挂起;除非给出了 timeout(秒),否则会一直阻塞
print(time.strftime("%Y-%m-%d %H:%M:%S"),'end')
# 输出结果
# 2020-02-17 11:38:47 main: ppid:19674, pid: 20103
# 2020-02-17 11:38:49 func: ppid:20103, pid: 20104
# 2020-02-17 11:38:49 end
#### 如果没有join输出结果 ####
#2020-02-17 11:38:57 main: ppid:19674, pid: 20115
#2020-02-17 11:38:57 end
#2020-02-17 11:38:59 func: ppid:20115, pid: 20116
|
多个子进程join
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 | from multiprocessing import Process
import os
import time
def func(n):
time.sleep(2)
print('%s func%d: ppid:%s, pid: %s' % (time.strftime("%Y-%m-%d %H:%M:%S"), n,os.getppid(),os .getpid()))
if __name__ == '__main__':
print('%s main: ppid:%s, pid: %s' % (time.strftime("%Y-%m-%d %H:%M:%S"), os.getppid(),os .getpid()))
p_list = []
for arg in range(1,5):
p = Process(target=func,args=(arg,))
p.start()
p_list.append(p)
for p in p_list:
p.join()
time.sleep(1)
print(time.strftime("%Y-%m-%d %H:%M:%S"),'end')
# 输出结果
# 2020-02-17 12:15:53 main: ppid:83012, pid: 79880
# 2020-02-17 12:15:56 func1: ppid:79880, pid: 76312
# 2020-02-17 12:15:56 func2: ppid:79880, pid: 83728
# 2020-02-17 12:15:56 func3: ppid:79880, pid: 72620
# 2020-02-17 12:15:56 func4: ppid:79880, pid: 72696
# 2020-02-17 12:15:57 end
|
3.属性和方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 | from multiprocessing import Process
import os
import time
class MyProcess(Process):
def run(self):
print('func: ppid: %s, pid: %s' % (os.getppid(), os.getpid()))
if __name__ == '__main__':
print('main: ppid: %s, pid: %s' % (os.getppid(), os.getpid()))
p = MyProcess()
p.start()
print(p.ident, p.pid) # 512 512
print(p.name) # MyProcess-1
print(p.is_alive()) # True
p.terminate() # 强制结束一个子进程,异步非阻塞,需要操作系统响应请求后执行
time.sleep(0.01)
print(p.is_alive()) # False
|
4. 守护进程
- 主进程为了回收子进程的资源,会等待所有子进程结束才结束。
- 守护进程会等待主进程代码结束之后才结束。
- 如果主进程代码结束之后还有其他子进程在运行,守护进程不守护
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30 | from multiprocessing import Process
import time
def func1():
while True:
print('func1')
time.sleep(1)
def func2():
for i in range(5):
print('func2', i)
time.sleep(1)
if __name__ == '__main__':
p1 = Process(target=func1)
p1.daemon = True # p1设置为守护进程
p1.start() # p1启动新的进程开始执行
p2 = Process(target=func2)
p2.start() # p2启动新的进程开始执行
time.sleep(3) # 主进程继续执行,3秒之后主进程的代码执行完成,p1守护进程随之结束,但是p2进程会继续执行,直到p2结束之后,脚本执行结束。
# 输出结果
func1
func2 0
func1
func2 1
func1
func2 2
func2 3
func2 4
|
5. 锁
加锁降低了程序的执行效率,保证了数据的安全
5.1 不使用锁的情况
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60 | #!/usr/bin/env python3
# coding:utf-8
from multiprocessing import Process
import time
import json
import random
def search_ticket(i):
# 查询余票
with open("ticket.db", 'r', encoding='utf-8') as f:
ticket = json.load(f)
print('用户%s查询余票,当前余票:%s张.' % (i, ticket.get('num')), end='')
def buy_ticket(i):
with open("ticket.db", 'r', encoding='utf-8') as f:
ticket = json.load(f)
# 模拟网络延时
time.sleep(random.randint(1, 3))
# 判断当前是否有票
if ticket.get('num') > 0:
# 修改数据库
ticket['num'] -= 1
# 写入数据
with open('ticket.db', mode='w', encoding='utf-8') as f:
json.dump(ticket, f)
print('user%s购票成功!' % i)
else:
print('余票不足,购票失败!')
def get_ticket(i):
search_ticket(i)
buy_ticket(i)
def create_db():
ticket = {'num': 1}
with open('ticket.db', mode='w', encoding='utf-8') as f:
json.dump(ticket, f)
if __name__ == '__main__':
create_db()
for i in range(1, 11):
p = Process(target=get_ticket, args=(i,))
p.start()
######################################
用户3查询余票,当前余票:1张.user3购票成功!
用户4查询余票,当前余票:1张.user4购票成功!
用户6查询余票,当前余票:1张.user6购票成功!
用户10查询余票,当前余票:1张.user10购票成功!
用户5查询余票,当前余票:1张.user5购票成功!
用户7查询余票,当前余票:1张.user7购票成功!
用户1查询余票,当前余票:1张.user1购票成功!
用户2查询余票,当前余票:1张.user2购票成功!
用户9查询余票,当前余票:1张.user9购票成功!
用户8查询余票,当前余票:1张.user8购票成功!
|
5.2 互斥锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63 | #!/usr/bin/env python3
# coding:utf-8
from multiprocessing import Process, Lock
import time
import json
import random
def search_ticket(i):
# 查询余票
with open("ticket.db", 'r', encoding='utf-8') as f:
ticket = json.load(f)
print('用户%s查询余票,当前余票:%s张.' % (i, ticket.get('num')), end='')
def buy_ticket(i):
with open("ticket.db", 'r', encoding='utf-8') as f:
ticket = json.load(f)
# 判断当前是否有票
if ticket.get('num') > 0:
# 修改数据库
ticket['num'] -= 1
# 模拟网络延时
time.sleep(random.randint(1,3))
# 写入数据
with open('ticket.db', mode='w', encoding='utf-8') as f:
json.dump(ticket, f)
print('user%s购票成功!' % i)
else:
print('余票不足,购票失败!')
def get_ticket(i, lock):
search_ticket(i)
#lock.acquire() # 加锁
#buy_ticket(i)
#lock.release() # 释放锁
with lock: # with 在执行函数前加锁,执行函数后释放锁,with 可以保证在函数执行失败后释放锁。
buy_ticket(i)
def create_db():
ticket={'num':3}
with open('ticket.db', mode='w', encoding='utf-8') as f:
json.dump(ticket, f)
if __name__ == '__main__':
create_db()
lock = Lock()
for i in range(1,11):
p = Process(target=get_ticket, args=(i, lock))
p.start()
##################################
# 虽然每个人都查到余票有3张,但只有三个人买票成功
用户1查询余票,当前余票:3张.user1购票成功!
用户2查询余票,当前余票:3张.user2购票成功!
用户4查询余票,当前余票:3张.user4购票成功!
用户5查询余票,当前余票:3张.余票不足,购票失败!
用户7查询余票,当前余票:3张.余票不足,购票失败!
用户8查询余票,当前余票:3张.余票不足,购票失败!
用户9查询余票,当前余票:3张.余票不足,购票失败!
用户3查询余票,当前余票:3张.余票不足,购票失败!
用户6查询余票,当前余票:3张.余票不足,购票失败!
用户10查询余票,当前余票:3张.余票不足,购票失败!
|
互斥锁不能acquire多次
6. 进程间通信(IPC)
- 基于文件 同一机器的多个进程之间通信 (基于socket)
- 基于网络 同一或不同机器多进程之间通信 (memcheck, redis, rabbitmq,kafka)
6.1 队列Queue
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 | from multiprocessing import Queue, Process
def foo(q):
print(q.get()) # hello world!
def bar(q):
q.put('hello world!')
if __name__ == '__main__':
q = Queue()
p1 = Process(target=bar, args=(q,))
p1.start()
p2 = Process(target=foo, args=(q,))
p2.start()
|
队列中put一个,get一个,如果get数量超过put,get会一直等待。
6.2 通过队列实现生产者消费者模型
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35 | from multiprocessing import Process
from multiprocessing import Queue
import time
import random
def consumer(q):
while True:
food = q.get()
if food:
print("-C消费了%s" % food)
else:
break
def producer(q, name, food):
for i in range(10):
ifood = "第%s个%s" % (i, food)
print("+%s生产了%s" % (name, ifood))
time.sleep(random.random())
q.put(ifood)
if __name__ == '__main__':
q = Queue()
p1 = Process(target=producer, args=(q, "A", "饺子"))
p2 = Process(target=producer, args=(q, "B", "包子"))
c = Process(target=consumer, args=(q,))
p1.start()
p2.start()
c.start()
p1.join()
p2.join()
q.put(None)
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 | from multiprocessing import Process
from multiprocessing import Queue
import time
def comsumor(q):
print('-消费第%s个产品' % q.get())
def productor(q,i):
print('+生产第%s个产品' % (i,))
q.put((i))
time.sleep(0.5)
if __name__ == '__main__':
q = Queue()
for i in range(10):
p = Process(target=productor,args=(q,i))
p.start()
for i in range(10):
c = Process(target=comsumor, args=(q,))
c.start()
|
7. 进程间数据共享
进程之间是数据隔离的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 | from multiprocessing import Process
num = 0
def func():
global num
num += 1
if __name__ == '__main__':
p_l = []
for i in range(100): # 开100个进程空间
p = Process(target=func)
p.start()
p_l.append(p)
for p in p_l:
p.join()
print(num) # 0
|
python中可以通过Manager
类加锁的方法实现进程间的数据共享,但是带来的性能代价很高,需要对共享的数据进行加锁,不推荐使用这种方法。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 | from multiprocessing import Process, Manager, Lock
def change_dic(dic, lock):
with lock:
dic['count'] -= 1
if __name__ == '__main__':
m = Manager()
dic = m.dict({'count': 100})
lock = Lock()
p_l = []
for i in range(100):
p = Process(target=change_dic, args=(dic, lock))
p.start()
p_l.append(p)
for p in p_l:
p.join()
print(dic) # {'count': 0}
|
2. 进程池
2.1.创建线程池
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 | from concurrent.futures import ProcessPoolExecutor
import os
import time
import random
def func(i):
print('start: %s %s' % (i, os.getpid()))
time.sleep(random.randint(1, 3))
print('end: %s %s' % (i, os.getpid()))
if __name__ == "__main__":
tp = ProcessPoolExecutor(4) # 实例进程池,传入进程池中的进程数量
for i in range(10):
tp.submit(func, i)
|
2.2 从进程池中获取结果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 | from concurrent.futures import ProcessPoolExecutor
import os
import time
import random
def func(i):
print('start: %s %s' % (i, os.getpid()))
time.sleep(random.randint(1, 3))
print('end: %s %s' % (i, os.getpid()))
return i ** 2
if __name__ == "__main__":
tp = ProcessPoolExecutor(4)
future_l = {}
for i in range(20): # 异步非阻塞
ret = tp.submit(func, i) # ret --> <Future at 0x166e360f5c0 state=running>
# print(ret.result()) # 同步执行,如果在这里获取结果:执行完一个进程等待收到结果之后继续执行
future_l[i] = ret
for k, v in future_l.items(): # 同步获取结果,按照提交顺序获取结果,等到获取到结果之后才能获取下一个任务的结果。
print(k, '-->', v.result())
|
使用map
方法,适用于传递简单的参数,且参数时一个可迭代的类型。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 | from concurrent.futures import ProcessPoolExecutor
import os
import time
import random
def func(i):
print('start: %s %s' % (i, os.getpid()))
time.sleep(random.randint(1, 3))
print('end: %s %s' % (i, os.getpid()))
return i ** 2
if __name__ == "__main__":
tp = ProcessPoolExecutor(4)
ret = tp.map(func, range(10)) # map(函数,可迭代对象)
for k in ret:
print(k)
|
2.3 回调函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25 | from concurrent.futures import ProcessPoolExecutor
import os
import time
import random
def func(i):
print('start: %s %s' % (i, os.getpid()))
time.sleep(random.randint(1, 3))
print('end: %s %s' % (i, os.getpid()))
return i ** 2
def get_ret(ret): # 异步阻塞
print(ret.result()) # 先执行完的先打印
if __name__ == "__main__":
tp = ProcessPoolExecutor(4)
future_l = {}
for i in range(20): # 异步非阻塞
ret = tp.submit(func, i)
ret.add_done_callback(get_ret) # 异步阻塞
# 回调函数,给ret对象绑定一个回调函数,
# 等待ret对应的任务有了结果之后立即调用get_ret函数对结果立即进行处理,而不用按照顺序接收处理结果
|
三、多线程
1. 多线程
- 线程是能被操作系统调度的最小单位
- 同一个进程中的线程可以同时被CPU执行
- 线程之间数据共享
- 线程的切换开销较小
- 线程只能自己执行完代码后关闭
CPython中的线程:
- gc垃圾回收:引用计数+分代回收
- 全局解释器锁:为了完成gc的回收机制。导致一个进程中的多个线程只有一个线程被执行。
pypy 也使用gc回收,同样不能使用多核。
2.开启线程
2.1 方法一 面向过程
1
2
3
4
5
6
7
8
9
10
11
12
13
14 | from threading import Thread
import time
def func(i):
print('start:%s' % i)
time.sleep(1)
print('end:%s' % i)
if __name__ == '__main__':
for i in range(2):
t = Thread(target=func, args=(i,))
t.start()
|
2.1 方法二 面向对象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 | from threading import Thread
import time
class MyThread(Thread):
def __init__(self, i):
self.i = i
super().__init__()
def run(self):
print('start:%s' % self.i)
time.sleep(1)
print('end:%s' % self.i)
if __name__ == '__main__':
for i in range(2):
t = MyThread(i)
t.start()
|
3 join
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 | from threading import Thread
import time
def func(i):
print('start:%s' % i)
time.sleep(1)
print('end:%s' % i)
if __name__ == '__main__':
t_l = []
for i in range(3):
t = Thread(target=func, args=(i,))
t.start()
t_l.append(t)
for t in t_l:
t.join()
print("所有线程执行完成")
# start:0
# start:1
# start:2
# end:1
# end:2
# end:0
# 所有线程执行完成
|
4. 属性和方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26 | from threading import Thread
from threading import current_thread
from threading import enumerate
from threading import active_count
import time
def func(i):
print('%s --> %s 线程id: %s' % (i, current_thread().name, current_thread().ident)) # # 在执行的线程中返回当前线程的id
time.sleep(1)
if __name__ == '__main__':
t_l = []
for i in range(2):
t = Thread(target=func, args=(i,))
t.start()
t_l.append(t)
print('进程列表:',
enumerate()) # 进程列表: [<_MainThread(MainThread, started 30544)>, <Thread(Thread-1, started 23060)>, <Thread(Thread-2, started 28636)>, <Thread(Thread-3, started 28932)>]
print('当前存活的进程数:', active_count()) # 4
for t in t_l:
t.join()
print("所有线程执行完成")
|
5,.线程之间数据共享
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 | from threading import Thread
num = 100
def func():
global num
num -= 1
if __name__ == '__main__':
t_l = []
for i in range(100):
t = Thread(target=func)
t.start()
t_l.append(t)
for r in t_l:
t.join()
print(num) # 0
|
3. 守护线程
主线程会等待子线程结束之后才结束。
- 守护线程随着主线程的结束而结束。
- 守护线程会在主线程的代码结束之后还有其他子线程在运行,继续守护其他子线程。
- 其他子线程结束-->主线程结束-->整个进程中所有资源回收-->守护进程也会被回收
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29 | from threading import Thread
import time
def func1():
while True:
print('执行子线程1...')
time.sleep(1)
def func2():
for i in range(3):
print('执行子线程2...')
time.sleep(1)
t1 = Thread(target=func1)
t1.daemon = True
t1.start()
t2 = Thread(target=func2)
t2.start()
# 输出结果
执行子线程1...
执行子线程2...
执行子线程1...
执行子线程2...
执行子线程1...
执行子线程2...
执行子线程1...
|
子线程func2结束之后,不等待线程func1,主线程结束。
4. 锁
线程之间也存在数据不安全。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 | from threading import Thread
n = 0
def add():
for _ in range(100000):
global n
n += 1
def sub():
for _ in range(100000):
global n
n -= 1
t_l = []
for i in range(2):
t1 = Thread(target=add)
t1.start()
t2 = Thread(target=sub)
t2.start()
t_l.append(t1)
t_l.append(t2)
for t in t_l:
t.join()
print(n) # -15778 可能出现数据不安全
|
引起数据不安全的原因:
多个线程在运算和赋值之间发生线程的切换
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 | import dis
n = 0
def add():
global n
n += 1
dis.dis(add)
# 8 0 LOAD_GLOBAL 0 (n)
# 2 LOAD_CONST 1 (1)
# 4 INPLACE_ADD
# 6 STORE_GLOBAL 0 (n)
# 8 LOAD_CONST 0 (None)
# 10 RETURN_VALUE
|
引起数据不安全的操作:
- 在
+=
、-=
、*=
、/=
运算中,计算和赋值是两个操作,其他先计算后赋值的表达式都会引起数据不安全
if
、while
判断同样会引起数据不安全
- 多个线程同时操作全部变量、静态变量会引起数据不安全
避免引起数据不安全的方法:
-
尽量避免操作全局变量
-
不要在类里操作静态变量
4.1 互斥锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 | from threading import Thread,Lock
n = 0
def add(lock):
for i in range(30000):
global n
with lock:
n += 1
def sub(lock):
for i in range(30000):
global n
with lock:
n -= 1
t_l = []
lock = Lock()
for i in range(2):
t1 = Thread(target=add, args=(lock,))
t1.start()
t2 = Thread(target=sub, args=(lock,))
t2.start()
t_l.append(t1)
t_l.append(t2)
for t in t_l:
t.join()
print(n)
|
4.2 死锁
产生的原因:多把(互斥/递归)锁,在再多个线程中交叉使用
解决死锁的方法:当使用互斥锁出现死锁现象时,可以把所有的互斥锁都改成一把递归锁,但是执行的效率队降低。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37 | from threading import Thread, Lock
import time
mutexA = Lock()
mutexB = Lock()
def func1():
mutexA.acquire()
print("func1 抢到A锁")
time.sleep(2)
mutexB.acquire()
print("func1 抢到B锁")
mutexB.release()
mutexA.release()
def func2():
mutexB.acquire()
print("func2 抢到B锁")
time.sleep(2)
mutexA.acquire()
print("func2 抢到A锁")
mutexA.release()
mutexB.release()
t1 = Thread(target=func1)
t2 = Thread(target=func2)
t1.start()
t2.start()
# func1 抢到A锁
# func2 抢到B锁
# 到这里阻塞住了
|
4.3 递归锁
同一个线程中可以lock.acquire多次,同时需要lock.release多次才能释放锁。但是只能被第一个抢到锁的执行上述操作。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30 | from threading import Thread, RLock
import time
mutexA = mutexB = RLock()
def func1():
mutexA.acquire()
print("func1 抢到A锁")
time.sleep(2)
mutexB.acquire()
print("func1 抢到B锁")
mutexB.release()
mutexA.release()
def func2():
mutexB.acquire()
print("func2 抢到B锁")
time.sleep(2)
mutexA.acquire()
print("func2 抢到A锁")
mutexA.release()
mutexB.release()
t1 = Thread(target=func1)
t2 = Thread(target=func2)
t1.start()
t2.start()
|
递归锁的层级越多效率越低。
信号量
semaphore信号量可以通过内置计数器来控制同时运行线程的数量,启动线程(消耗信号量)内置计数器会自动减一,线程结束(释放信号量)内置计数器会自动加一;内置计数器为零,启动线程会阻塞,直到有本线程结束或者其他线程结束为止;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 | from threading import Thread, Semaphore
import time
import random
semaphore = Semaphore(5) # # 添加一个计数器,最大并发线程数量5(最多同时运行5个线程)
def task(name):
semaphore.acquire()
print("%s号正在执行..." % name)
time.sleep(random.randint(1, 5))
semaphore.release()
if __name__ == '__main__':
for i in range(20):
t = Thread(target=task, args=(i,))
t.start()
|
Event事件
事件是用于线程间通信的对象。多个进程拥有同一个Event实例,当调用wait()时进入到阻塞状态,同时会设置阻塞标记为False(待阻塞标记为True后才会解除阻塞状态),此时另一个进程可以继续工作,并且通过set()方法将阻塞标记设置为True,这样之前阻塞的进程会继续执行。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26 | from threading import Thread, Event
import time
event = Event()
def light():
print('红灯亮')
time.sleep(3)
print('绿灯亮')
event.set() # 绿灯亮
def car(name):
print('车%s正在等绿灯' % name)
event.wait() # 等灯绿 此时event为False,直到event.set()将其值设置为True,才会继续运行.
print('车%s通行' % name)
if __name__ == '__main__':
t1 = Thread(target=light)
t1.start()
for i in range(5):
t = Thread(target=car, args=(i,))
t.start()
|
5. 线程队列
5.1 先进先出队列
1
2
3
4
5
6
7
8
9
10
11
12
13
14 | import queue # 数据之间线程安全的队列容器
q = queue.Queue(3) # FIFO 先进先出,可以设置队列大小
q.put(1)
q.put(2)
q.put(3)
print(q.get()) # 1
print(q.get()) # 2
print(q.get()) # 3
try:
q.get_nowait() # 取不到数据时不阻塞,抛出异常
except queue.Empty:
pass
|
5.2 后进先出队列
1
2
3
4
5
6
7
8
9
10
11
12
13 | from queue import LifoQueue
lq = LifoQueue()
for i in range(3):
lq.put(i)
for i in range(3):
print(lq.get())
# 输出结果
# 2
# 1
# 0
|
5.3 优先级队列
1
2
3
4
5
6
7
8
9
10
11
12 | from queue import PriorityQueue
pq = PriorityQueue()
# (优先级,数据内容 )
pq.put((8,"A"))
pq.put((2,"B"))
print(pq.get())
print(pq.get())
# 输出结果
# (2, 'B')
# (8, 'A')
|
优先级根据ASCII排序
线程池
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 | from concurrent.futures import ThreadPoolExecutor
from threading import current_thread
import time
import random
def func(i):
print("第%s个任务,进程ID是:%s" % (i, current_thread().ident))
time.sleep(random.randint(1, 3))
return i * 2
tp = ThreadPoolExecutor(4) # 实例化线程池,传入线程池中线程数量
t_list = []
for i in range(10):
res = tp.submit(func, i)
# print(res.result()) # 同步提交
t_list.append(res)
tp.shutdown() # 关闭线程池,等待线程池中所有的任务运行完毕
for t in t_list:
print('>>>', t.result())
|
四. 协程
单线程下实现并发。减轻了操作系统的负担,可以多争取一些时间片来被CPU执行,提高程序的效率。
1. gevent
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 | from gevent import monkey
monkey.patch_all()
# gevent模块本身无法检测常见的IO操作,需要导入monkey
import gevent
import time
def func():
print('start...')
time.sleep(1)
print('end...')
g = gevent.spawn(func)
# g.join() # 阻塞直到协程g任务执行结束
gevent.joinall([g, ])
|
基于gevent实现socket并发
server 端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32 | #!/usr/bin/env python3
# _*_ coding:utf-8 _*_
# filename: server.py
from gevent import monkey
monkey.patch_all()
import socket
import gevent
def talk(conn):
while True:
try:
msg = conn.recv(1024).decode('utf-8')
conn.send(msg.upper().encode('utf-8'))
except ConnectionResetError:
break
conn.close()
def server():
sock = socket.socket()
sock.bind(("127.0.0.1", 8080))
sock.listen()
while True:
conn, addr = sock.accept()
gevent.spawn(talk, conn)
sock.close()
if __name__ == '__main__':
gevent.spawn(server).join()
|
client端
1
2
3
4
5
6
7
8
9
10
11
12
13
14 | #!/usr/bin/env python3
# _*_ coding:utf-8 _*_
# filename: client.py
import socket
import time
sock = socket.socket()
sock.connect(("127.0.0.1", 8080))
while True:
sock.send(b'hello')
content = sock.recv(1024).decode('utf-8')
print(content)
time.sleep(1)
|
2. asyncio
2.1 用法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 | import asyncio
# 定义协程函数
async def func():
pass
# 创建协程对象
task = func()
# 将协程对象交给事件循环
loop = asyncio.get_event_loop()
loop.run_until_complete(task)
# python3.7以后
# asyncio.run(task)
|
2.2 await
await + 可等待的对象(协程对象、Future、Task对象)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 | import asyncio
async def task():
print('start')
await asyncio.sleep(1)
print('end')
async def main():
result = await task()
print(result)
asyncio.run(main())
|
2.3 Task对象
创建多个任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 | import asyncio
async def task():
print('task start')
await asyncio.sleep(2) # await 可能会发生阻塞的方法
print('task end')
return "hello world"
async def main():
print("main start")
task_list = [
asyncio.create_task(task(), name="task-1"),
asyncio.create_task(task(), name="task-2")
]
done, pending = await asyncio.wait(task_list)
print("main end")
print(done)
asyncio.run(main())
|
执行结果
| main start
task start
task start
task end
task end
main end
{<Task finished name='task-1' coro=<task() done, defined at E:\Project\pythonProject\main.py:4> result='hello world'>, <Task finished name='task-2' coro=<task() done, defined at E:\Project\pythonProject\main.py:4> result='hello world'>}
|
2.4 Future
Task继承了Future
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 | import asyncio
async def task(fut):
print('task start')
await asyncio.sleep(2)
print('task end')
fut.set_result(123)
async def main():
loop = asyncio.get_running_loop()
fut = loop.create_future()
await loop.create_task(task(fut))
data = await fut
print(data)
asyncio.run(main())
|
执行结果
如果任务不支持异步
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 | import time
import asyncio
def task(i):
time.sleep(2)
return i*i
async def main():
loop = asyncio.get_running_loop()
fut = loop.run_in_executor(None, task, 2)
data = await fut
print(data)
asyncio.run(main())
|
|
数据隔离 |
安全性 |
切换方式 |
开销 |
利用多核 |
特点 |
进程 |
隔离 |
数据不安全 |
操作系统 |
*** |
是 |
|
线程 |
共享 |
数据不安全 |
操作系统 |
** |
否 |
一些和文件操作相关的IO只有操作系统能够感知 |
协程 |
共享 |
数据安全 |
用户 |
* |
否 |
只有在用户级别能够感知到的IO操作才会用协程来做切换来规避 |