dev
zj3D 8 months ago
parent fe94d8ed1b
commit b86f626e94

@ -1,6 +1,8 @@
import sys
from cppy.cp_util import *
## 切分任务这个工作,可以统一为一个通用函数。做成一个生成器
script_dir = os.path.dirname(os.path.abspath(__file__))
testfile = os.path.join(script_dir, 'test.txt')
stop_words = get_stopwords()

@ -2,6 +2,7 @@ import concurrent.futures
from collections import Counter
import cppy.cp_util as util
# 价值不大,就是多线程的一个表现,说明松耦合不如消息驱动的组件
class WordFrequencyAgent:
def __init__(self, words):

@ -1,109 +0,0 @@
import sys, os, string
from cppy.cp_util import *
def touchopen(filename, *args, **kwargs):
try:
os.remove(filename)
except OSError:
pass
open(filename, "a",encoding='utf-8').close() # "touch" file
return open(filename, *args, **kwargs)
# The constrained memory should have no more than 1024*n cells
data = []
n = 10
f = open( stopwordfilepath,encoding='utf-8' )
data = [f.read(1024*n).split(',')] # data[0] holds the stop words
f.close()
data.append([]) # data[1] is line (max 80 characters)
data.append(None) # data[2] is index of the start_char of word
data.append(0) # data[3] is index on characters, i = 0
data.append(False) # data[4] is flag indicating if word was found
data.append('') # data[5] is the word
data.append('') # data[6] is word,NNNN
data.append(0) # data[7] is frequency
# Open the secondary memory
word_freqs = touchopen('word_freqs', 'rb+')
# Open the input file
f = open( testfilepath , 'r',encoding='utf-8')
# Loop over input file's lines
while True:
print('.',end='',flush = True)
data[1] = [f.readline()]
if data[1] == ['']: # end of input file
break
if data[1][0][len(data[1][0])-1] != '\n': # If it does not end with \n
data[1][0] = data[1][0] + '\n' # Add \n
data[2] = None
data[3] = 0
# Loop over characters in the line
for c in data[1][0]: # elimination of symbol c is exercise
if data[2] == None:
if c.isalnum():
# We found the start of a word
data[2] = data[3]
else:
if not c.isalnum():
# We found the end of a word. Process it
data[4] = False
data[5] = data[1][0][data[2]:data[3]].lower()
# Ignore words with len < 2, and stop words
if len(data[5]) >= 2 and data[5] not in data[0]:
# Let's see if it already exists
while True:
data[6] = str(word_freqs.readline().strip(), 'utf-8')
if data[6] == '':
break;
data[7] = int(data[6].split(',')[1])
# word, no white space
data[6] = data[6].split(',')[0].strip()
if data[5] == data[6]:
data[7] += 1
data[4] = True
break
if not data[4]:
word_freqs.seek(0, 1) # Needed in Windows
word_freqs.write(bytes("%20s,%04d\n" % (data[5], 1), 'utf-8'))
else:
word_freqs.seek(-26, 1)
word_freqs.write(bytes("%20s,%04d\n" % (data[5], data[7]), 'utf-8'))
word_freqs.seek(0,0)
# Let's reset
data[2] = None
data[3] += 1
# We're done with the input file
f.close()
word_freqs.flush()
# PART 2
# Now we need to find the 25 most frequently occurring words.
# We don't need anything from the previous values in memory
del data[:]
# Let's use the first 25 entries for the top 25 words
data = data + [[]]*(25 - len(data))
data.append('') # data[25] is word,freq from file
data.append(0) # data[26] is freq
# Loop over secondary memory file
while True:
data[25] = str(word_freqs.readline().strip(), 'utf-8')
if data[25] == '': # EOF
break
data[26] = int(data[25].split(',')[1]) # Read it as integer
data[25] = data[25].split(',')[0].strip() # word
# Check if this word has more counts than the ones in memory
for i in range(25): # elimination of symbol i is exercise
if data[i] == [] or data[i][1] < data[26]:
data.insert(i, [data[25], data[26]])
del data[26] # delete the last element
break
for tf in data[0:10]:
if len(tf) == 2:
print(tf[0], '-', tf[1])
word_freqs.close()

@ -1,3 +1,10 @@
'''
每个组件提供注册消息接口和注册消息动作
把顺序的流程分解到各个组件内部实现
这样避免回传到中心控制器
这是一个示例性质的原型具体分布式环境下需要调整
'''
from collections import defaultdict
from cppy.cp_util import *
@ -81,4 +88,5 @@ if __name__ == '__main__':
stop_word_filter = StopWordFilter(wfapp)
data_storage = DataStorage(wfapp, stop_word_filter)
word_freq_counter = WordFrequencyCounter(wfapp, data_storage)
wfapp.run(testfilepath)

@ -1,4 +1,10 @@
from cppy.cp_util import *
'''
注册回调的一个变体
提供一个中心消息管理器统一管理消息的订阅和通知
这是一个示例性质的原型具体分布式环境下需要调整
'''
#################################################
# The event management

@ -1,4 +1,7 @@
import sys, re, operator, string
'''
依靠给各个组件的 dispatch 调用接口发指令来驱动所有工作
这是一个示例性质的原型具体环境下需要调整
'''
from cppy.cp_util import *
class DataStorageManager():
@ -13,19 +16,9 @@ class DataStorageManager():
return self._words()
else:
raise Exception("Message not understood " + message[0])
# 使用内省的写法
'''
def dispatch(self, message):
method_name = '_' + message[0]
if hasattr(self, method_name):
method = getattr(self, method_name)
return method(*message[1:])
else:
raise ValueError(f"DataStorageManager doesn't understand message {message[0]}")
'''
def _init(self, path_to_file):
self._data = re.findall('\w+', read_file(path_to_file).lower())
self._data = re_split( read_file(path_to_file) )
def _words(self):
return self._data
@ -63,13 +56,11 @@ class WordFrequencyManager():
raise Exception("Message not understood " + message[0])
def _increment_count(self, word):
if word in self._word_freqs:
self._word_freqs[word] += 1
else:
self._word_freqs[word] = 1
self._word_freqs[word] = self._word_freqs.get(word,0) + 1
def _sorted(self):
return sorted(self._word_freqs.items(), key=operator.itemgetter(1), reverse=True)
return sort_dict(self._word_freqs)
class WordFrequencyController():
@ -94,8 +85,7 @@ class WordFrequencyController():
self._word_freq_manager.dispatch(['increment_count', w])
word_freqs = self._word_freq_manager.dispatch(['sorted'])
for (w, c) in word_freqs[0:10]:
print(w, '-', c)
print_word_freqs(word_freqs)
if __name__ == '__main__':

@ -1,3 +1,9 @@
'''
依靠给各个不同线程组件的队列发指令来驱动所有工作比较繁琐
比较 01.py 的实现各个组件完全不能互操作仅依靠队列发消息进行协作
这是一个示例性质的原型具体分布式环境下需要调整
'''
from threading import Thread
from queue import Queue
from cppy.cp_util import *
@ -13,8 +19,8 @@ class ActiveWFObject(Thread):
while not self._stopMe:
message = self.queue.get()
self._dispatch(message)
if message[0] == 'die':
self._stopMe = True
if message[0] == 'over':
break
def send(receiver, message):
receiver.queue.put(message)
@ -33,15 +39,14 @@ class DataStorageManager(ActiveWFObject):
send(self._stop_word_manager, message)
def _init(self, message):
path_to_file = message[0]
self._data = extract_file_words(message[0])
self._stop_word_manager = message[1]
self._data = extract_file_words(path_to_file)
def _process_words(self, message):
recipient = message[0]
for w in self._data:
send(self._stop_word_manager, ['filter', w])
send(self._stop_word_manager, ['top10', recipient])
send(self._stop_word_manager, ['topWord', recipient])
class StopWordManager(ActiveWFObject):
""" Models the stop word filter """
@ -72,24 +77,24 @@ class WordFrequencyManager(ActiveWFObject):
def _dispatch(self, message):
if message[0] == 'word':
self._increment_count(message[1:])
elif message[0] == 'top10':
self._top10(message[1:])
elif message[0] == 'topWord':
self._topWord(message[1:])
def _increment_count(self, message):
word, = message
self._word_freqs[word] = self._word_freqs.get(word, 0) + 1
def _top10(self, message):
def _topWord(self, message):
recipient = message[0]
freqs_sorted = sort_dict ( self._word_freqs )
send(recipient, ['top10', freqs_sorted])
send(recipient, ['topWord', freqs_sorted])
class WordFrequencyController(ActiveWFObject):
class MyController(ActiveWFObject):
def _dispatch(self, message):
if message[0] == 'run':
self._run(message[1:])
elif message[0] == 'top10':
elif message[0] == 'topWord':
self._display(message[1:])
else:
raise Exception("Message not understood " + message[0])
@ -101,7 +106,7 @@ class WordFrequencyController(ActiveWFObject):
def _display(self, message):
word_freqs, = message
print_word_freqs( word_freqs)
send(self._storage_manager, ['die'])
send(self._storage_manager, ['over'])
self._stopMe = True
@ -109,12 +114,12 @@ if __name__ == '__main__':
word_freq_manager = WordFrequencyManager()
stop_word_manager = StopWordManager()
storage_manager = DataStorageManager()
wfcontroller = MyController()
send(stop_word_manager, ['init', word_freq_manager])
send(storage_manager, ['init', testfilepath, stop_word_manager])
wfcontroller = WordFrequencyController()
send(stop_word_manager, ['init', word_freq_manager])
send(wfcontroller, ['run', storage_manager])
# Wait for the active objects to finish
[t.join() for t in [word_freq_manager, stop_word_manager, storage_manager, wfcontroller]]
# 等待所有管理器完成工作
threads = [word_freq_manager, stop_word_manager, storage_manager, wfcontroller]
for thread in threads: thread.join()
Loading…
Cancel
Save