You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
CodePattern/B 高性能模式/多线程的应用场景.md

7.1 KiB

Python的多线程时间切片间隔可以通过 sys.setswitchinterval() 设置。其他切换触发条件

  • 当线程等待I/O操作如网络请求或磁盘读写GIL会被释放允许其他线程运行。
  • 某些函数(如 time.sleep()会显式释放GIL。
  • 线程主动释放GIL。

异步编程特别适合高并发的 I/O 密集型任务(如 Web 服务器、爬虫、实时通信), 特别是大量并发连接的任务。 对于 I/O 密集型任务异步编程 通常比 多线程 是更好的选择。 多线程相对编程简单 。

以下场景更适合使用 多线程

场景:GUI 应用程序

在 GUI图形用户界面应用程序中主线程负责处理用户交互而其他任务如文件读写、网络请求需要在后台运行以避免阻塞主线程导致界面卡顿。多线程可以与 GUI 主线程共享内存方便更新界面状态。线程间通信简单适合处理后台任务。GUI 框架(如 PyQt、Tkinter通常有自己的事件循环与异步编程的事件循环冲突。


import sys
import requests
from PyQt5.QtWidgets import QApplication, QWidget, QVBoxLayout, QPushButton, QLabel
from PyQt5.QtCore import QThread, pyqtSignal

# 工作线程:负责下载文件
class DownloadThread(QThread):
    # 自定义信号,用于通知主线程下载进度
    progress_signal = pyqtSignal(str)

    def __init__(self, url):
        super().__init__()
        self.url = url

    def run(self):
        self.progress_signal.emit("开始下载...")
        try:
            response = requests.get(self.url, stream=True)
            total_size = int(response.headers.get("content-length", 0))
            downloaded_size = 0
            with open("downloaded_file", "wb") as file:
                for chunk in response.iter_content(chunk_size=1024):
                    file.write(chunk)
                    downloaded_size += len(chunk)
                    progress = f"已下载: {downloaded_size / 1024:.2f} KB / {total_size / 1024:.2f} KB"
                    self.progress_signal.emit(progress)
            self.progress_signal.emit("下载完成!")
        except Exception as e:
            self.progress_signal.emit(f"下载失败: {str(e)}")

#### 主窗口
class MainWindow(QWidget):
    def __init__(self):
        super().__init__()
        self.init_ui()

    def init_ui(self):
        self.setWindowTitle("多线程下载示例")
        self.setGeometry(100, 100, 300, 150)

        # 布局
        layout = QVBoxLayout()

        # 下载按钮
        self.download_button = QPushButton("开始下载", self)
        self.download_button.clicked.connect(self.start_download)
        layout.addWidget(self.download_button)

        # 状态标签
        self.status_label = QLabel("点击按钮开始下载", self)
        layout.addWidget(self.status_label)

        self.setLayout(layout)

    def start_download(self):
        # 禁用按钮,防止重复点击
        self.download_button.setEnabled(False)
        self.status_label.setText("准备下载...")

        # 创建工作线程
        self.download_thread = DownloadThread("https://example.com/large_file.zip")
        self.download_thread.progress_signal.connect(self.update_status)
        self.download_thread.finished.connect(self.on_download_finished)
        self.download_thread.start()

    def update_status(self, message):
        # 更新状态标签
        self.status_label.setText(message)

    def on_download_finished(self):
        # 下载完成后启用按钮
        self.download_button.setEnabled(True)

if __name__ == "__main__":
    app = QApplication(sys.argv)
    window = MainWindow()
    window.show()
    sys.exit(app.exec_())

场景:与阻塞式 API 交互

某些库或 API 是阻塞式的(如某些数据库驱动、硬件接口库),无法直接使用异步编程。在这种情况下,多线程可以避免阻塞主线程。

import threading
import time
import sqlite3

def query_database():
    # 模拟阻塞式数据库查询
    conn = sqlite3.connect("example.db")
    cursor = conn.cursor()
    cursor.execute("SELECT * FROM users")
    results = cursor.fetchall()
    print("查询完成,结果:", results)
    conn.close()

def main():
    print("主线程开始")
    # 创建线程执行数据库查询
    thread = threading.Thread(target=query_database)
    thread.start()
    # 主线程继续执行其他任务
    for i in range(5):
        print(f"主线程运行中... {i}")
        time.sleep(1)
    thread.join()
    print("主线程结束")

main()

场景:任务队列与线程池

在需要处理大量短期任务的场景中(如 Web 服务器的请求处理),使用线程池可以简单编程实现高效地管理任务。 特别类似上面场景,有些任务是阻塞式的,不支持异步

import concurrent.futures
import time

def process_task(task):
    print(f"开始处理任务: {task}")
    time.sleep(2)  # 模拟任务处理时间
    print(f"完成处理任务: {task}")

def main():
    print("主线程开始")
    tasks = ["task1", "task2", "task3", "task4", "task5"]
    # 使用线程池处理任务
    with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
        executor.map(process_task, tasks)
    print("主线程结束")

main()

场景:与 C/C++ 扩展交互

某些 Python 库是基于 C/C++ 扩展实现的(如 numpypandas),这些扩展可能释放了 GIL允许在多线程中并行运行。 多线程常常更快 。

import threading
import numpy as np

def compute_task(data):
    result = np.sum(data)
    print(f"计算结果: {result}")

def main():
    print("主线程开始")
    data = np.random.rand(1000000)  # 生成随机数据
    # 创建多个线程并行计算
    threads = []
    for i in range(4):
        thread = threading.Thread(target=compute_task, args=(data,))
        thread.start()
        threads.append(thread)
    for thread in threads:
        thread.join()
    print("主线程结束")

main()

场景:需要共享状态

在某些场景中,多个任务需要频繁共享和修改状态(如缓存、计数器),使用多线程可以方便地共享内存。 线程之间可以直接访问和修改共享变量。 异步任务之间是独立的,不能直接共享变量或状态。 如果需要在任务之间共享状态,必须通过显式的机制(如队列、回调函数)来传递数据。

import threading

class Counter:
    def __init__(self):
        self.value = 0
        self.lock = threading.Lock()

    def increment(self):
        with self.lock:
            self.value += 1
            print(f"计数器值: {self.value}")

def worker(counter):
    for _ in range(5):
        counter.increment()

def main():
    print("主线程开始")
    counter = Counter()
    # 创建多个线程共享计数器
    threads = []
    for i in range(3):
        thread = threading.Thread(target=worker, args=(counter,))
        thread.start()
        threads.append(thread)
    for thread in threads:
        thread.join()
    print("主线程结束")

main()