# 导入Python内置time模块,用于生成唯一ID(时间戳毫秒级) import time # 导入Elasticsearch客户端模块,用于直接操作Elasticsearch服务(如创建管道、删除索引) import elasticsearch.client # 导入Django配置模块,用于读取项目中的Elasticsearch配置(settings.py中) from django.conf import settings # 从elasticsearch-dsl库导入核心组件: # Document:Elasticsearch文档模型基类(类似Django的Model) # InnerDoc:嵌套文档基类(用于存储结构化子数据,如地理位置、用户代理信息) # 字段类型:Date(日期)、Integer(整数)、Long(长整数)、Text(可分词文本)、Object(对象类型)、GeoPoint(地理坐标)、Keyword(不可分词文本)、Boolean(布尔值) from elasticsearch_dsl import Document, InnerDoc, Date, Integer, Long, Text, Object, GeoPoint, Keyword, Boolean # 导入elasticsearch-dsl的连接管理模块,用于建立与Elasticsearch服务的连接 from elasticsearch_dsl.connections import connections # 从当前应用(blog)的models.py导入Article模型,用于将文章数据同步到Elasticsearch from blog.models import Article # 判断项目是否启用Elasticsearch:检查settings.py中是否配置了ELASTICSEARCH_DSL ELASTICSEARCH_ENABLED = hasattr(settings, 'ELASTICSEARCH_DSL') # 如果启用了Elasticsearch,执行以下初始化操作 if ELASTICSEARCH_ENABLED: # 建立与Elasticsearch服务的连接:从settings中读取配置的主机地址(如['http://localhost:9200']) connections.create_connection( hosts=[settings.ELASTICSEARCH_DSL['default']['hosts']]) # 导入Elasticsearch原生客户端,用于执行更底层的操作(如创建索引、删除索引) from elasticsearch import Elasticsearch # 初始化Elasticsearch原生客户端,传入服务地址 es = Elasticsearch(settings.ELASTICSEARCH_DSL['default']['hosts']) # 导入Elasticsearch的IngestClient(数据处理管道客户端),用于创建数据预处理管道 from elasticsearch.client import IngestClient # 初始化IngestClient,绑定到上面创建的Elasticsearch客户端 c = IngestClient(es) try: # 尝试获取名为'geoip'的数据处理管道(用于解析IP地址对应的地理位置) c.get_pipeline('geoip') # 如果管道不存在(捕获NotFoundError异常),则创建该管道 except elasticsearch.exceptions.NotFoundError: # 创建'geoip'管道:定义数据处理逻辑,通过geoip处理器解析IP地址 c.put_pipeline('geoip', body='''{ "description" : "Add geoip info", // 管道描述:添加地理位置信息 "processors" : [ // 处理器列表:定义数据处理步骤 { "geoip" : { // geoip处理器:Elasticsearch内置,用于解析IP "field" : "ip" // 待解析的字段:文档中的'ip'字段 } } ] }''') # 定义GeoIp嵌套文档类(InnerDoc):存储IP地址解析后的地理位置信息 class GeoIp(InnerDoc): continent_name = Keyword() # 洲名(Keyword类型:不可分词,适合精确查询/排序) country_iso_code = Keyword() # 国家ISO代码(如CN、US,Keyword类型) country_name = Keyword() # 国家名称(Keyword类型) location = GeoPoint() # 地理坐标(经纬度,GeoPoint类型:支持地理位置查询) # 定义UserAgentBrowser嵌套文档类:存储用户代理(UA)中的浏览器信息 class UserAgentBrowser(InnerDoc): Family = Keyword() # 浏览器家族(如Chrome、Firefox,Keyword类型) Version = Keyword() # 浏览器版本(如120.0,Keyword类型) # 定义UserAgentOS嵌套文档类:存储用户代理中的操作系统信息,继承自UserAgentBrowser(结构一致) class UserAgentOS(UserAgentBrowser): pass # 直接继承父类字段,无需额外定义 # 定义UserAgentDevice嵌套文档类:存储用户代理中的设备信息 class UserAgentDevice(InnerDoc): Family = Keyword() # 设备家族(如iPhone、Windows,Keyword类型) Brand = Keyword() # 设备品牌(如Apple、Huawei,Keyword类型) Model = Keyword() # 设备型号(如iPhone 15,Keyword类型) # 定义UserAgent嵌套文档类:存储完整的用户代理信息(包含浏览器、OS、设备) class UserAgent(InnerDoc): browser = Object(UserAgentBrowser, required=False) # 浏览器信息(Object类型:关联UserAgentBrowser) os = Object(UserAgentOS, required=False) # 操作系统信息(Object类型:关联UserAgentOS) device = Object(UserAgentDevice, required=False) # 设备信息(Object类型:关联UserAgentDevice) string = Text() # 完整UA字符串(Text类型:可分词,支持模糊查询) is_bot = Boolean() # 是否为爬虫(Boolean类型:true/false) # 定义ElapsedTimeDocument文档类:Elasticsearch中的"性能监控"文档模型(记录请求耗时、访问信息) class ElapsedTimeDocument(Document): url = Keyword() # 访问URL(Keyword类型:精确匹配,不分词) time_taken = Long() # 请求耗时(毫秒,Long类型:支持大范围数值存储) log_datetime = Date() # 日志记录时间(Date类型:支持时间范围查询) ip = Keyword() # 访问IP地址(Keyword类型:精确匹配) geoip = Object(GeoIp, required=False) # 地理位置信息(Object类型:关联GeoIp嵌套文档,非必填) useragent = Object(UserAgent, required=False) # 用户代理信息(Object类型:关联UserAgent嵌套文档,非必填) # 定义文档对应的Elasticsearch索引配置 class Index: name = 'performance' # 索引名称:Elasticsearch中存储性能数据的索引名 settings = { # 索引设置 "number_of_shards": 1, # 分片数:1个(小型索引无需多分片) "number_of_replicas": 0 # 副本数:0个(开发/小型场景无需副本,节省资源) } # 定义文档元数据(兼容Elasticsearch旧版本,doc_type在7.x后已废弃,此处保留兼容) class Meta: doc_type = 'ElapsedTime' # 文档类型:标识索引中的文档类别 # 定义ElaspedTimeDocumentManager类:ElapsedTimeDocument的管理类(封装索引创建、数据插入等操作) class ElaspedTimeDocumentManager: # 静态方法:创建性能监控索引(如果不存在) @staticmethod def build_index(): # 导入Elasticsearch原生客户端 from elasticsearch import Elasticsearch # 初始化客户端,读取settings中的Elasticsearch地址 client = Elasticsearch(settings.ELASTICSEARCH_DSL['default']['hosts']) # 检查名为'performance'的索引是否已存在 res = client.indices.exists(index="performance") # 如果索引不存在,初始化ElapsedTimeDocument(创建索引及映射) if not res: ElapsedTimeDocument.init() # 静态方法:删除性能监控索引 @staticmethod def delete_index(): from elasticsearch import Elasticsearch es = Elasticsearch(settings.ELASTICSEARCH_DSL['default']['hosts']) # 删除'performance'索引,忽略400(请求错误)和404(索引不存在)异常 es.indices.delete(index='performance', ignore=[400, 404]) # 静态方法:创建性能监控文档(插入一条访问耗时记录) @staticmethod def create(url, time_taken, log_datetime, useragent, ip): # 确保索引已创建(调用build_index方法) ElaspedTimeDocumentManager.build_index() # 初始化UserAgent嵌套文档对象 ua = UserAgent() # 赋值浏览器信息:从传入的useragent对象中提取浏览器家族和版本 ua.browser = UserAgentBrowser() ua.browser.Family = useragent.browser.family ua.browser.Version = useragent.browser.version_string # 赋值操作系统信息:从传入的useragent对象中提取OS家族和版本 ua.os = UserAgentOS() ua.os.Family = useragent.os.family ua.os.Version = useragent.os.version_string # 赋值设备信息:从传入的useragent对象中提取设备家族、品牌、型号 ua.device = UserAgentDevice() ua.device.Family = useragent.device.family ua.device.Brand = useragent.device.brand ua.device.Model = useragent.device.model # 赋值完整UA字符串和是否为爬虫的标识 ua.string = useragent.ua_string ua.is_bot = useragent.is_bot # 初始化ElapsedTimeDocument文档对象,设置字段值 doc = ElapsedTimeDocument( meta={ 'id': int(round(time.time() * 1000)) # 文档ID:毫秒级时间戳(确保唯一) }, url=url, # 访问URL time_taken=time_taken, # 请求耗时(毫秒) log_datetime=log_datetime,# 记录时间 useragent=ua, # 用户代理信息(嵌套文档) ip=ip) # 访问IP # 保存文档到Elasticsearch,并指定使用'geoip'管道预处理(解析IP地址) doc.save(pipeline="geoip") # 定义ArticleDocument文档类:Elasticsearch中的"文章"文档模型(用于文章搜索) class ArticleDocument(Document): # 文章内容:Text类型,使用ik_max_word分词器(分词更细,适合全文搜索),搜索时用ik_smart(分词更粗,提升效率) body = Text(analyzer='ik_max_word', search_analyzer='ik_smart') # 文章标题:同上,支持中文分词搜索 title = Text(analyzer='ik_max_word', search_analyzer='ik_smart') # 作者信息:Object类型,包含昵称(可分词)和ID(整数) author = Object(properties={ 'nickname': Text(analyzer='ik_max_word', search_analyzer='ik_smart'), 'id': Integer() }) # 分类信息:Object类型,包含分类名称(可分词)和ID(整数) category = Object(properties={ 'name': Text(analyzer='ik_max_word', search_analyzer='ik_smart'), 'id': Integer() }) # 标签信息:Object类型(数组),每个标签包含名称(可分词)和ID(整数) tags = Object(properties={ 'name': Text(analyzer='ik_max_word', search_analyzer='ik_smart'), 'id': Integer() }) pub_time = Date() # 发布时间(Date类型:支持按时间排序/筛选) status = Text() # 文章状态(如'p'=发布,'d'=草稿,Text类型) comment_status = Text() # 评论状态(如'o'=开启,'c'=关闭,Text类型) type = Text() # 文章类型(如'p'=页面,'a'=普通文章,Text类型) views = Integer() # 浏览量(Integer类型:支持数值排序) article_order = Integer() # 排序权重(Integer类型:用于自定义文章排序) # 定义文档对应的Elasticsearch索引配置 class Index: name = 'blog' # 索引名称:存储文章数据的索引名 settings = { # 索引设置 "number_of_shards": 1, # 分片数:1个(小型博客无需多分片) "number_of_replicas": 0 # 副本数:0个(开发/小型场景节省资源) } # 文档元数据(兼容旧版本Elasticsearch的doc_type) class Meta: doc_type = 'Article' # 文档类型:标识为文章类文档 # 定义ArticleDocumentManager类:ArticleDocument的管理类(封装文章索引的创建、重建、更新等操作) class ArticleDocumentManager(): # 构造方法:实例化管理类时自动创建文章索引(如果不存在) def __init__(self): self.create_index() # 实例方法:创建文章索引(调用ArticleDocument的init方法,生成索引和字段映射) def create_index(self): ArticleDocument.init() # 实例方法:删除文章索引 def delete_index(self): from elasticsearch import Elasticsearch es = Elasticsearch(settings.ELASTICSEARCH_DSL['default']['hosts']) # 删除'blog'索引,忽略400和404异常 es.indices.delete(index='blog', ignore=[400, 404]) # 实例方法:将Django的Article模型对象列表转换为Elasticsearch的ArticleDocument列表 def convert_to_doc(self, articles): # 列表推导式:遍历每篇文章,构建对应的ArticleDocument return [ ArticleDocument( meta={'id': article.id}, # 文档ID与Django Article模型ID一致(便于关联) body=article.body, # 文章内容 title=article.title, # 文章标题 author={ # 作者信息:从Article模型的author字段提取 'nickname': article.author.username, 'id': article.author.id }, category={ # 分类信息:从Article模型的category字段提取 'name': article.category.name, 'id': article.category.id }, tags=[ # 标签信息:遍历Article模型的tags多对多字段,提取每个标签的名称和ID {'name': t.name, 'id': t.id} for t in article.tags.all() ], pub_time=article.pub_time, # 发布时间 status=article.status, # 文章状态 comment_status=article.comment_status, # 评论状态 type=article.type, # 文章类型 views=article.views, # 浏览量 article_order=article.article_order # 排序权重 ) for article in articles] # 实例方法:重建文章索引(全量同步文章数据到Elasticsearch) def rebuild(self, articles=None): # 确保索引已创建(初始化索引和映射) ArticleDocument.init() # 如果传入了articles参数,则同步指定文章;否则同步所有文章(Article.objects.all()) articles = articles if articles else Article.objects.all() # 将Django Article对象转换为Elasticsearch文档列表 docs = self.convert_to_doc(articles) # 遍历文档列表,逐个保存到Elasticsearch for doc in docs: doc.save() # 实例方法:批量更新Elasticsearch中的文章文档 def update_docs(self, docs): # 遍历文档列表,逐个保存(已存在的文档会执行更新操作) for doc in docs: doc.save()