多线程

Python的标准库提供了两个模块:_thread和threading,_thread是低级模块,threading是高级模块,对_thread进行了封装。绝大多数情况下,我们只需要使用threading这个高级模块。

创建一个线程

import threading

def test():
    print(threading.currentThread().name + '中运行')
    print(threading.current_thread().name + '中运行')

# 任何进程默认就会启动一个线程,我们把该线程称为主线程,主线程又可以启动新的线程
print(threading.currentThread().name + '中运行')
t = threading.Thread(target=test, name='threadTest')
t.start()
t.join()
print("执行结束")

Lock

多线程和多进程最大的不同在于,多进程中,同一个变量,各自有一份拷贝存在于每个进程中,互不影响,而多线程中,所有变量都由所有线程共享

多线程开发中,通常使用线程锁来防止变量的污染,但是锁会使得多线程程序变成串行运行,效率变低,以及不当的使用锁,会导致锁交叉互锁,形成死锁

import threading

money = 0

lock = threading.Lock()


def transaction(amount):
    global money
    money = money + amount
    money = money - amount


def test(amount):
    for i in range(1, 1000000):
        lock.acquire()
        try:
            transaction(amount)
        finally:
            lock.release()


t1 = threading.Thread(target=test, args=(10,))
t2 = threading.Thread(target=test, args=(3,))
t1.start()
t2.start()
t1.join()
t2.join()

# 预期的值是0,如果不使用锁,2个线程同时交叉加减,会使得money的值跟预期不同
# 在test方法中增加锁,会使得 transaction 方法中的 money 每次都能完整运行加减相同的值(不交叉),保证符合预期值
print(money)

ThreadLocal

多线程中的全局变量会互相干扰(需要加锁),多线程中的局部变量虽然是互不干扰的,但是层级多的话,传递起来比较麻烦,所以 ThreadLocal 应运而生

import threading

threadLocal = threading.local()

# 实际运行线程
def run():
    print(threadLocal.name + ' in ' + threading.currentThread().name)

# 绑定 threadLocal
def processThread(name):
    threadLocal.name = name
    run()

t1 = threading.Thread(target=processThread, args=('张三',),name='thread1')
t2 = threading.Thread(target=processThread, args=('李四',),name='thread2')
t1.start()
t2.start()
t1.join()
t2.join()

Global Interpreter Lock

GIL(Global Interpreter Lock),任何Python线程执行前,必须先获得GIL锁,然后,每执行100条字节码,解释器就自动释放GIL锁,让别的线程有机会执行。这个GIL全局锁实际上把所有线程的执行代码都给上了锁,所以,多线程在Python中只能交替执行,即使100个线程跑在100核CPU上,也只能用到1个核。

GIL是Python解释器设计的历史遗留问题,通常我们用的解释器是官方实现的CPython。

Python中如何使用多核:

  1. 重写一个不带GIL的解释器
  2. 通过C扩展来实现
  3. 使用多进程+多线程的模式
# 以下代码只能打满一个CPU
import threading, multiprocessing

def loop():
    x = 0
    while True:
        x = x ^ 1

for i in range(multiprocessing.cpu_count()):
    t = threading.Thread(target=loop)
    t.start()

定时任务

import concurrent.futures
import time

m = list(range(0, 61))


def task(params):
    print(params)
    # 如果需要改动,复制一下
    args = params.copy()
    args['foo'] = 'bar'
    print(args)
    time.sleep(5)


def main():
    # 使用线程池
    with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
        while True:
            # 获取当前时间
            current_time = time.localtime()
            # 检查当前时间是否满足执行任务的条件
            if current_time.tm_min in m:
                # 提交任务到线程池并并发执行
                futures = [executor.submit(task, {"foo":"foo","bar":"bar"}) for _ in range(10)]
                # 等待所有任务完成
                # 下面这行代码能保证不会内存溢出,但是如果某个任务执行超过1分钟,会导致下一分钟的定时任务不执行
                # 如果定时任务安排合理,电脑内存预留充足,可以不加下面这行,保证每分钟的任务都能被执行
                concurrent.futures.wait(futures)
                # 等待一段时间,避免重复执行
                time.sleep(60)  # 等待60秒,避免在同一分钟内重复执行任务
            else:
                # 如果当前时间不满足条件,等待一段时间再检查
                time.sleep(10)  # 等待10秒后再次检查


if __name__ == "__main__":
    main()

results matching ""

    No results matching ""