zj3D 9 months ago
commit e5dc492333

@ -0,0 +1,46 @@
import string
from cppy.cp_util import *
# 准备词和停用词表
word_freqs = []
with open( stopwordfilepath,encoding='utf-8' ) as f:
stop_words = f.read().split(',')
stop_words.extend(list(string.ascii_lowercase))
for line in open( testfilepath ,encoding='utf-8' ):
start_char = None
i = 0
for c in line:
if start_char == None:
if c.isalnum():
# 一个单词开始
start_char = i
else:
if not c.isalnum():
# 一个单词结束
found = False
word = line[start_char:i].lower()
# 跳过停用词
if word not in stop_words:
pair_index = 0
# 单词是否第一次出现
for pair in word_freqs:
if word == pair[0]:
pair[1] += 1
found = True
break
pair_index += 1
if not found:
word_freqs.append([word, 1])
elif len(word_freqs) > 1:
for n in reversed(range(pair_index)):
if word_freqs[pair_index][1] > word_freqs[n][1]:
# 交换
word_freqs[n], word_freqs[pair_index] = word_freqs[pair_index], word_freqs[n]
pair_index = n
# 重置开始标记
start_char = None
i += 1
for tf in word_freqs[0:10]:
print(tf[0], '-', tf[1])

@ -0,0 +1,31 @@
from cppy.cp_util import *
from collections import Counter
# 准备词和停用词表
stop_words = set(open(stopwordfilepath).read().split(','))
stop_words.update(list(string.ascii_lowercase))
# 读取文件并计算单词频率
word_freqs = Counter()
with open(testfilepath,encoding = 'utf8') as f:
for line_num, line in enumerate(f, 1):
start_char = None
for i, c in enumerate(line):
if start_char is None and c.isalnum():
start_char = i
elif start_char is not None and not c.isalnum():
word = line[start_char:i].lower()
if word not in stop_words:
word_freqs[word] += 1
start_char = None
# 打印前10个最常见的单词
for word, freq in word_freqs.most_common(10):
print(f"{word}-{freq}")
'''
相比 A01
使用collections.Counter来计数单词频率从而简化了代码并提高了效率
使用enumerate来获取行号和行内容使用set来存储停用词都有助于提高代码的性能和可读性
使用most_common方法来获取最常见的单词使输出更为简洁
'''

@ -0,0 +1,48 @@
import sys, re, operator, string, inspect
from cppy.cp_util import *
#
# The functions
#
def extract_words(path_to_file):
try:
with open(path_to_file, 'r', encoding='utf-8') as f:
str_data = f.read()
except IOError as e:
print(f"I/O error({e.errno}) when opening {path_to_file}: {e.strerror}")
return []
word_list = re.findall('\w+', str_data.lower())
return word_list
def remove_stop_words(word_list):
try:
stop_words = set(get_stopwords())
except IOError as e:
print(f"I/O error({e.errno}) when opening stops_words.txt: {e.strerror}")
return word_list
stop_words.update(string.ascii_lowercase)
return [w for w in word_list if w not in stop_words]
def frequencies(word_list):
if type(word_list) is not list or word_list == []: return {}
word_freqs = {}
for w in word_list:
if w in word_freqs:
word_freqs[w] += 1
else:
word_freqs[w] = 1
return word_freqs
def sort(word_freq):
if type(word_freq) is not dict or word_freq == {}: return []
return sorted(word_freq.items(), key=operator.itemgetter(1), reverse=True)
if __name__ == '__main__':
word_freqs = sort(frequencies(remove_stop_words(extract_words(testfilepath))))
print_word_freqs(word_freqs)

@ -0,0 +1,39 @@
from collections import Counter
from cppy.cp_util import *
def extract_words(path_to_file):
assert(type(path_to_file) is str), "I need a string!"
assert(path_to_file), "I need a non-empty string!"
try:
with open(path_to_file,encoding='utf-8') as f:
str_data = f.read()
except IOError as e:
print("I/O error({0}) when opening {1}: {2}! I quit!".format(e.errno, path_to_file, e.strerror))
raise e
return re_split(str_data)
def remove_stop_words(word_list):
assert(type(word_list) is list), "I need a list!"
try:
stop_words = get_stopwords()
except IOError as e:
print("I/O error({0}) opening stops_words.txt: {1}! I quit!".format(e.errno, e.strerror))
raise e
return [w for w in word_list if not w in stop_words]
def frequencies(word_list):
return Counter(word_list)
def sort(word_freq):
return word_freq.most_common()
if __name__ == '__main__':
word_freqs = sort(frequencies(remove_stop_words(extract_words(testfilepath))))
print_word_freqs(word_freqs)

@ -0,0 +1,25 @@
from cppy.cp_util import *
def extractWords(path_to_file):
assert(type(path_to_file) is str), "I need a string! I quit!"
assert(path_to_file), "I need a non-empty string! I quit!"
return extract_file_words(path_to_file)
def frequencies(word_list):
assert(type(word_list) is list), "I need a list! I quit!"
assert(word_list != []), "I need a non-empty list! I quit!"
return get_frequencies(word_list)
def sort(word_freqs):
assert(type(word_freqs) is dict), "I need a dictionary! I quit!"
assert(word_freqs != {}), "I need a non-empty dictionary! I quit!"
return sort_dict(word_freqs)
if __name__ == '__main__':
try:
word_freqs = sort(frequencies(extractWords( testfilepath )))
print_word_freqs(word_freqs)
except Exception as e:
print(" Something wrong: {0}".format(e) )

@ -0,0 +1,30 @@
from collections import Counter
from cppy.cp_util import *
class AcceptTypes:
def __init__(self, *args):
self._args = args
def __call__(self, f):
def wrapped_f(*args, **kwargs):
for i, arg_type in enumerate(self._args):
if not isinstance(args[i], arg_type):
raise TypeError(f"Argument {i} expected {arg_type}, got {type(args[i])}")
return f(*args, **kwargs)
return wrapped_f
@AcceptTypes(str)
def extract_words_(path_to_file):
return extract_file_words(path_to_file)
@AcceptTypes(list)
def frequencies_(word_list):
return Counter(word_list)
@AcceptTypes(Counter)
def sort_(word_freq):
return word_freq.most_common()
if __name__ == '__main__':
word_freqs = sort_(frequencies_(extract_words_( testfilepath )))
print_word_freqs(word_freqs)

@ -0,0 +1,27 @@
from functools import reduce
from cppy.cp_util import *
from collections import Counter
def partition(data_str, nlines):
lines = data_str.split('\n')
for i in range(0, len(lines), nlines):
yield '\n'.join(lines[i:i+nlines])
def split_words(data_str):
word_list = extract_str_words(data_str)
return Counter( word_list )
def count_words(pairs_list_1, pairs_list_2):
return pairs_list_1 + pairs_list_2
if __name__ == '__main__':
data = read_file(testfilepath)
# 使用 map 方法和 split_words 函数处理每个分区
splits = map(split_words, partition(data, 200))
splits_list = list(splits)
# 使用 reduce 和 count_words 函数统计所有分区的词频
word_freqs = sort_dict(reduce(count_words, splits_list, Counter()) )
print_word_freqs(word_freqs)

@ -0,0 +1,37 @@
from functools import reduce
from cppy.cp_util import *
#################################################
# Functions for map reduce
#################################################
def partition(data_str, nlines):
lines = data_str.split('\n')
for i in range(0, len(lines), nlines):
yield '\n'.join(lines[i:i+nlines])
def split_words(data_str):
words = extract_str_words(data_str)
return [ (w, 1) for w in words ]
def regroup(pairs_list):
mapping = {}
for pairs in pairs_list:
for p in pairs:
mapping[p[0]] = mapping.get(p[0], []) + [p]
return mapping
def count_words(mapping):
def add(x, y): return x+y
return ( mapping[0],
reduce(add, (pair[1] for pair in mapping[1]))
)
def sort (word_freq):
return sorted(word_freq, key=operator.itemgetter(1), reverse=True)
if __name__ == '__main__':
data = read_file(testfilepath)
splits = map(split_words, partition(data, 200))
splits_per_word = regroup(splits)
word_freqs = sort(map(count_words, splits_per_word.items()))
print_word_freqs(word_freqs)

@ -0,0 +1,40 @@
import re
import multiprocessing
from collections import Counter
from cppy.cp_util import *
stopwords = get_stopwords()
def process_chunk(chunk):
# 切词并过滤停用词
words = re.findall(r'\w+', chunk.lower())
words = [ word for word in words if word not in stopwords and len(word) > 2]
return Counter(words)
def merge_counts(counts_list):
# 合并多个Counter对象
total_counts = Counter()
for counts in counts_list:
total_counts += counts
return total_counts
if __name__ == '__main__':
# 读取文件内容
content = read_file(testfilepath)
# 分割文件内容为多个块,每个块由一个进程处理
chunk_size = 1000 # 可以根据实际情况调整块大小
chunks = [content[i:i+chunk_size] for i in range(0, len(content), chunk_size)]
# 使用多进程处理每个块
pool = multiprocessing.Pool(processes=multiprocessing.cpu_count())
counts_list = pool.map(process_chunk, chunks)
pool.close()
pool.join()
# 合并计数
total_counts = merge_counts(counts_list)
# 输出最高频的n个词
for word, count in total_counts.most_common(10):
print(f"{word}-- {count}")

@ -0,0 +1,50 @@
import threading
import queue
from cppy.cp_util import *
# 处理单词
def process_words(word_space, freq_space, stopwords):
word_freqs = {}
while True:
try:
word = word_space.get(timeout=1)
except queue.Empty:
break
count_word(word, word_freqs, stopwords)
freq_space.put(word_freqs)
# 创建并启动线程
def start_threads(word_space, freq_space, stopwords):
workers = []
for i in range(5):
worker = threading.Thread(target=process_words,
args=(word_space, freq_space, stopwords))
worker.start()
workers.append(worker)
return workers
if __name__ == "__main__":
stopwords = get_stopwords()
word_space = queue.Queue()
freq_space = queue.Queue()
# 将数据压入 word_space
for word in extract_file_words(testfilepath):
word_space.put(word)
# 创建并启动线程
workers = start_threads(word_space, freq_space, stopwords)
# 等待所有线程完成
for worker in workers: worker.join()
# 合并处理结果
word_freqs = {}
while not freq_space.empty():
freqs = freq_space.get()
for (k, v) in freqs.items():
word_freqs[k] = word_freqs.get(k,0) + v
# 打印
print_word_freqs ( sort_dict (word_freqs) )

@ -0,0 +1,37 @@
import threading, queue, operator
from cppy.cp_util import *
class WordFrequencyCounter:
def __init__(self, input_file):
self.word_space = queue.Queue()
self.freq_space = queue.Queue()
for word in extract_file_words(input_file):
self.word_space.put(word)
def process_words(self):
word_freqs = {}
while not self.word_space.empty():
try:
word = self.word_space.get(timeout=1)
word_freqs[word] = word_freqs.get(word, 0) + 1
except queue.Empty:
break
self.freq_space.put(word_freqs)
def run(self):
workers = [threading.Thread(target=self.process_words) for _ in range(5)]
for worker in workers: worker.start()
for worker in workers: worker.join()
word_freqs = {}
while not self.freq_space.empty():
freqs = self.freq_space.get()
for word, count in freqs.items():
word_freqs[word] = word_freqs.get(word, 0) + count
print_word_freqs ( sort_dict (word_freqs) )
if __name__ == '__main__':
counter = WordFrequencyCounter( testfilepath )
counter.run()

@ -0,0 +1,62 @@
import sqlite3, os.path
from cppy.cp_util import *
# 数据库表结构
TABLES = {
'documents': '''CREATE TABLE IF NOT EXISTS documents (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL
)''',
'words': '''CREATE TABLE IF NOT EXISTS words (
doc_id INTEGER NOT NULL,
value TEXT NOT NULL,
FOREIGN KEY (doc_id) REFERENCES documents (id)
)''',
'characters': '''CREATE TABLE IF NOT EXISTS characters (
word_id INTEGER NOT NULL,
value TEXT NOT NULL,
FOREIGN KEY (word_id) REFERENCES words (id)
)'''
}
# 创建数据库表
def create_db_schema(connection):
for table, sql in TABLES.items():
c = connection.cursor()
c.execute(sql)
connection.commit()
c.close()
def load_file_into_database(path_to_file, connection):
words = extract_file_words( path_to_file )
c = connection.cursor()
c.execute("INSERT INTO documents (name) VALUES (?)", (path_to_file,))
doc_id = c.lastrowid
for w in words:
c.execute("INSERT INTO words (doc_id, value) VALUES (?, ?)", (doc_id, w))
word_id = c.lastrowid
for char in w:
c.execute("INSERT INTO characters (word_id, value) VALUES (?, ?)", (word_id, char))
connection.commit()
c.close()
# 建数据库,处理数据入库
db_path = 'tfdb'
if not os.path.isfile(db_path):
with sqlite3.connect(db_path) as connection:
create_db_schema(connection)
load_file_into_database(testfilepath, connection)
# 查询输出
with sqlite3.connect(db_path) as connection:
c = connection.cursor()
c.execute("SELECT value, COUNT(*) as C FROM words GROUP BY value ORDER BY C DESC LIMIT 10")
for row in c.fetchall():
print(row[0], '-', row[1])

Binary file not shown.

@ -0,0 +1,109 @@
import sys, os, string
from cppy.cp_util import *
def touchopen(filename, *args, **kwargs):
try:
os.remove(filename)
except OSError:
pass
open(filename, "a",encoding='utf-8').close() # "touch" file
return open(filename, *args, **kwargs)
# The constrained memory should have no more than 1024*n cells
data = []
n = 10
f = open( stopwordfilepath,encoding='utf-8' )
data = [f.read(1024*n).split(',')] # data[0] holds the stop words
f.close()
data.append([]) # data[1] is line (max 80 characters)
data.append(None) # data[2] is index of the start_char of word
data.append(0) # data[3] is index on characters, i = 0
data.append(False) # data[4] is flag indicating if word was found
data.append('') # data[5] is the word
data.append('') # data[6] is word,NNNN
data.append(0) # data[7] is frequency
# Open the secondary memory
word_freqs = touchopen('word_freqs', 'rb+')
# Open the input file
f = open( testfilepath , 'r',encoding='utf-8')
# Loop over input file's lines
while True:
print('.',end='',flush = True)
data[1] = [f.readline()]
if data[1] == ['']: # end of input file
break
if data[1][0][len(data[1][0])-1] != '\n': # If it does not end with \n
data[1][0] = data[1][0] + '\n' # Add \n
data[2] = None
data[3] = 0
# Loop over characters in the line
for c in data[1][0]: # elimination of symbol c is exercise
if data[2] == None:
if c.isalnum():
# We found the start of a word
data[2] = data[3]
else:
if not c.isalnum():
# We found the end of a word. Process it
data[4] = False
data[5] = data[1][0][data[2]:data[3]].lower()
# Ignore words with len < 2, and stop words
if len(data[5]) >= 2 and data[5] not in data[0]:
# Let's see if it already exists
while True:
data[6] = str(word_freqs.readline().strip(), 'utf-8')
if data[6] == '':
break;
data[7] = int(data[6].split(',')[1])
# word, no white space
data[6] = data[6].split(',')[0].strip()
if data[5] == data[6]:
data[7] += 1
data[4] = True
break
if not data[4]:
word_freqs.seek(0, 1) # Needed in Windows
word_freqs.write(bytes("%20s,%04d\n" % (data[5], 1), 'utf-8'))
else:
word_freqs.seek(-26, 1)
word_freqs.write(bytes("%20s,%04d\n" % (data[5], data[7]), 'utf-8'))
word_freqs.seek(0,0)
# Let's reset
data[2] = None
data[3] += 1
# We're done with the input file
f.close()
word_freqs.flush()
# PART 2
# Now we need to find the 25 most frequently occurring words.
# We don't need anything from the previous values in memory
del data[:]
# Let's use the first 25 entries for the top 25 words
data = data + [[]]*(25 - len(data))
data.append('') # data[25] is word,freq from file
data.append(0) # data[26] is freq
# Loop over secondary memory file
while True:
data[25] = str(word_freqs.readline().strip(), 'utf-8')
if data[25] == '': # EOF
break
data[26] = int(data[25].split(',')[1]) # Read it as integer
data[25] = data[25].split(',')[0].strip() # word
# Check if this word has more counts than the ones in memory
for i in range(25): # elimination of symbol i is exercise
if data[i] == [] or data[i][1] < data[26]:
data.insert(i, [data[25], data[26]])
del data[26] # delete the last element
break
for tf in data[0:10]:
if len(tf) == 2:
print(tf[0], '-', tf[1])
word_freqs.close()

File diff suppressed because it is too large Load Diff

@ -0,0 +1,85 @@
import operator, string
from collections import defaultdict
from cppy.cp_util import *
#
# Framework
#
class WordFrequencyFramework:
def __init__(self):
self._load_event_handlers = []
self._dowork_event_handlers = []
self._end_event_handlers = []
def register_for_load_event(self, handler):
self._load_event_handlers.append(handler)
def register_for_dowork_event(self, handler):
self._dowork_event_handlers.append(handler)
def register_for_end_event(self, handler):
self._end_event_handlers.append(handler)
def run(self, path_to_file):
for h in self._load_event_handlers: h(path_to_file)
for h in self._dowork_event_handlers: h()
for h in self._end_event_handlers: h()
#
# 功能组件
#
class DataStorage:
""" Models the contents of the file """
_data = ''
_stop_word_filter = None
_word_event_handlers = []
def __init__(self, wfapp, stop_word_filter):
self._stop_word_filter = stop_word_filter
wfapp.register_for_load_event(self.__load)
wfapp.register_for_dowork_event(self.__produce_words)
def __load(self, path_to_file):
self._data = re_split( read_file(path_to_file) )
def __produce_words(self):
for w in self._data:
if not self._stop_word_filter.is_stop_word(w):
for h in self._word_event_handlers:
h(w)
def register_for_word_event(self, handler):
self._word_event_handlers.append(handler)
class StopWordFilter:
""" Models the stop word filter """
_stop_words = []
def __init__(self, wfapp):
wfapp.register_for_load_event(self.__load)
def __load(self, ignore):
self._stop_words = get_stopwords()
def is_stop_word(self, word):
return word in self._stop_words
class WordFrequencyCounter:
def __init__(self, wfapp, data_storage):
self._word_freqs = defaultdict(int)
data_storage.register_for_word_event(self.__increment_count)
wfapp.register_for_end_event(self.__print_freqs)
def __increment_count(self, word):
self._word_freqs[word] += 1
def __print_freqs(self):
print_word_freqs ( sort_dict (self._word_freqs) )
if __name__ == '__main__':
wfapp = WordFrequencyFramework()
stop_word_filter = StopWordFilter(wfapp)
data_storage = DataStorage(wfapp, stop_word_filter)
word_freq_counter = WordFrequencyCounter(wfapp, data_storage)
wfapp.run(testfilepath)

@ -0,0 +1,87 @@
from cppy.cp_util import *
#################################################
# The event management
#################################################
class EventManager:
def __init__(self):
self._subscriptions = {}
def subscribe(self, event_type, handler):
self._subscriptions.setdefault(event_type, []).append(handler)
def publish(self, event):
event_type = event[0]
for h in self._subscriptions.get(event_type, []):
h(event)
#################################################
# The application entities
#################################################
class DataStorage:
""" Models the contents of the file """
def __init__(self, event_manager):
self._event_manager = event_manager
self._event_manager.subscribe('load', self.load)
self._event_manager.subscribe('start', self.produce_words)
def load(self, event):
self._data = extract_file_words( event[1] )
def produce_words(self, event):
for w in self._data:
self._event_manager.publish(('word', w))
self._event_manager.publish(('eof', None))
class StopWordFilter:
""" Models the stop word filter """
def __init__(self, event_manager):
self._stop_words = []
self._event_manager = event_manager
self._event_manager.subscribe('load', self.load)
self._event_manager.subscribe('word', self.is_stop_word)
def load(self, event):
self._stop_words = get_stopwords()
def is_stop_word(self, event):
word = event[1]
if word not in self._stop_words:
self._event_manager.publish(('valid_word', word))
class WordFrequencyCounter:
""" Keeps the word frequency data """
def __init__(self, event_manager):
self._word_freqs = {}
self._event_manager = event_manager
self._event_manager.subscribe('valid_word', self.increment_count)
self._event_manager.subscribe('print', self.print_freqs)
def increment_count(self, event):
word = event[1]
self._word_freqs[word] = self._word_freqs.get(word, 0) + 1
def print_freqs(self, event):
print_word_freqs ( sort_dict (self._word_freqs) )
class WordFrequencyApplication:
def __init__(self, event_manager):
self._event_manager = event_manager
self._event_manager.subscribe('run', self.run)
self._event_manager.subscribe('eof', self.stop)
def run(self, event):
path_to_file = event[1]
self._event_manager.publish(('load', path_to_file))
self._event_manager.publish(('start', None))
def stop(self, event):
self._event_manager.publish(('print', None))
if __name__ == "__main__":
em = EventManager()
DataStorage(em), StopWordFilter(em), WordFrequencyCounter(em)
WordFrequencyApplication(em)
em.publish(('run', testfilepath ))

@ -0,0 +1,104 @@
import sys, re, operator, string
from cppy.cp_util import *
class DataStorageManager():
""" Models the contents of the file """
def __init__(self):
self._data = []
def dispatch(self, message):
if message[0] == 'init':
return self._init(message[1])
elif message[0] == 'words':
return self._words()
else:
raise Exception("Message not understood " + message[0])
# 使用内省的写法
'''
def dispatch(self, message):
method_name = '_' + message[0]
if hasattr(self, method_name):
method = getattr(self, method_name)
return method(*message[1:])
else:
raise ValueError(f"DataStorageManager doesn't understand message {message[0]}")
'''
def _init(self, path_to_file):
self._data = re.findall('\w+', read_file(path_to_file).lower())
def _words(self):
return self._data
class StopWordManager():
""" Models the stop word filter """
_stop_words = []
def dispatch(self, message):
if message[0] == 'init':
return self._init()
elif message[0] == 'is_stop_word':
return self._is_stop_word(message[1])
else:
raise Exception("Message not understood " + message[0])
def _init(self):
self._stop_words = get_stopwords()
def _is_stop_word(self, word):
return word in self._stop_words
class WordFrequencyManager():
""" Keeps the word frequency data """
_word_freqs = {}
def dispatch(self, message):
if message[0] == 'increment_count':
return self._increment_count(message[1])
elif message[0] == 'sorted':
return self._sorted()
else:
raise Exception("Message not understood " + message[0])
def _increment_count(self, word):
if word in self._word_freqs:
self._word_freqs[word] += 1
else:
self._word_freqs[word] = 1
def _sorted(self):
return sorted(self._word_freqs.items(), key=operator.itemgetter(1), reverse=True)
class WordFrequencyController():
def dispatch(self, message):
if message[0] == 'init':
return self._init(message[1])
elif message[0] == 'run':
return self._run()
else:
raise Exception("Message not understood " + message[0])
def _init(self, path_to_file):
self._storage_manager = DataStorageManager()
self._stop_word_manager = StopWordManager()
self._word_freq_manager = WordFrequencyManager()
self._storage_manager.dispatch(['init', path_to_file])
self._stop_word_manager.dispatch(['init'])
def _run(self):
for w in self._storage_manager.dispatch(['words']):
if not self._stop_word_manager.dispatch(['is_stop_word', w]):
self._word_freq_manager.dispatch(['increment_count', w])
word_freqs = self._word_freq_manager.dispatch(['sorted'])
for (w, c) in word_freqs[0:10]:
print(w, '-', c)
if __name__ == '__main__':
wfcontroller = WordFrequencyController()
wfcontroller.dispatch(['init', testfilepath])
wfcontroller.dispatch(['run'])

@ -0,0 +1,120 @@
from threading import Thread
from queue import Queue
from cppy.cp_util import *
class ActiveWFObject(Thread):
def __init__(self):
super().__init__()
self.queue = Queue()
self._stopMe = False
self.start()
def run(self):
while not self._stopMe:
message = self.queue.get()
self._dispatch(message)
if message[0] == 'die':
self._stopMe = True
def send(receiver, message):
receiver.queue.put(message)
class DataStorageManager(ActiveWFObject):
""" Models the contents of the file """
_data = ''
def _dispatch(self, message):
if message[0] == 'init':
self._init(message[1:])
elif message[0] == 'send_word_freqs':
self._process_words(message[1:])
else:
# forward
send(self._stop_word_manager, message)
def _init(self, message):
path_to_file = message[0]
self._stop_word_manager = message[1]
self._data = extract_file_words(path_to_file)
def _process_words(self, message):
recipient = message[0]
for w in self._data:
send(self._stop_word_manager, ['filter', w])
send(self._stop_word_manager, ['top10', recipient])
class StopWordManager(ActiveWFObject):
""" Models the stop word filter """
_stop_words = []
def _dispatch(self, message):
if message[0] == 'init':
self._init(message[1:])
elif message[0] == 'filter':
return self._filter(message[1:])
else:
# forward
send(self._word_freqs_manager, message)
def _init(self, message):
self._stop_words = get_stopwords()
self._word_freqs_manager = message[0]
def _filter(self, message):
word = message[0]
if word not in self._stop_words:
send(self._word_freqs_manager, ['word', word])
class WordFrequencyManager(ActiveWFObject):
""" Keeps the word frequency data """
_word_freqs = {}
def _dispatch(self, message):
if message[0] == 'word':
self._increment_count(message[1:])
elif message[0] == 'top10':
self._top10(message[1:])
def _increment_count(self, message):
word, = message
self._word_freqs[word] = self._word_freqs.get(word, 0) + 1
def _top10(self, message):
recipient = message[0]
freqs_sorted = sort_dict ( self._word_freqs )
send(recipient, ['top10', freqs_sorted])
class WordFrequencyController(ActiveWFObject):
def _dispatch(self, message):
if message[0] == 'run':
self._run(message[1:])
elif message[0] == 'top10':
self._display(message[1:])
else:
raise Exception("Message not understood " + message[0])
def _run(self, message):
self._storage_manager, = message
send(self._storage_manager, ['send_word_freqs', self])
def _display(self, message):
word_freqs, = message
print_word_freqs( word_freqs)
send(self._storage_manager, ['die'])
self._stopMe = True
if __name__ == '__main__':
word_freq_manager = WordFrequencyManager()
stop_word_manager = StopWordManager()
storage_manager = DataStorageManager()
send(stop_word_manager, ['init', word_freq_manager])
send(storage_manager, ['init', testfilepath, stop_word_manager])
wfcontroller = WordFrequencyController()
send(wfcontroller, ['run', storage_manager])
# Wait for the active objects to finish
[t.join() for t in [word_freq_manager, stop_word_manager, storage_manager, wfcontroller]]

@ -0,0 +1,34 @@
from cppy.cp_util import *
class Calculator:
def frequencies(self,word_list):
return get_frequencies(word_list)
def sort(self,word_freq):
return sort_dict( word_freq)
def print_all(self,word_freqs):
print_word_freqs(word_freqs[1:])
##########################################
# 应用框架
##########################################
def call_method(obj, method_name, *args):
# 使用内省的 getattr 函数动态获取 obj 对象的 method_name 方法
method = getattr(obj, method_name, None)
if method:
return method(*args) # 动态调用方法
else:
return "Method not found."
if __name__ == '__main__':
# 流水线处理方法清单
method_names = ' frequencies sort print_all'
data = extract_file_words( testfilepath )
calc = Calculator()
for method_name in method_names.split():
data = call_method( calc,method_name,data )

@ -0,0 +1,37 @@
import cppy.cp_util as util
##########################################
# 工具类
##########################################
class wordsTaskHandler:
def handle(self,path_to_file):
return util.extract_file_words(path_to_file)
class frequenciesTaskHandler:
def handle(self,word_list):
return util.get_frequencies(word_list)
class sortTaskHandler:
def handle(self,word_freq):
return util.sort_dict(word_freq)
##########################################
# 应用框架
##########################################
def handle_task(task_type,*args):
handler_class_name = f"{task_type}TaskHandler" # 构建处理器类名
# 使用globals()获取当前全局符号表
handler_class = globals().get(handler_class_name)
if handler_class:
handler = handler_class() # 实例化处理器类
return handler.handle(*args) # 调用处理方法
else:
print(f"No handler found for task type: {task_type}")
if __name__ == '__main__':
word_list = handle_task("words",util.testfilepath)
word_freq = handle_task("frequencies",word_list)
word_sort = handle_task("sort",word_freq)
util.print_word_freqs(word_sort)

@ -0,0 +1,18 @@
import cppy.cp_util as util
import operator
# 工具函数
def extract_words(path_to_file):
return util.extract_file_words(path_to_file)
def frequencies(word_list):
return util.get_frequencies(word_list)
s = ' lambda word_freq: '+ \
' sorted(word_freq.items(), key=operator.itemgetter(1), reverse=True) '
exec( 'mysort = ' + s ) # s 可以是读文件,也可以网络发过来的代码
if __name__ == '__main__':
word_freq = frequencies(extract_words( util.testfilepath ))
word_freq = locals()['mysort'](word_freq)
util.print_word_freqs(word_freq)

@ -0,0 +1,27 @@
from cppy.cp_util import *
# 框架类
class TFFlowcls:
def __init__(self, v):
self._value = v
def bind(self, func):
self._value = func(self._value)
return self
def over(self):
print(self._value)
def top10_freqs(word_freqs):
top10 = "\n".join(f"{word} - {count}" for word, count in word_freqs[:10])
return top10
if __name__ == "__main__":
TFFlowcls( testfilepath )\
.bind(extract_file_words)\
.bind(get_frequencies)\
.bind(sort_dict)\
.bind(top10_freqs)\
.over()

@ -0,0 +1,64 @@
from cppy.cp_util import *
#
# 框架类
#
class TFFlowAll:
def __init__(self, func):
self._funcs = [func]
def bind(self, func):
self._funcs.append(func)
return self
def execute(self):
def is_callable(obj):
"""Check if an object is callable."""
return hasattr(obj, '__call__')
def call_if_possible(obj):
"""Call the object if it's callable, otherwise return it as is."""
return obj() if is_callable(obj) else obj
# Initialize the value to a no-op lambda function
value = lambda: None
for func in self._funcs:
value = call_if_possible(func(value))
print(call_if_possible(value))
#
# 工具函数
#
def get_input(arg):
def _f():
return testfilepath
return _f
def extractwords(path_to_file):
def _f():
return extract_file_words(path_to_file)
return _f
def frequencies(word_list):
def _f():
return get_frequencies(word_list)
return _f
def sort(word_freq):
def _f():
return sort_dict(word_freq)
return _f
def top10_freqs(word_freqs):
def _f():
return '\n'.join(f"{word} - {freq}" for word, freq in word_freqs[:10])
return _f
if __name__ == "__main__":
TFFlowAll(get_input)\
.bind(extractwords)\
.bind(frequencies)\
.bind(sort)\
.bind(top10_freqs)\
.execute()

@ -0,0 +1,49 @@
import threading
from collections import Counter
from cppy.cp_util import *
"""
把切分数据片段分给多线程改为分配多个文件给多个线程有IO操作就能看到效果了
"""
stop_words = get_stopwords()
# 定义一个函数来计算每个线程的词频
def count_words(start, end, text, result_index, results):
words = re_split( text[start:end] )
words = [w for w in words if not w in stop_words]
result = Counter(words)
results[result_index] = result
if __name__ == '__main__':
# 读取文件内容
text = read_file(testfilepath)
# 确定线程数量
num_threads = 4
text_length = len(text)
chunk_size = text_length // num_threads
# 存储每个线程的结果
results = [None] * num_threads
threads = []
# 创建并启动线程
for i in range(num_threads):
start = i * chunk_size
# 确保最后一个线程能够读取文件的末尾
end = text_length if i == num_threads - 1 else (i + 1) * chunk_size
t = threading.Thread(target=count_words, args=(start, end, text, i, results))
threads.append(t)
t.start()
# 等待所有线程完成
for t in threads: t.join()
# 合并结果
total_count = Counter()
for result in results: total_count += result
# 打印词频最高的10个单词
for w,c in total_count.most_common(10):
print(w, '--',c)

@ -0,0 +1,45 @@
from cppy.cp_util import *
###########################################
# 生成器
###########################################
def characters(filename): # 弹出一行
for line in open(filename,encoding='utf-8'):
for c in line:
yield c
def all_words(filename): # 弹出一个词
start_char = True
for c in characters(filename):
if start_char == True:
word = ""
if c.isalnum(): # start of a word
word = c.lower()
start_char = False
else:
pass
else:
if c.isalnum():
word += c.lower() # end of word, emit it
else:
start_char = True
yield word
def non_stop_words(filename, stopwords):
for w in all_words(filename):
if not w in stopwords:
yield w # 弹出一个审核过的词
if __name__ == "__main__":
stopwords = get_stopwords()
freqs = {}
for word in non_stop_words(testfilepath,stopwords):
freqs[word] = freqs.get(word, 0) + 1
data = sort_dict(freqs)
print_word_freqs(data)

@ -0,0 +1,33 @@
import time
import cppy.cp_util as util
# 工具函数
def extract_words(path_to_file):
return util.extract_file_words(path_to_file)
def frequencies(word_list):
return util.get_frequencies(word_list)
def sort(word_freq):
return util.sort_dict(word_freq)
# 闭包
def profile(f):
def profilewrapper(*arg, **kw):
start_time = time.time()
ret_value = f(*arg, **kw)
elapsed = time.time() - start_time
print(f"{f.__name__} took {elapsed} s")
return ret_value
return profilewrapper
# 装饰
tracked_functions = [extract_words, frequencies, sort]
for func in tracked_functions:
globals()[func.__name__] = profile(func) # 自省
if __name__ == "__main__":
word_freqs = sort( frequencies(extract_words( util.testfilepath )) )
util.print_word_freqs(word_freqs)

@ -0,0 +1,2 @@
" my Some sure acquaintance or other, my dear, sure,other I suppose; I am sure I do not
know. sure "

@ -0,0 +1,27 @@
import sys
from cppy.cp_util import *
script_dir = os.path.dirname(os.path.abspath(__file__))
testfile = os.path.join(script_dir, 'test.txt')
stop_words = get_stopwords()
# 如果崩溃,把 5000 改下
RECURSION_LIMIT = 5000
sys.setrecursionlimit( RECURSION_LIMIT )
def count( i,chunks, stopwords, wordfreqs):
if i < 0 : return
for word in chunks[i]:
if word not in stopwords:
wordfreqs[word] = wordfreqs.get(word, 0) + 1
count( i-1, chunks,stopwords, wordfreqs )
word_list = re_split( open(testfile,encoding='utf-8').read() )
filesize = len( word_list )
chunk_size = ( filesize // RECURSION_LIMIT ) + 1
chunks = [ word_list[ x*chunk_size:(x+1)*RECURSION_LIMIT ]
for x in range(chunk_size) ]
word_freqs = {}
count( chunk_size -1 ,chunks, stop_words, word_freqs )
print_word_freqs(sort_dict(word_freqs))
Loading…
Cancel
Save