Python3 多线程怎么用
回答 11
核心机制
Python3的多线程基于threading模块,核心是Thread类和Lock、Semaphore等同步原语。但需要明确:CPython解释器有GIL(全局解释器锁),这意味着多线程无法利用多核CPU并行执行CPU密集型任务。对于I/O密集型场景,GIL释放频繁,多线程非常有效。
基本用法
最简洁的方式是继承Thread类或传递目标函数:
import threading
import time
def worker(name, delay):
"""模拟I/O操作"""
time.sleep(delay)
print(f"线程 {name} 完成")
# 创建并启动线程
threads = []
for i in range(3):
t = threading.Thread(target=worker, args=(f"T-{i}", 1))
threads.append(t)
t.start()
# 等待所有线程结束
for t in threads:
t.join()
print("所有线程执行完毕")线程安全与锁
共享数据必须加锁,否则会出现竞态条件:
import threading
counter = 0
lock = threading.Lock()
def increment():
global counter
for _ in range(100000):
with lock: # 上下文管理器自动获取和释放锁
counter += 1
threads = [threading.Thread(target=increment) for _ in range(10)]
for t in threads:
t.start()
for t in threads:
t.join()
print(f"期望值: 1000000, 实际值: {counter}")线程池
手动管理线程繁琐且低效,推荐使用concurrent.futures.ThreadPoolExecutor:
from concurrent.futures import ThreadPoolExecutor, as_completed
import requests
def fetch_url(url):
response = requests.get(url, timeout=5)
return response.status_code
urls = [
"https://api.github.com",
"https://httpbin.org/get",
"https://jsonplaceholder.typicode.com/posts/1"
]
with ThreadPoolExecutor(max_workers=4) as executor:
# 方式1: submit返回Future对象
futures = {executor.submit(fetch_url, url): url for url in urls}
for future in as_completed(futures):
url = futures[future]
try:
status = future.result()
print(f"{url} 状态码: {status}")
except Exception as e:
print(f"{url} 异常: {e}")
# 方式2: map方法批量执行
# results = executor.map(fetch_url, urls)线程间通信
使用queue.Queue实现生产者-消费者模式,这是最安全的通信方式:
import threading
import queue
import time
import random
def producer(q, event):
while not event.is_set():
item = random.randint(1, 100)
q.put(item)
print(f"生产: {item}")
time.sleep(random.uniform(0.1, 0.5))
print("生产者退出")
def consumer(q, event, name):
while not event.is_set() or not q.empty():
try:
item = q.get(timeout=0.5)
print(f"消费者 {name} 消费: {item}")
q.task_done()
except queue.Empty:
continue
print(f"消费者 {name} 退出")
q = queue.Queue(maxsize=10)
stop_event = threading.Event()
# 启动生产者和多个消费者
producer_thread = threading.Thread(target=producer, args=(q, stop_event))
consumers = [threading.Thread(target=consumer, args=(q, stop_event, f"C-{i}"))
for i in range(3)]
producer_thread.start()
for c in consumers:
c.start()
time.sleep(3)
stop_event.set() # 通知所有线程停止
producer_thread.join()
for c in consumers:
c.join()注意事项
GIL限制:CPU密集型任务用multiprocessing模块或concurrent.futures.ProcessPoolExecutor。I/O密集型任务用asyncio协程可能更高效。
线程安全:threading.local()可创建线程局部变量,避免共享数据。threading.Event用于线程间信号通知,比轮询更高效。
异常处理:子线程的异常不会自动传播到主线程,需要在目标函数内部捕获或通过Future.result()获取。
资源泄漏:始终使用with语句管理锁和线程池,确保资源释放。线程默认不是守护线程,主线程退出前需等待所有非守护线程结束。
调试技巧
使用threading.enumerate()列出所有活跃线程,threading.current_thread().name获取当前线程名。复杂场景下考虑concurrent.futures模块,它封装了更高级的错误处理和结果收集。
用threading模块 
import threading
def task():
print("任务执行中")
t = threading.Thread(target=task)
t.start()threading模块 
Thread(target=func).start()

import threading
def func():
print("Hello")
t = threading.Thread(target=func)
t.start()Thread(target=函数).start()

threading模块
用 threading 模块 
threading模块,或者concurrent.futures。
threading模块,Thread类。
threading.Thread(target=func).start() 
黑柿AI
