dev
zj3D 8 months ago
parent 41a14b6705
commit 7db531d2fc

@ -1,56 +1,56 @@
# -*- coding: utf-8 -*-
from collections import Counter
from cppy.cp_util import *
from multiprocessing.pool import ThreadPool
#
# 多线程
#
def process_chunk(chunk):
# 过滤停用词
stop_words = get_stopwords()
words = [ w for w in chunk if ( not w in stop_words ) and len(w) >= 3 ]
return Counter(words)
def merge_counts(counts_list):
# 合并多个Counter对象
total_counts = Counter()
for counts in counts_list:
total_counts += counts
return total_counts
def thread_function(chunk, counts_list):
word_count = process_chunk(chunk)
counts_list.append(word_count)
@timing_decorator
def main():
# 读取文件内容
content = re_split(read_file(testfilepath))
chunk_size = 1000 # 可以根据实际情况调整块大小
chunks = [content[i:i + chunk_size] for i in range(0, len(content), chunk_size)]
# 使用多线程池,每个线程处理一个块
pool = ThreadPool(len(content)//chunk_size+1)
counts_list = pool.map(process_chunk, chunks)
pool.close()
pool.join()
# 合并计数
total_counts = merge_counts(counts_list)
# 输出最高频的n个词
print_word_freqs(total_counts.most_common(10))
if __name__ == '__main__':
main()
# -*- coding: utf-8 -*-
from collections import Counter
from cppy.cp_util import *
from multiprocessing.pool import ThreadPool
#
# 多线程
#
def process_chunk(chunk):
# 过滤停用词
stop_words = get_stopwords()
words = [ w for w in chunk if ( not w in stop_words ) and len(w) >= 3 ]
return Counter(words)
def merge_counts(counts_list):
# 合并多个Counter对象
total_counts = Counter()
for counts in counts_list:
total_counts += counts
return total_counts
def thread_function(chunk, counts_list):
word_count = process_chunk(chunk)
counts_list.append(word_count)
@timing_decorator
def main():
# 读取文件内容
content = re_split(read_file(testfilepath))
chunk_size = 1000 # 可以根据实际情况调整块大小
chunks = [content[i:i + chunk_size] for i in range(0, len(content), chunk_size)]
# 使用多线程池,每个线程处理一个块
pool = ThreadPool(len(content)//chunk_size+1)
counts_list = pool.map(process_chunk, chunks)
pool.close()
pool.join()
# 合并计数
total_counts = merge_counts(counts_list)
# 输出最高频的n个词
print_word_freqs(total_counts.most_common(10))
if __name__ == '__main__':
main()

@ -1,49 +1,49 @@
# -*- coding: utf-8 -*-
import multiprocessing
from collections import Counter
from cppy.cp_util import *
#
# 多进程
#
def process_chunk(chunk):
# 过滤停用词
stop_words = get_stopwords()
words = [ w for w in chunk if ( not w in stop_words ) and len(w) >= 3 ]
return Counter(words)
def merge_counts(counts_list):
# 合并多个Counter对象
total_counts = Counter()
for counts in counts_list:
total_counts += counts
return total_counts
@timing_decorator
def main():
# 读取文件内容
content = re_split(read_file(testfilepath))
# 分割文件内容为多个块,每个块由一个进程处理
chunk_size = 1000 # 可以根据实际情况调整块大小
chunks = [content[i:i + chunk_size] for i in range(0, len(content), chunk_size)]
# 使用多进程处理每个块
pool = multiprocessing.Pool(processes=multiprocessing.cpu_count())
counts_list = pool.map(process_chunk, chunks)
pool.close()
pool.join()
# 合并计数
total_counts = merge_counts(counts_list)
# 输出最高频的n个词
print_word_freqs(total_counts.most_common(10))
if __name__ == '__main__':
main()
# -*- coding: utf-8 -*-
import multiprocessing
from collections import Counter
from cppy.cp_util import *
#
# 多进程
#
def process_chunk(chunk):
# 过滤停用词
stop_words = get_stopwords()
words = [ w for w in chunk if ( not w in stop_words ) and len(w) >= 3 ]
return Counter(words)
def merge_counts(counts_list):
# 合并多个Counter对象
total_counts = Counter()
for counts in counts_list:
total_counts += counts
return total_counts
@timing_decorator
def main():
# 读取文件内容
content = re_split(read_file(testfilepath))
# 分割文件内容为多个块,每个块由一个进程处理
chunk_size = 1000 # 可以根据实际情况调整块大小
chunks = [content[i:i + chunk_size] for i in range(0, len(content), chunk_size)]
# 使用多进程处理每个块
pool = multiprocessing.Pool(processes=multiprocessing.cpu_count())
counts_list = pool.map(process_chunk, chunks)
pool.close()
pool.join()
# 合并计数
total_counts = merge_counts(counts_list)
# 输出最高频的n个词
print_word_freqs(total_counts.most_common(10))
if __name__ == '__main__':
main()

@ -1,77 +1,77 @@
import os,re,string,operator
from collections import Counter
# TextProcessor 类负责处理文本并计算词频。当文本处理完成后,它会通过 notify 方法通知所有注册的观察者。
# WordFrequencyObserver 类是一个具体的观察者,它实现了 update 方法来接收词频更新并打印前10个最常见的单词。
class Subject:
def __init__(self):
self._observers = []
# 不能随意改变,所以肯定是私有
def attach(self, observer):
self._observers.append(observer)
def detach(self, observer):
self._observers.remove(observer)
def notify(self, word_freqs):
for observer in self._observers:
observer.update(word_freqs)
# 关注,取消关注,通知有更新,Subject类是用来创建一个类对订阅者即观察者列表进行维护
class Observer:
def update(self, word_freqs):
pass
# 定义一个抽象的Observer
# 而下面的是一个具体的Observer类
class WordFrequencyObserver(Observer):
def update(self, word_freqs):
print("词频已经被更新:")
self.print_word_freqs(word_freqs)
def print_word_freqs(self, word_freqs):
sorted_freqs = sorted(word_freqs.items(), key=operator.itemgetter(1), reverse=True)
for (w, c) in sorted_freqs[:10]:
print(f"{w}: {c}")
# 对文本进行分析
class TextProcessor:
def __init__(self, subject: Subject):
#subject是Subject的子类类型注解单独写也可以
self._subject = subject
self._stop_words:str = set()
#是一个集合其实这里需要表明是str
def load_stop_words(self, path_to_file):
with open(path_to_file, encoding='utf-8') as f:
self._stop_words = set(line.strip().lower() for line in f)
def process_text(self, path_to_file):
with open(path_to_file, encoding='utf-8') as f:
data = f.read()
word_list = self.re_split(data)
filtered_words = self.filter_words(word_list)
word_freqs = self.count_frequencies(filtered_words)
self._subject.notify(word_freqs)
def re_split(self, data):
pattern = re.compile('[\W_]+')
return pattern.sub(' ', data).lower().split()
def filter_words(self, word_list):
return [w for w in word_list if w not in self._stop_words and len(w) >= 3]
def count_frequencies(self, word_list):
return Counter(word_list)
# 开始测试
if __name__ == "__main__":
stopwordfilepath = r'C:\Users\asus\Desktop\cppy余悦批注\cppy\data\stop_words.txt'
testfilepath = r'C:\Users\asus\Desktop\cppy余悦批注\cppy\data\pride-and-prejudice.txt'
# 调用实例
subject = Subject()
observer = WordFrequencyObserver()
subject.attach(observer)
text_processor = TextProcessor(subject)
text_processor.load_stop_words(stopwordfilepath)
import os,re,string,operator
from collections import Counter
# TextProcessor 类负责处理文本并计算词频。当文本处理完成后,它会通过 notify 方法通知所有注册的观察者。
# WordFrequencyObserver 类是一个具体的观察者,它实现了 update 方法来接收词频更新并打印前10个最常见的单词。
class Subject:
def __init__(self):
self._observers = []
# 不能随意改变,所以肯定是私有
def attach(self, observer):
self._observers.append(observer)
def detach(self, observer):
self._observers.remove(observer)
def notify(self, word_freqs):
for observer in self._observers:
observer.update(word_freqs)
# 关注,取消关注,通知有更新,Subject类是用来创建一个类对订阅者即观察者列表进行维护
class Observer:
def update(self, word_freqs):
pass
# 定义一个抽象的Observer
# 而下面的是一个具体的Observer类
class WordFrequencyObserver(Observer):
def update(self, word_freqs):
print("词频已经被更新:")
self.print_word_freqs(word_freqs)
def print_word_freqs(self, word_freqs):
sorted_freqs = sorted(word_freqs.items(), key=operator.itemgetter(1), reverse=True)
for (w, c) in sorted_freqs[:10]:
print(f"{w}: {c}")
# 对文本进行分析
class TextProcessor:
def __init__(self, subject: Subject):
#subject是Subject的子类类型注解单独写也可以
self._subject = subject
self._stop_words:str = set()
#是一个集合其实这里需要表明是str
def load_stop_words(self, path_to_file):
with open(path_to_file, encoding='utf-8') as f:
self._stop_words = set(line.strip().lower() for line in f)
def process_text(self, path_to_file):
with open(path_to_file, encoding='utf-8') as f:
data = f.read()
word_list = self.re_split(data)
filtered_words = self.filter_words(word_list)
word_freqs = self.count_frequencies(filtered_words)
self._subject.notify(word_freqs)
def re_split(self, data):
pattern = re.compile('[\W_]+')
return pattern.sub(' ', data).lower().split()
def filter_words(self, word_list):
return [w for w in word_list if w not in self._stop_words and len(w) >= 3]
def count_frequencies(self, word_list):
return Counter(word_list)
# 开始测试
if __name__ == "__main__":
stopwordfilepath = r'C:\Users\asus\Desktop\cppy余悦批注\cppy\data\stop_words.txt'
testfilepath = r'C:\Users\asus\Desktop\cppy余悦批注\cppy\data\pride-and-prejudice.txt'
# 调用实例
subject = Subject()
observer = WordFrequencyObserver()
subject.attach(observer)
text_processor = TextProcessor(subject)
text_processor.load_stop_words(stopwordfilepath)
text_processor.process_text(testfilepath)

@ -1,59 +1,59 @@
# -*- coding: utf-8 -*-
import cppy.cp_util as util
from collections import Counter
class WordFrequencyStateMachine:
def __init__(self, file_path):
self.file_path = file_path
self.content = None
self.words = None
self.word_freq = None
self.state = 'IDLE'
def transition_to_read_file(self):
try:
with open(self.file_path, 'r', encoding='utf-8') as file:
self.content = file.read()
self.state = 'WORDS_SPLIT'
except FileNotFoundError:
print(f"文件 {self.file_path} 未找到。")
except Exception as e:
print(f"读取文件时发生错误: {e}")
def transition_to_split_words(self):
if self.content is not None:
self.words = util.extract_str_words(self.content)
self.state = 'CALCULATE_FREQ'
else:
print("文件内容为空,无法分割单词。")
def transition_to_calculate_freq(self):
if self.words is not None:
self.word_freq = Counter(self.words)
self.state = 'DONE'
else:
print("单词列表为空,无法计算词频。")
def run(self):
while self.state != 'DONE':
if self.state == 'IDLE':
self.transition_to_read_file()
elif self.state == 'WORDS_SPLIT':
self.transition_to_split_words()
elif self.state == 'CALCULATE_FREQ':
self.transition_to_calculate_freq()
else:
print(f"未知状态: {self.state}")
break
return self.word_freq
# 使用状态机计算词频
state_machine = WordFrequencyStateMachine(util.testfilepath)
word_frequencies = state_machine.run()
# 打印结果
# -*- coding: utf-8 -*-
import cppy.cp_util as util
from collections import Counter
class WordFrequencyStateMachine:
def __init__(self, file_path):
self.file_path = file_path
self.content = None
self.words = None
self.word_freq = None
self.state = 'IDLE'
def transition_to_read_file(self):
try:
with open(self.file_path, 'r', encoding='utf-8') as file:
self.content = file.read()
self.state = 'WORDS_SPLIT'
except FileNotFoundError:
print(f"文件 {self.file_path} 未找到。")
except Exception as e:
print(f"读取文件时发生错误: {e}")
def transition_to_split_words(self):
if self.content is not None:
self.words = util.extract_str_words(self.content)
self.state = 'CALCULATE_FREQ'
else:
print("文件内容为空,无法分割单词。")
def transition_to_calculate_freq(self):
if self.words is not None:
self.word_freq = Counter(self.words)
self.state = 'DONE'
else:
print("单词列表为空,无法计算词频。")
def run(self):
while self.state != 'DONE':
if self.state == 'IDLE':
self.transition_to_read_file()
elif self.state == 'WORDS_SPLIT':
self.transition_to_split_words()
elif self.state == 'CALCULATE_FREQ':
self.transition_to_calculate_freq()
else:
print(f"未知状态: {self.state}")
break
return self.word_freq
# 使用状态机计算词频
state_machine = WordFrequencyStateMachine(util.testfilepath)
word_frequencies = state_machine.run()
# 打印结果
util.print_word_freqs(word_frequencies.most_common(10))

@ -1,33 +1,33 @@
# -*- coding: utf-8 -*-
import cppy.cp_util as util
# 每一列是一个数据元素和一个公式,第一列是输入数据,所以没有公式
all_words = [(), None]
non_stop_words = [(), util.extract_str_words]
frequencies = [(), util.get_frequencies]
sorted_data = [(), util.sort_dict]
# 整个电子表格
all_columns = [all_words, non_stop_words,\
frequencies, sorted_data]
# 每次输入数据后调用此方法
def update():
global all_columns
for c in all_columns[1::]:
if c[1] == util.extract_str_words:
c[0] = c[1](all_words[0])
elif c[1] == util.get_frequencies:
c[0] = c[1](non_stop_words[0])
elif c[1] == util.sort_dict:
c[0] = c[1](frequencies[0])
# 将固定数据加载到第一列中
all_words[0] = util.read_file(util.testfilepath)
# 调用update函数遍历列表
update()
#打印结果
util.print_word_freqs(sorted_data[0])
# -*- coding: utf-8 -*-
import cppy.cp_util as util
# 每一列是一个数据元素和一个公式,第一列是输入数据,所以没有公式
all_words = [(), None]
non_stop_words = [(), util.extract_str_words]
frequencies = [(), util.get_frequencies]
sorted_data = [(), util.sort_dict]
# 整个电子表格
all_columns = [all_words, non_stop_words,\
frequencies, sorted_data]
# 每次输入数据后调用此方法
def update():
global all_columns
for c in all_columns[1::]:
if c[1] == util.extract_str_words:
c[0] = c[1](all_words[0])
elif c[1] == util.get_frequencies:
c[0] = c[1](non_stop_words[0])
elif c[1] == util.sort_dict:
c[0] = c[1](frequencies[0])
# 将固定数据加载到第一列中
all_words[0] = util.read_file(util.testfilepath)
# 调用update函数遍历列表
update()
#打印结果
util.print_word_freqs(sorted_data[0])
Loading…
Cancel
Save