@ -1,27 +1,42 @@
 | 
				
			||||
from functools import reduce
 | 
				
			||||
from cppy.cp_util import *
 | 
				
			||||
# -*- coding: utf-8 -*-
 | 
				
			||||
from collections import Counter
 | 
				
			||||
from cppy.cp_util import *
 | 
				
			||||
from functools import reduce
 | 
				
			||||
 | 
				
			||||
# map - reduce
 | 
				
			||||
def process_chunk(chunk):
 | 
				
			||||
    # 过滤停用词
 | 
				
			||||
    stop_words = get_stopwords()
 | 
				
			||||
    words = [ w for w in chunk if ( not w in stop_words ) and len(w) >= 3 ]
 | 
				
			||||
    return Counter(words)
 | 
				
			||||
 | 
				
			||||
 | 
				
			||||
def merge_counts(count1,count2):
 | 
				
			||||
    sum_counts = count1 + count2
 | 
				
			||||
    return sum_counts
 | 
				
			||||
 | 
				
			||||
 | 
				
			||||
@timing_decorator
 | 
				
			||||
def main():
 | 
				
			||||
    # 读取文件内容
 | 
				
			||||
    content = re_split(read_file(testfilepath))
 | 
				
			||||
 | 
				
			||||
    # 分割文件内容为多个块,每个块由一个进程处理
 | 
				
			||||
    chunk_size = 1000  # 可以根据实际情况调整块大小
 | 
				
			||||
    chunks = [content[i:i + chunk_size] for i in range(0, len(content), chunk_size)]
 | 
				
			||||
 | 
				
			||||
    # 使用 map 方法和 process_chunk 函数处理每个分区
 | 
				
			||||
    counts_list = list(map(process_chunk, chunks))
 | 
				
			||||
 | 
				
			||||
    #  使用 reduce 和 merge_counts 函数统计所有分区的词频
 | 
				
			||||
    total_counts = (reduce(merge_counts,counts_list))
 | 
				
			||||
 | 
				
			||||
    # 输出最高频的n个词
 | 
				
			||||
    print_word_freqs(total_counts.most_common(10))
 | 
				
			||||
 | 
				
			||||
 | 
				
			||||
if __name__ == '__main__':
 | 
				
			||||
    main()
 | 
				
			||||
 | 
				
			||||
 | 
				
			||||
 | 
				
			||||
def partition(data_str, nlines):
 | 
				
			||||
    lines = data_str.split('\n')
 | 
				
			||||
    for i in range(0, len(lines), nlines):
 | 
				
			||||
        yield '\n'.join(lines[i:i+nlines])
 | 
				
			||||
 | 
				
			||||
def split_words(data_str):
 | 
				
			||||
    word_list =  extract_str_words(data_str)   
 | 
				
			||||
    return Counter( word_list )
 | 
				
			||||
 | 
				
			||||
def count_words(pairs_list_1, pairs_list_2):
 | 
				
			||||
    return pairs_list_1 + pairs_list_2   
 | 
				
			||||
 | 
				
			||||
 | 
				
			||||
if __name__ == '__main__':        
 | 
				
			||||
    data = read_file(testfilepath)
 | 
				
			||||
    
 | 
				
			||||
    # 使用 map 方法和 split_words 函数处理每个分区
 | 
				
			||||
    splits = map(split_words, partition(data, 200))  
 | 
				
			||||
    splits_list = list(splits)
 | 
				
			||||
    
 | 
				
			||||
    # 使用 reduce 和 count_words 函数统计所有分区的词频
 | 
				
			||||
    word_freqs = sort_dict(reduce(count_words, splits_list, Counter()) )    
 | 
				
			||||
    print_word_freqs(word_freqs)
 | 
				
			||||
@ -1,37 +0,0 @@
 | 
				
			||||
from functools import reduce
 | 
				
			||||
from cppy.cp_util import *
 | 
				
			||||
 | 
				
			||||
#################################################
 | 
				
			||||
# Functions for map reduce
 | 
				
			||||
#################################################
 | 
				
			||||
def partition(data_str, nlines):
 | 
				
			||||
    lines = data_str.split('\n')
 | 
				
			||||
    for i in range(0, len(lines), nlines):
 | 
				
			||||
        yield '\n'.join(lines[i:i+nlines])
 | 
				
			||||
 | 
				
			||||
def split_words(data_str):    
 | 
				
			||||
    words = extract_str_words(data_str)    
 | 
				
			||||
    return [ (w, 1) for w in words ]        
 | 
				
			||||
 | 
				
			||||
def regroup(pairs_list):
 | 
				
			||||
    mapping = {}
 | 
				
			||||
    for pairs in pairs_list:
 | 
				
			||||
        for p in pairs:
 | 
				
			||||
            mapping[p[0]] = mapping.get(p[0], []) + [p]
 | 
				
			||||
    return mapping
 | 
				
			||||
 | 
				
			||||
def count_words(mapping):
 | 
				
			||||
    def add(x, y): return x+y
 | 
				
			||||
    return ( mapping[0], 
 | 
				
			||||
             reduce(add, (pair[1] for pair in mapping[1]))
 | 
				
			||||
           )
 | 
				
			||||
 | 
				
			||||
def sort (word_freq):
 | 
				
			||||
    return sorted(word_freq, key=operator.itemgetter(1), reverse=True)
 | 
				
			||||
 | 
				
			||||
if __name__ == '__main__':        
 | 
				
			||||
    data = read_file(testfilepath)
 | 
				
			||||
    splits = map(split_words, partition(data, 200))        
 | 
				
			||||
    splits_per_word = regroup(splits)
 | 
				
			||||
    word_freqs = sort(map(count_words, splits_per_word.items()))    
 | 
				
			||||
    print_word_freqs(word_freqs)    
 | 
				
			||||
@ -0,0 +1,56 @@
 | 
				
			||||
# -*- coding: utf-8 -*-
 | 
				
			||||
from collections import Counter
 | 
				
			||||
from cppy.cp_util import *
 | 
				
			||||
from multiprocessing.pool import ThreadPool
 | 
				
			||||
 | 
				
			||||
 | 
				
			||||
#
 | 
				
			||||
# 多线程
 | 
				
			||||
#
 | 
				
			||||
def process_chunk(chunk):
 | 
				
			||||
    # 过滤停用词
 | 
				
			||||
    stop_words = get_stopwords()
 | 
				
			||||
    words = [ w for w in chunk if ( not w in stop_words ) and len(w) >= 3 ]
 | 
				
			||||
    return Counter(words)
 | 
				
			||||
 | 
				
			||||
 | 
				
			||||
def merge_counts(counts_list):
 | 
				
			||||
    # 合并多个Counter对象
 | 
				
			||||
    total_counts = Counter()
 | 
				
			||||
    for counts in counts_list:
 | 
				
			||||
        total_counts += counts
 | 
				
			||||
    return total_counts
 | 
				
			||||
 | 
				
			||||
 | 
				
			||||
def thread_function(chunk, counts_list):
 | 
				
			||||
    word_count = process_chunk(chunk)
 | 
				
			||||
    counts_list.append(word_count)
 | 
				
			||||
 | 
				
			||||
 | 
				
			||||
@timing_decorator
 | 
				
			||||
def main():
 | 
				
			||||
    # 读取文件内容
 | 
				
			||||
    content = re_split(read_file(testfilepath))
 | 
				
			||||
    chunk_size = 1000 # 可以根据实际情况调整块大小
 | 
				
			||||
    chunks = [content[i:i + chunk_size] for i in range(0, len(content), chunk_size)]
 | 
				
			||||
 | 
				
			||||
    # 使用多线程池,每个线程处理一个块
 | 
				
			||||
    pool = ThreadPool(len(content)//chunk_size+1)
 | 
				
			||||
    counts_list = pool.map(process_chunk, chunks)
 | 
				
			||||
    pool.close()
 | 
				
			||||
    pool.join()
 | 
				
			||||
 | 
				
			||||
    # 合并计数
 | 
				
			||||
    total_counts = merge_counts(counts_list)
 | 
				
			||||
 | 
				
			||||
    # 输出最高频的n个词
 | 
				
			||||
    print_word_freqs(total_counts.most_common(10))
 | 
				
			||||
 | 
				
			||||
 | 
				
			||||
if __name__ == '__main__':
 | 
				
			||||
    main()
 | 
				
			||||
 | 
				
			||||
 | 
				
			||||
 | 
				
			||||
 | 
				
			||||
 | 
				
			||||
@ -0,0 +1,49 @@
 | 
				
			||||
# -*- coding: utf-8 -*-
 | 
				
			||||
import multiprocessing
 | 
				
			||||
from collections import Counter
 | 
				
			||||
from cppy.cp_util import *
 | 
				
			||||
 | 
				
			||||
 | 
				
			||||
#
 | 
				
			||||
# 多进程
 | 
				
			||||
#
 | 
				
			||||
def process_chunk(chunk):
 | 
				
			||||
    # 过滤停用词
 | 
				
			||||
    stop_words = get_stopwords()
 | 
				
			||||
    words = [ w for w in chunk if ( not w in stop_words ) and len(w) >= 3 ]
 | 
				
			||||
    return Counter(words)
 | 
				
			||||
 | 
				
			||||
 | 
				
			||||
def merge_counts(counts_list):
 | 
				
			||||
    # 合并多个Counter对象
 | 
				
			||||
    total_counts = Counter()
 | 
				
			||||
    for counts in counts_list:
 | 
				
			||||
        total_counts += counts
 | 
				
			||||
    return total_counts
 | 
				
			||||
 | 
				
			||||
 | 
				
			||||
@timing_decorator
 | 
				
			||||
def main():
 | 
				
			||||
    # 读取文件内容
 | 
				
			||||
    content = re_split(read_file(testfilepath))
 | 
				
			||||
 | 
				
			||||
    # 分割文件内容为多个块,每个块由一个进程处理
 | 
				
			||||
    chunk_size = 1000  # 可以根据实际情况调整块大小
 | 
				
			||||
    chunks = [content[i:i + chunk_size] for i in range(0, len(content), chunk_size)]
 | 
				
			||||
 | 
				
			||||
    # 使用多进程处理每个块
 | 
				
			||||
    pool = multiprocessing.Pool(processes=multiprocessing.cpu_count())
 | 
				
			||||
    counts_list = pool.map(process_chunk, chunks)
 | 
				
			||||
    pool.close()
 | 
				
			||||
    pool.join()
 | 
				
			||||
 | 
				
			||||
    # 合并计数
 | 
				
			||||
    total_counts = merge_counts(counts_list)
 | 
				
			||||
 | 
				
			||||
    # 输出最高频的n个词
 | 
				
			||||
    print_word_freqs(total_counts.most_common(10))
 | 
				
			||||
 | 
				
			||||
 | 
				
			||||
if __name__ == '__main__':
 | 
				
			||||
    main()
 | 
				
			||||
 | 
				
			||||
@ -1,54 +0,0 @@
 | 
				
			||||
import threading
 | 
				
			||||
from collections import Counter
 | 
				
			||||
from cppy.cp_util import *
 | 
				
			||||
 | 
				
			||||
#
 | 
				
			||||
# 多线程
 | 
				
			||||
# 
 | 
				
			||||
def process_chunk(start, end, text, result_index, results):   
 | 
				
			||||
    # 切词并过滤停用词            
 | 
				
			||||
    words = extract_str_words( text[start:end] )     
 | 
				
			||||
    results[result_index] = Counter(words)
 | 
				
			||||
 | 
				
			||||
def merge_counts(counts_list):  
 | 
				
			||||
    # 合并多个Counter对象  
 | 
				
			||||
    total_counts = Counter()  
 | 
				
			||||
    for counts in counts_list:  
 | 
				
			||||
        total_counts += counts  
 | 
				
			||||
    return total_counts  
 | 
				
			||||
 | 
				
			||||
@timing_decorator
 | 
				
			||||
def main():  
 | 
				
			||||
    # 读取文件内容
 | 
				
			||||
    text = read_file(testfilepath)    
 | 
				
			||||
 | 
				
			||||
    # 确定线程数量
 | 
				
			||||
    num_threads = 4
 | 
				
			||||
    text_length = len(text)
 | 
				
			||||
    chunk_size = text_length // num_threads
 | 
				
			||||
 | 
				
			||||
    # 存储每个线程的结果
 | 
				
			||||
    results = [None] * num_threads
 | 
				
			||||
    threads = []
 | 
				
			||||
 | 
				
			||||
    # 创建并启动线程
 | 
				
			||||
    for i in range(num_threads):
 | 
				
			||||
        start = i * chunk_size
 | 
				
			||||
        # 确保最后一个线程能够读取文件的末尾
 | 
				
			||||
        end = text_length if i == num_threads - 1 else (i + 1) * chunk_size
 | 
				
			||||
        t = threading.Thread(target=process_chunk, args=(start, end, text, i, results))
 | 
				
			||||
        threads.append(t)
 | 
				
			||||
        t.start()
 | 
				
			||||
 | 
				
			||||
    # 等待所有线程完成
 | 
				
			||||
    for t in threads: t.join()
 | 
				
			||||
 | 
				
			||||
    # 合并计数  
 | 
				
			||||
    total_counts = merge_counts(results)  
 | 
				
			||||
 | 
				
			||||
    # 输出最高频的n个词
 | 
				
			||||
    print_word_freqs( total_counts.most_common(10) )     
 | 
				
			||||
 | 
				
			||||
    
 | 
				
			||||
if __name__ == '__main__':
 | 
				
			||||
    main()
 | 
				
			||||
@ -1,44 +0,0 @@
 | 
				
			||||
import multiprocessing  
 | 
				
			||||
from collections import Counter
 | 
				
			||||
from cppy.cp_util import *  
 | 
				
			||||
 | 
				
			||||
#
 | 
				
			||||
# 多进程
 | 
				
			||||
#
 | 
				
			||||
def process_chunk(chunk):  
 | 
				
			||||
    # 切词并过滤停用词       
 | 
				
			||||
    words = extract_str_words( chunk.lower() )
 | 
				
			||||
    return Counter(words)  
 | 
				
			||||
  
 | 
				
			||||
def merge_counts(counts_list):  
 | 
				
			||||
    # 合并多个Counter对象  
 | 
				
			||||
    total_counts = Counter()  
 | 
				
			||||
    for counts in counts_list:  
 | 
				
			||||
        total_counts += counts  
 | 
				
			||||
    return total_counts    
 | 
				
			||||
 | 
				
			||||
@timing_decorator
 | 
				
			||||
def main():  
 | 
				
			||||
    # 读取文件内容  
 | 
				
			||||
    content = read_file(testfilepath)    
 | 
				
			||||
 | 
				
			||||
    # 分割文件内容为多个块,每个块由一个进程处理  
 | 
				
			||||
    chunk_size = 1000  # 可以根据实际情况调整块大小  
 | 
				
			||||
    chunks = [content[i:i+chunk_size] for i in range(0, len(content), chunk_size)]  
 | 
				
			||||
  
 | 
				
			||||
    # 使用多进程处理每个块  
 | 
				
			||||
    pool = multiprocessing.Pool(processes=multiprocessing.cpu_count())  
 | 
				
			||||
    counts_list = pool.map(process_chunk, chunks)  
 | 
				
			||||
    pool.close()  
 | 
				
			||||
    pool.join()  
 | 
				
			||||
  
 | 
				
			||||
    # 合并计数  
 | 
				
			||||
    total_counts = merge_counts(counts_list)  
 | 
				
			||||
  
 | 
				
			||||
    # 输出最高频的n个词
 | 
				
			||||
    print_word_freqs( total_counts.most_common(10) )        
 | 
				
			||||
 | 
				
			||||
 | 
				
			||||
if __name__ == '__main__':  
 | 
				
			||||
    main()
 | 
				
			||||
    
 | 
				
			||||
					Loading…
					
					
				
		Reference in new issue