|
|
# 导入Python内置time模块,用于生成唯一ID(时间戳毫秒级)
|
|
|
import time
|
|
|
|
|
|
# 导入Elasticsearch客户端模块,用于直接操作Elasticsearch服务(如创建管道、删除索引)
|
|
|
import elasticsearch.client
|
|
|
# 导入Django配置模块,用于读取项目中的Elasticsearch配置(settings.py中)
|
|
|
from django.conf import settings
|
|
|
# 从elasticsearch-dsl库导入核心组件:
|
|
|
# Document:Elasticsearch文档模型基类(类似Django的Model)
|
|
|
# InnerDoc:嵌套文档基类(用于存储结构化子数据,如地理位置、用户代理信息)
|
|
|
# 字段类型:Date(日期)、Integer(整数)、Long(长整数)、Text(可分词文本)、Object(对象类型)、GeoPoint(地理坐标)、Keyword(不可分词文本)、Boolean(布尔值)
|
|
|
from elasticsearch_dsl import Document, InnerDoc, Date, Integer, Long, Text, Object, GeoPoint, Keyword, Boolean
|
|
|
# 导入elasticsearch-dsl的连接管理模块,用于建立与Elasticsearch服务的连接
|
|
|
from elasticsearch_dsl.connections import connections
|
|
|
|
|
|
# 从当前应用(blog)的models.py导入Article模型,用于将文章数据同步到Elasticsearch
|
|
|
from blog.models import Article
|
|
|
|
|
|
# 判断项目是否启用Elasticsearch:检查settings.py中是否配置了ELASTICSEARCH_DSL
|
|
|
ELASTICSEARCH_ENABLED = hasattr(settings, 'ELASTICSEARCH_DSL')
|
|
|
|
|
|
# 如果启用了Elasticsearch,执行以下初始化操作
|
|
|
if ELASTICSEARCH_ENABLED:
|
|
|
# 建立与Elasticsearch服务的连接:从settings中读取配置的主机地址(如['http://localhost:9200'])
|
|
|
connections.create_connection(
|
|
|
hosts=[settings.ELASTICSEARCH_DSL['default']['hosts']])
|
|
|
# 导入Elasticsearch原生客户端,用于执行更底层的操作(如创建索引、删除索引)
|
|
|
from elasticsearch import Elasticsearch
|
|
|
|
|
|
# 初始化Elasticsearch原生客户端,传入服务地址
|
|
|
es = Elasticsearch(settings.ELASTICSEARCH_DSL['default']['hosts'])
|
|
|
# 导入Elasticsearch的IngestClient(数据处理管道客户端),用于创建数据预处理管道
|
|
|
from elasticsearch.client import IngestClient
|
|
|
|
|
|
# 初始化IngestClient,绑定到上面创建的Elasticsearch客户端
|
|
|
c = IngestClient(es)
|
|
|
try:
|
|
|
# 尝试获取名为'geoip'的数据处理管道(用于解析IP地址对应的地理位置)
|
|
|
c.get_pipeline('geoip')
|
|
|
# 如果管道不存在(捕获NotFoundError异常),则创建该管道
|
|
|
except elasticsearch.exceptions.NotFoundError:
|
|
|
# 创建'geoip'管道:定义数据处理逻辑,通过geoip处理器解析IP地址
|
|
|
c.put_pipeline('geoip', body='''{
|
|
|
"description" : "Add geoip info", // 管道描述:添加地理位置信息
|
|
|
"processors" : [ // 处理器列表:定义数据处理步骤
|
|
|
{
|
|
|
"geoip" : { // geoip处理器:Elasticsearch内置,用于解析IP
|
|
|
"field" : "ip" // 待解析的字段:文档中的'ip'字段
|
|
|
}
|
|
|
}
|
|
|
]
|
|
|
}''')
|
|
|
|
|
|
|
|
|
# 定义GeoIp嵌套文档类(InnerDoc):存储IP地址解析后的地理位置信息
|
|
|
class GeoIp(InnerDoc):
|
|
|
continent_name = Keyword() # 洲名(Keyword类型:不可分词,适合精确查询/排序)
|
|
|
country_iso_code = Keyword() # 国家ISO代码(如CN、US,Keyword类型)
|
|
|
country_name = Keyword() # 国家名称(Keyword类型)
|
|
|
location = GeoPoint() # 地理坐标(经纬度,GeoPoint类型:支持地理位置查询)
|
|
|
|
|
|
|
|
|
# 定义UserAgentBrowser嵌套文档类:存储用户代理(UA)中的浏览器信息
|
|
|
class UserAgentBrowser(InnerDoc):
|
|
|
Family = Keyword() # 浏览器家族(如Chrome、Firefox,Keyword类型)
|
|
|
Version = Keyword() # 浏览器版本(如120.0,Keyword类型)
|
|
|
|
|
|
|
|
|
# 定义UserAgentOS嵌套文档类:存储用户代理中的操作系统信息,继承自UserAgentBrowser(结构一致)
|
|
|
class UserAgentOS(UserAgentBrowser):
|
|
|
pass # 直接继承父类字段,无需额外定义
|
|
|
|
|
|
|
|
|
# 定义UserAgentDevice嵌套文档类:存储用户代理中的设备信息
|
|
|
class UserAgentDevice(InnerDoc):
|
|
|
Family = Keyword() # 设备家族(如iPhone、Windows,Keyword类型)
|
|
|
Brand = Keyword() # 设备品牌(如Apple、Huawei,Keyword类型)
|
|
|
Model = Keyword() # 设备型号(如iPhone 15,Keyword类型)
|
|
|
|
|
|
|
|
|
# 定义UserAgent嵌套文档类:存储完整的用户代理信息(包含浏览器、OS、设备)
|
|
|
class UserAgent(InnerDoc):
|
|
|
browser = Object(UserAgentBrowser, required=False) # 浏览器信息(Object类型:关联UserAgentBrowser)
|
|
|
os = Object(UserAgentOS, required=False) # 操作系统信息(Object类型:关联UserAgentOS)
|
|
|
device = Object(UserAgentDevice, required=False) # 设备信息(Object类型:关联UserAgentDevice)
|
|
|
string = Text() # 完整UA字符串(Text类型:可分词,支持模糊查询)
|
|
|
is_bot = Boolean() # 是否为爬虫(Boolean类型:true/false)
|
|
|
|
|
|
|
|
|
# 定义ElapsedTimeDocument文档类:Elasticsearch中的"性能监控"文档模型(记录请求耗时、访问信息)
|
|
|
class ElapsedTimeDocument(Document):
|
|
|
url = Keyword() # 访问URL(Keyword类型:精确匹配,不分词)
|
|
|
time_taken = Long() # 请求耗时(毫秒,Long类型:支持大范围数值存储)
|
|
|
log_datetime = Date() # 日志记录时间(Date类型:支持时间范围查询)
|
|
|
ip = Keyword() # 访问IP地址(Keyword类型:精确匹配)
|
|
|
geoip = Object(GeoIp, required=False) # 地理位置信息(Object类型:关联GeoIp嵌套文档,非必填)
|
|
|
useragent = Object(UserAgent, required=False) # 用户代理信息(Object类型:关联UserAgent嵌套文档,非必填)
|
|
|
|
|
|
# 定义文档对应的Elasticsearch索引配置
|
|
|
class Index:
|
|
|
name = 'performance' # 索引名称:Elasticsearch中存储性能数据的索引名
|
|
|
settings = { # 索引设置
|
|
|
"number_of_shards": 1, # 分片数:1个(小型索引无需多分片)
|
|
|
"number_of_replicas": 0 # 副本数:0个(开发/小型场景无需副本,节省资源)
|
|
|
}
|
|
|
|
|
|
# 定义文档元数据(兼容Elasticsearch旧版本,doc_type在7.x后已废弃,此处保留兼容)
|
|
|
class Meta:
|
|
|
doc_type = 'ElapsedTime' # 文档类型:标识索引中的文档类别
|
|
|
|
|
|
|
|
|
# 定义ElaspedTimeDocumentManager类:ElapsedTimeDocument的管理类(封装索引创建、数据插入等操作)
|
|
|
class ElaspedTimeDocumentManager:
|
|
|
# 静态方法:创建性能监控索引(如果不存在)
|
|
|
@staticmethod
|
|
|
def build_index():
|
|
|
# 导入Elasticsearch原生客户端
|
|
|
from elasticsearch import Elasticsearch
|
|
|
# 初始化客户端,读取settings中的Elasticsearch地址
|
|
|
client = Elasticsearch(settings.ELASTICSEARCH_DSL['default']['hosts'])
|
|
|
# 检查名为'performance'的索引是否已存在
|
|
|
res = client.indices.exists(index="performance")
|
|
|
# 如果索引不存在,初始化ElapsedTimeDocument(创建索引及映射)
|
|
|
if not res:
|
|
|
ElapsedTimeDocument.init()
|
|
|
|
|
|
# 静态方法:删除性能监控索引
|
|
|
@staticmethod
|
|
|
def delete_index():
|
|
|
from elasticsearch import Elasticsearch
|
|
|
es = Elasticsearch(settings.ELASTICSEARCH_DSL['default']['hosts'])
|
|
|
# 删除'performance'索引,忽略400(请求错误)和404(索引不存在)异常
|
|
|
es.indices.delete(index='performance', ignore=[400, 404])
|
|
|
|
|
|
# 静态方法:创建性能监控文档(插入一条访问耗时记录)
|
|
|
@staticmethod
|
|
|
def create(url, time_taken, log_datetime, useragent, ip):
|
|
|
# 确保索引已创建(调用build_index方法)
|
|
|
ElaspedTimeDocumentManager.build_index()
|
|
|
# 初始化UserAgent嵌套文档对象
|
|
|
ua = UserAgent()
|
|
|
# 赋值浏览器信息:从传入的useragent对象中提取浏览器家族和版本
|
|
|
ua.browser = UserAgentBrowser()
|
|
|
ua.browser.Family = useragent.browser.family
|
|
|
ua.browser.Version = useragent.browser.version_string
|
|
|
|
|
|
# 赋值操作系统信息:从传入的useragent对象中提取OS家族和版本
|
|
|
ua.os = UserAgentOS()
|
|
|
ua.os.Family = useragent.os.family
|
|
|
ua.os.Version = useragent.os.version_string
|
|
|
|
|
|
# 赋值设备信息:从传入的useragent对象中提取设备家族、品牌、型号
|
|
|
ua.device = UserAgentDevice()
|
|
|
ua.device.Family = useragent.device.family
|
|
|
ua.device.Brand = useragent.device.brand
|
|
|
ua.device.Model = useragent.device.model
|
|
|
# 赋值完整UA字符串和是否为爬虫的标识
|
|
|
ua.string = useragent.ua_string
|
|
|
ua.is_bot = useragent.is_bot
|
|
|
|
|
|
# 初始化ElapsedTimeDocument文档对象,设置字段值
|
|
|
doc = ElapsedTimeDocument(
|
|
|
meta={
|
|
|
'id': int(round(time.time() * 1000)) # 文档ID:毫秒级时间戳(确保唯一)
|
|
|
},
|
|
|
url=url, # 访问URL
|
|
|
time_taken=time_taken, # 请求耗时(毫秒)
|
|
|
log_datetime=log_datetime,# 记录时间
|
|
|
useragent=ua, # 用户代理信息(嵌套文档)
|
|
|
ip=ip) # 访问IP
|
|
|
# 保存文档到Elasticsearch,并指定使用'geoip'管道预处理(解析IP地址)
|
|
|
doc.save(pipeline="geoip")
|
|
|
|
|
|
|
|
|
# 定义ArticleDocument文档类:Elasticsearch中的"文章"文档模型(用于文章搜索)
|
|
|
class ArticleDocument(Document):
|
|
|
# 文章内容:Text类型,使用ik_max_word分词器(分词更细,适合全文搜索),搜索时用ik_smart(分词更粗,提升效率)
|
|
|
body = Text(analyzer='ik_max_word', search_analyzer='ik_smart')
|
|
|
# 文章标题:同上,支持中文分词搜索
|
|
|
title = Text(analyzer='ik_max_word', search_analyzer='ik_smart')
|
|
|
# 作者信息:Object类型,包含昵称(可分词)和ID(整数)
|
|
|
author = Object(properties={
|
|
|
'nickname': Text(analyzer='ik_max_word', search_analyzer='ik_smart'),
|
|
|
'id': Integer()
|
|
|
})
|
|
|
# 分类信息:Object类型,包含分类名称(可分词)和ID(整数)
|
|
|
category = Object(properties={
|
|
|
'name': Text(analyzer='ik_max_word', search_analyzer='ik_smart'),
|
|
|
'id': Integer()
|
|
|
})
|
|
|
# 标签信息:Object类型(数组),每个标签包含名称(可分词)和ID(整数)
|
|
|
tags = Object(properties={
|
|
|
'name': Text(analyzer='ik_max_word', search_analyzer='ik_smart'),
|
|
|
'id': Integer()
|
|
|
})
|
|
|
|
|
|
pub_time = Date() # 发布时间(Date类型:支持按时间排序/筛选)
|
|
|
status = Text() # 文章状态(如'p'=发布,'d'=草稿,Text类型)
|
|
|
comment_status = Text() # 评论状态(如'o'=开启,'c'=关闭,Text类型)
|
|
|
type = Text() # 文章类型(如'p'=页面,'a'=普通文章,Text类型)
|
|
|
views = Integer() # 浏览量(Integer类型:支持数值排序)
|
|
|
article_order = Integer() # 排序权重(Integer类型:用于自定义文章排序)
|
|
|
|
|
|
# 定义文档对应的Elasticsearch索引配置
|
|
|
class Index:
|
|
|
name = 'blog' # 索引名称:存储文章数据的索引名
|
|
|
settings = { # 索引设置
|
|
|
"number_of_shards": 1, # 分片数:1个(小型博客无需多分片)
|
|
|
"number_of_replicas": 0 # 副本数:0个(开发/小型场景节省资源)
|
|
|
}
|
|
|
|
|
|
# 文档元数据(兼容旧版本Elasticsearch的doc_type)
|
|
|
class Meta:
|
|
|
doc_type = 'Article' # 文档类型:标识为文章类文档
|
|
|
|
|
|
|
|
|
# 定义ArticleDocumentManager类:ArticleDocument的管理类(封装文章索引的创建、重建、更新等操作)
|
|
|
class ArticleDocumentManager():
|
|
|
# 构造方法:实例化管理类时自动创建文章索引(如果不存在)
|
|
|
def __init__(self):
|
|
|
self.create_index()
|
|
|
|
|
|
# 实例方法:创建文章索引(调用ArticleDocument的init方法,生成索引和字段映射)
|
|
|
def create_index(self):
|
|
|
ArticleDocument.init()
|
|
|
|
|
|
# 实例方法:删除文章索引
|
|
|
def delete_index(self):
|
|
|
from elasticsearch import Elasticsearch
|
|
|
es = Elasticsearch(settings.ELASTICSEARCH_DSL['default']['hosts'])
|
|
|
# 删除'blog'索引,忽略400和404异常
|
|
|
es.indices.delete(index='blog', ignore=[400, 404])
|
|
|
|
|
|
# 实例方法:将Django的Article模型对象列表转换为Elasticsearch的ArticleDocument列表
|
|
|
def convert_to_doc(self, articles):
|
|
|
# 列表推导式:遍历每篇文章,构建对应的ArticleDocument
|
|
|
return [
|
|
|
ArticleDocument(
|
|
|
meta={'id': article.id}, # 文档ID与Django Article模型ID一致(便于关联)
|
|
|
body=article.body, # 文章内容
|
|
|
title=article.title, # 文章标题
|
|
|
author={ # 作者信息:从Article模型的author字段提取
|
|
|
'nickname': article.author.username,
|
|
|
'id': article.author.id
|
|
|
},
|
|
|
category={ # 分类信息:从Article模型的category字段提取
|
|
|
'name': article.category.name,
|
|
|
'id': article.category.id
|
|
|
},
|
|
|
tags=[ # 标签信息:遍历Article模型的tags多对多字段,提取每个标签的名称和ID
|
|
|
{'name': t.name, 'id': t.id} for t in article.tags.all()
|
|
|
],
|
|
|
pub_time=article.pub_time, # 发布时间
|
|
|
status=article.status, # 文章状态
|
|
|
comment_status=article.comment_status, # 评论状态
|
|
|
type=article.type, # 文章类型
|
|
|
views=article.views, # 浏览量
|
|
|
article_order=article.article_order # 排序权重
|
|
|
) for article in articles]
|
|
|
|
|
|
# 实例方法:重建文章索引(全量同步文章数据到Elasticsearch)
|
|
|
def rebuild(self, articles=None):
|
|
|
# 确保索引已创建(初始化索引和映射)
|
|
|
ArticleDocument.init()
|
|
|
# 如果传入了articles参数,则同步指定文章;否则同步所有文章(Article.objects.all())
|
|
|
articles = articles if articles else Article.objects.all()
|
|
|
# 将Django Article对象转换为Elasticsearch文档列表
|
|
|
docs = self.convert_to_doc(articles)
|
|
|
# 遍历文档列表,逐个保存到Elasticsearch
|
|
|
for doc in docs:
|
|
|
doc.save()
|
|
|
|
|
|
# 实例方法:批量更新Elasticsearch中的文章文档
|
|
|
def update_docs(self, docs):
|
|
|
# 遍历文档列表,逐个保存(已存在的文档会执行更新操作)
|
|
|
for doc in docs:
|
|
|
doc.save() |