You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

109 lines
3.6 KiB

3 days ago
import yfinance as yf
import numpy as np
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import asyncio
import time
import functools
####################################################################
# 获取股票数据I/O密集型任务
def fetch_stock_data(stock):
print(f"Fetching data for {stock}")
data = yf.download(stock, start="2020-01-01", end="2023-01-01",auto_adjust=False)
return data['Close'].values
# 异步获取股票数据
async def async_fetch_stock_data(stock):
print(f"Fetching data for {stock}")
data = await asyncio.to_thread(yf.download, stock, start="2020-01-01", end="2023-01-01",auto_adjust=False)
return data['Close'].values
# 傅里叶变换(计算密集型任务)
def fourier_transform(data):
print("Performing Fourier Transform")
fft_data = np.fft.fft(data)
return np.abs(fft_data)
def timeit(message):
def decorator(func):
@functools.wraps(func)
def sync_wrapper(*args, **kwargs):
start_time = time.time()
result = func(*args, **kwargs)
end_time = time.time()
print(f"{message}: {end_time - start_time:.2f} seconds")
return result
async def async_wrapper(*args, **kwargs):
start_time = time.time()
result = await func(*args, **kwargs)
end_time = time.time()
print(f"{message}: {end_time - start_time:.2f} seconds")
return result
if asyncio.iscoroutinefunction(func):
return async_wrapper
else:
return sync_wrapper
return decorator
####################################################################
# 股票列表
stocks = ['AAPL', 'MSFT', 'GOOG']
# 串行执行
@timeit("串行执行")
def serial_execution():
for stock in stocks:
data = fetch_stock_data(stock)
fft_data = fourier_transform(data)
# 可视化(等等...
# 多线程执行(优化 I/O 密集型任务)
@timeit("多线程执行")
def threaded_execution():
with ThreadPoolExecutor(max_workers=3) as executor:
data_list = list(executor.map(fetch_stock_data, stocks))
for data in data_list:
fft_data = fourier_transform(data)
# 可视化(等等...
# 多进程执行(优化计算密集型任务)
@timeit("多进程执行")
def multiprocessing_execution():
with ProcessPoolExecutor(max_workers=3) as executor:
data_list = list(executor.map(fetch_stock_data, stocks))
with ProcessPoolExecutor(max_workers=3) as executor:
fft_data_list = list(executor.map(fourier_transform, data_list))
# 可视化(等等...
# 异步执行(优化 I/O 密集型任务)
@timeit("异步执行")
async def async_execution():
tasks = [async_fetch_stock_data(stock) for stock in stocks]
data_list = await asyncio.gather(*tasks)
for data in data_list:
fft_data = fourier_transform(data)
# 可视化(等等...
@timeit("混合执行")
async def mixed_execution():
tasks = [async_fetch_stock_data(stock) for stock in stocks]
data_list = await asyncio.gather(*tasks)
with ProcessPoolExecutor(max_workers=3) as executor:
fft_data_list = list(executor.map(fourier_transform, data_list))
# 可视化(等等...
if __name__ == "__main__":
print("串行执行:"); serial_execution()
print("\n多线程执行:") ; threaded_execution()
print("\n多进程执行:"); multiprocessing_execution()
print("\n异步执行:"); asyncio.run(async_execution())
print("\n混合执行:") ; asyncio.run(mixed_execution())