forked from p46318075/CodePattern
Merge pull request '修改map-reduce模式' (#13) from pbr4nzfkh/CodePattern:dev into dev
commit
f131c63ff4
@ -1,27 +1,42 @@
|
||||
from functools import reduce
|
||||
from cppy.cp_util import *
|
||||
# -*- coding: utf-8 -*-
|
||||
from collections import Counter
|
||||
from cppy.cp_util import *
|
||||
from functools import reduce
|
||||
|
||||
# map - reduce
|
||||
def process_chunk(chunk):
|
||||
# 过滤停用词
|
||||
stop_words = get_stopwords()
|
||||
words = [ w for w in chunk if ( not w in stop_words ) and len(w) >= 3 ]
|
||||
return Counter(words)
|
||||
|
||||
|
||||
def merge_counts(count1,count2):
|
||||
sum_counts = count1 + count2
|
||||
return sum_counts
|
||||
|
||||
|
||||
@timing_decorator
|
||||
def main():
|
||||
# 读取文件内容
|
||||
content = re_split(read_file(testfilepath))
|
||||
|
||||
# 分割文件内容为多个块,每个块由一个进程处理
|
||||
chunk_size = 1000 # 可以根据实际情况调整块大小
|
||||
chunks = [content[i:i + chunk_size] for i in range(0, len(content), chunk_size)]
|
||||
|
||||
# 使用 map 方法和 process_chunk 函数处理每个分区
|
||||
counts_list = list(map(process_chunk, chunks))
|
||||
|
||||
# 使用 reduce 和 merge_counts 函数统计所有分区的词频
|
||||
total_counts = (reduce(merge_counts,counts_list))
|
||||
|
||||
# 输出最高频的n个词
|
||||
print_word_freqs(total_counts.most_common(10))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
|
||||
|
||||
|
||||
def partition(data_str, nlines):
|
||||
lines = data_str.split('\n')
|
||||
for i in range(0, len(lines), nlines):
|
||||
yield '\n'.join(lines[i:i+nlines])
|
||||
|
||||
def split_words(data_str):
|
||||
word_list = extract_str_words(data_str)
|
||||
return Counter( word_list )
|
||||
|
||||
def count_words(pairs_list_1, pairs_list_2):
|
||||
return pairs_list_1 + pairs_list_2
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
data = read_file(testfilepath)
|
||||
|
||||
# 使用 map 方法和 split_words 函数处理每个分区
|
||||
splits = map(split_words, partition(data, 200))
|
||||
splits_list = list(splits)
|
||||
|
||||
# 使用 reduce 和 count_words 函数统计所有分区的词频
|
||||
word_freqs = sort_dict(reduce(count_words, splits_list, Counter()) )
|
||||
print_word_freqs(word_freqs)
|
@ -1,37 +0,0 @@
|
||||
from functools import reduce
|
||||
from cppy.cp_util import *
|
||||
|
||||
#################################################
|
||||
# Functions for map reduce
|
||||
#################################################
|
||||
def partition(data_str, nlines):
|
||||
lines = data_str.split('\n')
|
||||
for i in range(0, len(lines), nlines):
|
||||
yield '\n'.join(lines[i:i+nlines])
|
||||
|
||||
def split_words(data_str):
|
||||
words = extract_str_words(data_str)
|
||||
return [ (w, 1) for w in words ]
|
||||
|
||||
def regroup(pairs_list):
|
||||
mapping = {}
|
||||
for pairs in pairs_list:
|
||||
for p in pairs:
|
||||
mapping[p[0]] = mapping.get(p[0], []) + [p]
|
||||
return mapping
|
||||
|
||||
def count_words(mapping):
|
||||
def add(x, y): return x+y
|
||||
return ( mapping[0],
|
||||
reduce(add, (pair[1] for pair in mapping[1]))
|
||||
)
|
||||
|
||||
def sort (word_freq):
|
||||
return sorted(word_freq, key=operator.itemgetter(1), reverse=True)
|
||||
|
||||
if __name__ == '__main__':
|
||||
data = read_file(testfilepath)
|
||||
splits = map(split_words, partition(data, 200))
|
||||
splits_per_word = regroup(splits)
|
||||
word_freqs = sort(map(count_words, splits_per_word.items()))
|
||||
print_word_freqs(word_freqs)
|
@ -0,0 +1,56 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
from collections import Counter
|
||||
from cppy.cp_util import *
|
||||
from multiprocessing.pool import ThreadPool
|
||||
|
||||
|
||||
#
|
||||
# 多线程
|
||||
#
|
||||
def process_chunk(chunk):
|
||||
# 过滤停用词
|
||||
stop_words = get_stopwords()
|
||||
words = [ w for w in chunk if ( not w in stop_words ) and len(w) >= 3 ]
|
||||
return Counter(words)
|
||||
|
||||
|
||||
def merge_counts(counts_list):
|
||||
# 合并多个Counter对象
|
||||
total_counts = Counter()
|
||||
for counts in counts_list:
|
||||
total_counts += counts
|
||||
return total_counts
|
||||
|
||||
|
||||
def thread_function(chunk, counts_list):
|
||||
word_count = process_chunk(chunk)
|
||||
counts_list.append(word_count)
|
||||
|
||||
|
||||
@timing_decorator
|
||||
def main():
|
||||
# 读取文件内容
|
||||
content = re_split(read_file(testfilepath))
|
||||
chunk_size = 1000 # 可以根据实际情况调整块大小
|
||||
chunks = [content[i:i + chunk_size] for i in range(0, len(content), chunk_size)]
|
||||
|
||||
# 使用多线程池,每个线程处理一个块
|
||||
pool = ThreadPool(len(content)//chunk_size+1)
|
||||
counts_list = pool.map(process_chunk, chunks)
|
||||
pool.close()
|
||||
pool.join()
|
||||
|
||||
# 合并计数
|
||||
total_counts = merge_counts(counts_list)
|
||||
|
||||
# 输出最高频的n个词
|
||||
print_word_freqs(total_counts.most_common(10))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
|
||||
|
||||
|
||||
|
||||
|
@ -0,0 +1,49 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
import multiprocessing
|
||||
from collections import Counter
|
||||
from cppy.cp_util import *
|
||||
|
||||
|
||||
#
|
||||
# 多进程
|
||||
#
|
||||
def process_chunk(chunk):
|
||||
# 过滤停用词
|
||||
stop_words = get_stopwords()
|
||||
words = [ w for w in chunk if ( not w in stop_words ) and len(w) >= 3 ]
|
||||
return Counter(words)
|
||||
|
||||
|
||||
def merge_counts(counts_list):
|
||||
# 合并多个Counter对象
|
||||
total_counts = Counter()
|
||||
for counts in counts_list:
|
||||
total_counts += counts
|
||||
return total_counts
|
||||
|
||||
|
||||
@timing_decorator
|
||||
def main():
|
||||
# 读取文件内容
|
||||
content = re_split(read_file(testfilepath))
|
||||
|
||||
# 分割文件内容为多个块,每个块由一个进程处理
|
||||
chunk_size = 1000 # 可以根据实际情况调整块大小
|
||||
chunks = [content[i:i + chunk_size] for i in range(0, len(content), chunk_size)]
|
||||
|
||||
# 使用多进程处理每个块
|
||||
pool = multiprocessing.Pool(processes=multiprocessing.cpu_count())
|
||||
counts_list = pool.map(process_chunk, chunks)
|
||||
pool.close()
|
||||
pool.join()
|
||||
|
||||
# 合并计数
|
||||
total_counts = merge_counts(counts_list)
|
||||
|
||||
# 输出最高频的n个词
|
||||
print_word_freqs(total_counts.most_common(10))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
|
@ -1,54 +0,0 @@
|
||||
import threading
|
||||
from collections import Counter
|
||||
from cppy.cp_util import *
|
||||
|
||||
#
|
||||
# 多线程
|
||||
#
|
||||
def process_chunk(start, end, text, result_index, results):
|
||||
# 切词并过滤停用词
|
||||
words = extract_str_words( text[start:end] )
|
||||
results[result_index] = Counter(words)
|
||||
|
||||
def merge_counts(counts_list):
|
||||
# 合并多个Counter对象
|
||||
total_counts = Counter()
|
||||
for counts in counts_list:
|
||||
total_counts += counts
|
||||
return total_counts
|
||||
|
||||
@timing_decorator
|
||||
def main():
|
||||
# 读取文件内容
|
||||
text = read_file(testfilepath)
|
||||
|
||||
# 确定线程数量
|
||||
num_threads = 4
|
||||
text_length = len(text)
|
||||
chunk_size = text_length // num_threads
|
||||
|
||||
# 存储每个线程的结果
|
||||
results = [None] * num_threads
|
||||
threads = []
|
||||
|
||||
# 创建并启动线程
|
||||
for i in range(num_threads):
|
||||
start = i * chunk_size
|
||||
# 确保最后一个线程能够读取文件的末尾
|
||||
end = text_length if i == num_threads - 1 else (i + 1) * chunk_size
|
||||
t = threading.Thread(target=process_chunk, args=(start, end, text, i, results))
|
||||
threads.append(t)
|
||||
t.start()
|
||||
|
||||
# 等待所有线程完成
|
||||
for t in threads: t.join()
|
||||
|
||||
# 合并计数
|
||||
total_counts = merge_counts(results)
|
||||
|
||||
# 输出最高频的n个词
|
||||
print_word_freqs( total_counts.most_common(10) )
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
@ -1,44 +0,0 @@
|
||||
import multiprocessing
|
||||
from collections import Counter
|
||||
from cppy.cp_util import *
|
||||
|
||||
#
|
||||
# 多进程
|
||||
#
|
||||
def process_chunk(chunk):
|
||||
# 切词并过滤停用词
|
||||
words = extract_str_words( chunk.lower() )
|
||||
return Counter(words)
|
||||
|
||||
def merge_counts(counts_list):
|
||||
# 合并多个Counter对象
|
||||
total_counts = Counter()
|
||||
for counts in counts_list:
|
||||
total_counts += counts
|
||||
return total_counts
|
||||
|
||||
@timing_decorator
|
||||
def main():
|
||||
# 读取文件内容
|
||||
content = read_file(testfilepath)
|
||||
|
||||
# 分割文件内容为多个块,每个块由一个进程处理
|
||||
chunk_size = 1000 # 可以根据实际情况调整块大小
|
||||
chunks = [content[i:i+chunk_size] for i in range(0, len(content), chunk_size)]
|
||||
|
||||
# 使用多进程处理每个块
|
||||
pool = multiprocessing.Pool(processes=multiprocessing.cpu_count())
|
||||
counts_list = pool.map(process_chunk, chunks)
|
||||
pool.close()
|
||||
pool.join()
|
||||
|
||||
# 合并计数
|
||||
total_counts = merge_counts(counts_list)
|
||||
|
||||
# 输出最高频的n个词
|
||||
print_word_freqs( total_counts.most_common(10) )
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
|
Loading…
Reference in new issue