|
|
import site
|
|
|
import os, re, time
|
|
|
import string, operator
|
|
|
|
|
|
################################################################################
|
|
|
# 变量
|
|
|
################################################################################
|
|
|
testfilename = 'test.txt'
|
|
|
testfilename = 'pride-and-prejudice.txt'
|
|
|
testfilename = 'Prey.txt'
|
|
|
|
|
|
db_filename = "tf.db"
|
|
|
|
|
|
site_packages = site.getsitepackages()
|
|
|
for package in site_packages:
|
|
|
if 'package' in package:
|
|
|
basePath = package
|
|
|
stopwordfilepath = os.path.join(basePath, 'cppy', 'data', 'stop_words.txt')
|
|
|
testfilepath = os.path.join(basePath, 'cppy', 'data', testfilename)
|
|
|
|
|
|
|
|
|
################################################################################
|
|
|
# 项目函数
|
|
|
################################################################################
|
|
|
def read_file(path_to_file):
|
|
|
"""
|
|
|
读取指定文件的内容。
|
|
|
|
|
|
Args:
|
|
|
path_to_file (str): 文件路径。
|
|
|
|
|
|
Returns:
|
|
|
str: 文件内容。
|
|
|
"""
|
|
|
with open(path_to_file, encoding='utf-8') as f:
|
|
|
data = f.read()
|
|
|
return data
|
|
|
|
|
|
|
|
|
def re_split(data):
|
|
|
"""
|
|
|
使用正则表达式分割字符串,将非字母字符替换为空格,并将所有字符转换为小写。
|
|
|
|
|
|
Args:
|
|
|
data (str): 输入字符串。
|
|
|
|
|
|
Returns:
|
|
|
list: 分割后的单词列表。
|
|
|
"""
|
|
|
pattern = re.compile('[\W_]+')
|
|
|
data = pattern.sub(' ', data).lower()
|
|
|
return data.split()
|
|
|
|
|
|
|
|
|
def get_stopwords(path_to_file=stopwordfilepath):
|
|
|
"""
|
|
|
获取停用词列表。
|
|
|
|
|
|
Args:
|
|
|
path_to_file (str): 停用词文件路径,默认为 stopwordfilepath。
|
|
|
|
|
|
Returns:
|
|
|
list: 停用词列表。
|
|
|
"""
|
|
|
with open(path_to_file, encoding='utf-8') as f:
|
|
|
data = f.read().split(',')
|
|
|
data.extend(list(string.ascii_lowercase))
|
|
|
return data
|
|
|
|
|
|
|
|
|
def get_chunks(file_path=testfilepath, chunk_size=1000):
|
|
|
"""
|
|
|
将文件内容分割成多个块。
|
|
|
|
|
|
Args:
|
|
|
file_path (str): 文件路径,默认为 testfilepath。
|
|
|
chunk_size (int): 每个块的大小,默认为 1000。
|
|
|
|
|
|
Returns:
|
|
|
list: 分割后的块列表。
|
|
|
"""
|
|
|
content = re_split(read_file(file_path))
|
|
|
chunks = [
|
|
|
content[i:i + chunk_size] for i in range(0, len(content), chunk_size)
|
|
|
]
|
|
|
return chunks
|
|
|
|
|
|
|
|
|
def extract_file_words(path_to_file):
|
|
|
"""
|
|
|
提取文件中的单词,去除停用词和长度小于3的单词。
|
|
|
|
|
|
Args:
|
|
|
path_to_file (str): 文件路径。
|
|
|
|
|
|
Returns:
|
|
|
list: 提取后的单词列表。
|
|
|
"""
|
|
|
word_list = re_split(read_file(path_to_file))
|
|
|
stop_words = get_stopwords()
|
|
|
return [w for w in word_list if (w not in stop_words) and len(w) >= 3]
|
|
|
|
|
|
|
|
|
def extract_str_words(data_str):
|
|
|
"""
|
|
|
提取字符串中的单词,去除停用词和长度小于3的单词。
|
|
|
|
|
|
Args:
|
|
|
data_str (str): 输入字符串。
|
|
|
|
|
|
Returns:
|
|
|
list: 提取后的单词列表。
|
|
|
"""
|
|
|
word_list = re_split(data_str)
|
|
|
stop_words = get_stopwords()
|
|
|
return [w for w in word_list if (w not in stop_words) and len(w) >= 3]
|
|
|
|
|
|
|
|
|
def count_word(word, word_freqs, stopwords):
|
|
|
"""
|
|
|
统计单词频率。
|
|
|
|
|
|
Args:
|
|
|
word (str): 单词。
|
|
|
word_freqs (dict): 单词频率字典。
|
|
|
stopwords (list): 停用词列表。
|
|
|
"""
|
|
|
if word not in stopwords:
|
|
|
word_freqs[word] = word_freqs.get(word, 0) + 1
|
|
|
|
|
|
|
|
|
def get_frequencies(word_list):
|
|
|
"""
|
|
|
获取单词频率。
|
|
|
|
|
|
Args:
|
|
|
word_list (list): 单词列表。
|
|
|
|
|
|
Returns:
|
|
|
dict: 单词频率字典。
|
|
|
"""
|
|
|
word_freqs = {}
|
|
|
for word in word_list:
|
|
|
word_freqs[word] = word_freqs.get(word, 0) + 1
|
|
|
return word_freqs
|
|
|
|
|
|
|
|
|
def sort_dict(word_freq):
|
|
|
"""
|
|
|
对字典进行排序。
|
|
|
|
|
|
Args:
|
|
|
word_freq (dict): 单词频率字典。
|
|
|
|
|
|
Returns:
|
|
|
list: 排序后的单词频率列表。
|
|
|
"""
|
|
|
return sorted(word_freq.items(), key=operator.itemgetter(1), reverse=True)
|
|
|
|
|
|
|
|
|
def print_word_freqs(word_freqs, n=10):
|
|
|
"""
|
|
|
打印单词频率。
|
|
|
|
|
|
Args:
|
|
|
word_freqs (list): 单词频率列表。
|
|
|
n (int): 打印的单词数量,默认为 10。
|
|
|
"""
|
|
|
for (w, c) in word_freqs[:n]:
|
|
|
print(w, '-', c)
|
|
|
|
|
|
|
|
|
################################################################################
|
|
|
# 通用工具
|
|
|
################################################################################
|
|
|
|
|
|
|
|
|
def timing_decorator(func):
|
|
|
|
|
|
def wrapper(*args, **kwargs):
|
|
|
start_time = time.time() # 记录开始时间
|
|
|
result = func(*args, **kwargs) # 调用原始函数
|
|
|
end_time = time.time() # 记录结束时间
|
|
|
run_time = end_time - start_time # 计算运行时间
|
|
|
print(f"{func.__name__} 运行时间: {run_time*1000:.2f} 秒")
|
|
|
return result
|
|
|
|
|
|
return wrapper
|
|
|
|
|
|
|
|
|
def test():
|
|
|
print('cppy welcome')
|