#gq: from django.utils.encoding import force_str from elasticsearch_dsl import Q from haystack.backends import BaseEngine, BaseSearchBackend, BaseSearchQuery, log_query from haystack.forms import ModelSearchForm from haystack.models import SearchResult from haystack.utils import log as logging from blog.documents import ArticleDocument, ArticleDocumentManager from blog.models import Article logger = logging.getLogger(__name__) class ElasticSearchBackend(BaseSearchBackend): """Elasticsearch搜索后端,实现Haystack接口,处理索引与搜索逻辑""" def __init__(self, connection_alias, **connection_options): super( ElasticSearchBackend, self).__init__( connection_alias, **connection_options) self.manager = ArticleDocumentManager() # 文档索引管理工具 self.include_spelling = True # 启用拼写建议 def _get_models(self, iterable): """将模型实例/ID转换为Elasticsearch文档对象""" models = iterable if iterable and iterable[0] else Article.objects.all() docs = self.manager.convert_to_doc(models) return docs def _create(self, models): """创建索引并批量重建文档""" self.manager.create_index() docs = self._get_models(models) self.manager.rebuild(docs) def _delete(self, models): """删除指定文档""" for m in models: m.delete() return True def _rebuild(self, models): """增量更新索引文档""" models = models if models else Article.objects.all() docs = self._get_models(models) self.manager.update_docs(docs) def update(self, index, iterable, commit=True): """更新索引:将模型实例同步到Elasticsearch""" models = self._get_models(iterable) self.manager.update_docs(models) def remove(self, obj_or_string): """从索引中删除单个对象""" models = self._get_models([obj_or_string]) self._delete(models) def clear(self, models=None, commit=True): """清空整个索引""" self.remove(None) @staticmethod def get_suggestion(query: str) -> str: """获取搜索推荐词,无建议则返回原查询词""" search = ArticleDocument.search() \ .query("match", body=query) \ .suggest('suggest_search', query, term={'field': 'body'}) \ .execute() keywords = [] for suggest in search.suggest.suggest_search: # 有建议取第一个,无则用原词 keywords.append(suggest["options"][0]["text"] if suggest["options"] else suggest["text"]) return ' '.join(keywords) @log_query def search(self, query_string, **kwargs): """核心搜索逻辑:匹配文章标题/正文,过滤已发布文章,支持分页和拼写建议""" logger.info('search query_string:' + query_string) start_offset = kwargs.get('start_offset') # 分页起始位置 end_offset = kwargs.get('end_offset') # 分页结束位置 # 启用推荐词搜索 if getattr(self, "is_suggest", None): suggestion = self.get_suggestion(query_string) else: suggestion = query_string # 构建查询:匹配正文或标题,最低70%匹配度 q = Q('bool', should=[Q('match', body=suggestion), Q('match', title=suggestion)], minimum_should_match="70%") # 执行搜索:过滤已发布(status='p')、文章类型(type='a'),不返回原始文档 search = ArticleDocument.search() \ .query('bool', filter=[q]) \ .filter('term', status='p') \ .filter('term', type='a') \ .source(False)[start_offset: end_offset] results = search.execute() hits = results['hits'].total # 总命中数 raw_results = [] # 格式化结果为Haystack兼容的SearchResult对象 for raw_result in results['hits']['hits']: app_label = 'blog' model_name = 'Article' additional_fields = {} result_class = SearchResult result = result_class( app_label, model_name, raw_result['_id'], # 文档ID raw_result['_score'], # 相关性得分 **additional_fields) raw_results.append(result) facets = {} # 若推荐词与原词不同则返回建议 spelling_suggestion = None if query_string == suggestion else suggestion return { 'results': raw_results, 'hits': hits, 'facets': facets, 'spelling_suggestion': spelling_suggestion, } class ElasticSearchQuery(BaseSearchQuery): """Elasticsearch查询构建类,适配Haystack接口""" def _convert_datetime(self, date): """将datetime转换为Elasticsearch兼容的字符串格式""" if hasattr(date, 'hour'): return force_str(date.strftime('%Y%m%d%H%M%S')) else: return force_str(date.strftime('%Y%m%d000000')) def clean(self, query_fragment): """清理查询片段,处理保留字和特殊字符(兼容Whoosh逻辑)""" words = query_fragment.split() cleaned_words = [] for word in words: if word in self.backend.RESERVED_WORDS: word = word.replace(word, word.lower()) for char in self.backend.RESERVED_CHARACTERS: if char in word: word = "'%s'" % word break cleaned_words.append(word) return ' '.join(cleaned_words) def build_query_fragment(self, field, filter_type, value): """构建查询片段,直接返回查询字符串""" return value.query_string def get_count(self): """获取查询结果总数""" results = self.get_results() return len(results) if results else 0 def get_spelling_suggestion(self, preferred_query=None): """返回拼写建议词""" return self._spelling_suggestion def build_params(self, spelling_query=None): """构建查询参数,复用父类逻辑""" kwargs = super(ElasticSearchQuery, self).build_params(spelling_query=spelling_query) return kwargs class ElasticSearchModelSearchForm(ModelSearchForm): """自定义搜索表单,控制是否启用搜索建议""" def search(self): # 根据请求参数决定是否启用建议(is_suggest != "no"时启用) self.searchqueryset.query.backend.is_suggest = self.data.get("is_suggest") != "no" sqs = super().search() return sqs class ElasticSearchEngine(BaseEngine): """Elasticsearch引擎入口,指定后端和查询类""" backend = ElasticSearchBackend query = ElasticSearchQuery