Compare commits
No commits in common. 'dev' and 'dev' have entirely different histories.
@ -1,39 +0,0 @@
|
||||
from cppy.cp_util import *
|
||||
from dataclasses import dataclass
|
||||
from collections import Counter
|
||||
import re
|
||||
|
||||
# 对象属性是现代 Python编程喜欢的风格
|
||||
# 这里使用了dataclass来简化代码
|
||||
|
||||
@dataclass
|
||||
class WordFrequency:
|
||||
text: str
|
||||
stop_words: set = None
|
||||
|
||||
def __post_init__(self):
|
||||
# 如果未提供停用词表
|
||||
if self.stop_words is None:
|
||||
self.stop_words = get_stopwords()
|
||||
|
||||
def tokenize(self):
|
||||
# 分词并去除停用词
|
||||
words = re.findall(r'\b\w+\b', self.text.lower())
|
||||
filtered_words = [word for word in words if word not in self.stop_words and len(word)>2]
|
||||
return filtered_words
|
||||
|
||||
def get_top_n(self, n=10):
|
||||
# 计算词频
|
||||
word_freqs = Counter(self.tokenize())
|
||||
return word_freqs.most_common(n)
|
||||
|
||||
|
||||
# 使用示例
|
||||
if __name__ == '__main__':
|
||||
# 创建WordFrequency实例
|
||||
text = read_file()
|
||||
word_freq = WordFrequency( text )
|
||||
|
||||
# 获取并打印词频
|
||||
top_words = word_freq.get_top_n()
|
||||
print_word_freqs(top_words)
|
@ -1,20 +0,0 @@
|
||||
from cppy.cp_util import *
|
||||
|
||||
#
|
||||
# 生成器 是一种简单异步实现
|
||||
#
|
||||
def non_stop_words(testfilepath):
|
||||
stopwords = get_stopwords()
|
||||
data_str = read_file(testfilepath)
|
||||
wordlist = re_split( data_str )
|
||||
for word in wordlist:
|
||||
if word not in stopwords:
|
||||
yield word # 弹出一个非停用词
|
||||
|
||||
|
||||
freqs = {}
|
||||
for word in non_stop_words(testfilepath):
|
||||
freqs[word] = freqs.get(word, 0) + 1
|
||||
|
||||
data = sort_dict(freqs)
|
||||
print_word_freqs(data)
|
@ -1,3 +0,0 @@
|
||||
from cppy.cp_util import *
|
||||
|
||||
print_word_freqs( sort_dict ( get_frequencies ( extract_file_words(testfilepath) )))
|
@ -1,24 +0,0 @@
|
||||
from cppy.cp_util import *
|
||||
|
||||
# 这个例子没有实际意义,是用来帮助理解其他例子
|
||||
# 主程序只需要启动第一个动作,后面的顺序逻辑写到各个函数里面了
|
||||
|
||||
def readfile(file_path, func):
|
||||
data = read_file(file_path)
|
||||
func(data, frequencies)
|
||||
|
||||
def extractwords(str_data,func):
|
||||
func(extract_str_words(str_data), sort)
|
||||
|
||||
def frequencies(word_list, func):
|
||||
wf = get_frequencies(word_list)
|
||||
func(wf, printall)
|
||||
|
||||
def sort(wf, func):
|
||||
func(sort_dict(wf), None)
|
||||
|
||||
def printall(word_freqs, _ ):
|
||||
print_word_freqs(word_freqs)
|
||||
|
||||
if __name__ == "__main__":
|
||||
readfile(testfilepath, extractwords)
|
@ -1 +0,0 @@
|
||||
异常主要发生在参数传递和代码块执行过程。一种原则是:软件不能挂掉。检查参数合理性、检查代码块执行可能的错误,并进行合理结果补齐,保持程序继续运行【 1 软件不能挂掉 】,另外一种情况是发生异常就抛出然后终止程序【 2 时间停止在那一刻 】,或者由上层函数接住,集中统一处理。【 3 预判可能的错误 】。
|
@ -1,27 +0,0 @@
|
||||
from cppy.cp_util import *
|
||||
|
||||
#
|
||||
# 用断言从事发点给出出错的准确信息
|
||||
#
|
||||
def extractWords(path_to_file):
|
||||
assert(type(path_to_file) is str), "Must be a string"
|
||||
assert(path_to_file), "Must be a non-empty string"
|
||||
return extract_file_words(path_to_file)
|
||||
|
||||
def frequencies(word_list):
|
||||
assert(type(word_list) is list), "Must be a list"
|
||||
assert(word_list != []), "Must be a non-empty list"
|
||||
return get_frequencies(word_list)
|
||||
|
||||
def sort(word_freqs):
|
||||
assert(type(word_freqs) is dict), "Must be a dictionary"
|
||||
assert(word_freqs != {}), "Must be a non-empty dictionary"
|
||||
return sort_dict(word_freqs)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
try:
|
||||
word_freqs = sort(frequencies(extractWords( testfilepath )))
|
||||
print_word_freqs(word_freqs)
|
||||
except Exception as e:
|
||||
print(" Something wrong: {0}".format(e) )
|
@ -1,36 +0,0 @@
|
||||
# 创建对象是消耗资源的,如果发现对象已经存在,可以返回引用,不创造新对象 。设计模式中这个做法叫享元
|
||||
# 可以降低资源需求和提升响应速度。更常见的该模式使用场景是各种资源池。
|
||||
|
||||
from cppy.cp_util import *
|
||||
|
||||
#享元类
|
||||
class WordFrequencyController():
|
||||
def __init__(self, controllertype,filepath ):
|
||||
word_list = extract_file_words(filepath)
|
||||
word_freq = get_frequencies(word_list)
|
||||
self.word_freq = sort_dict(word_freq)
|
||||
self.number = controllertype
|
||||
def print_word_freqs( self ):
|
||||
print_word_freqs( self.word_freq,self.number)
|
||||
|
||||
#享元工厂
|
||||
class WordFrequencyControllerFactory():
|
||||
def __init__(self):
|
||||
self.types = {}
|
||||
|
||||
def get_WordFrequencyController(self, number,testfilepath):
|
||||
if number not in self.types:
|
||||
self.types[number] = WordFrequencyController(number,testfilepath) # 创建新的对象
|
||||
print('new obj: ','*'*30,number)
|
||||
else:
|
||||
print('ref obj: ','*'*30,number)
|
||||
return self.types[number] # 重复使用已存在的对象
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
factory = WordFrequencyControllerFactory()
|
||||
for number in [ 1,3,5,3,5,7 ]:
|
||||
WordFrequency = factory.get_WordFrequencyController(number,testfilepath)
|
||||
# print(flush=True)
|
||||
WordFrequency.print_word_freqs()
|
||||
|
@ -1,105 +0,0 @@
|
||||
'''
|
||||
把观察者挂到自己的处理队列上
|
||||
'''
|
||||
|
||||
import os
|
||||
import re
|
||||
import threading
|
||||
from queue import Queue
|
||||
from collections import Counter
|
||||
from abc import ABC, abstractmethod
|
||||
|
||||
# 观察者接口
|
||||
class Observer(ABC):
|
||||
@abstractmethod
|
||||
def update(self, word_counts: Counter):
|
||||
pass
|
||||
|
||||
# 具体观察者:打印前 10 高频词
|
||||
class PrintTopWordsObserver(Observer):
|
||||
def update(self, word_counts: Counter):
|
||||
print("Top 10 高频词:")
|
||||
for word, count in word_counts.most_common(10):
|
||||
print(f"{word}: {count}")
|
||||
|
||||
# 具体观察者:保存词频到文件
|
||||
class SaveToFileObserver(Observer):
|
||||
def __init__(self, output_file):
|
||||
self.output_file = output_file
|
||||
|
||||
def update(self, word_counts: Counter):
|
||||
try:
|
||||
with open(self.output_file, 'w', encoding='utf-8') as f:
|
||||
for word, count in word_counts.most_common(10):
|
||||
f.write(f"{word}: {count}\n")
|
||||
print(f"词频已保存到 {self.output_file}")
|
||||
except Exception as e:
|
||||
print(f"保存失败: {e}")
|
||||
|
||||
# 词频统计器(主题)
|
||||
class WordFrequencyCounter:
|
||||
def __init__(self):
|
||||
self.observers = []
|
||||
self.counter = Counter()
|
||||
self.queue = Queue()
|
||||
self.lock = threading.Lock()
|
||||
|
||||
def add_observer(self, observer: Observer):
|
||||
self.observers.append(observer)
|
||||
|
||||
def remove_observer(self, observer: Observer):
|
||||
self.observers.remove(observer)
|
||||
|
||||
def notify_observers(self):
|
||||
for observer in self.observers:
|
||||
observer.update(self.counter)
|
||||
|
||||
def process_file(self):
|
||||
while True:
|
||||
try:
|
||||
file_path = self.queue.get_nowait()
|
||||
except:
|
||||
break
|
||||
try:
|
||||
with open(file_path, 'r', encoding='utf-8') as f:
|
||||
text = f.read().lower()
|
||||
words = re.findall(r'\b\w+\b', text)
|
||||
with self.lock:
|
||||
self.counter.update(words)
|
||||
except Exception as e:
|
||||
print(f"Error processing {file_path}: {e}")
|
||||
finally:
|
||||
self.queue.task_done()
|
||||
|
||||
def count_words(self, files, num_threads=4):
|
||||
# 将文件路径放入队列
|
||||
for file_path in files:
|
||||
self.queue.put(file_path)
|
||||
|
||||
# 创建并启动线程
|
||||
threads = [threading.Thread(target=self.process_file) for _ in range(num_threads)]
|
||||
for t in threads:
|
||||
t.start()
|
||||
for t in threads:
|
||||
t.join()
|
||||
|
||||
# 通知所有观察者
|
||||
self.notify_observers()
|
||||
|
||||
def main():
|
||||
# 获取文件列表
|
||||
data_dir = 'data'
|
||||
files = [os.path.join(data_dir, f) for f in os.listdir(data_dir) if f.endswith('.txt')]
|
||||
|
||||
# 创建词频统计器
|
||||
counter = WordFrequencyCounter()
|
||||
|
||||
# 添加观察者
|
||||
counter.add_observer(PrintTopWordsObserver())
|
||||
counter.add_observer(SaveToFileObserver("word_frequency.txt"))
|
||||
|
||||
# 统计词频并通知观察者
|
||||
counter.count_words(files)
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
@ -1,5 +0,0 @@
|
||||
|
||||
|
||||
[Plugins]
|
||||
;; Options: plugins/f1.pyc, plugins/f2.pyc
|
||||
frequencies = plugins/f2.pyc
|
@ -1,34 +0,0 @@
|
||||
|
||||
# 插件模式提供一种个别扩展性开发和系统核心开发无关的松耦合结构。
|
||||
# 简单说,第三方开发者在没有核心框架源码下也能扩展或者改造系统功能
|
||||
|
||||
import configparser, importlib.machinery
|
||||
from cppy.cp_util import *
|
||||
|
||||
class PluginManager:
|
||||
def __init__(self):
|
||||
self.plugins = {}
|
||||
|
||||
def load_plugins(self):
|
||||
_dir = os.path.dirname(os.path.abspath(__file__))
|
||||
os.chdir(_dir)
|
||||
|
||||
config = configparser.ConfigParser()
|
||||
config.read("config.ini")
|
||||
|
||||
frequencies_plugin = config.get("Plugins", "frequencies")
|
||||
|
||||
# 加载插件
|
||||
self.plugins['word_freqs'] = importlib.machinery.SourcelessFileLoader('', frequencies_plugin).load_module()
|
||||
|
||||
def get_plugin(self, name):
|
||||
return self.plugins.get(name)
|
||||
|
||||
|
||||
# 创建 PluginManager 实例
|
||||
plugin_manager = PluginManager()
|
||||
plugin_manager.load_plugins()
|
||||
|
||||
wordlist = extract_file_words(testfilepath) # 提取文件中的单词
|
||||
word_freqs = plugin_manager.get_plugin('word_freqs').top_word(wordlist) # 调用实例方法
|
||||
print_word_freqs(word_freqs) # 打印词频
|
@ -1,28 +0,0 @@
|
||||
import py_compile
|
||||
|
||||
py_compile.compile('f1.py')
|
||||
py_compile.compile('f2.py')
|
||||
|
||||
import os
|
||||
import shutil
|
||||
|
||||
# 设置源目录和目标目录
|
||||
source_dir = os.path.join(os.path.dirname(__file__), '__pycache__') # 当前目录下的 __pycache__ 目录
|
||||
target_dir = os.path.join(os.path.dirname(__file__), '..', 'plugins') # 上一级目录下的 plugins 目录
|
||||
|
||||
# 确保目标目录存在
|
||||
os.makedirs(target_dir, exist_ok=True)
|
||||
|
||||
# 遍历源目录中的所有 .pyc 文件
|
||||
for filename in os.listdir(source_dir):
|
||||
if filename.endswith('.pyc'):
|
||||
# 提取文件名的前两个字符
|
||||
new_filename = filename[:2]
|
||||
# 构建源文件和目标文件的完整路径
|
||||
source_file = os.path.join(source_dir, filename)
|
||||
target_file = os.path.join(target_dir, new_filename + '.pyc')
|
||||
# 拷贝文件
|
||||
shutil.copyfile(source_file, target_file)
|
||||
# 删除原始文件
|
||||
os.remove(source_file)
|
||||
print(f"Copied {filename} to {target_file} and removed original file.")
|
@ -1,8 +0,0 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
import collections
|
||||
|
||||
def top_word(word_list):
|
||||
counts = collections.Counter( word_list )
|
||||
return counts.most_common(10)
|
||||
|
Binary file not shown.
@ -1,25 +0,0 @@
|
||||
import requests
|
||||
from cppy.cp_util import *
|
||||
|
||||
def main():
|
||||
# 读测试文件的内容
|
||||
content = read_file()
|
||||
|
||||
# 抽词
|
||||
tokenize_response = requests.post("http://localhost:7770/tokenize", json={"text": content})
|
||||
words = tokenize_response.json()["words"]
|
||||
|
||||
# 计算词频
|
||||
count_response = requests.post("http://localhost:7771/count", json={"words": words})
|
||||
word_count = count_response.json()["word_count"]
|
||||
|
||||
# 排序
|
||||
sort_response = requests.post("http://localhost:7772/sort", json={"word_count": word_count})
|
||||
top_10_words = sort_response.json()["top_10_words"]
|
||||
|
||||
print("Top 10 words:")
|
||||
print_word_freqs(top_10_words)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
@ -1,14 +0,0 @@
|
||||
from fastapi import FastAPI
|
||||
from collections import Counter
|
||||
from cppy.cp_util import *
|
||||
import uvicorn
|
||||
|
||||
app = FastAPI()
|
||||
|
||||
@app.post("/count")
|
||||
async def count(words_list: dict): # {"words": ["word1", "word2", ...]}
|
||||
word_count = Counter(words_list["words"])
|
||||
return {"word_count": dict(word_count)}
|
||||
|
||||
if __name__ == "__main__":
|
||||
uvicorn.run(app, host="127.0.0.1", port= 7771)
|
@ -1,13 +0,0 @@
|
||||
from fastapi import FastAPI
|
||||
import uvicorn
|
||||
|
||||
app = FastAPI()
|
||||
|
||||
@app.post("/sort")
|
||||
async def sort(word_count_dict: dict):
|
||||
sorted_word_count = sorted(word_count_dict["word_count"].items(), key=lambda x: x[1], reverse=True)
|
||||
top_10_words = sorted_word_count[:10]
|
||||
return {"top_10_words": top_10_words}
|
||||
|
||||
if __name__ == "__main__":
|
||||
uvicorn.run(app, host="127.0.0.1", port= 7772)
|
@ -1,13 +0,0 @@
|
||||
from fastapi import FastAPI
|
||||
from cppy.cp_util import *
|
||||
import uvicorn
|
||||
|
||||
app = FastAPI()
|
||||
|
||||
@app.post("/tokenize")
|
||||
async def tokenize(text: str):
|
||||
words = extract_str_words(text)
|
||||
return {"words": words}
|
||||
|
||||
if __name__ == "__main__":
|
||||
uvicorn.run(app, host="127.0.0.1", port= 7770)
|
Binary file not shown.
@ -1,20 +0,0 @@
|
||||
|
||||
# Python 作为弱类型语言希望拥有强类型语言类似的规范工整工程性的优点,牺牲一些代码的自由度。
|
||||
# 可以理解为更好的代码注释和更多的工程约束 。
|
||||
|
||||
import cppy.cp_util as util
|
||||
|
||||
|
||||
def extract_words(path_to_file:str) -> list:
|
||||
return util.extract_file_words(path_to_file)
|
||||
|
||||
def frequencies( word_list:list ) -> dict :
|
||||
return util.get_frequencies(word_list)
|
||||
|
||||
def sort(word_freq:dict) -> list :
|
||||
return util.sort_dict(word_freq)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
word_freqs = sort( frequencies(extract_words( util.testfilepath )) )
|
||||
util.print_word_freqs(word_freqs)
|
@ -1,51 +0,0 @@
|
||||
import re
|
||||
from collections import Counter
|
||||
|
||||
# 提供一个命令行交互方式来驱动程序运行
|
||||
|
||||
|
||||
# 清洗文本,移除标点符号并转换为小写
|
||||
def clean_text(text):
|
||||
return re.sub(r'[^\w\s]', '', text).lower()
|
||||
|
||||
# 统计词频
|
||||
def count_frequencies(text):
|
||||
return Counter(word for word in clean_text(text).split())
|
||||
|
||||
# 交互式提示用户输入文件路径和前n个单词的数量
|
||||
def interactive_mode():
|
||||
file_path = input("请输入文件路径 >> ")
|
||||
try:
|
||||
n = int(input("请输入你想要输出的前n个最常见单词的数量 >> "))
|
||||
if n <= 0:
|
||||
raise ValueError("数量必须大于0。")
|
||||
except ValueError as e:
|
||||
print(f"输入错误:{e}")
|
||||
return
|
||||
|
||||
try:
|
||||
# 打开文件并读取内容
|
||||
with open(file_path, 'r', encoding='utf-8') as file:
|
||||
text = file.read()
|
||||
|
||||
# 统计词频
|
||||
frequencies = count_frequencies(text)
|
||||
|
||||
# 获取前n个最常见的单词
|
||||
most_common = frequencies.most_common(n)
|
||||
|
||||
# 输出结果
|
||||
for word, freq in most_common:
|
||||
print(f"{word}: {freq}")
|
||||
except FileNotFoundError:
|
||||
print(f"文件未找到: {file_path}")
|
||||
except Exception as e:
|
||||
print(f"发生错误: {e}")
|
||||
|
||||
# 主函数
|
||||
def main():
|
||||
print("欢迎使用词频统计工具。")
|
||||
interactive_mode()
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
@ -1,30 +0,0 @@
|
||||
from flask import Flask, render_template, request, redirect, url_for
|
||||
from collections import Counter
|
||||
from cppy.cp_util import *
|
||||
import os
|
||||
|
||||
app = Flask(__name__)
|
||||
|
||||
@app.route('/', methods=['GET', 'POST'])
|
||||
def index():
|
||||
if request.method == 'POST':
|
||||
# 获取上传的文件
|
||||
file = request.files['file']
|
||||
|
||||
# 保存临时文件并读取内容
|
||||
filename = os.path.join('/temp', file.filename)
|
||||
file.save(filename)
|
||||
|
||||
# 计算词频
|
||||
words = extract_file_words(filename)
|
||||
word_counts = Counter(words)
|
||||
|
||||
# 删除临时文件
|
||||
os.remove(filename)
|
||||
|
||||
return render_template('result.html', word_counts=word_counts.most_common())
|
||||
|
||||
return render_template('index.html')
|
||||
|
||||
if __name__ == '__main__':
|
||||
app.run(debug=True)
|
@ -1,14 +0,0 @@
|
||||
<!DOCTYPE html>
|
||||
<html lang="en">
|
||||
<head>
|
||||
<meta charset="UTF-8">
|
||||
<title>Upload Text File</title>
|
||||
</head>
|
||||
<body>
|
||||
<h1>Upload a Text File to Count Word Frequencies</h1>
|
||||
<form action="/" method="post" enctype="multipart/form-data">
|
||||
<input type="file" name="file">
|
||||
<input type="submit" value="Submit">
|
||||
</form>
|
||||
</body>
|
||||
</html>
|
@ -1,16 +0,0 @@
|
||||
<!DOCTYPE html>
|
||||
<html lang="en">
|
||||
<head>
|
||||
<meta charset="UTF-8">
|
||||
<title>Word Frequencies</title>
|
||||
</head>
|
||||
<body>
|
||||
<h1>Top Word Frequencies:</h1>
|
||||
<ul>
|
||||
{% for word, count in word_counts %}
|
||||
<li>{{ word }}: {{ count }}</li>
|
||||
{% endfor %}
|
||||
</ul>
|
||||
<a href="{{ url_for('index') }}">Back to Upload</a>
|
||||
</body>
|
||||
</html>
|
@ -1,23 +0,0 @@
|
||||
# 装饰器模式允许我们在不修改原有类的基础上,动态地添加额外的功能。
|
||||
# 就增加功能来说,装饰器模式比生成子类更为灵活。
|
||||
# 餐吧的顾客可以选择为他们的咖啡添加额外的调料。
|
||||
class Beverage:
|
||||
def __init__(self, description):
|
||||
self.description = description
|
||||
self.price = 0.0
|
||||
|
||||
def cost(self):
|
||||
return self.price
|
||||
|
||||
class CondimentDecorator(Beverage): # 进行装饰
|
||||
def __init__(self, beverage, description, price_increase):
|
||||
self.beverage = beverage
|
||||
self.description = f"{beverage.description}, {description}"
|
||||
self.price_increase = price_increase
|
||||
|
||||
def cost(self):
|
||||
return self.beverage.cost() + self.price_increase
|
||||
|
||||
# 使用装饰器模式
|
||||
coffee = Beverage("Espresso")
|
||||
coffee_with_chocolate = CondimentDecorator(coffee, "Chocolate", 0.50)
|
@ -1,60 +0,0 @@
|
||||
import os
|
||||
import threading
|
||||
from queue import Queue
|
||||
from collections import Counter
|
||||
import re
|
||||
|
||||
# 共享队列和词频统计器
|
||||
file_queue = Queue()
|
||||
word_counter = Counter()
|
||||
lock = threading.Lock() # 确保线程安全更新 Counter
|
||||
|
||||
# 读取文件并分词的函数
|
||||
def process_file():
|
||||
while True:
|
||||
try:
|
||||
# 从队列获取文件名,非阻塞
|
||||
file_path = file_queue.get_nowait()
|
||||
except:
|
||||
break # 队列为空,退出
|
||||
try:
|
||||
with open(file_path, 'r', encoding='utf-8') as f:
|
||||
text = f.read().lower()
|
||||
# 简单分词,移除标点
|
||||
words = re.findall(r'\b\w+\b', text)
|
||||
# 线程安全更新词频
|
||||
with lock:
|
||||
word_counter.update(words)
|
||||
except Exception as e:
|
||||
print(f"Error processing {file_path}: {e}")
|
||||
finally:
|
||||
file_queue.task_done()
|
||||
|
||||
def main():
|
||||
# 获取 data 目录下所有 .txt 文件
|
||||
data_dir = 'data'
|
||||
files = [os.path.join(data_dir, f) for f in os.listdir(data_dir) if f.endswith('.txt')]
|
||||
|
||||
# 将文件路径放入队列
|
||||
for file_path in files:
|
||||
file_queue.put(file_path)
|
||||
|
||||
# 创建并启动多个线程
|
||||
num_threads = 4 # 可根据需要调整线程数
|
||||
threads = []
|
||||
for _ in range(num_threads):
|
||||
t = threading.Thread(target=process_file)
|
||||
t.start()
|
||||
threads.append(t)
|
||||
|
||||
# 等待所有线程完成
|
||||
for t in threads:
|
||||
t.join()
|
||||
|
||||
# 输出前 10 高频词
|
||||
print("Top 10 高频词:")
|
||||
for word, count in word_counter.most_common(10):
|
||||
print(f"{word}: {count}")
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
@ -1,45 +0,0 @@
|
||||
import os
|
||||
import re
|
||||
from collections import Counter
|
||||
from multiprocessing import Pool, Manager
|
||||
|
||||
def process_file(file_path, shared_counter):
|
||||
"""处理单个文件,统计词频"""
|
||||
try:
|
||||
with open(file_path, 'r', encoding='utf-8') as f:
|
||||
text = f.read().lower()
|
||||
# 简单分词,移除标点
|
||||
words = re.findall(r'\b\w+\b', text)
|
||||
# 更新共享 Counter
|
||||
shared_counter.update(words)
|
||||
except Exception as e:
|
||||
print(f"Error processing {file_path}: {e}")
|
||||
|
||||
def main():
|
||||
# 获取 data 目录下所有 .txt 文件
|
||||
data_dir = 'data'
|
||||
files = [os.path.join(data_dir, f) for f in os.listdir(data_dir) if f.endswith('.txt')]
|
||||
|
||||
# 使用 Manager 创建共享 Counter
|
||||
with Manager() as manager:
|
||||
shared_counter = manager.dict(Counter())
|
||||
|
||||
# 创建进程池
|
||||
with Pool(processes=4) as pool: # 可调整进程数
|
||||
# 分发任务给进程池
|
||||
for file_path in files:
|
||||
pool.apply_async(process_file, args=(file_path, shared_counter))
|
||||
# 关闭池并等待所有进程完成
|
||||
pool.close()
|
||||
pool.join()
|
||||
|
||||
# 转换为普通 Counter 以获取结果
|
||||
final_counter = Counter(dict(shared_counter))
|
||||
|
||||
# 输出前 10 高频词
|
||||
print("Top 10 高频词:")
|
||||
for word, count in final_counter.most_common(10):
|
||||
print(f"{word}: {count}")
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
@ -1,7 +0,0 @@
|
||||
|
||||
data_dir: "./data"
|
||||
output_file: "./results.csv"
|
||||
stop_words_file: "./config/stopwords.txt"
|
||||
top_n: 10
|
||||
tokenizer: "simple" # 可选 simple/jieba
|
||||
|
@ -1,10 +0,0 @@
|
||||
from pathlib import Path
|
||||
import csv
|
||||
|
||||
def save_results(results: dict, output_path: str) -> None:
|
||||
"""保存结果到CSV"""
|
||||
Path(output_path).parent.mkdir(exist_ok=True)
|
||||
with open(output_path, 'w', newline='') as f:
|
||||
writer = csv.writer(f)
|
||||
writer.writerow(["word", "count"])
|
||||
writer.writerows(results.items())
|
@ -1,36 +0,0 @@
|
||||
import pytest
|
||||
from src.core import WordCounter
|
||||
from src.file_io import save_results
|
||||
import csv
|
||||
|
||||
@pytest.fixture
|
||||
def sample_data(tmp_path):
|
||||
data_dir = tmp_path / "data"
|
||||
data_dir.mkdir()
|
||||
(data_dir / "test1.txt").write_text("apple banana apple")
|
||||
(data_dir / "test2.txt").write_text("banana cherry")
|
||||
return data_dir
|
||||
|
||||
def test_full_pipeline(sample_data, tmp_path):
|
||||
config = {
|
||||
"data_dir": str(sample_data),
|
||||
"stop_words_file": str(tmp_path / "stopwords.txt"),
|
||||
"output_file": str(tmp_path / "results.csv"),
|
||||
"top_n": 2,
|
||||
"tokenizer": "simple"
|
||||
}
|
||||
|
||||
# 生成停用词文件
|
||||
(tmp_path / "stopwords.txt").write_text("cherry")
|
||||
|
||||
# 执行完整流程
|
||||
counter = WordCounter()
|
||||
counter.config = config
|
||||
results = counter.process_files()
|
||||
save_results(results, config['output_file'])
|
||||
|
||||
# 验证输出
|
||||
with open(config['output_file']) as f:
|
||||
reader = csv.reader(f)
|
||||
next(reader) # Skip header
|
||||
assert list(reader) == [["apple", "2"], ["banana", "2"]]
|
@ -1,34 +0,0 @@
|
||||
import pytest
|
||||
from src.core import WordCounter
|
||||
import tempfile
|
||||
import random
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def large_data():
|
||||
"""生成1MB测试数据"""
|
||||
words = [f"word{i}" for i in range(1000)]
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
data_dir = Path(tmpdir) / "data"
|
||||
data_dir.mkdir()
|
||||
for i in range(10):
|
||||
with open(data_dir / f"bigfile{i}.txt", 'w') as f:
|
||||
content = " ".join(random.choices(words, k=100000))
|
||||
f.write(content)
|
||||
yield str(data_dir)
|
||||
|
||||
def test_processing_performance(benchmark, large_data):
|
||||
"""性能基准测试"""
|
||||
counter = WordCounter()
|
||||
counter.config = {
|
||||
"data_dir": large_data,
|
||||
"stop_words_file": "nonexistent",
|
||||
"output_file": "/dev/null",
|
||||
"top_n": 10,
|
||||
"tokenizer": "simple"
|
||||
}
|
||||
|
||||
# 执行基准测试
|
||||
result = benchmark(counter.process_files)
|
||||
|
||||
# 验证性能指标
|
||||
assert benchmark.stats['mean'] < 1.0 # 平均执行时间 < 1秒
|
@ -1,16 +0,0 @@
|
||||
import pytest
|
||||
from src.core import SimpleTokenizer, JiebaTokenizer
|
||||
|
||||
@pytest.fixture
|
||||
def stop_words():
|
||||
return {"the", "and", "a"}
|
||||
|
||||
def test_simple_tokenizer(stop_words):
|
||||
tokenizer = SimpleTokenizer()
|
||||
text = "the quick brown fox and a dog"
|
||||
assert tokenizer.tokenize(text, stop_words) == ["quick", "brown", "fox", "dog"]
|
||||
|
||||
def test_jieba_tokenizer(stop_words):
|
||||
tokenizer = JiebaTokenizer()
|
||||
text = "我爱北京天安门"
|
||||
assert tokenizer.tokenize(text, set()) == ["我", "爱", "北京", "天安门"]
|
@ -1,35 +0,0 @@
|
||||
{
|
||||
"cells": [
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"id": "13037781-7175-4a52-9d26-6c7d9f068b5f",
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"单元测试、集成测试、性能测试"
|
||||
]
|
||||
}
|
||||
],
|
||||
"metadata": {
|
||||
"kernelspec": {
|
||||
"display_name": "Python 3 (ipykernel)",
|
||||
"language": "python",
|
||||
"name": "python3"
|
||||
},
|
||||
"language_info": {
|
||||
"codemirror_mode": {
|
||||
"name": "ipython",
|
||||
"version": 3
|
||||
},
|
||||
"file_extension": ".py",
|
||||
"mimetype": "text/x-python",
|
||||
"name": "python",
|
||||
"nbconvert_exporter": "python",
|
||||
"pygments_lexer": "ipython3",
|
||||
"version": "3.12.7"
|
||||
}
|
||||
},
|
||||
"nbformat": 4,
|
||||
"nbformat_minor": 5
|
||||
}
|
@ -1,34 +0,0 @@
|
||||
本文旨在通过一个案例(读取 data 目录下 100 篇小说文本,统计词频并输出前 10 高频词)来说明如何提升代码工程质量。
|
||||
教案将逐步展示不同编程技术的应用,并分析其对代码可读性、可维护性、可扩展性和复用性的提升。
|
||||
|
||||
本案例不做性能提升方面的考量。
|
||||
|
||||
|
||||
## 起点:基础实现
|
||||
|
||||
```
|
||||
import os
|
||||
|
||||
files = os.listdir('data')
|
||||
word_count = {}
|
||||
for file in files:
|
||||
with open('data/' + file, 'r', encoding='utf-8') as f:
|
||||
text = f.read()
|
||||
words = text.split() # 假设简单按空格分词
|
||||
for word in words:
|
||||
if word in word_count:
|
||||
word_count[word] += 1
|
||||
else:
|
||||
word_count[word] = 1
|
||||
|
||||
# 排序并输出前10
|
||||
sorted_words = sorted(word_count.items(), key=lambda x: x[1], reverse=True)
|
||||
for i in range(10):
|
||||
print(sorted_words[i])
|
||||
```
|
||||
|
||||
## 问题分析
|
||||
- 可读性差:没有清晰的功能划分,代码逻辑混杂,难以阅读理解维护。
|
||||
- 扩展性差:如果需要更改分词逻辑、文件路径或输出格式,需修改多处代码。
|
||||
- 容错性差:未处理文件读取失败、空文件等问题。
|
||||
- 复用性低:逻辑无法直接复用在其他类似任务中。
|
@ -0,0 +1,72 @@
|
||||
|
||||
import site
|
||||
import os,re
|
||||
import string,operator
|
||||
|
||||
################################################################################
|
||||
# 变量
|
||||
################################################################################
|
||||
testfilename = 'test.txt'
|
||||
testfilename = 'pride-and-prejudice.txt'
|
||||
testfilename = 'Prey.txt'
|
||||
|
||||
db_filename = "tf.db"
|
||||
|
||||
site_packages = site.getsitepackages()
|
||||
for package in site_packages:
|
||||
if 'package' in package:
|
||||
basePath = package
|
||||
stopwordfilepath = os.path.join(basePath, 'cppy','data','stop_words.txt')
|
||||
testfilepath = os.path.join(basePath, 'cppy','data',testfilename )
|
||||
|
||||
|
||||
################################################################################
|
||||
# 函数
|
||||
################################################################################
|
||||
def read_file(path_to_file):
|
||||
with open(path_to_file,encoding='utf-8') as f:
|
||||
data = f.read()
|
||||
return data
|
||||
|
||||
def re_split( data ):
|
||||
pattern = re.compile('[\W_]+')
|
||||
data = pattern.sub(' ', data).lower()
|
||||
return data.split()
|
||||
|
||||
def get_stopwords( path_to_file = stopwordfilepath ):
|
||||
with open(path_to_file,encoding='utf-8') as f:
|
||||
data = f.read().split(',')
|
||||
data.extend(list(string.ascii_lowercase))
|
||||
return data
|
||||
|
||||
def extract_file_words(path_to_file):
|
||||
word_list = re_split( read_file(path_to_file) )
|
||||
stop_words = get_stopwords()
|
||||
return [ w for w in word_list if ( not w in stop_words ) and len(w) >= 3 ]
|
||||
|
||||
def extract_str_words(data_str):
|
||||
word_list = re_split( data_str )
|
||||
stop_words = get_stopwords()
|
||||
return [ w for w in word_list if ( not w in stop_words ) and len(w) >= 3 ]
|
||||
|
||||
def count_word(word, word_freqs, stopwords):
|
||||
if word not in stopwords:
|
||||
word_freqs[word] = word_freqs.get(word, 0) + 1
|
||||
|
||||
def get_frequencies(word_list):
|
||||
word_freqs = {}
|
||||
for word in word_list:
|
||||
word_freqs[word] = word_freqs.get(word, 0) + 1
|
||||
return word_freqs
|
||||
|
||||
def sort_dict (word_freq):
|
||||
return sorted(word_freq.items(), key=operator.itemgetter(1), reverse=True)
|
||||
# return sorted( word_freq, key=lambda x: x[1], reverse=True )
|
||||
|
||||
def print_word_freqs( word_freqs, n = 10):
|
||||
for (w, c) in word_freqs[ :n ]:
|
||||
print( w, '-', c )
|
||||
|
||||
|
||||
def test():
|
||||
print( 'cppy welcome' )
|
@ -0,0 +1,46 @@
|
||||
import string
|
||||
from cppy.cp_util import *
|
||||
|
||||
# 准备词和停用词表
|
||||
word_freqs = []
|
||||
with open( stopwordfilepath,encoding='utf-8' ) as f:
|
||||
stop_words = f.read().split(',')
|
||||
stop_words.extend(list(string.ascii_lowercase))
|
||||
|
||||
for line in open( testfilepath ,encoding='utf-8' ):
|
||||
start_char = None
|
||||
i = 0
|
||||
for c in line:
|
||||
if start_char == None:
|
||||
if c.isalnum():
|
||||
# 一个单词开始
|
||||
start_char = i
|
||||
else:
|
||||
if not c.isalnum():
|
||||
# 一个单词结束
|
||||
found = False
|
||||
word = line[start_char:i].lower()
|
||||
# 跳过停用词
|
||||
if word not in stop_words:
|
||||
pair_index = 0
|
||||
# 单词是否第一次出现
|
||||
for pair in word_freqs:
|
||||
if word == pair[0]:
|
||||
pair[1] += 1
|
||||
found = True
|
||||
break
|
||||
pair_index += 1
|
||||
if not found:
|
||||
word_freqs.append([word, 1])
|
||||
elif len(word_freqs) > 1:
|
||||
for n in reversed(range(pair_index)):
|
||||
if word_freqs[pair_index][1] > word_freqs[n][1]:
|
||||
# 交换
|
||||
word_freqs[n], word_freqs[pair_index] = word_freqs[pair_index], word_freqs[n]
|
||||
pair_index = n
|
||||
# 重置开始标记
|
||||
start_char = None
|
||||
i += 1
|
||||
|
||||
for tf in word_freqs[0:10]:
|
||||
print(tf[0], '-', tf[1])
|
@ -0,0 +1,45 @@
|
||||
import sys, collections
|
||||
from cppy.cp_util import *
|
||||
|
||||
class WordFrequenciesModel:
|
||||
""" 模型:数据 """
|
||||
def __init__(self, path_to_file):
|
||||
self.update(path_to_file)
|
||||
|
||||
def update(self, path_to_file):
|
||||
try:
|
||||
self.freqs = collections.Counter( extract_file_words(path_to_file) )
|
||||
except IOError:
|
||||
print("File not found")
|
||||
self.freqs = {}
|
||||
|
||||
|
||||
class WordFrequenciesView:
|
||||
""" 视图:数据展现 """
|
||||
def __init__(self, model):
|
||||
self._model = model
|
||||
|
||||
def render(self):
|
||||
sorted_freqs = sort_dict(self._model.freqs)
|
||||
print_word_freqs(sorted_freqs)
|
||||
|
||||
|
||||
class WordFrequencyController:
|
||||
""" 控制:操作逻辑 """
|
||||
def __init__(self, model, view):
|
||||
self._model, self._view = model, view
|
||||
view.render()
|
||||
|
||||
def run(self):
|
||||
while True:
|
||||
print("Enter the file path (or 'q' to quit): ", file=sys.stderr, flush=True)
|
||||
filename = sys.stdin.readline().strip()
|
||||
if filename.lower() == 'q': break
|
||||
self._model.update(filename)
|
||||
self._view.render()
|
||||
|
||||
|
||||
m = WordFrequenciesModel( testfilepath )
|
||||
v = WordFrequenciesView(m)
|
||||
c = WordFrequencyController(m, v)
|
||||
c.run()
|
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in new issue