多线程
Python的标准库提供了两个模块:_thread和threading,_thread是低级模块,threading是高级模块,对_thread进行了封装。绝大多数情况下,我们只需要使用threading这个高级模块。
创建一个线程
import threading
def test():
print(threading.currentThread().name + '中运行')
print(threading.current_thread().name + '中运行')
# 任何进程默认就会启动一个线程,我们把该线程称为主线程,主线程又可以启动新的线程
print(threading.currentThread().name + '中运行')
t = threading.Thread(target=test, name='threadTest')
t.start()
t.join()
print("执行结束")
Lock
多线程和多进程最大的不同在于,多进程中,同一个变量,各自有一份拷贝存在于每个进程中,互不影响,而多线程中,所有变量都由所有线程共享
多线程开发中,通常使用线程锁来防止变量的污染,但是锁会使得多线程程序变成串行运行,效率变低,以及不当的使用锁,会导致锁交叉互锁,形成死锁
import threading
money = 0
lock = threading.Lock()
def transaction(amount):
global money
money = money + amount
money = money - amount
def test(amount):
for i in range(1, 1000000):
lock.acquire()
try:
transaction(amount)
finally:
lock.release()
t1 = threading.Thread(target=test, args=(10,))
t2 = threading.Thread(target=test, args=(3,))
t1.start()
t2.start()
t1.join()
t2.join()
# 预期的值是0,如果不使用锁,2个线程同时交叉加减,会使得money的值跟预期不同
# 在test方法中增加锁,会使得 transaction 方法中的 money 每次都能完整运行加减相同的值(不交叉),保证符合预期值
print(money)
ThreadLocal
多线程中的全局变量会互相干扰(需要加锁),多线程中的局部变量虽然是互不干扰的,但是层级多的话,传递起来比较麻烦,所以 ThreadLocal 应运而生
import threading
threadLocal = threading.local()
# 实际运行线程
def run():
print(threadLocal.name + ' in ' + threading.currentThread().name)
# 绑定 threadLocal
def processThread(name):
threadLocal.name = name
run()
t1 = threading.Thread(target=processThread, args=('张三',),name='thread1')
t2 = threading.Thread(target=processThread, args=('李四',),name='thread2')
t1.start()
t2.start()
t1.join()
t2.join()
Global Interpreter Lock
GIL(Global Interpreter Lock),任何Python线程执行前,必须先获得GIL锁,然后,每执行100条字节码,解释器就自动释放GIL锁,让别的线程有机会执行。这个GIL全局锁实际上把所有线程的执行代码都给上了锁,所以,多线程在Python中只能交替执行,即使100个线程跑在100核CPU上,也只能用到1个核。
GIL是Python解释器设计的历史遗留问题,通常我们用的解释器是官方实现的CPython。
Python中如何使用多核:
- 重写一个不带GIL的解释器
- 通过C扩展来实现
- 使用多进程+多线程的模式
# 以下代码只能打满一个CPU
import threading, multiprocessing
def loop():
x = 0
while True:
x = x ^ 1
for i in range(multiprocessing.cpu_count()):
t = threading.Thread(target=loop)
t.start()
定时任务
import concurrent.futures
import time
m = list(range(0, 61))
def task(params):
print(params)
# 如果需要改动,复制一下
args = params.copy()
args['foo'] = 'bar'
print(args)
time.sleep(5)
def main():
# 使用线程池
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
while True:
# 获取当前时间
current_time = time.localtime()
# 检查当前时间是否满足执行任务的条件
if current_time.tm_min in m:
# 提交任务到线程池并并发执行
futures = [executor.submit(task, {"foo":"foo","bar":"bar"}) for _ in range(10)]
# 等待所有任务完成
# 下面这行代码能保证不会内存溢出,但是如果某个任务执行超过1分钟,会导致下一分钟的定时任务不执行
# 如果定时任务安排合理,电脑内存预留充足,可以不加下面这行,保证每分钟的任务都能被执行
concurrent.futures.wait(futures)
# 等待一段时间,避免重复执行
time.sleep(60) # 等待60秒,避免在同一分钟内重复执行任务
else:
# 如果当前时间不满足条件,等待一段时间再检查
time.sleep(10) # 等待10秒后再次检查
if __name__ == "__main__":
main()