|
|
|
|
@ -0,0 +1,149 @@
|
|
|
|
|
r"""
|
|
|
|
|
-弹幕统计.xlsx(原句/排名 /TopN三个工作表)
|
|
|
|
|
前8条弹幕.txt(TopN文本)
|
|
|
|
|
词云图.png
|
|
|
|
|
"""
|
|
|
|
|
import argparse
|
|
|
|
|
from pathlib import Path
|
|
|
|
|
from typing import List, Tuple, Dict
|
|
|
|
|
import pandas as pd
|
|
|
|
|
from collections import Counter
|
|
|
|
|
import re
|
|
|
|
|
|
|
|
|
|
# 词云依赖
|
|
|
|
|
try:
|
|
|
|
|
from wordcloud import WordCloud # type: ignore
|
|
|
|
|
except ImportError:
|
|
|
|
|
WordCloud = None #跳过词云生成
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def read_lines_from_file(f: Path) -> List[str]:
|
|
|
|
|
try:
|
|
|
|
|
return [line.strip() for line in f.read_text(encoding='utf-8', errors='ignore').splitlines()]
|
|
|
|
|
except Exception:
|
|
|
|
|
return []
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def collect_texts(input_path: Path) -> List[str]:
|
|
|
|
|
texts: List[str] = []
|
|
|
|
|
if input_path.is_dir():
|
|
|
|
|
for f in input_path.rglob('*.txt'):
|
|
|
|
|
texts.extend(read_lines_from_file(f))
|
|
|
|
|
else:
|
|
|
|
|
texts.extend(read_lines_from_file(input_path))
|
|
|
|
|
texts = [t for t in texts if t]
|
|
|
|
|
return texts
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def rank_exact_sentences(texts: List[str]) -> pd.DataFrame:
|
|
|
|
|
# 统计一样的弹幕的数量
|
|
|
|
|
if not texts:
|
|
|
|
|
return pd.DataFrame(columns=['原句', '数量', '排名'])
|
|
|
|
|
s = pd.Series(texts, name='原句')
|
|
|
|
|
vc = s.value_counts(dropna=False)
|
|
|
|
|
df = vc.rename('数量').reset_index().rename(columns={'index': '原句'})
|
|
|
|
|
# 添加到排名
|
|
|
|
|
df['排名'] = df['数量'].rank(method='dense', ascending=False).astype(int)
|
|
|
|
|
return df
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def write_excel(raw_texts: List[str], rank_df: pd.DataFrame, out_dir: Path, topn: int):
|
|
|
|
|
out_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
excel_path = out_dir / '弹幕统计.xlsx'
|
|
|
|
|
# 排名表的排序
|
|
|
|
|
rank_sorted = rank_df.sort_values(by=['数量', '原句'], ascending=[False, True])
|
|
|
|
|
with pd.ExcelWriter(excel_path, engine='openpyxl') as writer:
|
|
|
|
|
pd.DataFrame({'原句': raw_texts}).to_excel(writer, index=False, sheet_name='原句')
|
|
|
|
|
rank_sorted.to_excel(writer, index=False, sheet_name='排名')
|
|
|
|
|
rank_sorted.head(topn).to_excel(writer, index=False, sheet_name='TopN')
|
|
|
|
|
# 同时输出TopN的文本
|
|
|
|
|
top_txt = out_dir / '前8条弹幕.txt'
|
|
|
|
|
with top_txt.open('w', encoding='utf-8') as f:
|
|
|
|
|
for _, row in rank_sorted.head(topn).iterrows():
|
|
|
|
|
f.write(f"{int(row['数量'])}\t{row['原句']}\n")
|
|
|
|
|
return excel_path, top_txt
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _pick_chinese_font() -> str:
|
|
|
|
|
"""确定字体"""
|
|
|
|
|
candidates = [
|
|
|
|
|
r"C:\\Windows\\Fonts\\simhei.ttf", # 黑体
|
|
|
|
|
r"C:\\Windows\\Fonts\\msyh.ttc", # 微软雅黑
|
|
|
|
|
r"C:\\Windows\\Fonts\\simsun.ttc", # 宋体
|
|
|
|
|
]
|
|
|
|
|
for p in candidates:
|
|
|
|
|
if Path(p).exists():
|
|
|
|
|
return p
|
|
|
|
|
return ''
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _simple_tokenize_for_wc(texts: List[str]) -> Counter:
|
|
|
|
|
#词云用的简单切分
|
|
|
|
|
stop = set("""的 了 啊 吗 呢 吧 是 在 我 你 他 她 它 这 那 也 与 和 很 都 就 还 不 没 有 说 啊呀 哦 嗯 呃 啊? 吧? 吗? 呢? ? ! ! , 。 、 . , ~ ~ 哈 哈哈 啊啊 啊啊啊 666 233""".split())
|
|
|
|
|
joined = '\n'.join(texts)
|
|
|
|
|
tokens: List[str] = []
|
|
|
|
|
rough = re.split(r"[^\w\u4e00-\u9fff]+", joined)
|
|
|
|
|
for tk in rough:
|
|
|
|
|
tk = tk.strip()
|
|
|
|
|
if len(tk) < 2 or tk in stop:
|
|
|
|
|
continue
|
|
|
|
|
if re.fullmatch(r"\d+", tk):
|
|
|
|
|
continue
|
|
|
|
|
tokens.append(tk)
|
|
|
|
|
return Counter(tokens)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def generate_wordcloud(raw_texts: List[str], out_dir: Path) -> Path:
|
|
|
|
|
if WordCloud is None:
|
|
|
|
|
print('未安装wordcloud,已跳过词云生成。')
|
|
|
|
|
return Path()
|
|
|
|
|
out_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
freq = _simple_tokenize_for_wc(raw_texts)
|
|
|
|
|
if not freq:
|
|
|
|
|
print('词云:未得到有效分词,已跳过。')
|
|
|
|
|
return Path()
|
|
|
|
|
font_path = _pick_chinese_font() or None
|
|
|
|
|
wc = WordCloud(
|
|
|
|
|
width=1600,
|
|
|
|
|
height=900,
|
|
|
|
|
background_color='white',
|
|
|
|
|
font_path=font_path,
|
|
|
|
|
collocations=False,
|
|
|
|
|
prefer_horizontal=0.9,
|
|
|
|
|
)
|
|
|
|
|
wc.generate_from_frequencies(freq)
|
|
|
|
|
out_path = out_dir / '词云图.png'
|
|
|
|
|
wc.to_file(str(out_path))
|
|
|
|
|
return out_path
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def main():
|
|
|
|
|
parser = argparse.ArgumentParser()
|
|
|
|
|
parser.add_argument('-i', '--input', required=True, help='输入:txt 文件或目录(目录下将递归读取 *.txt)')
|
|
|
|
|
parser.add_argument('-o', '--out', required=True, help='输出目录')
|
|
|
|
|
parser.add_argument('-n', '--topn', type=int, default=8, help='TopN 的数量(默认 8)')
|
|
|
|
|
args = parser.parse_args()
|
|
|
|
|
|
|
|
|
|
input_path = Path(args.input)
|
|
|
|
|
out_dir = Path(args.out)
|
|
|
|
|
|
|
|
|
|
texts = collect_texts(input_path)
|
|
|
|
|
if not texts:
|
|
|
|
|
print('没有读取到任何弹幕文本,请检查路径。')
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
rank_df = rank_exact_sentences(texts)
|
|
|
|
|
excel, top_txt = write_excel(texts, rank_df, out_dir, args.topn)
|
|
|
|
|
wc_path = generate_wordcloud(texts, out_dir)
|
|
|
|
|
|
|
|
|
|
print('完成:')
|
|
|
|
|
print(f' 统计结果 Excel: {excel}')
|
|
|
|
|
print(f' 前{args.topn} 文本: {top_txt}')
|
|
|
|
|
if wc_path and wc_path.exists():
|
|
|
|
|
print(f' 词云图: {wc_path}')
|
|
|
|
|
print('TopN预览:')
|
|
|
|
|
print(rank_df.head(args.topn).to_string(index=False))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
|
|
main()
|