from concurrent.futures import ThreadPoolExecutor, as_completed import threading import util @util.measure_performance def compute_task(): threads = [threading.Thread(target=util.compute_task) for _ in range(5)] for thread in threads: thread.start() for thread in threads: thread.join() @util.measure_performance def compute_task_threadpool(): with ThreadPoolExecutor(max_workers=5) as executor: tasks = [executor.submit(util.compute_task) for _ in range(5)] for future in as_completed(tasks): future.result() @util.measure_performance def io_task(): threads = [ threading.Thread(target=util.fetch_url, args=(url, )) for url in util.urls ] for thread in threads: thread.start() for thread in threads: thread.join() # 多线程+线程池优化 @util.measure_performance def io_task_threadpool(): with ThreadPoolExecutor(max_workers=5) as executor: tasks = [executor.submit(util.fetch_url, url) for url in util.urls] for future in as_completed(tasks): future.result() def main(): compute_task() compute_task_threadpool() io_task() io_task_threadpool() if __name__ == "__main__": main()