Compare commits
3 Commits
Author | SHA1 | Date |
9cad806031 | 1 year ago |
a66617dcce | 1 year ago |
34e76e8db6 | 1 year ago |
@ -1,3 +0,0 @@
from cppy.cp_util import *
print_word_freqs( sort_dict ( get_frequencies ( extract_file_words(testfilepath) )))
@ -1,20 +0,0 @@
from cppy.cp_util import *
# 生成器
def non_stop_words(testfilepath):
stopwords = get_stopwords()
data_str = read_file(testfilepath)
wordlist = re_split( data_str )
for word in wordlist:
if word not in stopwords:
yield word # 弹出一个非停用词
freqs = {}
for word in non_stop_words(testfilepath):
freqs[word] = freqs.get(word, 0) + 1
data = sort_dict(freqs)
Binary file not shown.
@ -1,76 +0,0 @@
# -*- coding: utf-8 -*-
from flask import Flask, request, jsonify, abort
from functools import lru_cache
from cppy.cp_util import *
from functools import cache
app = Flask(__name__)
# 模拟数据库
books_db = []
# 用于缓存用户数据库的装饰器
def get_books_db():
return books_db
@app.route('/books', methods=['GET'])
def get_books():
return jsonify(get_books_db())
@app.route('/books/<int:book_id>', methods=['GET'])
def get_book(book_id):
book = next((book for book in get_books_db() if book['id'] == book_id), None)
if book is None:
return jsonify(book['content'])
# 创建或更新新资源
@app.route('/books/<int:book_id>', methods=['PUT'])
def update_book(book_id):
global books_db
book_to_update = request.json
books_db = get_books_db()
book = next((book for book in books_db if book['id'] == book_id), None)
if book is None:
# 如果资源不存在,创建新资源
# 如果资源存在,更新资源
# 清除缓存的数据库
return jsonify(books_db), 200
@app.route('/books/<int:book_id>/word_frequency', methods=['GET'])
def word_frequency(book_id):
global books_db
book = next((book for book in get_books_db() if book['id'] == book_id), None)
filepath = book['content']
word_list = extract_file_words(filepath)
word_frequency = get_frequencies(word_list)
word_frequency = sort_dict(word_frequency)
return jsonify(word_frequency), 200
@app.route('/books/<int:book_id>', methods=['DELETE'])
def delete_book(book_id):
global books_db
books_db = [book for book in books_db if book['id'] != book_id]
if len(books_db) == len([l for l in books_db if l['id'] == book_id]):
abort(404) # 用户不存在
return jsonify({'message': f'book {book_id} deleted'}), 200
if __name__ == '__main__':
@ -1,42 +0,0 @@
# -*- coding: utf-8 -*-
import multiprocessing
from collections import Counter
from cppy.cp_util import *
# 多进程: 因为创建进程相比计算过程开销太大,结果最慢
stop_words = get_stopwords()
def process_chunk(chunk):
# 过滤停用词
words = [ w for w in chunk if ( not w in stop_words ) and len(w) >= 3 ]
return Counter(words)
def merge_counts(counts_list):
return sum(counts_list, Counter())
def main():
# 读取文件内容,分割文件内容为多个块,每个块由一个进程处理
chunks = get_chunks(testfilepath,1000)
# 使用多进程处理每个块
pool = multiprocessing.Pool(processes=multiprocessing.cpu_count())
counts_list =, chunks)
# 合并计数
total_counts = merge_counts(counts_list)
# 输出最高频的n个词
if __name__ == '__main__':
@ -1,48 +0,0 @@
import re
from collections import Counter
# 清洗文本,移除标点符号并转换为小写
def clean_text(text):
return re.sub(r'[^\w\s]', '', text).lower()
# 统计词频
def count_frequencies(text):
return Counter(word for word in clean_text(text).split())
# 交互式提示用户输入文件路径和前n个单词的数量
def interactive_mode():
file_path = input("请输入文件路径 >> ")
n = int(input("请输入你想要输出的前n个最常见单词的数量 >> "))
if n <= 0:
raise ValueError("数量必须大于0。")
except ValueError as e:
# 打开文件并读取内容
with open(file_path, 'r', encoding='utf-8') as file:
text =
# 统计词频
frequencies = count_frequencies(text)
# 获取前n个最常见的单词
most_common = frequencies.most_common(n)
# 输出结果
for word, freq in most_common:
print(f"{word}: {freq}")
except FileNotFoundError:
print(f"文件未找到: {file_path}")
except Exception as e:
print(f"发生错误: {e}")
# 主函数
def main():
if __name__ == "__main__":
@ -1,30 +0,0 @@
from flask import Flask, render_template, request, redirect, url_for
from collections import Counter
from cppy.cp_util import *
import os
app = Flask(__name__)
@app.route('/', methods=['GET', 'POST'])
def index():
if request.method == 'POST':
# 获取上传的文件
file = request.files['file']
# 保存临时文件并读取内容
filename = os.path.join('/temp', file.filename)
# 计算词频
words = extract_file_words(filename)
word_counts = Counter(words)
# 删除临时文件
return render_template('result.html', word_counts=word_counts.most_common())
return render_template('index.html')
if __name__ == '__main__':
@ -1,14 +0,0 @@
<!DOCTYPE html>
<html lang="en">
<meta charset="UTF-8">
<title>Upload Text File</title>
<h1>Upload a Text File to Count Word Frequencies</h1>
<form action="/" method="post" enctype="multipart/form-data">
<input type="file" name="file">
<input type="submit" value="Submit">
@ -1,16 +0,0 @@
<!DOCTYPE html>
<html lang="en">
<meta charset="UTF-8">
<title>Word Frequencies</title>
<h1>Top Word Frequencies:</h1>
{% for word, count in word_counts %}
<li>{{ word }}: {{ count }}</li>
{% endfor %}
<a href="{{ url_for('index') }}">Back to Upload</a>
@ -1,34 +0,0 @@
# 创建对象是消耗资源的,如果发现对象已经存在,可以返回引用,不创造新对象 。设计模式中这个做法叫享元
from cppy.cp_util import *
class WordFrequencyController():
def __init__(self, controllertype,filepath ):
word_list = extract_file_words(filepath)
word_freq = get_frequencies(word_list)
self.word_freq = sort_dict(word_freq)
self.number = controllertype
def print_word_freqs( self ):
print_word_freqs( self.word_freq,self.number)
class WordFrequencyControllerFactory():
def __init__(self):
self.types = {}
def get_WordFrequencyController(self, number,testfilepath):
if number not in self.types:
self.types[number] = WordFrequencyController(number,testfilepath) # 创建新的对象
print('new obj: ','*'*30,number)
print('ref obj: ','*'*30,number)
return self.types[number] # 重复使用已存在的对象
if __name__ == "__main__":
factory = WordFrequencyControllerFactory()
for number in [ 1,3,5,3,5,7 ]:
WordFrequency = factory.get_WordFrequencyController(number,testfilepath)
# print(flush=True)
@ -1,9 +0,0 @@
- 解耦合:通过回调函数,可以将不同部分的代码逻辑分离,降低模块之间的耦合度。
- 主动通信:注册回调模式实现了下层模块与上层模块之间的主动通信。当下层模块发生特定事件或满足特定条件时,可以主动调用上层模块注册的回调函数,而不需要上层模块不停地轮询下层模块的状态。
- 异步处理:回调函数常用于异步操作的响应处理,可以在主线程之外执行耗时操作,提升程序的效率和响应速度。
- 简化设计:在某些情况下,使用回调函数可以避免复杂的控制流设计,使代码更加简洁明了。
- 适应变化:随着项目的发展,需求可能会发生变化。注册回调模式使得在不影响现有代码的基础上,容易添加新功能或修改现有逻辑。
@ -1,24 +0,0 @@
from cppy.cp_util import *
# 这个例子没有实际意义,是用来帮助理解其他例子
# 主程序只需要启动第一个动作,后面的顺序逻辑写到各个函数里面了
def readfile(file_path, func):
data = read_file(file_path)
func(data, frequencies)
def extractwords(str_data,func):
func(extract_str_words(str_data), sort)
def frequencies(word_list, func):
wf = get_frequencies(word_list)
func(wf, printall)
def sort(wf, func):
func(sort_dict(wf), None)
def printall(word_freqs, _ ):
if __name__ == "__main__":
readfile(testfilepath, extractwords)
@ -1,25 +0,0 @@
import requests
from cppy.cp_util import *
def main():
# 读测试文件的内容
content = read_file()
# 抽词
tokenize_response ="http://localhost:7770/tokenize", json={"text": content})
words = tokenize_response.json()["words"]
# 计算词频
count_response ="http://localhost:7771/count", json={"words": words})
word_count = count_response.json()["word_count"]
# 排序
sort_response ="http://localhost:7772/sort", json={"word_count": word_count})
top_10_words = sort_response.json()["top_10_words"]
print("Top 10 words:")
if __name__ == "__main__":
@ -1,14 +0,0 @@
from fastapi import FastAPI
from collections import Counter
from cppy.cp_util import *
import uvicorn
app = FastAPI()
async def count(words_list: dict): # {"words": ["word1", "word2", ...]}
word_count = Counter(words_list["words"])
return {"word_count": dict(word_count)}
if __name__ == "__main__":
||||, host="", port= 7771)
@ -1,13 +0,0 @@
from fastapi import FastAPI
import uvicorn
app = FastAPI()
async def sort(word_count_dict: dict):
sorted_word_count = sorted(word_count_dict["word_count"].items(), key=lambda x: x[1], reverse=True)
top_10_words = sorted_word_count[:10]
return {"top_10_words": top_10_words}
if __name__ == "__main__":
||||, host="", port= 7772)
@ -1,13 +0,0 @@
from fastapi import FastAPI
from cppy.cp_util import *
import uvicorn
app = FastAPI()
async def tokenize(text: str):
words = extract_str_words(text)
return {"words": words}
if __name__ == "__main__":
||||, host="", port= 7770)
@ -1,5 +0,0 @@
;; Options: plugins/f1.pyc, plugins/f2.pyc
frequencies = plugins/f2.pyc
@ -1,30 +0,0 @@
import configparser, importlib.machinery
from cppy.cp_util import *
class PluginManager:
def __init__(self):
self.plugins = {}
def load_plugins(self):
_dir = os.path.dirname(os.path.abspath(__file__))
config = configparser.ConfigParser()
frequencies_plugin = config.get("Plugins", "frequencies")
# 加载插件
self.plugins['word_freqs'] = importlib.machinery.SourcelessFileLoader('', frequencies_plugin).load_module()
def get_plugin(self, name):
return self.plugins.get(name)
# 创建 PluginManager 实例
plugin_manager = PluginManager()
wordlist = extract_file_words(testfilepath) # 提取文件中的单词
word_freqs = plugin_manager.get_plugin('word_freqs').top_word(wordlist) # 调用实例方法
print_word_freqs(word_freqs) # 打印词频
@ -1,28 +0,0 @@
import py_compile
import os
import shutil
# 设置源目录和目标目录
source_dir = os.path.join(os.path.dirname(__file__), '__pycache__') # 当前目录下的 __pycache__ 目录
target_dir = os.path.join(os.path.dirname(__file__), '..', 'plugins') # 上一级目录下的 plugins 目录
# 确保目标目录存在
os.makedirs(target_dir, exist_ok=True)
# 遍历源目录中的所有 .pyc 文件
for filename in os.listdir(source_dir):
if filename.endswith('.pyc'):
# 提取文件名的前两个字符
new_filename = filename[:2]
# 构建源文件和目标文件的完整路径
source_file = os.path.join(source_dir, filename)
target_file = os.path.join(target_dir, new_filename + '.pyc')
# 拷贝文件
shutil.copyfile(source_file, target_file)
# 删除原始文件
print(f"Copied {filename} to {target_file} and removed original file.")
@ -1,8 +0,0 @@
# -*- coding: utf-8 -*-
import collections
def top_word(word_list):
counts = collections.Counter( word_list )
return counts.most_common(10)
Binary file not shown.
@ -1,16 +0,0 @@
import cppy.cp_util as util
def extract_words(path_to_file:str) -> list:
return util.extract_file_words(path_to_file)
def frequencies( word_list:list ) -> dict :
return util.get_frequencies(word_list)
def sort(word_freq:dict) -> list :
return util.sort_dict(word_freq)
if __name__ == "__main__":
word_freqs = sort( frequencies(extract_words( util.testfilepath )) )
@ -1,36 +0,0 @@
from cppy.cp_util import *
from dataclasses import dataclass
from collections import Counter
import re
class WordFrequency:
text: str
stop_words: set = None
def __post_init__(self):
# 如果未提供停用词表
if self.stop_words is None:
self.stop_words = get_stopwords()
def tokenize(self):
# 分词并去除停用词
words = re.findall(r'\b\w+\b', self.text.lower())
filtered_words = [word for word in words if word not in self.stop_words and len(word)>2]
return filtered_words
def get_top_n(self, n=10):
# 计算词频
word_freqs = Counter(self.tokenize())
return word_freqs.most_common(n)
# 使用示例
if __name__ == '__main__':
# 创建WordFrequency实例
text = read_file()
word_freq = WordFrequency( text )
# 获取并打印词频
top_words = word_freq.get_top_n()
@ -1,25 +0,0 @@
from cppy.cp_util import *
def extractWords(path_to_file):
assert(type(path_to_file) is str), "Must be a string"
assert(path_to_file), "Must be a non-empty string"
return extract_file_words(path_to_file)
def frequencies(word_list):
assert(type(word_list) is list), "Must be a list"
assert(word_list != []), "Must be a non-empty list"
return get_frequencies(word_list)
def sort(word_freqs):
assert(type(word_freqs) is dict), "Must be a dictionary"
assert(word_freqs != {}), "Must be a non-empty dictionary"
return sort_dict(word_freqs)
if __name__ == '__main__':
word_freqs = sort(frequencies(extractWords( testfilepath )))
except Exception as e:
print(" Something wrong: {0}".format(e) )
@ -1,4 +0,0 @@
## 任务
@ -1,23 +0,0 @@
# 装饰器模式允许我们在不修改原有类的基础上,动态地添加额外的功能。
# 就增加功能来说,装饰器模式比生成子类更为灵活。
# 餐吧的顾客可以选择为他们的咖啡添加额外的调料。
class Beverage:
def __init__(self, description):
self.description = description
self.price = 0.0
def cost(self):
return self.price
class CondimentDecorator(Beverage): # 进行装饰
def __init__(self, beverage, description, price_increase):
self.beverage = beverage
self.description = f"{beverage.description}, {description}"
self.price_increase = price_increase
def cost(self):
return self.beverage.cost() + self.price_increase
# 使用装饰器模式
coffee = Beverage("Espresso")
coffee_with_chocolate = CondimentDecorator(coffee, "Chocolate", 0.50)
@ -0,0 +1,72 @@
import site
import os,re
import string,operator
# 变量
testfilename = 'test.txt'
testfilename = 'pride-and-prejudice.txt'
testfilename = 'Prey.txt'
db_filename = "tf.db"
site_packages = site.getsitepackages()
for package in site_packages:
if 'package' in package:
basePath = package
stopwordfilepath = os.path.join(basePath, 'cppy','data','stop_words.txt')
testfilepath = os.path.join(basePath, 'cppy','data',testfilename )
# 函数
def read_file(path_to_file):
with open(path_to_file,encoding='utf-8') as f:
data =
return data
def re_split( data ):
pattern = re.compile('[\W_]+')
data = pattern.sub(' ', data).lower()
return data.split()
def get_stopwords( path_to_file = stopwordfilepath ):
with open(path_to_file,encoding='utf-8') as f:
data =',')
return data
def extract_file_words(path_to_file):
word_list = re_split( read_file(path_to_file) )
stop_words = get_stopwords()
return [ w for w in word_list if ( not w in stop_words ) and len(w) >= 3 ]
def extract_str_words(data_str):
word_list = re_split( data_str )
stop_words = get_stopwords()
return [ w for w in word_list if ( not w in stop_words ) and len(w) >= 3 ]
def count_word(word, word_freqs, stopwords):
if word not in stopwords:
word_freqs[word] = word_freqs.get(word, 0) + 1
def get_frequencies(word_list):
word_freqs = {}
for word in word_list:
word_freqs[word] = word_freqs.get(word, 0) + 1
return word_freqs
def sort_dict (word_freq):
return sorted(word_freq.items(), key=operator.itemgetter(1), reverse=True)
# return sorted( word_freq, key=lambda x: x[1], reverse=True )
def print_word_freqs( word_freqs, n = 10):
for (w, c) in word_freqs[ :n ]:
print( w, '-', c )
def test():
print( 'cppy welcome' )
@ -1,14 +0,0 @@
## 代码为啥要这样写,我要这样写代码
A 代码模式
B 面向对象设计模式
C 高性能模式
@ -0,0 +1,45 @@
import sys, collections
from cppy.cp_util import *
class WordFrequenciesModel:
""" 模型:数据 """
def __init__(self, path_to_file):
def update(self, path_to_file):
self.freqs = collections.Counter( extract_file_words(path_to_file) )
except IOError:
print("File not found")
self.freqs = {}
class WordFrequenciesView:
""" 视图:数据展现 """
def __init__(self, model):
self._model = model
def render(self):
sorted_freqs = sort_dict(self._model.freqs)
class WordFrequencyController:
""" 控制:操作逻辑 """
def __init__(self, model, view):
self._model, self._view = model, view
def run(self):
while True:
print("Enter the file path (or 'q' to quit): ", file=sys.stderr, flush=True)
filename = sys.stdin.readline().strip()
if filename.lower() == 'q': break
m = WordFrequenciesModel( testfilepath )
v = WordFrequenciesView(m)
c = WordFrequencyController(m, v)
@ -0,0 +1,39 @@
import re, operator
from cppy.cp_util import *
def print_text(word_freqs, func):
def frequencies(word_list, func):
wf = get_frequencies(word_list)
func(wf, print_text)
def scan(str_data, func):
func(str_data.split(), frequencies)
def filter_chars(str_data, func):
pattern = re.compile('[\W_]+')
func(pattern.sub(' ', str_data), scan)
def remove_stop_words(word_list, func):
stop_words = get_stopwords()
func([w for w in word_list if not w in stop_words], sort)
def sort(wf, func):
func(sorted(wf.items(), key=operator.itemgetter(1), reverse=True), no_op)
def no_op(func):
def normalize(str_data, func):
func(str_data.lower(), remove_stop_words)
def read_file(path_to_file, func):
with open(path_to_file,encoding='utf-8') as f:
data =
func(data, normalize)
if __name__ == "__main__":
read_file(testfilepath, filter_chars)
@ -0,0 +1,53 @@
from collections import Counter
from cppy.cp_util import *
class DataStorageManager:
""" 数据模型 """
def __init__(self, path_to_file):
data = read_file(path_to_file)
self._data = re_split( data )
def words(self):
return self._data
class StopWordManager:
""" 停用词模型 """
def __init__(self):
self._stop_words = get_stopwords()
def is_stop_word(self, word):
return word in self._stop_words
class WordFrequencyManager:
""" 词频模型 """
def __init__(self):
self._word_freqs = Counter()
def increment_count(self, word):
self._word_freqs[word] += 1
def sorted(self):
return self._word_freqs.most_common()
class WordFrequencyController:
def __init__(self, path_to_file):
self._storage_manager = DataStorageManager(path_to_file)
self._stop_word_manager = StopWordManager()
self._word_freq_manager = WordFrequencyManager()
def run(self):
for w in self._storage_manager.words():
if not self._stop_word_manager.is_stop_word(w):
word_freqs = self._word_freq_manager.sorted()
if __name__ == '__main__':
@ -0,0 +1,41 @@
from cppy.cp_util import *
def extract_words(obj, path_to_file):
obj['data'] = re_split( read_file(path_to_file) )
def load_stop_words(obj):
obj['stop_words'] = get_stopwords()
def increment_count(obj, w):
obj['freqs'][w] = 1 if w not in obj['freqs'] else obj['freqs'][w]+1
data_storage_obj = {
'data' : [],
'init' : lambda path_to_file : extract_words(data_storage_obj, path_to_file),
'words' : lambda : data_storage_obj['data']
stop_words_obj = {
'stop_words' : [],
'init' : lambda : load_stop_words(stop_words_obj),
'is_stop_word' : lambda word : word in stop_words_obj['stop_words']
word_freqs_obj = {
'freqs' : {},
'increment_count' : lambda w : increment_count(word_freqs_obj, w),
'sorted' : lambda : sort_dict(word_freqs_obj['freqs'])
if __name__ == '__main__':
data_storage_obj['init']( testfilepath )
for w in data_storage_obj['words']():
if not stop_words_obj['is_stop_word'](w):
word_freqs = word_freqs_obj['sorted']()
for (w, c) in word_freqs[0:10]:
print(w, '-', c)
Some files were not shown because too many files have changed in this diff Show More
Reference in new issue