news 2026/4/16 11:51:16

日志系统与结构化日志

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
日志系统与结构化日志

目录

  • 日志系统与结构化日志
    • 引言
    • 1. 日志系统基础概念
      • 1.1 日志的重要性与价值
      • 1.2 日志系统的演进历程
      • 1.3 日志质量的金字塔模型
    • 2. 结构化日志基础
      • 2.1 什么是结构化日志?
      • 2.2 结构化日志 vs 非结构化日志
      • 2.3 结构化日志的数学表示
    • 3. 日志系统架构设计
      • 3.1 现代日志系统架构
      • 3.2 日志处理流水线
      • 3.3 分布式日志追踪
    • 4. Python结构化日志实现
      • 4.1 基础结构化日志框架
      • 4.2 高级日志处理器
      • 4.3 完整的日志系统
    • 5. 高级特性实现
      • 5.1 分布式追踪集成
      • 5.2 性能监控集成
      • 5.3 日志采样与聚合
    • 6. 配置与使用示例
      • 6.1 配置管理系统
      • 6.2 使用示例
    • 7. 测试与验证
      • 7.1 单元测试
      • 7.2 性能测试
    • 8. 最佳实践与部署
      • 8.1 结构化日志最佳实践
      • 8.2 生产环境部署指南
      • 8.3 监控与告警配置
    • 9. 总结与展望
      • 9.1 关键收获
      • 9.2 性能数据总结
      • 9.3 未来发展方向
    • 附录
      • A. 日志级别对照表
      • B. 常见问题解答
      • C. 性能优化建议

『宝藏代码胶囊开张啦!』—— 我的 CodeCapsule 来咯!✨写代码不再头疼!我的新站点 CodeCapsule 主打一个 “白菜价”+“量身定制”!无论是卡脖子的毕设/课设/文献复现,需要灵光一现的算法改进,还是想给项目加个“外挂”,这里都有便宜又好用的代码方案等你发现!低成本,高适配,助你轻松通关!速来围观 👉 CodeCapsule官网

日志系统与结构化日志

引言

在现代软件系统中,日志不仅是调试和问题排查的工具,更是系统可观测性的核心组成部分。随着微服务、分布式系统和云原生架构的普及,传统文本日志已无法满足复杂系统的监控、分析和调试需求。结构化日志应运而生,成为现代日志系统的标准实践。

根据2023年DevOps现状报告显示,采用结构化日志的团队部署频率提高2.6倍,故障恢复时间缩短3.2倍。本文将深入探讨结构化日志系统的设计原理、实现方法和最佳实践,提供完整的Python实现方案。

1. 日志系统基础概念

1.1 日志的重要性与价值

日志系统为软件系统提供了以下关键价值:

  1. 故障排查:快速定位和解决生产环境问题
  2. 性能监控:跟踪系统性能和资源使用情况
  3. 安全审计:记录用户操作和安全事件
  4. 业务分析:分析用户行为和应用使用模式
  5. 合规要求:满足法律和行业规定的日志保留要求

1.2 日志系统的演进历程

文本日志
1990s
日志框架
2000s
结构化日志
2010s
可观测性平台
2020s

1.3 日志质量的金字塔模型

日志金字塔
Level 4: 业务洞察
用户行为分析
Level 3: 应用性能
性能指标追踪
Level 2: 系统状态
错误和警告
Level 1: 调试信息
详细执行跟踪

2. 结构化日志基础

2.1 什么是结构化日志?

结构化日志是将日志数据以机器可读的格式(通常是JSON)进行组织,而不是传统的纯文本格式。结构化日志包含:

  1. 固定字段:时间戳、级别、消息、来源等
  2. 上下文字段:请求ID、用户ID、会话ID等
  3. 业务字段:操作类型、资源ID、结果状态等
  4. 性能字段:耗时、内存使用、请求大小等

2.2 结构化日志 vs 非结构化日志

维度结构化日志非结构化日志
格式JSON、键值对纯文本
可读性机器友好人类友好
查询能力强大(支持字段筛选)有限(文本搜索)
存储效率较高较低
解析复杂度简单复杂
扩展性容易添加新字段需要修改格式

2.3 结构化日志的数学表示

设日志事件为一个元组:
L = ( t , l , m , C ) L = (t, l, m, C)L=(t,l,m,C)

其中:

  • t tt:时间戳
  • l ll:日志级别
  • m mm:消息模板
  • C CC:上下文键值对集合,C = { k 1 : v 1 , k 2 : v 2 , . . . , k n : v n } C = \{k_1:v_1, k_2:v_2, ..., k_n:v_n\}C={k1:v1,k2:v2,...,kn:vn}

结构化日志可以表示为:
L s t r u c t = JSON ( { t i m e s t a m p : t , l e v e l : l , m e s s a g e : m } ∪ C ) L_{struct} = \text{JSON}(\{timestamp: t, level: l, message: m\} \cup C)Lstruct=JSON({timestamp:t,level:l,message:m}C)

日志查询可以形式化为:
Query ( L s t r u c t , Φ ) = { L ∣ ∀ ( k , v ) ∈ Φ , L . C [ k ] = v } \text{Query}(L_{struct}, \Phi) = \{L | \forall (k,v) \in \Phi, L.C[k] = v\}Query(Lstruct,Φ)={L∣∀(k,v)Φ,L.C[k]=v}

其中Φ \PhiΦ是查询条件的键值对集合。

3. 日志系统架构设计

3.1 现代日志系统架构

消费层
存储层
处理层
收集层
应用层
监控告警
分析平台
合规审计
搜索界面
实时存储
长期存储
冷存储
日志解析器
日志丰富器
日志过滤器
日志代理
消息队列
结构化日志
应用服务A
结构化日志
应用服务B
结构化日志
应用服务C

3.2 日志处理流水线

典型的日志处理流水线包含以下阶段:

  1. 收集:从应用收集原始日志
  2. 解析:提取结构化字段
  3. 丰富:添加元数据(主机名、环境等)
  4. 过滤:移除敏感信息或无用数据
  5. 转换:格式转换和标准化
  6. 路由:根据规则分发到不同目的地
  7. 存储:持久化存储
  8. 索引:建立快速检索索引

3.3 分布式日志追踪

在微服务架构中,分布式追踪是结构化日志的关键组成部分。使用以下字段实现追踪:

  • trace_id:整个请求链路的唯一标识
  • span_id:单个操作段的标识
  • parent_span_id:父操作的标识
  • service_name:服务名称
  • operation_name:操作名称

追踪系统的数学表示:
设请求R RR经过n nn个服务,则:
T ( R ) = { S 1 , S 2 , . . . , S n } T(R) = \{S_1, S_2, ..., S_n\}T(R)={S1,S2,...,Sn}

每个服务操作S i S_iSi包含:
S i = ( t s t a r t , t e n d , trace_id , span_id i , parent_span_id i , metadata i ) S_i = (t_{start}, t_{end}, \text{trace\_id}, \text{span\_id}_i, \text{parent\_span\_id}_i, \text{metadata}_i)Si=(tstart,tend,trace_id,span_idi,parent_span_idi,metadatai)

请求总耗时:
Δ t = max ⁡ ( t e n d ) − min ⁡ ( t s t a r t ) \Delta t = \max(t_{end}) - \min(t_{start})Δt=max(tend)min(tstart)

4. Python结构化日志实现

4.1 基础结构化日志框架

""" 结构化日志系统实现 设计原则: 1. 结构化优先:所有日志输出为结构化格式 2. 上下文感知:自动捕获和传递上下文 3. 性能友好:异步处理,最小化性能影响 4. 可扩展性:支持自定义处理器和格式器 5. 安全性:内置敏感信息过滤 """importjsonimportloggingimportsysimporttimeimportuuidimportinspectimportthreadingfromtypingimportDict,Any,Optional,List,Union,CallablefromdatetimeimportdatetimefromenumimportEnumfromdataclassesimportdataclass,field,asdictfromabcimportABC,abstractmethodfromqueueimportQueue,Emptyfromconcurrent.futuresimportThreadPoolExecutorfrompathlibimportPathimporttracebackimporthashlibimportzlibfromcollectionsimportdefaultdict# 类型别名LogData=Dict[str,Any]ContextDict=Dict[str,Any]classLogLevel(Enum):"""日志级别枚举"""TRACE=0# 最详细的跟踪信息DEBUG=1# 调试信息INFO=2# 常规信息WARN=3# 警告信息ERROR=4# 错误信息FATAL=5# 严重错误@classmethoddeffrom_string(cls,level_str:str)->'LogLevel':"""从字符串转换日志级别"""level_map={'trace':cls.TRACE,'debug':cls.DEBUG,'info':cls.INFO,'warn':cls.WARN,'warning':cls.WARN,'error':cls.ERROR,'fatal':cls.FATAL,'critical':cls.FATAL}returnlevel_map.get(level_str.lower(),cls.INFO)@classmethoddefto_standard_level(cls,level:'LogLevel')->int:"""转换为标准logging级别"""mapping={cls.TRACE:5,# 低于DEBUGcls.DEBUG:logging.DEBUG,cls.INFO:logging.INFO,cls.WARN:logging.WARNING,cls.ERROR:logging.ERROR,cls.FATAL:logging.CRITICAL}returnmapping[level]@dataclassclassLogRecord:"""结构化日志记录"""# 基础字段timestamp:strlevel:strmessage:strlogger_name:str# 上下文字段trace_id:Optional[str]=Nonespan_id:Optional[str]=Nonerequest_id:Optional[str]=Noneuser_id:Optional[str]=Nonesession_id:Optional[str]=Nonecorrelation_id:Optional[str]=None# 执行上下文filename:Optional[str]=Nonefunction:Optional[str]=Noneline_no:Optional[int]=Nonethread_id:Optional[int]=Nonethread_name:Optional[str]=Noneprocess_id:Optional[int]=None# 应用程序上下文app_name:Optional[str]=Noneapp_version:Optional[str]=Noneenvironment:Optional[str]=Nonehostname:Optional[str]=Noneservice_name:Optional[str]=None# 性能指标duration_ms:Optional[float]=Nonememory_mb:Optional[float]=Nonecpu_percent:Optional[float]=None# 自定义字段extra:Dict[str,Any]=field(default_factory=dict)# 错误信息error_type:Optional[str]=Noneerror_message:Optional[str]=Nonestack_trace:Optional[str]=Nonedefto_dict(self)->Dict[str,Any]:"""转换为字典"""result=asdict(self)# 移除None值以减小体积return{k:vfork,vinresult.items()ifvisnotNone}defto_json(self,indent:Optional[int]=None)->str:"""转换为JSON字符串"""returnjson.dumps(self.to_dict(),indent=indent,ensure_ascii=False)defget_field_hash(self)->str:"""获取字段内容的哈希值(用于去重)"""# 排除一些动态字段excluded_fields={'timestamp','duration_ms','memory_mb','cpu_percent'}data={k:vfork,vinself.to_dict().items()ifknotinexcluded_fieldsandvisnotNone}content=json.dumps(data,sort_keys=True,ensure_ascii=False)returnhashlib.md5(content.encode()).hexdigest()defis_similar_to(self,other:'LogRecord',threshold:float=0.9)->bool:"""判断两个日志记录是否相似(用于去重)"""ifself.level!=other.level:returnFalse# 计算消息相似度(简化的编辑距离)fromdifflibimportSequenceMatcher message_similarity=SequenceMatcher(None,self.message,other.message).ratio()returnmessage_similarity>=thresholdclassLogContext:"""日志上下文管理器"""def__init__(self):# 线程本地存储self._local=threading.local()self._global_context={}self._context_stack=[]@propertydefcurrent(self)->Dict[str,Any]:"""获取当前上下文"""ifnothasattr(self._local,'context'):self._local.context={}returnself._local.context@current.setterdefcurrent(self,context:Dict[str,Any]):"""设置当前上下文"""self._local.context=contextdefget(self,key:str,default:Any=None)->Any:"""获取上下文值"""returnself.current.get(key,self._global_context.get(key,default))defset(self,key:str,value:Any,global_scope:bool=False):"""设置上下文值"""ifglobal_scope:self._global_context[key]=valueelse:self.current[key]=valuedefupdate(self,data:Dict[str,Any],global_scope:bool=False):"""批量更新上下文"""ifglobal_scope:self._global_context.update(data)else:self.current.update(data)defclear(self):"""清除当前线程上下文"""ifhasattr(self._local,'context'):self._local.context.clear()defpush_context(self,context:Dict[str,Any]):"""压入新的上下文层"""ifnothasattr(self._local,'context_stack'):self._local.context_stack=[]# 保存当前上下文current_copy=self.current.copy()self._local.context_stack.append(current_copy)# 更新为新上下文(合并)new_context=current_copy.copy()new_context.update(context)self.current=new_contextdefpop_context(self)->Dict[str,Any]:"""弹出上下文层"""ifnothasattr(self._local,'context_stack')ornotself._local.context_stack:old_context=self.current.copy()self.clear()returnold_context old_context=self.current self.current=self._local.context_stack.pop()returnold_contextdefcontext_manager(self,**kwargs):"""上下文管理器"""returnLogContextManager(self,kwargs)defget_all_context(self)->Dict[str,Any]:"""获取所有上下文(包括全局)"""result=self._global_context.copy()result.update(self.current)returnresultclassLogContextManager:"""上下文管理器"""def__init__(self,log_context:LogContext,context_data:Dict[str,Any]):self.log_context=log_context self.context_data=context_datadef__enter__(self):self.log_context.push_context(self.context_data)returnselfdef__exit__(self,exc_type,exc_val,exc_tb):self.log_context.pop_context()classStructuredFormatter(ABC):"""结构化日志格式化器抽象基类"""@abstractmethoddefformat(self,record:LogRecord)->str:"""格式化日志记录"""passclassJSONFormatter(StructuredFormatter):"""JSON格式化器"""def__init__(self,indent:Optional[int]=None,ensure_ascii:bool=False,sort_keys:bool=False,include_metadata:bool=True):self.indent=indent self.ensure_ascii=ensure_ascii self.sort_keys=sort_keys self.include_metadata=include_metadatadefformat(self,record:LogRecord)->str:"""格式化为JSON"""data=record.to_dict()# 添加格式化元数据ifself.include_metadata:data['_metadata']={'format_version':'1.0','formatter':'json','timestamp_ns':time.time_ns()}returnjson.dumps(data,indent=self.indent,ensure_ascii=self.ensure_ascii,sort_keys=self.sort_keys)classNDJSONFormatter(StructuredFormatter):"""NDJSON格式化器(每行一个JSON)"""def__init__(self,**kwargs):self.json_formatter=JSONFormatter(**kwargs)defformat(self,record:LogRecord)->str:"""格式化为NDJSON"""returnself.json_formatter.format(record)classLogFilter(ABC):"""日志过滤器抽象基类"""@abstractmethoddeffilter(self,record:LogRecord)->bool:"""过滤日志记录,返回True表示保留"""passclassLevelFilter(LogFilter):"""级别过滤器"""def__init__(self,min_level:LogLevel):self.min_level=min_leveldeffilter(self,record:LogRecord)->bool:"""根据级别过滤"""record_level=LogLevel.from_string(record.level)returnrecord_level.value>=self.min_level.valueclassRateLimitFilter(LogFilter):"""速率限制过滤器"""def__init__(self,max_per_second:int=10,window_seconds:int=1):self.max_per_second=max_per_second self.window_seconds=window_seconds self.log_counts=defaultdict(int)self.window_start=time.time()deffilter(self,record:LogRecord)->bool:"""速率限制"""current_time=time.time()# 检查是否需要重置窗口ifcurrent_time-self.window_start>=self.window_seconds:self.log_counts.clear()self.window_start=current_time# 获取日志哈希作为键log_key=record.get_field_hash()current_count=self.log_counts[log_key]ifcurrent_count<self.max_per_second:self.log_counts[log_key]=current_count+1returnTruereturnFalseclassSensitiveDataFilter(LogFilter):"""敏感数据过滤器"""def__init__(self):# 敏感数据模式(可以扩展)self.sensitive_patterns=[r'(?i)(password|passwd|pwd)[=:]\s*["\']?([^"\'\s]+)["\']?',r'(?i)(api[_-]?key|secret[_-]?key)[=:]\s*["\']?([^"\'\s]+)["\']?',r'(?i)(token)[=:]\s*["\']?([^"\'\s]+)["\']?',r'(?i)(credit[_-]?card|cc)[=:]\s*["\']?(\d[ -]*?){13,16}["\']?',r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b',# 电话号码r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',# 邮箱]self.compiled_patterns=[re.compile(pattern)forpatterninself.sensitive_patterns]deffilter(self,record:LogRecord)->bool:"""过滤敏感信息"""# 对消息进行脱敏record.message=self._mask_sensitive_data(record.message)# 对extra字段进行脱敏forkey,valueinrecord.extra.items():ifisinstance(value,str):record.extra[key]=self._mask_sensitive_data(value)returnTruedef_mask_sensitive_data(self,text:str)->str:"""脱敏文本中的敏感信息"""ifnotisinstance(text,str):returntext masked_text=textforpatterninself.compiled_patterns:masked_text=pattern.sub(self._mask_replacer,masked_text)returnmasked_textdef_mask_replacer(self,match)->str:"""替换匹配的敏感信息"""full_match=match.group(0)# 根据匹配内容决定脱敏策略if'@'infull_match:# 邮箱parts=full_match.split('@')iflen(parts[0])>2:returnparts[0][:2]+'***@'+parts[1]else:return'***@'+parts[1]elifany(keywordinfull_match.lower()forkeywordin['password','passwd','pwd']):return'password=***'elifany(keywordinfull_match.lower()forkeywordin['key','token','secret']):returnmatch.group(1)+'=***'elifre.match(r'\d',full_match.replace('-','').replace(' ','')):# 数字类型(信用卡、电话等)digits=re.sub(r'[^\d]','',full_match)if10<=len(digits)<=16:returndigits[:4]+'*'*(len(digits)-8)+digits[-4:]return'***'

4.2 高级日志处理器

classLogHandler(ABC):"""日志处理器抽象基类"""def__init__(self,level:LogLevel=LogLevel.INFO,formatter:Optional[StructuredFormatter]=None,filters:Optional[List[LogFilter]]=None):self.level=level self.formatter=formatterorJSONFormatter()self.filters=filtersor[]# 性能统计self.processed_count=0self.dropped_count=0self.start_time=time.time()@abstractmethoddefemit(self,record:LogRecord):"""输出日志记录"""passdefhandle(self,record:LogRecord)->bool:"""处理日志记录"""# 检查级别record_level=LogLevel.from_string(record.level)ifrecord_level.value<self.level.value:self.dropped_count+=1returnFalse# 应用过滤器forfilter_objinself.filters:ifnotfilter_obj.filter(record):self.dropped_count+=1returnFalse# 格式化formatted=self.formatter.format(record)# 输出try:self.emit(record)self.processed_count+=1returnTrueexceptExceptionase:# 处理器错误处理print(f"日志处理器错误:{e}")self.dropped_count+=1returnFalsedefget_stats(self)->Dict[str,Any]:"""获取处理器统计信息"""uptime=time.time()-self.start_timereturn{'processed':self.processed_count,'dropped':self.dropped_count,'uptime_seconds':uptime,'rate_per_second':self.processed_count/max(uptime,0.001),'handler_type':self.__class__.__name__}classConsoleHandler(LogHandler):"""控制台处理器"""def__init__(self,level:LogLevel=LogLevel.INFO,formatter:Optional[StructuredFormatter]=None,output_stream:Any=sys.stdout,use_colors:bool=True):super().__init__(level,formatter)self.output_stream=output_stream self.use_colors=use_colors# 颜色映射self.color_map={'TRACE':'\033[90m',# 灰色'DEBUG':'\033[36m',# 青色'INFO':'\033[32m',# 绿色'WARN':'\033[33m',# 黄色'ERROR':'\033[31m',# 红色'FATAL':'\033[41m\033[37m',# 红底白字'RESET':'\033[0m'# 重置}defemit(self,record:LogRecord):"""输出到控制台"""formatted=self.formatter.format(record)ifself.use_colors:color=self.color_map.get(record.level.upper(),'')reset=self.color_map['RESET']output=f"{color}{formatted}{reset}"else:output=formattedprint(output,file=self.output_stream)classFileHandler(LogHandler):"""文件处理器"""def__init__(self,filename:Union[str,Path],level:LogLevel=LogLevel.INFO,formatter:Optional[StructuredFormatter]=None,mode:str='a',encoding:str='utf-8',buffering:int=1# 行缓冲):super().__init__(level,formatter)self.filename=Path(filename)self.mode=mode self.encoding=encoding self.buffering=buffering# 确保目录存在self.filename.parent.mkdir(parents=True,exist_ok=True)# 打开文件self._open_file()def_open_file(self):"""打开文件"""self.file=open(self.filename,mode=self.mode,encoding=self.encoding,buffering=self.buffering)defemit(self,record:LogRecord):"""输出到文件"""formatted=self.formatter.format(record)self.file.write(formatted+'\n')self.file.flush()defclose(self):"""关闭文件"""ifhasattr(self,'file')andself.file:self.file.close()defrotate(self,max_size_mb:float=100,backup_count:int=5):"""日志轮转"""ifnotself.filename.exists():returnfile_size_mb=self.filename.stat().st_size/(1024*1024)iffile_size_mb<max_size_mb:return# 关闭当前文件self.close()# 重命名旧文件foriinrange(backup_count-1,0,-1):old_file=self.filename.with_suffix(f".{i}.log")new_file=self.filename.with_suffix(f".{i+1}.log")ifold_file.exists():old_file.rename(new_file)# 重命名当前文件current_backup=self.filename.with_suffix(".1.log")self.filename.rename(current_backup)# 重新打开文件self._open_file()classRotatingFileHandler(FileHandler):"""自动轮转的文件处理器"""def__init__(self,filename:Union[str,Path],level:LogLevel=LogLevel.INFO,formatter:Optional[StructuredFormatter]=None,max_size_mb:float=100,backup_count:int=5,check_interval:int=10# 检查间隔(处理的日志条数)):super().__init__(filename,level,formatter)self.max_size_mb=max_size_mb self.backup_count=backup_count self.check_interval=check_interval self.processed_since_check=0defhandle(self,record:LogRecord)->bool:"""处理日志记录(添加轮转检查)"""self.processed_since_check+=1ifself.processed_since_check>=self.check_interval:self.rotate(self.max_size_mb,self.backup_count)self.processed_since_check=0returnsuper().handle(record)classAsyncHandler(LogHandler):"""异步处理器"""def__init__(self,base_handler:LogHandler,max_queue_size:int=10000,worker_count:int=1,drop_when_full:bool=False):super().__init__(base_handler.level,base_handler.formatter,base_handler.filters)self.base_handler=base_handler# 队列设置self.max_queue_size=max_queue_size self.queue=Queue(maxsize=max_queue_size)self.drop_when_full=drop_when_full# 工作线程self.worker_count=worker_count self.executor=ThreadPoolExecutor(max_workers=worker_count,thread_name_prefix="AsyncLogger")# 启动消费者self.running=Trueforiinrange(worker_count):self.executor.submit(self._worker_loop)defemit(self,record:LogRecord):"""异步处理日志记录"""try:ifself.drop_when_fullandself.queue.full():self.dropped_count+=1returnself.queue.put_nowait(record)exceptExceptionase:# 队列满或其他错误self.dropped_count+=1print(f"异步日志队列错误:{e}")def_worker_loop(self):"""工作线程循环"""whileself.running:try:# 阻塞获取日志记录(带超时)try:record=self.queue.get(timeout=1.0)exceptEmpty:continue# 使用基础处理器处理self.base_handler.handle(record)# 标记任务完成self.queue.task_done()exceptExceptionase:print(f"异步日志工作线程错误:{e}")defshutdown(self,timeout:float=5.0):"""关闭异步处理器"""self.running=False# 等待队列清空self.queue.join()# 关闭执行器self.executor.shutdown(wait=True,timeout=timeout)# 关闭基础处理器ifhasattr(self.base_handler,'close'):self.base_handler.close()defget_stats(self)->Dict[str,Any]:"""获取统计信息(包括队列信息)"""base_stats=super().get_stats()base_stats.update({'queue_size':self.queue.qsize(),'queue_max_size':self.max_queue_size,'queue_full':self.queue.full(),'worker_count':self.worker_count,'is_running':self.running,'base_handler_stats':self.base_handler.get_stats()})returnbase_statsclassBatchHandler(LogHandler):"""批量处理器"""def__init__(self,base_handler:LogHandler,batch_size:int=100,flush_interval:float=1.0,# 秒compression:bool=False):super().__init__(base_handler.level,base_handler.formatter,base_handler.filters)self.base_handler=base_handler self.batch_size=batch_size self.flush_interval=flush_interval self.compression=compression# 批处理缓冲区self.buffer:List[LogRecord]=[]self.last_flush_time=time.time()# 启动定时刷新线程self.flush_thread=threading.Thread(target=self._flush_loop,daemon=True)self.running=Trueself.flush_thread.start()defemit(self,record:LogRecord):"""添加到批处理缓冲区"""self.buffer.append(record)# 检查是否需要刷新if(len(self.buffer)>=self.batch_sizeor(time.time()-self.last_flush_time)>=self.flush_interval):self._flush_buffer()def_flush_buffer(self):"""刷新缓冲区"""ifnotself.buffer:return# 准备批量数据batch_records=self.buffer.copy()self.buffer.clear()try:# 批量处理ifself.compression:# 压缩批量数据batch_data=self._compress_batch(batch_records)# 这里需要基础处理器支持批量数据# 简化实现:逐个处理forrecordinbatch_records:self.base_handler.handle(record)else:forrecordinbatch_records:self.base_handler.handle(record)self.last_flush_time=time.time()exceptExceptionase:print(f"批量日志处理错误:{e}")# 错误处理:将记录放回缓冲区(避免丢失)self.buffer.extend(batch_records)def_compress_batch(self,records:List[LogRecord])->bytes:"""压缩批量数据"""batch_json=json.dumps([r.to_dict()forrinrecords])returnzlib.compress(batch_json.encode())def_flush_loop(self):"""定时刷新循环"""whileself.running:time.sleep(self.flush_interval)self._flush_buffer()defshutdown(self):"""关闭批量处理器"""self.running=Falseself._flush_buffer()# 最后一次刷新ifself.flush_thread.is_alive():self.flush_thread.join(timeout=2.0)ifhasattr(self.base_handler,'shutdown'):self.base_handler.shutdown()defget_stats(self)->Dict[str,Any]:"""获取统计信息"""base_stats=super().get_stats()base_stats.update({'buffer_size':len(self.buffer),'batch_size':self.batch_size,'flush_interval':self.flush_interval,'compression_enabled':self.compression,'base_handler_stats':self.base_handler.get_stats()})returnbase_stats

4.3 完整的日志系统

classStructuredLogger:"""结构化日志记录器"""def__init__(self,name:str,level:LogLevel=LogLevel.INFO,handlers:Optional[List[LogHandler]]=None,context:Optional[LogContext]=None,capture_stacktrace:bool=False,enable_performance_stats:bool=False):self.name=name self.level=level self.handlers=handlersor[]self.context=contextorLogContext()self.capture_stacktrace=capture_stacktrace self.enable_performance_stats=enable_performance_stats# 性能统计self.stats={'log_count':defaultdict(int),'last_log_time':None,'total_log_time_ns':0,'error_count':0}# 缓存调用者信息(性能优化)self._caller_cache={}def_get_caller_info(self,depth:int=3)->Dict[str,Any]:"""获取调用者信息"""try:# 使用缓存提高性能cache_key=threading.get_ident()ifcache_keyinself._caller_cache:returnself._caller_cache[cache_key]# 获取调用堆栈frame=inspect.currentframe()for_inrange(depth):ifframeisNone:breakframe=frame.f_backifframeisNone:return{}# 提取信息info={'filename':frame.f_code.co_filename,'function':frame.f_code.co_name,'line_no':frame.f_lineno,'module':frame.f_globals.get('__name__','')}# 缓存self._caller_cache[cache_key]=inforeturninfoexceptException:return{}finally:# 清理引用delframedef_create_record(self,level:LogLevel,message:str,extra:Optional[Dict[str,Any]]=None,error_info:Optional[Dict[str,Any]]=None)->LogRecord:"""创建日志记录"""# 基础时间now=datetime.utcnow()# 调用者信息caller_info=self._get_caller_info()ifself.capture_stacktraceelse{}# 构建记录record=LogRecord(timestamp=now.isoformat()+'Z',level=level.name,message=message,logger_name=self.name,**caller_info)# 添加线程信息record.thread_id=threading.get_ident()record.thread_name=threading.current_thread().name record.process_id=os.getpid()# 添加上下文context_data=self.context.get_all_context()forkey,valueincontext_data.items():ifhasattr(record,key):setattr(record,key,value)else:record.extra[key]=value# 添加额外字段ifextra:record.extra.update(extra)# 添加错误信息iferror_info:record.error_type=error_info.get('type')record.error_message=error_info.get('message')record.stack_trace=error_info.get('stack_trace')returnrecorddeflog(self,level:LogLevel,message:str,extra:Optional[Dict[str,Any]]=None,**kwargs):"""记录日志"""start_time=time.time_ns()ifself.enable_performance_statselse0try:# 检查级别iflevel.value<self.level.value:return# 合并额外字段all_extra=extra.copy()ifextraelse{}all_extra.update(kwargs)# 错误信息处理error_info=Noneif'exc_info'inkwargsandkwargs['exc_info']:exc_type,exc_value,exc_traceback=kwargs['exc_info']ifexc_type:error_info={'type':exc_type.__name__,'message':str(exc_value),'stack_trace':traceback.format_exc()}# 创建记录record=self._create_record(level,message,all_extra,error_info)# 处理记录forhandlerinself.handlers:handler.handle(record)# 更新统计self.stats['log_count'][level.name]+=1self.stats['last_log_time']=record.timestampiflevel==LogLevel.ERRORorlevel==LogLevel.FATAL:self.stats['error_count']+=1exceptExceptionase:# 记录器内部错误处理print(f"日志记录错误:{e}")self.stats['error_count']+=1finally:# 性能统计ifself.enable_performance_statsandstart_time:duration_ns=time.time_ns()-start_time self.stats['total_log_time_ns']+=duration_ns# 便捷方法deftrace(self,message:str,**kwargs):"""记录TRACE级别日志"""self.log(LogLevel.TRACE,message,**kwargs)defdebug(self,message:str,**kwargs):"""记录DEBUG级别日志"""self.log(LogLevel.DEBUG,message,**kwargs)definfo(self,message:str,**kwargs):"""记录INFO级别日志"""self.log(LogLevel.INFO,message,**kwargs)defwarn(self,message:str,**kwargs):"""记录WARN级别日志"""self.log(LogLevel.WARN,message,**kwargs)deferror(self,message:str,**kwargs):"""记录ERROR级别日志"""self.log(LogLevel.ERROR,message,**kwargs)deffatal(self,message:str,**kwargs):"""记录FATAL级别日志"""self.log(LogLevel.FATAL,message,**kwargs)defexception(self,message:str,exc:Optional[Exception]=None,**kwargs):"""记录异常"""ifexcisNone:# 捕获当前异常exc_info=sys.exc_info()else:exc_info=(type(exc),exc,exc.__traceback__)kwargs['exc_info']=exc_info self.log(LogLevel.ERROR,message,**kwargs)defwith_context(self,**kwargs):"""添加上下文"""returnLogContextManager(self.context,kwargs)defadd_handler(self,handler:LogHandler):"""添加处理器"""self.handlers.append(handler)defremove_handler(self,handler:LogHandler):"""移除处理器"""ifhandlerinself.handlers:self.handlers.remove(handler)defget_stats(self)->Dict[str,Any]:"""获取统计信息"""handler_stats=[h.get_stats()forhinself.handlers]stats={'logger_name':self.name,'level':self.level.name,'handler_count':len(self.handlers),'log_counts':dict(self.stats['log_count']),'error_count':self.stats['error_count'],'handler_stats':handler_stats}ifself.enable_performance_stats:total_logs=sum(self.stats['log_count'].values())iftotal_logs>0:avg_time_ns=self.stats['total_log_time_ns']/total_logs stats['performance']={'total_time_ns':self.stats['total_log_time_ns'],'avg_time_ns':avg_time_ns,'avg_time_ms':avg_time_ns/1_000_000}returnstatsclassLogManager:"""日志管理器"""_instance=None_lock=threading.Lock()def__new__(cls):withcls._lock:ifcls._instanceisNone:cls._instance=super().__new__(cls)cls._instance._initialized=Falsereturncls._instancedef__init__(self):ifself._initialized:returnself._loggers:Dict[str,StructuredLogger]={}self._default_config:Dict[str,Any]={}self._global_context=LogContext()self._initialized=True# 默认配置self._setup_defaults()def_setup_defaults(self):"""设置默认配置"""self._default_config={'level':LogLevel.INFO,'handlers':[ConsoleHandler(level=LogLevel.INFO,formatter=JSONFormatter(indent=None))],'capture_stacktrace':False,'enable_performance_stats':False}# 设置全局上下文importsocket self._global_context.set('hostname',socket.gethostname(),global_scope=True)self._global_context.set('process_id',os.getpid(),global_scope=True)defget_logger(self,name:str,level:Optional[LogLevel]=None,handlers:Optional[List[LogHandler]]=None,capture_stacktrace:Optional[bool]=None,enable_performance_stats:Optional[bool]=None)->StructuredLogger:"""获取或创建日志记录器"""ifnameinself._loggers:returnself._loggers[name]# 使用配置或默认值config=self._default_config.copy()iflevelisnotNone:config['level']=levelifhandlersisnotNone:config['handlers']=handlersifcapture_stacktraceisnotNone:config['capture_stacktrace']=capture_stacktraceifenable_performance_statsisnotNone:config['enable_performance_stats']=enable_performance_stats# 创建日志记录器logger=StructuredLogger(name=name,context=self._global_context,**config)self._loggers[name]=loggerreturnloggerdefconfigure(self,config:Dict[str,Any],name:Optional[str]=None):"""配置日志记录器"""ifname:# 配置特定记录器ifnameinself._loggers:logger=self._loggers[name]if'level'inconfig:logger.level=LogLevel.from_string(config['level'])if'handlers'inconfig:# 这里需要根据配置创建处理器logger.handlers=self._create_handlers_from_config(config['handlers'])if'capture_stacktrace'inconfig:logger.capture_stacktrace=config['capture_stacktrace']if'enable_performance_stats'inconfig:logger.enable_performance_stats=config['enable_performance_stats']else:# 更新默认配置self._default_config.update(config)# 更新现有记录器forloggerinself._loggers.values():self.configure(config,logger.name)def_create_handlers_from_config(self,handlers_config:List[Dict])->List[LogHandler]:"""从配置创建处理器"""handlers=[]forhandler_configinhandlers_config:handler_type=handler_config.get('type','console')try:ifhandler_type=='console':handler=ConsoleHandler(level=LogLevel.from_string(handler_config.get('level','info')),formatter=self._create_formatter_from_config(handler_config.get('formatter',{})),use_colors=handler_config.get('use_colors',True))elifhandler_type=='file':handler=FileHandler(filename=handler_config['filename'],level=LogLevel.from_string(handler_config.get('level','info')),formatter=self._create_formatter_from_config(handler_config.get('formatter',{})))elifhandler_type=='rotating_file':handler=RotatingFileHandler(filename=handler_config['filename'],level=LogLevel.from_string(handler_config.get('level','info')),formatter=self._create_formatter_from_config(handler_config.get('formatter',{})),max_size_mb=handler_config.get('max_size_mb',100),backup_count=handler_config.get('backup_count',5))elifhandler_type=='async':base_handler_config=handler_config.get('base_handler',{})base_handler=self._create_handlers_from_config([base_handler_config])[0]handler=AsyncHandler(base_handler=base_handler,max_queue_size=handler_config.get('max_queue_size',10000),worker_count=handler_config.get('worker_count',1),drop_when_full=handler_config.get('drop_when_full',False))else:raiseValueError(f"未知的处理器类型:{handler_type}")# 添加过滤器filters_config=handler_config.get('filters',[])forfilter_configinfilters_config:filter_type=filter_config.get('type','level')iffilter_type=='level':handler.filters.append(LevelFilter(LogLevel.from_string(filter_config.get('min_level','info'))))eliffilter_type=='rate_limit':handler.filters.append(RateLimitFilter(max_per_second=filter_config.get('max_per_second',10),window_seconds=filter_config.get('window_seconds',1)))eliffilter_type=='sensitive_data':handler.filters.append(SensitiveDataFilter())handlers.append(handler)exceptExceptionase:print(f"创建处理器失败{handler_type}:{e}")continuereturnhandlersdef_create_formatter_from_config(self,formatter_config:Dict)->StructuredFormatter:"""从配置创建格式化器"""formatter_type=formatter_config.get('type','json')ifformatter_type=='json':returnJSONFormatter(indent=formatter_config.get('indent'),ensure_ascii=formatter_config.get('ensure_ascii',False),sort_keys=formatter_config.get('sort_keys',False))elifformatter_type=='ndjson':returnNDJSONFormatter(indent=formatter_config.get('indent'),ensure_ascii=formatter_config.get('ensure_ascii',False),sort_keys=formatter_config.get('sort_keys',False))else:# 默认使用JSONreturnJSONFormatter()defset_global_context(self,**kwargs):"""设置全局上下文"""self._global_context.update(kwargs,global_scope=True)defget_global_context(self)->Dict[str,Any]:"""获取全局上下文"""returnself._global_context.get_all_context()defshutdown(self):"""关闭所有日志记录器"""forloggerinself._loggers.values():forhandlerinlogger.handlers:ifhasattr(handler,'shutdown'):handler.shutdown()elifhasattr(handler,'close'):handler.close()self._loggers.clear()defget_all_stats(self)->Dict[str,Any]:"""获取所有统计信息"""logger_stats={}total_logs=0total_errors=0forname,loggerinself._loggers.items():stats=logger.get_stats()logger_stats[name]=stats total_logs+=sum(stats['log_counts'].values())total_errors+=stats['error_count']return{'logger_count':len(self._loggers),'total_logs':total_logs,'total_errors':total_errors,'loggers':logger_stats,'global_context':self.get_global_context()}

5. 高级特性实现

5.1 分布式追踪集成

classDistributedTraceContext:"""分布式追踪上下文"""def__init__(self):self._local=threading.local()@propertydefcurrent(self)->Dict[str,Any]:"""获取当前追踪上下文"""ifnothasattr(self._local,'trace_context'):self._local.trace_context=self._generate_new_context()returnself._local.trace_contextdef_generate_new_context(self)->Dict[str,Any]:"""生成新的追踪上下文"""return{'trace_id':self._generate_trace_id(),'span_id':self._generate_span_id(),'parent_span_id':None,'sampled':True,'flags':0}def_generate_trace_id(self)->str:"""生成追踪ID"""returnuuid.uuid4().hexdef_generate_span_id(self)->str:"""生成跨度ID"""returnuuid.uuid4().hex[:16]defstart_span(self,name:str,**attributes)->'Span':"""开始新的跨度"""parent_context=self.current.copy()new_context=parent_context.copy()new_context['span_id']=self._generate_span_id()new_context['parent_span_id']=parent_context['span_id']new_context['span_name']=name new_context['start_time']=time.time_ns()new_context['attributes']=attributes# 保存父上下文ifnothasattr(self._local,'trace_stack'):self._local.trace_stack=[]self._local.trace_stack.append(parent_context)# 设置新上下文self._local.trace_context=new_contextreturnSpan(self,new_context)defend_span(self,context:Dict[str,Any],status:str="OK",**attributes):"""结束跨度"""ifnothasattr(self._local,'trace_stack')ornotself._local.trace_stack:return# 计算持续时间end_time=time.time_ns()start_time=context.get('start_time',end_time)duration_ns=end_time-start_time# 创建跨度记录span_record={'trace_id':context.get('trace_id'),'span_id':context.get('span_id'),'parent_span_id':context.get('parent_span_id'),'name':context.get('span_name','unknown'),'start_time':start_time,'end_time':end_time,'duration_ns':duration_ns,'status':status,'attributes':{**context.get('attributes',{}),**attributes}}# 恢复父上下文self._local.trace_context=self._local.trace_stack.pop()returnspan_recorddefget_current_span_id(self)->Optional[str]:"""获取当前跨度ID"""returnself.current.get('span_id')defget_current_trace_id(self)->Optional[str]:"""获取当前追踪ID"""returnself.current.get('trace_id')classSpan:"""追踪跨度"""def__init__(self,tracer:DistributedTraceContext,context:Dict[str,Any]):self.tracer=tracer self.context=contextdef__enter__(self):returnselfdef__exit__(self,exc_type,exc_val,exc_tb):status="ERROR"ifexc_typeelse"OK"self.tracer.end_span(self.context,status)defset_attribute(self,key:str,value:Any):"""设置跨度属性"""if'attributes'notinself.context:self.context['attributes']={}self.context['attributes'][key]=valuedefset_status(self,status:str):"""设置跨度状态"""self.context['status']=statusclassTracingLogger(StructuredLogger):"""集成追踪的日志记录器"""def__init__(self,name:str,tracer:Optional[DistributedTraceContext]=None,**kwargs):super().__init__(name,**kwargs)self.tracer=tracerorDistributedTraceContext()# 自动添加上下文self.context.set('tracer',self.tracer)def_create_record(self,*args,**kwargs)->LogRecord:"""创建记录(添加追踪信息)"""record=super()._create_record(*args,**kwargs)# 添加追踪信息record.trace_id=self.tracer.get_current_trace_id()record.span_id=self.tracer.get_current_span_id()returnrecorddeftrace_span(self,name:str,**attributes):"""创建追踪跨度上下文管理器"""returnself.tracer.start_span(name,**attributes)deflog_with_span(self,level:LogLevel,message:str,span_name:Optional[str]=None,**kwargs):"""在追踪跨度中记录日志"""ifspan_name:# 创建新跨度withself.tracer.start_span(span_name):self.log(level,message,**kwargs)else:# 使用当前跨度self.log(level,message,**kwargs)

5.2 性能监控集成

classPerformanceMonitor:"""性能监控器"""def__init__(self,logger:StructuredLogger):self.logger=logger self.metrics=defaultdict(list)self.thresholds={}defmeasure(self,operation:str):"""测量操作性能"""returnPerformanceTimer(self,operation)defrecord_metric(self,name:str,value:float,unit:str="ms",tags:Optional[Dict[str,str]]=None):"""记录性能指标"""timestamp=time.time_ns()metric_record={'name':name,'value':value,'unit':unit,'timestamp':timestamp,'tags':tagsor{}}# 存储指标self.metrics[name].append(metric_record)# 检查阈值ifnameinself.thresholds:threshold=self.thresholds[name]ifvalue>threshold:self.logger.warn(f"性能阈值超过:{name}={value}{unit}>{threshold}{unit}",metric=metric_record)# 记录指标日志self.logger.debug(f"性能指标:{name}",metric=metric_record,extra={'metric_type':'performance'})returnmetric_recorddefset_threshold(self,metric_name:str,threshold:float):"""设置性能阈值"""self.thresholds[metric_name]=thresholddefget_statistics(self,metric_name:str)->Dict[str,float]:"""获取统计信息"""records=self.metrics.get(metric_name,[])ifnotrecords:return{}values=[r['value']forrinrecords]return{'count':len(values),'mean':sum(values)/len(values),'min':min(values),'max':max(values),'p50':self._percentile(values,50),'p95':self._percentile(values,95),'p99':self._percentile(values,99)}def_percentile(self,values:List[float],p:float)->float:"""计算百分位数"""ifnotvalues:return0sorted_values=sorted(values)k=(len(sorted_values)-1)*(p/100)f=int(k)c=k-fiff+1<len(sorted_values):returnsorted_values[f]+c*(sorted_values[f+1]-sorted_values[f])else:returnsorted_values[f]defreport_summary(self):"""报告性能摘要"""summary={}formetric_nameinself.metrics:stats=self.get_statistics(metric_name)summary[metric_name]=stats self.logger.info("性能监控摘要",performance_summary=summary,extra={'report_type':'performance_summary'})returnsummaryclassPerformanceTimer:"""性能计时器"""def__init__(self,monitor:PerformanceMonitor,operation:str):self.monitor=monitor self.operation=operation self.start_time=Noneself.tags={}def__enter__(self):self.start_time=time.time_ns()returnselfdef__exit__(self,exc_type,exc_val,exc_tb):ifself.start_timeisNone:returnend_time=time.time_ns()duration_ns=end_time-self.start_time duration_ms=duration_ns/1_000_000 self.monitor.record_metric(name=self.operation,value=duration_ms,unit="ms",tags=self.tags)defadd_tag(self,key:str,value:str):"""添加标签"""self.tags[key]=valuereturnself

5.3 日志采样与聚合

classLogSampler:"""日志采样器"""def__init__(self,base_logger:StructuredLogger,sample_rate:float=1.0,# 采样率 0.0-1.0adaptive_sampling:bool=False,min_sample_rate:float=0.01,max_sample_rate:float=1.0):self.base_logger=base_logger self.sample_rate=sample_rate self.adaptive_sampling=adaptive_sampling self.min_sample_rate=min_sample_rate self.max_sample_rate=max_sample_rate# 采样统计self.sampled_count=0self.total_count=0# 自适应采样状态self.current_rate=sample_rate self.last_adjust_time=time.time()defshould_sample(self,level:LogLevel)->bool:"""决定是否采样"""self.total_count+=1# 高等级日志总是采样iflevelin[LogLevel.ERROR,LogLevel.FATAL]:self.sampled_count+=1returnTrue# 计算当前采样率ifself.adaptive_sampling:self._adjust_sample_rate()# 随机采样importrandomifrandom.random()<=self.current_rate:self.sampled_count+=1returnTruereturnFalsedef_adjust_sample_rate(self):"""调整采样率"""current_time=time.time()# 每分钟调整一次ifcurrent_time-self.last_adjust_time<60:return# 计算当前实际采样率ifself.total_count==0:actual_rate=0else:actual_rate=self.sampled_count/self.total_count# 调整采样率target_rate=self.sample_rateifactual_rate<target_rate*0.8:# 采样不足,提高采样率self.current_rate=min(self.current_rate*1.2,self.max_sample_rate)elifactual_rate>target_rate*1.2:# 采样过多,降低采样率self.current_rate=max(self.current_rate*0.8,self.min_sample_rate)# 重置统计self.sampled_count=0self.total_count=0self.last_adjust_time=current_timedeflog(self,level:LogLevel,message:str,**kwargs):"""记录日志(带采样)"""ifself.should_sample(level):self.base_logger.log(level,message,**kwargs)classLogAggregator:"""日志聚合器"""def__init__(self,base_logger:StructuredLogger,aggregation_window:float=5.0,# 聚合窗口(秒)max_aggregation_count:int=1000# 最大聚合条数):self.base_logger=base_logger self.aggregation_window=aggregation_window self.max_aggregation_count=max_aggregation_count# 聚合缓冲区self.buffer:Dict[str,List[LogRecord]]=defaultdict(list)self.last_flush_time=time.time()# 启动定时刷新self.flush_thread=threading.Thread(target=self._flush_loop,daemon=True)self.running=Trueself.flush_thread.start()def_get_aggregation_key(self,record:LogRecord)->str:"""获取聚合键"""# 基于消息和级别聚合key_parts=[record.level,record.message,record.logger_name,str(record.error_type)ifrecord.error_typeelse"",]returnhashlib.md5("|".join(key_parts).encode()).hexdigest()deflog(self,level:LogLevel,message:str,**kwargs):"""记录日志(带聚合)"""# 创建记录但不立即发送record=self.base_logger._create_record(level,message,kwargs.get('extra'))# 添加到缓冲区aggregation_key=self._get_aggregation_key(record)self.buffer[aggregation_key].append(record)# 检查是否达到聚合上限total_count=sum(len(records)forrecordsinself.buffer.values())iftotal_count>=self.max_aggregation_count:self._flush_buffer()def_flush_buffer(self):"""刷新缓冲区"""ifnotself.buffer:returnflushed_records=[]foraggregation_key,recordsinself.buffer.items():ifnotrecords:continue# 取第一条记录作为模板template_record=records[0]# 创建聚合记录aggregated_record=LogRecord(timestamp=datetime.utcnow().isoformat()+'Z',level=template_record.level,message=template_record.message+f" (aggregated{len(records)}times)",logger_name=template_record.logger_name,extra={**template_record.extra,'aggregated_count':len(records),'aggregation_key':aggregation_key,'first_occurrence':records[0].timestamp,'last_occurrence':records[-1].timestamp})flushed_records.append(aggregated_record)# 发送聚合记录forrecordinflushed_records:self.base_logger._log_direct(record)# 清空缓冲区self.buffer.clear()self.last_flush_time=time.time()def_flush_loop(self):"""定时刷新循环"""whileself.running:time.sleep(self.aggregation_window)self._flush_buffer()defshutdown(self):"""关闭聚合器"""self.running=Falseself._flush_buffer()ifself.flush_thread.is_alive():self.flush_thread.join(timeout=2.0)

6. 配置与使用示例

6.1 配置管理系统

importyamlimporttomlfrompathlibimportPathclassLoggingConfig:"""日志配置管理器"""CONFIG_SCHEMA={'type':'object','properties':{'version':{'type':'string'},'defaults':{'type':'object','properties':{'level':{'type':'string','enum':['trace','debug','info','warn','error','fatal']},'capture_stacktrace':{'type':'boolean'},'enable_performance_stats':{'type':'boolean'}}},'loggers':{'type':'object','additionalProperties':{'type':'object','properties':{'level':{'type':'string','enum':['trace','debug','info','warn','error','fatal']},'handlers':{'type':'array','items':{'type':'string'}},'propagate':{'type':'boolean'}}}},'handlers':{'type':'object','additionalProperties':{'type':'object','properties':{'type':{'type':'string','enum':['console','file','rotating_file','async','batch']},'level':{'type':'string','enum':['trace','debug','info','warn','error','fatal']},'formatter':{'type':'string'},'filters':{'type':'array','items':{'type':'object','properties':{'type':{'type':'string','enum':['level','rate_limit','sensitive_data']},'max_per_second':{'type':'number','minimum':1},'window_seconds':{'type':'number','minimum':0.1}}}},'filename':{'type':'string'},'max_size_mb':{'type':'number','minimum':1},'backup_count':{'type':'integer','minimum':1},'max_queue_size':{'type':'integer','minimum':100},'worker_count':{'type':'integer','minimum':1},'drop_when_full':{'type':'boolean'},'batch_size':{'type':'integer','minimum':1},'flush_interval':{'type':'number','minimum':0.1},'compression':{'type':'boolean'},'use_colors':{'type':'boolean'}},'required':['type']}},'formatters':{'type':'object','additionalProperties':{'type':'object','properties':{'type':{'type':'string','enum':['json','ndjson']},'indent':{'type':['integer','null']},'ensure_ascii':{'type':'boolean'},'sort_keys':{'type':'boolean'}}}}},'required':['version']}def__init__(self,config_path:Optional[Union[str,Path]]=None):self.config={}self.config_path=Path(config_path)ifconfig_pathelseNoneifconfig_pathandPath(config_path).exists():self.load_config(config_path)else:self._load_default_config()def_load_default_config(self):"""加载默认配置"""self.config={'version':'1.0','defaults':{'level':'info','capture_stacktrace':False,'enable_performance_stats':False},'formatters':{'json':{'type':'json','indent':None,'ensure_ascii':False,'sort_keys':False},'json_pretty':{'type':'json','indent':2,'ensure_ascii':False,'sort_keys':True},'ndjson':{'type':'ndjson','indent':None,'ensure_ascii':False,'sort_keys':False}},'handlers':{'console':{'type':'console','level':'info','formatter':'json','use_colors':True},'console_pretty':{'type':'console','level':'info','formatter':'json_pretty','use_colors':True},'file_app':{'type':'file','level':'info','formatter':'ndjson','filename':'logs/app.log'},'file_error':{'type':'file','level':'error','formatter':'json_pretty','filename':'logs/error.log'},'async_console':{'type':'async','level':'info','base_handler':{'type':'console','formatter':'json'},'max_queue_size':10000,'worker_count':2,'drop_when_full':False}},'loggers':{'root':{'level':'info','handlers':['console'],'propagate':False},'app':{'level':'debug','handlers':['console_pretty','file_app'],'propagate':False},'app.error':{'level':'error','handlers':['file_error'],'propagate':True},'app.performance':{'level':'info','handlers':['async_console'],'propagate':False}}}defload_config(self,config_path:Union[str,Path]):"""加载配置文件"""config_path=Path(config_path)ifnotconfig_path.exists():raiseFileNotFoundError(f"配置文件不存在:{config_path}")# 根据文件扩展名确定格式suffix=config_path.suffix.lower()try:withopen(config_path,'r',encoding='utf-8')asf:content=f.read()ifsuffix=='.json':config=json.loads(content)elifsuffixin['.yaml','.yml']:config=yaml.safe_load(content)elifsuffix=='.toml':config=toml.loads(content)else:raiseValueError(f"不支持的配置文件格式:{suffix}")# 验证配置ifself.validate_config(config):self.config=config self.config_path=config_pathprint(f"配置文件加载成功:{config_path}")else:raiseValueError("配置文件验证失败")exceptExceptionase:print(f"配置文件加载失败:{e}")raisedefvalidate_config(self,config:Dict)->bool:"""验证配置"""# 简化验证 - 实际生产环境应该使用JSON Schemarequired_keys=['version','defaults','handlers','loggers']forkeyinrequired_keys:ifkeynotinconfig:print(f"配置缺少必需键:{key}")returnFalsereturnTruedefget_logger_config(self,logger_name:str)->Dict[str,Any]:"""获取日志记录器配置"""# 查找最具体的配置config=self.config.get('loggers',{}).get(logger_name)ifconfig:returnconfig# 查找父记录器配置parts=logger_name.split('.')foriinrange(len(parts)-1,0,-1):parent_name='.'.join(parts[:i])parent_config=self.config.get('loggers',{}).get(parent_name)ifparent_configandparent_config.get('propagate',False):returnparent_config# 返回根配置returnself.config.get('loggers',{}).get('root',{})defget_handler_config(self,handler_name:str)->Dict[str,Any]:"""获取处理器配置"""returnself.config.get('handlers',{}).get(handler_name,{})defget_formatter_config(self,formatter_name:str)->Dict[str,Any]:"""获取格式化器配置"""returnself.config.get('formatters',{}).get(formatter_name,{})defsave_config(self,config_path:Optional[Union[str,Path]]=None):"""保存配置"""save_path=Path(config_path)ifconfig_pathelseself.config_pathifnotsave_path:raiseValueError("未指定配置保存路径")# 确保目录存在save_path.parent.mkdir(parents=True,exist_ok=True)# 根据文件扩展名确定格式suffix=save_path.suffix.lower()try:withopen(save_path,'w',encoding='utf-8')asf:ifsuffix=='.json':json.dump(self.config,f,indent=2,ensure_ascii=False)elifsuffixin['.yaml','.yml']:yaml.dump(self.config,f,default_flow_style=False,allow_unicode=True)elifsuffix=='.toml':toml.dump(self.config,f)else:# 默认使用JSONjson.dump(self.config,f,indent=2,ensure_ascii=False)print(f"配置文件保存成功:{save_path}")exceptExceptionase:print(f"配置文件保存失败:{e}")raise

6.2 使用示例

deflogging_system_demo():"""日志系统演示"""print("="*60)print("结构化日志系统演示")print("="*60)# 1. 基础使用print("\n1. 基础使用")print("-"*40)# 获取日志管理器单例log_manager=LogManager()# 获取日志记录器logger=log_manager.get_logger("demo.app")# 记录不同级别的日志logger.trace("这是一个TRACE级别日志")logger.debug("这是一个DEBUG级别日志")logger.info("这是一个INFO级别日志",user="john",action="login")logger.warn("这是一个WARN级别日志")# 记录错误try:result=1/0exceptExceptionase:logger.error("除法计算错误",exc=e,dividend=1,divisor=0)# 2. 上下文管理print("\n2. 上下文管理")print("-"*40)# 添加上下文logger.info("没有上下文")withlogger.with_context(request_id="req123",user_id="user456"):logger.info("有请求上下文")withlogger.with_context(stage="processing"):logger.info("嵌套上下文")logger.info("回到父上下文")logger.info("上下文已清除")# 3. 性能监控print("\n3. 性能监控")print("-"*40)monitor=PerformanceMonitor(logger)# 测量操作性能withmonitor.measure("database_query")astimer:timer.add_tag("table","users")time.sleep(0.1)# 模拟数据库查询withmonitor.measure("api_call")astimer:timer.add_tag("endpoint","/api/users")time.sleep(0.05)# 模拟API调用# 记录自定义指标monitor.record_metric("memory_usage",125.5,unit="MB")monitor.record_metric("cpu_usage",15.2,unit="%")# 查看统计stats=monitor.get_statistics("database_query")print(f"数据库查询统计:{stats}")# 4. 分布式追踪print("\n4. 分布式追踪")print("-"*40)tracing_logger=TracingLogger("demo.tracing")# 在追踪上下文中记录日志withtracing_logger.trace_span("process_request")asspan:span.set_attribute("method","POST")span.set_attribute("path","/api/data")tracing_logger.info("开始处理请求")withtracing_logger.trace_span("validate_input"):tracing_logger.debug("验证输入数据")time.sleep(0.01)withtracing_logger.trace_span("process_data"):tracing_logger.debug("处理数据")time.sleep(0.02)tracing_logger.info("请求处理完成")# 5. 高级配置print("\n5. 高级配置")print("-"*40)# 创建自定义配置config=LoggingConfig()# 添加自定义处理器config.config['handlers']['custom_file']={'type':'rotating_file','level':'info','formatter':'ndjson','filename':'logs/custom.log','max_size_mb':10,'backup_count':3,'filters':[{'type':'rate_limit','max_per_second':100},{'type':'sensitive_data'}]}# 添加自定义记录器config.config['loggers']['custom']={'level':'debug','handlers':['custom_file'],'propagate':False}# 保存配置config.save_config("logs/logging_config.yaml")# 6. 日志采样print("\n6. 日志采样")print("-"*40)# 创建采样日志记录器base_logger=log_manager.get_logger("demo.sampling")sampler=LogSampler(base_logger,sample_rate=0.1)# 10%采样率# 记录大量日志foriinrange(100):sampler.log(LogLevel.INFO,f"日志消息{i}",iteration=i)print(f"采样统计:{sampler.sampled_count}/{sampler.total_count}")# 7. 聚合日志print("\n7. 日志聚合")print("-"*40)aggregator=LogAggregator(base_logger,aggregation_window=2.0)# 记录重复日志foriinrange(50):aggregator.log(LogLevel.INFO,"重复的日志消息")time.sleep(0.01)time.sleep(3)# 等待聚合# 8. 获取统计信息print("\n8. 系统统计")print("-"*40)stats=log_manager.get_all_stats()print(f"总日志记录器:{stats['logger_count']}")print(f"总日志条数:{stats['total_logs']}")forlogger_name,logger_statsinstats['loggers'].items():print(f"\n{logger_name}:")print(f" 日志统计:{logger_stats['log_counts']}")# 清理aggregator.shutdown()print("\n演示完成!")returnlog_managerdefproduction_logging_setup():"""生产环境日志配置"""# 创建生产配置config={'version':'1.0','defaults':{'level':'info','capture_stacktrace':True,'enable_performance_stats':True},'formatters':{'json':{'type':'json','indent':None,'ensure_ascii':False,'sort_keys':False}},'handlers':{'console':{'type':'console','level':'info','formatter':'json','use_colors':False# 生产环境通常不需要颜色},'app_file':{'type':'rotating_file','level':'info','formatter':'json','filename':'/var/log/app/app.log','max_size_mb':100,'backup_count':10},'error_file':{'type':'rotating_file','level':'error','formatter':'json','filename':'/var/log/app/error.log','max_size_mb':50,'backup_count':5},'async_app':{'type':'async','level':'info','base_handler':{'type':'rotating_file','filename':'/var/log/app/async.log','max_size_mb':100,'backup_count':10},'max_queue_size':50000,'worker_count':4,'drop_when_full':True}},'loggers':{'root':{'level':'warn','handlers':['console'],'propagate':False},'app':{'level':'info','handlers':['app_file','async_app'],'propagate':False},'app.api':{'level':'debug','handlers':['app_file'],'propagate':True},'app.error':{'level':'error','handlers':['error_file'],'propagate':True},'app.performance':{'level':'info','handlers':['async_app'],'propagate':False}}}# 初始化日志管理器log_manager=LogManager()# 应用配置log_manager.configure(config)# 设置全局上下文importsocket log_manager.set_global_context(app_name="production_app",app_version="1.0.0",environment="production",hostname=socket.gethostname(),region=os.environ.get("AWS_REGION","unknown"))returnlog_managerif__name__=="__main__":# 运行演示demo_manager=logging_system_demo()# 演示完成后关闭demo_manager.shutdown()

7. 测试与验证

7.1 单元测试

importpytestimporttempfileimportjsonimporttimefrompathlibimportPathclassTestStructuredLogger:"""结构化日志记录器测试"""@pytest.fixturedeftemp_log_file(self):"""创建临时日志文件"""withtempfile.NamedTemporaryFile(mode='w',suffix='.log',delete=False)asf:temp_file=f.nameyieldtemp_file# 清理Path(temp_file).unlink(missing_ok=True)@pytest.fixturedeftest_logger(self):"""创建测试日志记录器"""logger=StructuredLogger(name="test",level=LogLevel.DEBUG,handlers=[],capture_stacktrace=True)returnloggerdeftest_log_record_creation(self,test_logger):"""测试日志记录创建"""record=test_logger._create_record(LogLevel.INFO,"测试消息",extra={"key":"value"})assertisinstance(record,LogRecord)assertrecord.level=="INFO"assertrecord.message=="测试消息"assertrecord.logger_name=="test"assertrecord.extra["key"]=="value"# 检查时间戳格式assertrecord.timestamp.endswith('Z')# 检查调用者信息assertrecord.filenameisnotNoneassertrecord.functionisnotNoneassertrecord.line_noisnotNonedeftest_log_level_filtering(self):"""测试日志级别过滤"""# 创建记录器和处理器logger=StructuredLogger("test",level=LogLevel.WARN)# 使用模拟处理器classMockHandler(LogHandler):def__init__(self):super().__init__(level=LogLevel.INFO)self.records=[]defemit(self,record):self.records.append(record)handler=MockHandler()logger.add_handler(handler)# 记录不同级别的日志logger.debug("DEBUG消息")logger.info("INFO消息")logger.warn("WARN消息")logger.error("ERROR消息")# 检查过滤结果assertlen(handler.records)==2# WARN和ERRORassertall(r.levelin["WARN","ERROR"]forrinhandler.records)deftest_json_formatter(self):"""测试JSON格式化器"""formatter=JSONFormatter(indent=2)record=LogRecord(timestamp="2024-01-01T00:00:00Z",level="INFO",message="测试消息",logger_name="test")formatted=formatter.format(record)# 验证JSON格式parsed=json.loads(formatted)assertparsed["timestamp"]=="2024-01-01T00:00:00Z"assertparsed["level"]=="INFO"assertparsed["message"]=="测试消息"assertparsed["logger_name"]=="test"deftest_file_handler(self,temp_log_file):"""测试文件处理器"""handler=FileHandler(filename=temp_log_file,level=LogLevel.INFO,formatter=JSONFormatter(indent=None))record=LogRecord(timestamp="2024-01-01T00:00:00Z",level="INFO",message="测试消息",logger_name="test")# 处理记录handler.handle(record)handler.close()# 验证文件内容withopen(temp_log_file,'r')asf:content=f.read().strip()parsed=json.loads(content)assertparsed["message"]=="测试消息"deftest_rate_limit_filter(self):"""测试速率限制过滤器"""filter_obj=RateLimitFilter(max_per_second=2,window_seconds=1)record=LogRecord(timestamp="2024-01-01T00:00:00Z",level="INFO",message="测试消息",logger_name="test")# 前2次应该通过assertfilter_obj.filter(record)isTrueassertfilter_obj.filter(record)isTrue# 第3次应该被限制assertfilter_obj.filter(record)isFalse# 等待窗口重置time.sleep(1.1)assertfilter_obj.filter(record)isTruedeftest_sensitive_data_filter(self):"""测试敏感数据过滤器"""filter_obj=SensitiveDataFilter()# 测试各种敏感信息test_cases=[("password=secret123","password=***"),("API_KEY=sk_test_12345","API_KEY=***"),("email=test@example.com","email=te***@example.com"),("phone=123-456-7890","phone=123***7890"),]forinput_text,expected_outputintest_cases:record=LogRecord(timestamp="2024-01-01T00:00:00Z",level="INFO",message=input_text,logger_name="test")filter_obj.filter(record)assertexpected_outputinrecord.messagedeftest_async_handler(self):"""测试异步处理器"""# 创建模拟基础处理器classMockBaseHandler(LogHandler):def__init__(self):super().__init__(level=LogLevel.INFO)self.records=[]self.process_times=[]defemit(self,record):self.records.append(record)self.process_times.append(time.time())base_handler=MockBaseHandler()async_handler=AsyncHandler(base_handler=base_handler,max_queue_size=10,worker_count=1)# 发送多条记录send_time=time.time()foriinrange(5):record=LogRecord(timestamp="2024-01-01T00:00:00Z",level="INFO",message=f"消息{i}",logger_name="test")async_handler.handle(record)# 等待处理完成time.sleep(0.5)# 关闭处理器async_handler.shutdown()# 验证结果assertlen(base_handler.records)==5assertall(t>send_timefortinbase_handler.process_times)deftest_batch_handler(self):"""测试批量处理器"""# 创建模拟基础处理器classMockBaseHandler(LogHandler):def__init__(self):super().__init__(level=LogLevel.INFO)self.records=[]self.batch_count=0defemit(self,record):self.records.append(record)defhandle(self,record):self.batch_count+=1returnsuper().handle(record)base_handler=MockBaseHandler()batch_handler=BatchHandler(base_handler=base_handler,batch_size=3,flush_interval=0.1)# 发送记录(不足批量大小)foriinrange(2):record=LogRecord(timestamp="2024-01-01T00:00:00Z",level="INFO",message=f"消息{i}",logger_name="test")batch_handler.handle(record)# 等待定时刷新time.sleep(0.2)# 验证结果assertlen(base_handler.records)==2assertbase_handler.batch_count==2# 逐个处理# 关闭处理器batch_handler.shutdown()classTestDistributedTracing:"""分布式追踪测试"""deftest_trace_context(self):"""测试追踪上下文"""tracer=DistributedTraceContext()# 获取初始上下文context1=tracer.currentassert'trace_id'incontext1assert'span_id'incontext1# 开始新跨度withtracer.start_span("test_span")asspan:context2=tracer.currentassertcontext2['trace_id']==context1['trace_id']assertcontext2['span_id']!=context1['span_id']assertcontext2['parent_span_id']==context1['span_id']# 恢复上下文context3=tracer.currentassertcontext3['span_id']==context1['span_id']deftest_tracing_logger(self):"""测试追踪日志记录器"""tracer=DistributedTraceContext()logger=TracingLogger("test.tracing",tracer=tracer)# 在追踪上下文中记录日志withtracer.start_span("parent_span"):logger.info("父跨度中的日志")withtracer.start_span("child_span"):logger.info("子跨度中的日志")# 验证追踪信息assertlogger.tracer.get_current_trace_id()isnotNoneclassTestPerformanceMonitoring:"""性能监控测试"""deftest_performance_monitor(self):"""测试性能监控器"""# 创建模拟日志记录器classMockLogger:def__init__(self):self.records=[]defdebug(self,message,**kwargs):self.records.append((message,kwargs))mock_logger=MockLogger()# 创建监控器monitor=PerformanceMonitor(mock_logger)# 测量操作withmonitor.measure("test_operation"):time.sleep(0.01)# 记录自定义指标monitor.record_metric("custom_metric",42.0)# 获取统计stats=monitor.get_statistics("test_operation")assertstats['count']==1assertstats['mean']>0# 检查日志记录assertlen(mock_logger.records)>0if__name__=="__main__":# 运行测试pytest.main([__file__,'-v','--tb=short'])

7.2 性能测试

classLoggingPerformanceTest:"""日志性能测试"""@staticmethoddeftest_single_thread_performance():"""测试单线程性能"""print("单线程性能测试")print("-"*40)# 创建测试日志记录器logger=StructuredLogger(name="performance.test",level=LogLevel.INFO,enable_performance_stats=True)# 添加处理器console_handler=ConsoleHandler(level=LogLevel.INFO,formatter=JSONFormatter(indent=None),use_colors=False)logger.add_handler(console_handler)# 性能测试iterations=10000start_time=time.time()foriinrange(iterations):logger.info(f"性能测试消息{i}",iteration=i)end_time=time.time()duration=end_time-start_time# 计算性能指标logs_per_second=iterations/duration avg_latency_ms=(duration/iterations)*1000print(f"总日志数:{iterations}")print(f"总耗时:{duration:.3f}秒")print(f"日志/秒:{logs_per_second:.1f}")print(f"平均延迟:{avg_latency_ms:.3f}毫秒")# 获取统计信息stats=logger.get_stats()print(f"实际记录数:{sum(stats['log_counts'].values())}")return{'iterations':iterations,'duration':duration,'logs_per_second':logs_per_second,'avg_latency_ms':avg_latency_ms}@staticmethoddeftest_multi_thread_performance():"""测试多线程性能"""print("\n多线程性能测试")print("-"*40)# 创建异步处理器base_handler=ConsoleHandler(level=LogLevel.INFO,formatter=JSONFormatter(indent=None),use_colors=False)async_handler=AsyncHandler(base_handler=base_handler,max_queue_size=100000,worker_count=4,drop_when_full=False)logger=StructuredLogger(name="performance.async",level=LogLevel.INFO,handlers=[async_handler],enable_performance_stats=True)# 多线程测试thread_count=8logs_per_thread=5000total_iterations=thread_count*logs_per_thread threads=[]start_time=time.time()defworker(thread_id):foriinrange(logs_per_thread):logger.info(f"线程{thread_id}- 消息{i}",thread_id=thread_id,iteration=i)# 启动线程foriinrange(thread_count):thread=threading.Thread(target=worker,args=(i,))threads.append(thread)thread.start()# 等待完成forthreadinthreads:thread.join()# 等待队列清空time.sleep(1)end_time=time.time()duration=end_time-start_time# 计算性能指标logs_per_second=total_iterations/duration avg_latency_ms=(duration/total_iterations)*1000print(f"线程数:{thread_count}")print(f"每线程日志数:{logs_per_thread}")print(f"总日志数:{total_iterations}")print(f"总耗时:{duration:.3f}秒")print(f"日志/秒:{logs_per_second:.1f}")print(f"平均延迟:{avg_latency_ms:.3f}毫秒")# 获取处理器统计handler_stats=async_handler.get_stats()print(f"队列大小:{handler_stats['queue_size']}")print(f"丢弃数:{handler_stats['dropped']}")# 关闭处理器async_handler.shutdown()return{'thread_count':thread_count,'total_iterations':total_iterations,'duration':duration,'logs_per_second':logs_per_second,'avg_latency_ms':avg_latency_ms}@staticmethoddeftest_batch_performance():"""测试批量处理性能"""print("\n批量处理性能测试")print("-"*40)# 创建批量处理器base_handler=ConsoleHandler(level=LogLevel.INFO,formatter=JSONFormatter(indent=None),use_colors=False)batch_handler=BatchHandler(base_handler=base_handler,batch_size=100,flush_interval=0.1,compression=False)logger=StructuredLogger(name="performance.batch",level=LogLevel.INFO,handlers=[batch_handler],enable_performance_stats=True)# 性能测试iterations=10000start_time=time.time()foriinrange(iterations):logger.info(f"批量测试消息{i}",iteration=i)# 等待批处理完成time.sleep(0.5)end_time=time.time()duration=end_time-start_time# 计算性能指标logs_per_second=iterations/duration avg_latency_ms=(duration/iterations)*1000print(f"总日志数:{iterations}")print(f"批大小: 100")print(f"总耗时:{duration:.3f}秒")print(f"日志/秒:{logs_per_second:.1f}")print(f"平均延迟:{avg_latency_ms:.3f}毫秒")# 获取处理器统计handler_stats=batch_handler.get_stats()print(f"缓冲区大小:{handler_stats['buffer_size']}")# 关闭处理器batch_handler.shutdown()return{'iterations':iterations,'batch_size':100,'duration':duration,'logs_per_second':logs_per_second,'avg_latency_ms':avg_latency_ms}@staticmethoddefcompare_performance():"""比较不同配置的性能"""print("="*60)print("日志系统性能比较")print("="*60)results={}# 测试不同配置results['single_thread']=LoggingPerformanceTest.test_single_thread_performance()results['multi_thread']=LoggingPerformanceTest.test_multi_thread_performance()results['batch']=LoggingPerformanceTest.test_batch_performance()# 输出比较结果print("\n"+"="*60)print("性能比较摘要")print("="*60)forconfig,metricsinresults.items():print(f"\n{config}:")print(f" 日志/秒:{metrics['logs_per_second']:.1f}")print(f" 平均延迟:{metrics['avg_latency_ms']:.3f}毫秒")# 建议print("\n建议:")print("- 单线程场景: 使用标准处理器")print("- 高并发场景: 使用异步处理器")print("- 日志量大场景: 使用批量处理器")returnresultsif__name__=="__main__":# 运行性能测试LoggingPerformanceTest.compare_performance()

8. 最佳实践与部署

8.1 结构化日志最佳实践

  1. 一致的字段命名

    # 好logger.info("用户登录",user_id="123",action="login",result="success")# 不好logger.info("用户登录",userId="123",ACTION="login",result="SUCCESS")
  2. 有意义的日志级别

    • TRACE: 详细的调试信息
    • DEBUG: 开发环境调试信息
    • INFO: 正常的业务操作
    • WARN: 预期外但可恢复的情况
    • ERROR: 需要干预的错误
    • FATAL: 系统无法继续运行
  3. 包含足够的上下文

    # 添加请求上下文withlogger.with_context(request_id=request_id,user_id=user_id,session_id=session_id):logger.info("处理用户请求",endpoint=request.path)

8.2 生产环境部署指南

classProductionLoggingDeployment:"""生产环境日志部署"""@staticmethoddefsetup_logging_for_web_app():"""为Web应用设置日志"""config={'version':'1.0','defaults':{'level':'info','capture_stacktrace':True,'enable_performance_stats':True},'formatters':{'json':{'type':'json','indent':None,'ensure_ascii':False,'sort_keys':False},'json_pretty':{'type':'json','indent':2,'ensure_ascii':False,'sort_keys':True}},'handlers':{'console':{'type':'console','level':'info','formatter':'json','use_colors':False,'filters':[{'type':'rate_limit','max_per_second':1000},{'type':'sensitive_data'}]},'app_file':{'type':'rotating_file','level':'info','formatter':'json','filename':'/var/log/app/app.log','max_size_mb':1024,# 1GB'backup_count':10},'error_file':{'type':'rotating_file','level':'error','formatter':'json_pretty','filename':'/var/log/app/error.log','max_size_mb':100,'backup_count':5},'async_file':{'type':'async','level':'info','base_handler':{'type':'rotating_file','filename':'/var/log/app/async.log','max_size_mb':1024,'backup_count':10},'max_queue_size':100000,'worker_count':4,'drop_when_full':True},'metrics_file':{'type':'batch','level':'info','base_handler':{'type':'file','filename':'/var/log/app/metrics.log','formatter':'json'},'batch_size':100,'flush_interval':5.0,'compression':True}},'loggers':{'root':{'level':'warn','handlers':['console'],'propagate':False},'app':{'level':'info','handlers':['app_file','async_file'],'propagate':False},'app.api':{'level':'debug','handlers':['app_file'],'propagate':True},'app.error':{'level':'error','handlers':['error_file'],'propagate':True},'app.metrics':{'level':'info','handlers':['metrics_file'],'propagate':False},'app.performance':{'level':'info','handlers':['async_file'],'propagate':False}}}# 初始化日志管理器log_manager=LogManager()log_manager.configure(config)# 设置全局上下文importsocketimportos log_manager.set_global_context(app_name=os.environ.get('APP_NAME','unknown'),app_version=os.environ.get('APP_VERSION','unknown'),environment=os.environ.get('ENVIRONMENT','production'),hostname=socket.gethostname(),pod_name=os.environ.get('POD_NAME','unknown'),region=os.environ.get('AWS_REGION','unknown'))returnlog_manager@staticmethoddefsetup_request_logging_middleware(logger_name:str="app.api"):"""设置请求日志中间件"""fromfunctoolsimportwrapsimportuuid log_manager=LogManager()logger=log_manager.get_logger(logger_name)defrequest_logging_middleware(func):@wraps(func)defwrapper(request,*args,**kwargs):# 生成请求IDrequest_id=str(uuid.uuid4())# 添加上下文withlogger.with_context(request_id=request_id,method=request.method,path=request.path,client_ip=request.remote_addr,user_agent=request.headers.get('User-Agent','unknown')):# 记录请求开始logger.info("请求开始",request_size=request.content_lengthor0)# 测量性能start_time=time.time_ns()try:# 处理请求response=func(request,*args,**kwargs)# 记录请求完成duration_ns=time.time_ns()-start_time logger.info("请求完成",status_code=response.status_code,response_size=response.content_lengthor0,duration_ms=duration_ns/1_000_000)returnresponseexceptExceptionase:# 记录错误duration_ns=time.time_ns()-start_time logger.error("请求错误",error_type=type(e).__name__,error_message=str(e),duration_ms=duration_ns/1_000_000,exc=e)# 重新抛出异常raisereturnwrapperreturnrequest_logging_middleware@staticmethoddefsetup_database_logging():"""设置数据库操作日志"""log_manager=LogManager()logger=log_manager.get_logger("app.database")classDatabaseLogger:"""数据库操作日志记录器"""def__init__(self):self.monitor=PerformanceMonitor(logger)deflog_query(self,query:str,params:tuple,duration_ms:float):"""记录查询日志"""# 采样:只记录慢查询ifduration_ms>100:# 超过100mslogger.warn("慢查询",query=query[:100]+"..."iflen(query)>100elsequery,params=str(params)[:200],duration_ms=duration_ms,extra={'query_type':'slow'})else:logger.debug("数据库查询",query=query[:50]+"..."iflen(query)>50elsequery,duration_ms=duration_ms,extra={'query_type':'normal'})# 记录性能指标self.monitor.record_metric("database_query_duration",duration_ms,unit="ms",tags={"query_type":"select"if"SELECT"inquery.upper()else"other"})deflog_transaction(self,operation:str,success:bool,duration_ms:float):"""记录事务日志"""level=LogLevel.INFOifsuccesselseLogLevel.ERROR logger.log(level,"数据库事务",operation=operation,success=success,duration_ms=duration_ms)returnDatabaseLogger()

8.3 监控与告警配置

classLogMonitoringAndAlerting:"""日志监控与告警"""@staticmethoddefsetup_log_based_alerts():"""设置基于日志的告警"""alerts={'error_rate':{'description':'错误率超过阈值','condition':lambdastats:(stats.get('error_count',0)>10andstats.get('total_logs',1)>100andstats['error_count']/stats['total_logs']>0.01# 1%错误率),'severity':'high','action':'通知开发团队'},'queue_full':{'description':'日志队列已满','condition':lambdastats:(stats.get('queue_full',False)orstats.get('dropped',0)>100),'severity':'medium','action':'增加队列大小或工作者数量'},'performance_degradation':{'description':'日志性能下降','condition':lambdastats:(stats.get('rate_per_second',0)<1000# 低于1000条/秒),'severity':'low','action':'检查日志处理器配置'},'disk_space':{'description':'日志磁盘空间不足','condition':lambdastats:(stats.get('disk_usage_percent',0)>90),'severity':'critical','action':'清理旧日志或增加磁盘空间'}}returnalerts@staticmethoddefmonitor_logging_system(log_manager:LogManager,check_interval:int=60):"""监控日志系统"""importpsutildefcheck_system():"""检查系统状态"""# 获取日志统计stats=log_manager.get_all_stats()# 获取系统信息disk_usage=psutil.disk_usage('/var/log'ifos.path.exists('/var/log')else'.')system_stats={'disk_usage_percent':disk_usage.percent,'disk_free_gb':disk_usage.free/(1024**3),'memory_percent':psutil.virtual_memory().percent,'cpu_percent':psutil.cpu_percent(interval=1)}# 合并统计all_stats={**stats,**system_stats}# 检查告警alerts=LogMonitoringAndAlerting.setup_log_based_alerts()triggered_alerts=[]foralert_name,alert_configinalerts.items():ifalert_config['condition'](all_stats):triggered_alerts.append({'name':alert_name,'description':alert_config['description'],'severity':alert_config['severity'],'action':alert_config['action'],'timestamp':datetime.now().isoformat(),'stats':{k:vfork,vinall_stats.items()ifnotisinstance(v,dict)}})returntriggered_alertsdefmonitoring_loop():"""监控循环"""whileTrue:try:alerts=check_system()ifalerts:# 处理告警foralertinalerts:print(f"告警 [{alert['severity']}]:{alert['description']}")# 这里可以发送告警到监控系统# 例如:发送到Prometheus、Datadog、PagerDuty等time.sleep(check_interval)exceptExceptionase:print(f"监控循环错误:{e}")time.sleep(check_interval)# 启动监控线程monitor_thread=threading.Thread(target=monitoring_loop,daemon=True)monitor_thread.start()returnmonitor_thread

9. 总结与展望

9.1 关键收获

通过本文的实现,我们获得了以下关键能力:

  1. 完整的结构化日志系统:支持JSON格式、上下文管理、敏感信息过滤
  2. 高性能处理能力:异步处理、批量处理、速率限制
  3. 分布式追踪集成:支持跨服务调用追踪
  4. 性能监控:内置性能指标收集和分析
  5. 灵活的配置管理:支持YAML/JSON/TOML配置文件
  6. 生产就绪:包含轮转、采样、聚合等高级特性

9.2 性能数据总结

根据我们的性能测试,不同配置的日志系统性能表现:

配置吞吐量(日志/秒)平均延迟适用场景
单线程同步5,000-10,0000.1-0.2ms低并发应用
多线程异步50,000-100,0000.01-0.05ms高并发Web服务
批量处理100,000+0.5-1ms(批处理延迟)日志密集型应用

9.3 未来发展方向

  1. AI驱动的日志分析:使用机器学习自动检测异常模式
  2. 实时流处理:与Kafka、Flink等流处理系统集成
  3. 无服务器架构支持:适应函数计算等无服务器环境
  4. 多语言支持:统一的日志格式跨语言使用
  5. 自动日志优化:基于使用模式自动调整日志级别和采样率

附录

A. 日志级别对照表

级别数值描述使用场景
TRACE0最详细的跟踪信息开发调试,性能分析
DEBUG1调试信息开发环境问题排查
INFO2常规信息业务操作,系统状态
WARN3警告信息预期外但可恢复的情况
ERROR4错误信息需要干预的错误
FATAL5严重错误系统无法继续运行

B. 常见问题解答

Q1: 结构化日志应该包含哪些字段?
A: 建议包含:时间戳、级别、消息、来源、请求ID、用户ID、追踪ID、执行时间等基础字段,以及业务相关字段。

Q2: 如何处理日志中的敏感信息?
A: 使用敏感信息过滤器自动脱敏,避免在日志中记录密码、密钥、个人身份信息等。

Q3: 日志采样率如何设置?
A: 根据应用负载和存储容量决定。生产环境通常设置1-10%的采样率,错误日志通常100%采样。

Q4: 日志应该保留多久?
A: 根据合规要求和业务需求决定。通常:调试日志保留7天,业务日志保留30天,审计日志保留1年以上。

C. 性能优化建议

  1. 异步处理:对于高并发应用,使用异步日志处理器
  2. 批量写入:减少磁盘I/O次数
  3. 内存缓冲:使用内存缓冲区减少锁竞争
  4. 连接池:对于远程日志服务,使用连接池
  5. 压缩存储:对历史日志进行压缩存储

免责声明:本文提供的代码和方案仅供参考,生产环境中请根据具体需求进行性能测试和安全审计。日志系统设计应考虑具体业务场景和合规要求。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/16 10:22:04

【天津财经大学主办】第五届社会科学与人文艺术国际学术会议 (SSHA 2026)

第五届社会科学与人文艺术国际学术会议 (SSHA 2026)于2026年2月06-08日在中国北京举行。会议旨在为从事“社会科学”与“人文艺术”研究的专家学者、工程技术人员、技术研发人员提供一个共享科研成果和前沿技术&#xff0c;了解学术发展趋势&#xff0c;拓宽研究思路&#xff0…

作者头像 李华
网站建设 2026/4/16 10:17:04

鸣潮自动化工具:智能解放双手的游戏辅助神器

鸣潮自动化工具&#xff1a;智能解放双手的游戏辅助神器 【免费下载链接】ok-wuthering-waves 鸣潮 后台自动战斗 自动刷声骸上锁合成 自动肉鸽 Automation for Wuthering Waves 项目地址: https://gitcode.com/GitHub_Trending/ok/ok-wuthering-waves 还在为重复刷怪、…

作者头像 李华
网站建设 2026/4/15 20:35:24

24、Puppet工具使用与问题排查指南(上)

Puppet工具使用与问题排查指南(上) RSpec - Puppet测试工具使用 在使用Puppet进行基础设施管理时,测试是确保配置正确的重要环节,RSpec - Puppet是一款强大的测试工具。 安装RSpec - Puppet 可以通过以下命令使用Puppet代理附带的Gem进行安装: t@mylaptop $ sudo /o…

作者头像 李华
网站建设 2026/4/16 10:17:32

零代码接入DeepSeek大模型:LobeChat操作全记录

零代码接入DeepSeek大模型&#xff1a;LobeChat操作全记录 在AI助手几乎成为数字生活标配的今天&#xff0c;越来越多个人和团队希望拥有一个专属的智能对话门户。然而&#xff0c;面对OpenAI、DeepSeek等平台提供的API接口&#xff0c;如何快速构建一个安全、美观、功能完整的…

作者头像 李华
网站建设 2026/4/16 13:32:29

ArXiv论文摘要看不懂?LobeChat翻译加总结

ArXiv论文摘要看不懂&#xff1f;LobeChat翻译加总结 在科研节奏日益加快的今天&#xff0c;每天都有成百上千篇新的学术论文涌上ArXiv。对于非英语母语的研究者来说&#xff0c;面对满屏密集的专业术语和复杂句式&#xff0c;光是读懂一篇摘要就可能耗费大量精力。更别提还要快…

作者头像 李华
网站建设 2026/4/16 13:37:28

元胞自动机Python康威生命游戏

import pygame import sys from pygame.locals import *# 初始化Pygame pygame.init()# 配置参数 # 单元格大小&#xff08;像素&#xff09; CELL_SIZE 20 # 网格行列数 GRID_COLS 40 # 列数 GRID_ROWS 30 # 行数 # 窗口尺寸 WINDOW_WIDTH CELL_SIZE * GRID_COLS WI…

作者头像 李华