豪翔天下

Change My World by Program

0%

Python 进程、线程与协程

基本概念

  • 进程: 资源分配的最小单位。
  • 线程: CPU调度的最小单位。
  • Python里面的多线程只能利用CPU的一个核(全局解释锁的历史原因)
  • 多线程一般来说比多进程快,毕竟共享内存,但是多线程也更危险,以为一个线程崩溃可能导致整个程序崩溃。

多线程

线程安全与线程不安全: 多个线程同时访问一个方法,得到的结果一样就是线程安全的,不一样则是线程不安全的。gevent库是基于事件驱动模型,它的线程是否安全完全看多线程程序是怎么写的,如果仅仅只有gevent一个线程那么不存在线程安全问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# 创建子线程
class Thread(threading.Thread):
def __init__(self, 变量):
threading.Thread.__init__(self)
self.变量 = 变量
def run(self):
逻辑
thread = Thread(参数) # 定义一个线程
thread.start() # 开启一个线程,此时子线程已经开始执行了,主线程不会被阻塞
thread.join() # 这样可以让主线程阻塞,一直等到子线程退出位置才会继续往下执行

# 常用方法
threading.activeCount() # 获取当前线程数量,可以用这个来控制线程最大的数量
threading.currentThread() # 获取当前线程对象
threading.currentThread().getName() # 获取当前线程的名称
exit() # 终止当前线程,网上好多人问怎么没有API,后来发现exit就行了...并不会影响到其它线程和主线程

# 线程锁,如果要修改全局变量,可以给全局变量加锁。
lock = threading.Lock()
lock.acquire()
xxxxx
lock.release()
# 更方便的使用:
lock = threading.Lock()
with lock:
xxxxxxx

# 局部变量。虽然局部变量简单的使用直接用就行,但是如果要在run里面进行各个函数之间的传递那就麻烦了,所以提供了ThreadLocal来将线程内部的局部变量变为一个字典,其它函数直接调用即可
LOCAL = threading.local() # 在全局定义,每个线程引用该值结果都仅仅会得到自己的私有变量
# 在Thread类里面的run函数赋值,不能在__init__里面定义,因为那时候线程还没启起来
LOCAL.变量名 = 值 # 就这样

多进程

os.fork()

直接fork一个进程

multiprocessing库

线程池/进程池

Pool

协程(asyncio)

位于标准库中,使用协程来编写单线程的并发,通过IO多路复用技术访问套接字。进程和线程都面临着内核态和用户态的切换问题而耗费许多切换时间,而协程则是用户自己控制切换时机,不需要进入内核态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import asyncio

# 用装饰器来标记作为协程的函数
@asyncio.coroutine
def countdown(number, n):
while n > 0:
print('T-minus', n, '({})'.format(number))
yield from asyncio.sleep(1) # 这里会返回一个asyncio.Future对象并将其传递给时间循环,同时暂停这一协程的执行,时间循环监听这一对象,1秒钟后,时间循环会选择刚刚这个协程,将future对象的结果返回给它,然后协程继续执行。这一过程会持续到所有的协程程序全部完成。
n -= 1

loop = asyncio.get_event_loop()
tasks = [
asyncio.ensure_future(countdown("A", 2)),
asyncio.ensure_future(countdown("B", 3))]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

# 在3.5里面,有了新的语法,添加了types.coroutine修饰器
async def slow_operation(n): # 以这种方式定义协程,在协程里面不能有yield语句,只有return和await可以用于返回
await asyncio.sleep(1) # await接受的对象必须是awaitable对象,必须是定义了__await__()方法且这一方法必须返回一个不是协程的迭代器,协程本身也被认为是awaitable对象
print('Slow operation {} complete'.format(n))

async def main():
await asyncio.wait([
slow_operation(1),
slow_operation(2),
slow_operation(3),
])

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

asyncio中使用requests

1
2
3
4
5
6
7
8
9
10
11
async def req(url):
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, requests.get, url)

async def req2(url):
await req(url)

tasks = [req2(url1), req2(url2)]

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.await(tasks))

asyncio动态添加任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
async def produce_task():
while True:
url = redis.pop()
asyncio.run_coroutine_threadsafe(req2(url), thread_loop) # 这里的req2参考上面的定义,是一个异步任务函数thread_loop是我们单独的一个事件循环

def start_loop(loop):
asyncio.set_event_loop(loop)
loop.run_forever()

# 单独启动了一个线程来做消费者
thread_loop = asyncio.new_event_loop()
run_loop_thread = Thrad(target=start_loop, args=(thread_loop,))
run_loop_thread.start()

# 主线程则直接进行事件循环
loop = asyncio.get_event_loop()
loop.run_until_complete(produce_task())

并发框架

concurrent.futures

该库通过相同的API同时支持线程与协程,以POOL的方式进行并行任务的管理。其中Executors用于管理workers,而futures则用于表示一个异步计算的结果,当调用future时会被立即返回,但是不一定就是最终结果。

ThreadPoolExcutor改为ProcessPoolExecutor就是线程了,真方便。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# 来源: https://pymotw.com/3/concurrent.futures/
# 该例表示一个最大执行进程数为2个的执行器,同时处理5个任务
def task(n):
print('{}: sleeping {}'.format(
threading.current_thread().name,
n)
)
time.sleep(n / 10)
print('{}: done with {}'.format(
threading.current_thread().name,
n)
)
return n / 10

ex = futures.ThreadPoolExecutor(max_workers=2)
print('main: starting')
results = ex.map(task, range(5, 0, -1))
print('main: unprocessed results {}'.format(results)) # 这里返回的是一个迭代器
print('main: waiting for real results')
real_results = list(results)
print('main: results: {}'.format(real_results)) # 返回真实的结果

# map中函数如果需要多参数,可以这样做(只需要替换lambda中的函数名即可)
for result in ex.map(lambda p: task(*p), arguements_list):
print(result)

# 用map只能处理相同的任务,可以通过submit来执行需要执行的任务
from concurrent import futures
import threading
import time

def task(n):
print('{}: sleeping {}'.format(
threading.current_thread().name,
n)
)
time.sleep(n / 10)
print('{}: done with {}'.format(
threading.current_thread().name,
n)
)
return n / 10

ex = futures.ThreadPoolExecutor(max_workers=2)
print('main: starting')
f = ex.submit(task, 5)
print('main: future: {}'.format(f))
print('main: waiting for results')
result = f.result()
print('main: result: {}'.format(result))
print('main: future after result: {}'.format(f))

# 不按照顺序来获取结果,只要有个任务完成,就执行输出结果,上面那几个方法必须等所有任务执行完了顺序输出,而这个则是只要完成一个就输出一个
wait_for = [
ex.submit(task, i)
for i in range(5, 0, -1)
]
for f in futures.as_completed(wait_for):
print('main: result: {}'.format(f.result()))

回调

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
from concurrent import futures
import time

def task(n):
print('{}: sleeping'.format(n))
time.sleep(0.5)
print('{}: done'.format(n))
return n / 10

def done(fn):
if fn.cancelled():
print('{}: canceled'.format(fn.arg))
elif fn.done():
error = fn.exception()
if error:
print('{}: error returned: {}'.format(
fn.arg, error))
else:
result = fn.result()
print('{}: value returned: {}'.format(
fn.arg, result))

if __name__ == '__main__':
ex = futures.ThreadPoolExecutor(max_workers=2)
print('main: starting')
f = ex.submit(task, 5)
f.arg = 5
f.add_done_callback(done)
result = f.result()

任务的取消

Future只要还未开始就能被取消,f.cancel()

任务的异常

通过f.exception()可以获取到任务抛出了什么样的异常

gevent

协程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 最简单的使用
import gevent

def foo():
print('Running in foo')
gevent.sleep(0)
print('Explicit context switch to foo again')

def bar(abc):
print('Explicit context to bar')
gevent.sleep(0)
print('Implicit context switch back to bar')

gevent.joinall([ # 把gevent.spawn()放到joinall过后才会真正开始执行
gevent.spawn(foo),
gevent.spawn(bar, '参数'),
])

print('abc') # 这行代码会等待所有协程执行完了过后才会执行

多进程/线程下的常见问题

主线程监听子线程状态

我的解决方法是利用共享队列将子线程的信息等传入queue,类似创建一个flag变量,然后在主线程进行监听,例如,代码来自https://stackoverflow.com/questions/2829329/catch-a-threads-exception-in-the-caller-thread-in-python/2830127#2830127

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class ExcThread(threading.Thread):
def __init__(self, bucket):
threading.Thread.__init__(self)
self.bucket = bucket
def run(self):
try:
raise Exception('An error occured here.')
except Exception:
self.bucket.put(sys.exc_info())
def main():
bucket = Queue.Queue()
thread_obj = ExcThread(bucket)
thread_obj.start()
while True:
try:
exc = bucket.get(block=False)
except Queue.Empty:
pass
else:
exc_type, exc_obj, exc_trace = exc
# deal with the exception
print exc_type, exc_obj
print exc_trace
thread_obj.join(0.1)
if thread_obj.isAlive():
continue
else:
break
if __name__ == '__main__':
main()
坚持原创技术分享,谢谢支持

欢迎关注我的其它发布渠道