diff --git a/11 概念认知/对象化/1 类对象.py b/11 概念认知/对象化/1 类对象.py index 93cad5b..b9d5c94 100644 --- a/11 概念认知/对象化/1 类对象.py +++ b/11 概念认知/对象化/1 类对象.py @@ -4,9 +4,8 @@ from cppy.cp_util import * class DataStorageManager: """ 数据模型 """ - def __init__(self, path_to_file): - data = read_file(path_to_file) - self._data = re_split( data ) + def __init__(self, path_to_file): + self._data = re_split( read_file(path_to_file) ) def words(self): return self._data diff --git a/11 概念认知/流式调用/1 嵌套调用.py b/11 概念认知/流式调用/1 嵌套调用.py index 8276209..777e689 100644 --- a/11 概念认知/流式调用/1 嵌套调用.py +++ b/11 概念认知/流式调用/1 嵌套调用.py @@ -1,29 +1,3 @@ -import re from cppy.cp_util import * - -def extractwords(str_data): - pattern = re.compile('[\W_]+') - word_list = pattern.sub(' ', str_data).lower().split() - stop_words = get_stopwords() - return [w for w in word_list if not w in stop_words] - -def frequencies(word_list): - word_freqs = {} - for word in word_list: - word_freqs[word] = word_freqs.get(word, 0) + 1 - return word_freqs - -def sort(word_freq): - return sorted( word_freq.items(), key=lambda x: x[1], reverse=True ) - -def printall(word_freqs, n = 10 ): - for word, freq in word_freqs[ :n ]: - print(word, '-', freq) - - -if __name__ == "__main__": - printall(sort(frequencies( - extractwords( - read_file( testfilepath )))) - ) \ No newline at end of file +print_word_freqs( sort_dict ( get_frequencies ( extract_file_words(testfilepath) ))) \ No newline at end of file diff --git a/12 语言特性/异步.py b/12 语言特性/异步.py index 5d12735..f23060f 100644 --- a/12 语言特性/异步.py +++ b/12 语言特性/异步.py @@ -5,7 +5,7 @@ from cppy.cp_util import * # -# 协程 +# 协程: 有点复杂 # async def read_file(file_path): async with aiofiles.open(file_path, 'r', encoding='utf-8') as file: diff --git a/13 计算设备/存储/内存队列.py b/13 计算设备/存储/内存队列.py new file mode 100644 index 0000000..4154687 --- /dev/null +++ b/13 计算设备/存储/内存队列.py @@ -0,0 +1,46 @@ +import threading, queue +from cppy.cp_util import * +from collections import Counter + +stop_words = get_stopwords() + +# 待处理数据放一个队列,多个线程轮流计数,最后合并统一计数 +class WordFrequencyCounter: + def __init__(self, input_file): + self.word_space = queue.Queue() + self.freq_space = queue.Queue() + for chunk in get_chunks(input_file,3000): + self.word_space.put(chunk) + + def process_words(self): + while not self.word_space.empty(): + try: + chunk = self.word_space.get_nowait() # 不使用超时,持续获取数据 + except queue.Empty: + break # 队列为空,退出循环 + # print(f"Worker thread ID: {threading.get_ident()}",len(chunk)) + words = [ w for w in chunk if w not in stop_words and len(w) >= 3 ] + word_freqs = Counter(words) + self.freq_space.put(dict(word_freqs)) # 将Counter对象转换为字典 + + def run(self): + workers = [ threading.Thread(target=self.process_words) for _ in range(5)] + for worker in workers: worker.start() + for worker in workers: worker.join() + + word_freqs = Counter() # 初始化一个空的Counter对象 + while not self.freq_space.empty(): + freqs = self.freq_space.get() + if freqs: # 确保freqs非空 + word_freqs.update(freqs) + + print_word_freqs ( sort_dict (word_freqs) ) + + +@timing_decorator +def main(): + counter = WordFrequencyCounter( testfilepath ) + counter.run() + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/13 计算设备/存储/内存队列_共享数据.py b/13 计算设备/存储/内存队列_共享数据.py deleted file mode 100644 index b927d9e..0000000 --- a/13 计算设备/存储/内存队列_共享数据.py +++ /dev/null @@ -1,37 +0,0 @@ -import threading, queue -from cppy.cp_util import * - -class WordFrequencyCounter: - def __init__(self, input_file): - self.word_space = queue.Queue() - self.freq_space = queue.Queue() - for word in extract_file_words(input_file): - self.word_space.put(word) - - def process_words(self): - word_freqs = {} - while not self.word_space.empty(): - try: - word = self.word_space.get(timeout=1) - word_freqs[word] = word_freqs.get(word, 0) + 1 - except queue.Empty: - break - self.freq_space.put(word_freqs) - - def run(self): - workers = [threading.Thread(target=self.process_words) for _ in range(5)] - for worker in workers: worker.start() - for worker in workers: worker.join() - - word_freqs = {} - while not self.freq_space.empty(): - freqs = self.freq_space.get() - for word, count in freqs.items(): - word_freqs[word] = word_freqs.get(word, 0) + count - - print_word_freqs ( sort_dict (word_freqs) ) - - -if __name__ == '__main__': - counter = WordFrequencyCounter( testfilepath ) - counter.run() \ No newline at end of file diff --git a/13 计算设备/存储/多线程_共享数据.py b/13 计算设备/存储/多线程_共享数据.py deleted file mode 100644 index d295b0a..0000000 --- a/13 计算设备/存储/多线程_共享数据.py +++ /dev/null @@ -1,51 +0,0 @@ -import threading, queue -from cppy.cp_util import * - -# 能否简单的共享全局变量 ? - -# 处理单词 -def process_words(word_space, freq_space, stopwords): - word_freqs = {} - while True: - try: - word = word_space.get(timeout=1) - except queue.Empty: - break - count_word(word, word_freqs, stopwords) - freq_space.put(word_freqs) - -# 创建并启动线程 -def start_threads(word_space, freq_space, stopwords): - workers = [] - for i in range(5): - worker = threading.Thread(target=process_words, - args=(word_space, freq_space, stopwords)) - worker.start() - workers.append(worker) - return workers - - -if __name__ == "__main__": - stopwords = get_stopwords() - word_space = queue.Queue() - freq_space = queue.Queue() - - # 将数据压入 word_space - for word in extract_file_words(testfilepath): - word_space.put(word) - - # 创建并启动线程 - workers = start_threads(word_space, freq_space, stopwords) - - # 等待所有线程完成 - for worker in workers: worker.join() - - # 合并处理结果 - word_freqs = {} - while not freq_space.empty(): - freqs = freq_space.get() - for (k, v) in freqs.items(): - word_freqs[k] = word_freqs.get(k,0) + v - - # 打印 - print_word_freqs ( sort_dict (word_freqs) ) \ No newline at end of file diff --git a/13 计算设备/数据分包/多进程.py b/13 计算设备/数据分包/多进程.py index 3df8890..7809bb7 100644 --- a/13 计算设备/数据分包/多进程.py +++ b/13 计算设备/数据分包/多进程.py @@ -5,7 +5,7 @@ from cppy.cp_util import * # -# 多进程 +# 多进程: 因为创建进程相比计算过程开销太大,结果最慢 # stop_words = get_stopwords()