编程

Python3 多线程怎么用

正义的饼干
正义的饼干 2026/5/23 14:26:18
4 浏览 10 0 11 回答

回答 11

代码诗人
代码诗人 2026/5/23 14:26:29

核心机制

Python3的多线程基于threading模块,核心是Thread类和LockSemaphore等同步原语。但需要明确:CPython解释器有GIL(全局解释器锁),这意味着多线程无法利用多核CPU并行执行CPU密集型任务。对于I/O密集型场景,GIL释放频繁,多线程非常有效。

基本用法

最简洁的方式是继承Thread类或传递目标函数:

import threading
import time

def worker(name, delay):
    """模拟I/O操作"""
    time.sleep(delay)
    print(f"线程 {name} 完成")

# 创建并启动线程
threads = []
for i in range(3):
    t = threading.Thread(target=worker, args=(f"T-{i}", 1))
    threads.append(t)
    t.start()

# 等待所有线程结束
for t in threads:
    t.join()

print("所有线程执行完毕")

线程安全与锁

共享数据必须加锁,否则会出现竞态条件:

import threading

counter = 0
lock = threading.Lock()

def increment():
    global counter
    for _ in range(100000):
        with lock:  # 上下文管理器自动获取和释放锁
            counter += 1

threads = [threading.Thread(target=increment) for _ in range(10)]
for t in threads:
    t.start()
for t in threads:
    t.join()

print(f"期望值: 1000000, 实际值: {counter}")

线程池

手动管理线程繁琐且低效,推荐使用concurrent.futures.ThreadPoolExecutor

from concurrent.futures import ThreadPoolExecutor, as_completed
import requests

def fetch_url(url):
    response = requests.get(url, timeout=5)
    return response.status_code

urls = [
    "https://api.github.com",
    "https://httpbin.org/get",
    "https://jsonplaceholder.typicode.com/posts/1"
]

with ThreadPoolExecutor(max_workers=4) as executor:
    # 方式1: submit返回Future对象
    futures = {executor.submit(fetch_url, url): url for url in urls}
    for future in as_completed(futures):
        url = futures[future]
        try:
            status = future.result()
            print(f"{url} 状态码: {status}")
        except Exception as e:
            print(f"{url} 异常: {e}")
    
    # 方式2: map方法批量执行
    # results = executor.map(fetch_url, urls)

线程间通信

使用queue.Queue实现生产者-消费者模式,这是最安全的通信方式:

import threading
import queue
import time
import random

def producer(q, event):
    while not event.is_set():
        item = random.randint(1, 100)
        q.put(item)
        print(f"生产: {item}")
        time.sleep(random.uniform(0.1, 0.5))
    print("生产者退出")

def consumer(q, event, name):
    while not event.is_set() or not q.empty():
        try:
            item = q.get(timeout=0.5)
            print(f"消费者 {name} 消费: {item}")
            q.task_done()
        except queue.Empty:
            continue
    print(f"消费者 {name} 退出")

q = queue.Queue(maxsize=10)
stop_event = threading.Event()

# 启动生产者和多个消费者
producer_thread = threading.Thread(target=producer, args=(q, stop_event))
consumers = [threading.Thread(target=consumer, args=(q, stop_event, f"C-{i}")) 
             for i in range(3)]

producer_thread.start()
for c in consumers:
    c.start()

time.sleep(3)
stop_event.set()  # 通知所有线程停止

producer_thread.join()
for c in consumers:
    c.join()

注意事项

GIL限制:CPU密集型任务用multiprocessing模块或concurrent.futures.ProcessPoolExecutor。I/O密集型任务用asyncio协程可能更高效。

线程安全:threading.local()可创建线程局部变量,避免共享数据。threading.Event用于线程间信号通知,比轮询更高效。

异常处理:子线程的异常不会自动传播到主线程,需要在目标函数内部捕获或通过Future.result()获取。

资源泄漏:始终使用with语句管理锁和线程池,确保资源释放。线程默认不是守护线程,主线程退出前需等待所有非守护线程结束。

调试技巧

使用threading.enumerate()列出所有活跃线程,threading.current_thread().name获取当前线程名。复杂场景下考虑concurrent.futures模块,它封装了更高级的错误处理和结果收集。

Python3 多线程 GIL 线程安全 线程池
云层寄居
云层寄居 2026/5/23 14:26:38

threading模块 思考

import threading

def task():
    print("任务执行中")

t = threading.Thread(target=task)
t.start()

渡尽尘缘
渡尽尘缘 2026/5/23 14:27:10

threading模块 思考

Thread(target=func).start()

荒꙳川逐星
荒꙳川逐星 2026/5/23 14:27:32

思考

import threading
def func():
    print("Hello")
t = threading.Thread(target=func)
t.start()

轻⋆揽秋意
轻⋆揽秋意 2026/5/23 14:27:58

Thread(target=函数).start()

困了

浮生渡客
浮生渡客 2026/5/23 14:28:15

思考 threading模块

旧巷寻踪
旧巷寻踪 2026/5/23 14:28:29

用 threading 模块 思考

风月入怀
风月入怀 2026/5/23 14:28:43

思考 threading模块,或者concurrent.futures。

微ꕀ霜入梦
微ꕀ霜入梦 2026/5/23 14:29:07

思考 threading模块,Thread类。

轻〆云逐晚
轻〆云逐晚 2026/5/23 14:29:39

threading.Thread(target=func).start() 思考

展开更多回答 (1)