You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
DjangoBlog/djangoblog/elasticsearch_backend.py

193 lines
6.9 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#gq:
from django.utils.encoding import force_str
from elasticsearch_dsl import Q
from haystack.backends import BaseEngine, BaseSearchBackend, BaseSearchQuery, log_query
from haystack.forms import ModelSearchForm
from haystack.models import SearchResult
from haystack.utils import log as logging
from blog.documents import ArticleDocument, ArticleDocumentManager
from blog.models import Article
logger = logging.getLogger(__name__)
class ElasticSearchBackend(BaseSearchBackend):
"""Elasticsearch搜索后端实现Haystack接口处理索引与搜索逻辑"""
def __init__(self, connection_alias, **connection_options):
super(
ElasticSearchBackend,
self).__init__(
connection_alias,
**connection_options)
self.manager = ArticleDocumentManager() # 文档索引管理工具
self.include_spelling = True # 启用拼写建议
def _get_models(self, iterable):
"""将模型实例/ID转换为Elasticsearch文档对象"""
models = iterable if iterable and iterable[0] else Article.objects.all()
docs = self.manager.convert_to_doc(models)
return docs
def _create(self, models):
"""创建索引并批量重建文档"""
self.manager.create_index()
docs = self._get_models(models)
self.manager.rebuild(docs)
def _delete(self, models):
"""删除指定文档"""
for m in models:
m.delete()
return True
def _rebuild(self, models):
"""增量更新索引文档"""
models = models if models else Article.objects.all()
docs = self._get_models(models)
self.manager.update_docs(docs)
def update(self, index, iterable, commit=True):
"""更新索引将模型实例同步到Elasticsearch"""
models = self._get_models(iterable)
self.manager.update_docs(models)
def remove(self, obj_or_string):
"""从索引中删除单个对象"""
models = self._get_models([obj_or_string])
self._delete(models)
def clear(self, models=None, commit=True):
"""清空整个索引"""
self.remove(None)
@staticmethod
def get_suggestion(query: str) -> str:
"""获取搜索推荐词,无建议则返回原查询词"""
search = ArticleDocument.search() \
.query("match", body=query) \
.suggest('suggest_search', query, term={'field': 'body'}) \
.execute()
keywords = []
for suggest in search.suggest.suggest_search:
# 有建议取第一个,无则用原词
keywords.append(suggest["options"][0]["text"] if suggest["options"] else suggest["text"])
return ' '.join(keywords)
@log_query
def search(self, query_string, **kwargs):
"""核心搜索逻辑:匹配文章标题/正文,过滤已发布文章,支持分页和拼写建议"""
logger.info('search query_string:' + query_string)
start_offset = kwargs.get('start_offset') # 分页起始位置
end_offset = kwargs.get('end_offset') # 分页结束位置
# 启用推荐词搜索
if getattr(self, "is_suggest", None):
suggestion = self.get_suggestion(query_string)
else:
suggestion = query_string
# 构建查询匹配正文或标题最低70%匹配度
q = Q('bool',
should=[Q('match', body=suggestion), Q('match', title=suggestion)],
minimum_should_match="70%")
# 执行搜索过滤已发布status='p'、文章类型type='a'),不返回原始文档
search = ArticleDocument.search() \
.query('bool', filter=[q]) \
.filter('term', status='p') \
.filter('term', type='a') \
.source(False)[start_offset: end_offset]
results = search.execute()
hits = results['hits'].total # 总命中数
raw_results = []
# 格式化结果为Haystack兼容的SearchResult对象
for raw_result in results['hits']['hits']:
app_label = 'blog'
model_name = 'Article'
additional_fields = {}
result_class = SearchResult
result = result_class(
app_label,
model_name,
raw_result['_id'], # 文档ID
raw_result['_score'], # 相关性得分
**additional_fields)
raw_results.append(result)
facets = {}
# 若推荐词与原词不同则返回建议
spelling_suggestion = None if query_string == suggestion else suggestion
return {
'results': raw_results,
'hits': hits,
'facets': facets,
'spelling_suggestion': spelling_suggestion,
}
class ElasticSearchQuery(BaseSearchQuery):
"""Elasticsearch查询构建类适配Haystack接口"""
def _convert_datetime(self, date):
"""将datetime转换为Elasticsearch兼容的字符串格式"""
if hasattr(date, 'hour'):
return force_str(date.strftime('%Y%m%d%H%M%S'))
else:
return force_str(date.strftime('%Y%m%d000000'))
def clean(self, query_fragment):
"""清理查询片段处理保留字和特殊字符兼容Whoosh逻辑"""
words = query_fragment.split()
cleaned_words = []
for word in words:
if word in self.backend.RESERVED_WORDS:
word = word.replace(word, word.lower())
for char in self.backend.RESERVED_CHARACTERS:
if char in word:
word = "'%s'" % word
break
cleaned_words.append(word)
return ' '.join(cleaned_words)
def build_query_fragment(self, field, filter_type, value):
"""构建查询片段,直接返回查询字符串"""
return value.query_string
def get_count(self):
"""获取查询结果总数"""
results = self.get_results()
return len(results) if results else 0
def get_spelling_suggestion(self, preferred_query=None):
"""返回拼写建议词"""
return self._spelling_suggestion
def build_params(self, spelling_query=None):
"""构建查询参数,复用父类逻辑"""
kwargs = super(ElasticSearchQuery, self).build_params(spelling_query=spelling_query)
return kwargs
class ElasticSearchModelSearchForm(ModelSearchForm):
"""自定义搜索表单,控制是否启用搜索建议"""
def search(self):
# 根据请求参数决定是否启用建议is_suggest != "no"时启用)
self.searchqueryset.query.backend.is_suggest = self.data.get("is_suggest") != "no"
sqs = super().search()
return sqs
class ElasticSearchEngine(BaseEngine):
"""Elasticsearch引擎入口指定后端和查询类"""
backend = ElasticSearchBackend
query = ElasticSearchQuery