|
|
from django.utils.encoding import force_str
|
|
|
from elasticsearch_dsl import Q
|
|
|
from haystack.backends import BaseEngine, BaseSearchBackend, BaseSearchQuery, log_query
|
|
|
from haystack.forms import ModelSearchForm
|
|
|
from haystack.models import SearchResult
|
|
|
from haystack.utils import log as logging
|
|
|
|
|
|
# 导入自定义的Elasticsearch文档和模型
|
|
|
from blog.documents import ArticleDocument, ArticleDocumentManager
|
|
|
from blog.models import Article
|
|
|
|
|
|
# 获取日志记录器
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
class ElasticSearchBackend(BaseSearchBackend):
|
|
|
"""Elasticsearch搜索后端实现"""
|
|
|
|
|
|
def __init__(self, connection_alias, **connection_options):
|
|
|
"""初始化Elasticsearch后端"""
|
|
|
super(
|
|
|
ElasticSearchBackend,
|
|
|
self).__init__(
|
|
|
connection_alias,
|
|
|
**connection_options)
|
|
|
self.manager = ArticleDocumentManager() # 文章文档管理器
|
|
|
self.include_spelling = True # 是否包含拼写建议
|
|
|
|
|
|
def _get_models(self, iterable):
|
|
|
"""获取模型实例并转换为文档"""
|
|
|
# 如果有提供模型列表则使用,否则获取所有文章
|
|
|
models = iterable if iterable and iterable[0] else Article.objects.all()
|
|
|
docs = self.manager.convert_to_doc(models) # 将模型转换为Elasticsearch文档
|
|
|
return docs
|
|
|
|
|
|
def _create(self, models):
|
|
|
"""创建索引并重建文档"""
|
|
|
self.manager.create_index() # 创建Elasticsearch索引
|
|
|
docs = self._get_models(models)
|
|
|
self.manager.rebuild(docs) # 重建所有文档
|
|
|
|
|
|
def _delete(self, models):
|
|
|
"""删除文档"""
|
|
|
for m in models:
|
|
|
m.delete()
|
|
|
return True
|
|
|
|
|
|
def _rebuild(self, models):
|
|
|
"""重建索引文档"""
|
|
|
models = models if models else Article.objects.all()
|
|
|
docs = self.manager.convert_to_doc(models)
|
|
|
self.manager.update_docs(docs) # 更新文档
|
|
|
|
|
|
def update(self, index, iterable, commit=True):
|
|
|
"""更新文档"""
|
|
|
models = self._get_models(iterable)
|
|
|
self.manager.update_docs(models)
|
|
|
|
|
|
def remove(self, obj_or_string):
|
|
|
"""移除指定对象"""
|
|
|
models = self._get_models([obj_or_string])
|
|
|
self._delete(models)
|
|
|
|
|
|
def clear(self, models=None, commit=True):
|
|
|
"""清空索引"""
|
|
|
self.remove(None)
|
|
|
|
|
|
@staticmethod
|
|
|
def get_suggestion(query: str) -> str:
|
|
|
"""获取搜索建议词,如果没有找到建议词则返回原搜索词"""
|
|
|
|
|
|
# 构建搜索建议查询
|
|
|
search = ArticleDocument.search() \
|
|
|
.query("match", body=query) \
|
|
|
.suggest('suggest_search', query, term={'field': 'body'}) \
|
|
|
.execute()
|
|
|
|
|
|
keywords = []
|
|
|
# 处理建议结果
|
|
|
for suggest in search.suggest.suggest_search:
|
|
|
if suggest["options"]:
|
|
|
keywords.append(suggest["options"][0]["text"]) # 使用建议词
|
|
|
else:
|
|
|
keywords.append(suggest["text"]) # 使用原词
|
|
|
|
|
|
return ' '.join(keywords)
|
|
|
|
|
|
@log_query # 记录查询日志的装饰器
|
|
|
def search(self, query_string, **kwargs):
|
|
|
"""执行搜索查询"""
|
|
|
logger.info('search query_string:' + query_string)
|
|
|
|
|
|
# 获取分页参数
|
|
|
start_offset = kwargs.get('start_offset')
|
|
|
end_offset = kwargs.get('end_offset')
|
|
|
|
|
|
# 推荐词搜索:如果启用建议,则获取建议词
|
|
|
if getattr(self, "is_suggest", None):
|
|
|
suggestion = self.get_suggestion(query_string)
|
|
|
else:
|
|
|
suggestion = query_string
|
|
|
|
|
|
# 构建搜索查询:在标题和正文中匹配,设置最小匹配度70%
|
|
|
q = Q('bool',
|
|
|
should=[Q('match', body=suggestion), Q('match', title=suggestion)],
|
|
|
minimum_should_match="70%")
|
|
|
|
|
|
# 执行搜索:过滤已发布的状态为p且类型为a的文章
|
|
|
search = ArticleDocument.search() \
|
|
|
.query('bool', filter=[q]) \
|
|
|
.filter('term', status='p') \
|
|
|
.filter('term', type='a') \
|
|
|
.source(False)[start_offset: end_offset] # 不返回源文档内容,只返回元数据
|
|
|
|
|
|
results = search.execute()
|
|
|
hits = results['hits'].total # 总命中数
|
|
|
raw_results = []
|
|
|
|
|
|
# 处理搜索结果
|
|
|
for raw_result in results['hits']['hits']:
|
|
|
app_label = 'blog'
|
|
|
model_name = 'Article'
|
|
|
additional_fields = {}
|
|
|
|
|
|
result_class = SearchResult
|
|
|
|
|
|
# 创建搜索结果对象
|
|
|
result = result_class(
|
|
|
app_label,
|
|
|
model_name,
|
|
|
raw_result['_id'], # 文档ID
|
|
|
raw_result['_score'], # 相关性分数
|
|
|
**additional_fields)
|
|
|
raw_results.append(result)
|
|
|
|
|
|
facets = {}
|
|
|
# 如果查询词与建议词不同,则设置拼写建议
|
|
|
spelling_suggestion = None if query_string == suggestion else suggestion
|
|
|
|
|
|
return {
|
|
|
'results': raw_results, # 搜索结果列表
|
|
|
'hits': hits, # 总命中数
|
|
|
'facets': facets, # 分面搜索数据
|
|
|
'spelling_suggestion': spelling_suggestion, # 拼写建议
|
|
|
}
|
|
|
|
|
|
|
|
|
class ElasticSearchQuery(BaseSearchQuery):
|
|
|
"""Elasticsearch查询构建器"""
|
|
|
|
|
|
def _convert_datetime(self, date):
|
|
|
"""转换日期时间格式"""
|
|
|
if hasattr(date, 'hour'):
|
|
|
return force_str(date.strftime('%Y%m%d%H%M%S')) # 包含时间的格式
|
|
|
else:
|
|
|
return force_str(date.strftime('%Y%m%d000000')) # 只包含日期的格式
|
|
|
|
|
|
def clean(self, query_fragment):
|
|
|
"""
|
|
|
清理用户输入的查询片段,转义保留字符
|
|
|
|
|
|
Whoosh 1.X与此不同,不再使用反斜杠转义保留字符,
|
|
|
而是应该引用整个单词。
|
|
|
"""
|
|
|
words = query_fragment.split()
|
|
|
cleaned_words = []
|
|
|
|
|
|
for word in words:
|
|
|
# 处理保留字
|
|
|
if word in self.backend.RESERVED_WORDS:
|
|
|
word = word.replace(word, word.lower())
|
|
|
|
|
|
# 处理保留字符
|
|
|
for char in self.backend.RESERVED_CHARACTERS:
|
|
|
if char in word:
|
|
|
word = "'%s'" % word # 用引号包围包含保留字符的单词
|
|
|
break
|
|
|
|
|
|
cleaned_words.append(word)
|
|
|
|
|
|
return ' '.join(cleaned_words)
|
|
|
|
|
|
def build_query_fragment(self, field, filter_type, value):
|
|
|
"""构建查询片段"""
|
|
|
return value.query_string
|
|
|
|
|
|
def get_count(self):
|
|
|
"""获取搜索结果数量"""
|
|
|
results = self.get_results()
|
|
|
return len(results) if results else 0
|
|
|
|
|
|
def get_spelling_suggestion(self, preferred_query=None):
|
|
|
"""获取拼写建议"""
|
|
|
return self._spelling_suggestion
|
|
|
|
|
|
def build_params(self, spelling_query=None):
|
|
|
"""构建查询参数"""
|
|
|
kwargs = super(ElasticSearchQuery, self).build_params(spelling_query=spelling_query)
|
|
|
return kwargs
|
|
|
|
|
|
|
|
|
class ElasticSearchModelSearchForm(ModelSearchForm):
|
|
|
"""Elasticsearch模型搜索表单"""
|
|
|
|
|
|
def search(self):
|
|
|
"""执行搜索,根据参数决定是否使用建议搜索"""
|
|
|
# 是否建议搜索:从请求数据中获取is_suggest参数
|
|
|
self.searchqueryset.query.backend.is_suggest = self.data.get("is_suggest") != "no"
|
|
|
sqs = super().search() # 调用父类搜索方法
|
|
|
return sqs
|
|
|
|
|
|
|
|
|
class ElasticSearchEngine(BaseEngine):
|
|
|
"""Elasticsearch搜索引擎"""
|
|
|
backend = ElasticSearchBackend # 指定后端类
|
|
|
query = ElasticSearchQuery # 指定查询类 |