You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
Django/doc/djangoblog/elasticsearch_backend.py

216 lines
7.5 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

from django.utils.encoding import force_str
from elasticsearch_dsl import Q
from haystack.backends import BaseEngine, BaseSearchBackend, BaseSearchQuery, log_query
from haystack.forms import ModelSearchForm
from haystack.models import SearchResult
from haystack.utils import log as logging
# 导入自定义的Elasticsearch文档和模型
from blog.documents import ArticleDocument, ArticleDocumentManager
from blog.models import Article
# 获取日志记录器
logger = logging.getLogger(__name__)
class ElasticSearchBackend(BaseSearchBackend):
"""Elasticsearch搜索后端实现"""
def __init__(self, connection_alias, **connection_options):
"""初始化Elasticsearch后端"""
super(
ElasticSearchBackend,
self).__init__(
connection_alias,
**connection_options)
self.manager = ArticleDocumentManager() # 文章文档管理器
self.include_spelling = True # 是否包含拼写建议
def _get_models(self, iterable):
"""获取模型实例并转换为文档"""
# 如果有提供模型列表则使用,否则获取所有文章
models = iterable if iterable and iterable[0] else Article.objects.all()
docs = self.manager.convert_to_doc(models) # 将模型转换为Elasticsearch文档
return docs
def _create(self, models):
"""创建索引并重建文档"""
self.manager.create_index() # 创建Elasticsearch索引
docs = self._get_models(models)
self.manager.rebuild(docs) # 重建所有文档
def _delete(self, models):
"""删除文档"""
for m in models:
m.delete()
return True
def _rebuild(self, models):
"""重建索引文档"""
models = models if models else Article.objects.all()
docs = self.manager.convert_to_doc(models)
self.manager.update_docs(docs) # 更新文档
def update(self, index, iterable, commit=True):
"""更新文档"""
models = self._get_models(iterable)
self.manager.update_docs(models)
def remove(self, obj_or_string):
"""移除指定对象"""
models = self._get_models([obj_or_string])
self._delete(models)
def clear(self, models=None, commit=True):
"""清空索引"""
self.remove(None)
@staticmethod
def get_suggestion(query: str) -> str:
"""获取搜索建议词,如果没有找到建议词则返回原搜索词"""
# 构建搜索建议查询
search = ArticleDocument.search() \
.query("match", body=query) \
.suggest('suggest_search', query, term={'field': 'body'}) \
.execute()
keywords = []
# 处理建议结果
for suggest in search.suggest.suggest_search:
if suggest["options"]:
keywords.append(suggest["options"][0]["text"]) # 使用建议词
else:
keywords.append(suggest["text"]) # 使用原词
return ' '.join(keywords)
@log_query # 记录查询日志的装饰器
def search(self, query_string, **kwargs):
"""执行搜索查询"""
logger.info('search query_string:' + query_string)
# 获取分页参数
start_offset = kwargs.get('start_offset')
end_offset = kwargs.get('end_offset')
# 推荐词搜索:如果启用建议,则获取建议词
if getattr(self, "is_suggest", None):
suggestion = self.get_suggestion(query_string)
else:
suggestion = query_string
# 构建搜索查询在标题和正文中匹配设置最小匹配度70%
q = Q('bool',
should=[Q('match', body=suggestion), Q('match', title=suggestion)],
minimum_should_match="70%")
# 执行搜索过滤已发布的状态为p且类型为a的文章
search = ArticleDocument.search() \
.query('bool', filter=[q]) \
.filter('term', status='p') \
.filter('term', type='a') \
.source(False)[start_offset: end_offset] # 不返回源文档内容,只返回元数据
results = search.execute()
hits = results['hits'].total # 总命中数
raw_results = []
# 处理搜索结果
for raw_result in results['hits']['hits']:
app_label = 'blog'
model_name = 'Article'
additional_fields = {}
result_class = SearchResult
# 创建搜索结果对象
result = result_class(
app_label,
model_name,
raw_result['_id'], # 文档ID
raw_result['_score'], # 相关性分数
**additional_fields)
raw_results.append(result)
facets = {}
# 如果查询词与建议词不同,则设置拼写建议
spelling_suggestion = None if query_string == suggestion else suggestion
return {
'results': raw_results, # 搜索结果列表
'hits': hits, # 总命中数
'facets': facets, # 分面搜索数据
'spelling_suggestion': spelling_suggestion, # 拼写建议
}
class ElasticSearchQuery(BaseSearchQuery):
"""Elasticsearch查询构建器"""
def _convert_datetime(self, date):
"""转换日期时间格式"""
if hasattr(date, 'hour'):
return force_str(date.strftime('%Y%m%d%H%M%S')) # 包含时间的格式
else:
return force_str(date.strftime('%Y%m%d000000')) # 只包含日期的格式
def clean(self, query_fragment):
"""
清理用户输入的查询片段,转义保留字符
Whoosh 1.X与此不同不再使用反斜杠转义保留字符
而是应该引用整个单词。
"""
words = query_fragment.split()
cleaned_words = []
for word in words:
# 处理保留字
if word in self.backend.RESERVED_WORDS:
word = word.replace(word, word.lower())
# 处理保留字符
for char in self.backend.RESERVED_CHARACTERS:
if char in word:
word = "'%s'" % word # 用引号包围包含保留字符的单词
break
cleaned_words.append(word)
return ' '.join(cleaned_words)
def build_query_fragment(self, field, filter_type, value):
"""构建查询片段"""
return value.query_string
def get_count(self):
"""获取搜索结果数量"""
results = self.get_results()
return len(results) if results else 0
def get_spelling_suggestion(self, preferred_query=None):
"""获取拼写建议"""
return self._spelling_suggestion
def build_params(self, spelling_query=None):
"""构建查询参数"""
kwargs = super(ElasticSearchQuery, self).build_params(spelling_query=spelling_query)
return kwargs
class ElasticSearchModelSearchForm(ModelSearchForm):
"""Elasticsearch模型搜索表单"""
def search(self):
"""执行搜索,根据参数决定是否使用建议搜索"""
# 是否建议搜索从请求数据中获取is_suggest参数
self.searchqueryset.query.backend.is_suggest = self.data.get("is_suggest") != "no"
sqs = super().search() # 调用父类搜索方法
return sqs
class ElasticSearchEngine(BaseEngine):
"""Elasticsearch搜索引擎"""
backend = ElasticSearchBackend # 指定后端类
query = ElasticSearchQuery # 指定查询类