SecGPT-14B实战教程:用Python requests封装SecGPT-14B API构建自动化巡检工具
1. 引言
在网络安全领域,自动化巡检工具已经成为企业安全防护的重要组成部分。SecGPT-14B作为一款专注于网络安全问答与分析的AI模型,能够帮助我们快速识别潜在威胁、分析安全事件并提供专业建议。本文将带你从零开始,使用Python的requests库封装SecGPT-14B API,构建一个实用的自动化安全巡检工具。
通过本教程,你将学会:
- 如何调用SecGPT-14B的API接口
- 使用Python封装API调用的最佳实践
- 构建一个能够自动分析安全日志的巡检工具
- 处理API返回结果并生成可读性强的报告
2. 环境准备与API基础
2.1 安装必要依赖
在开始之前,请确保你的Python环境已安装以下库:
pip install requests python-dotenv2.2 获取API访问信息
SecGPT-14B提供了标准的OpenAI兼容API接口,我们需要先了解基本的API调用方式:
import requests API_URL = "http://127.0.0.1:8000/v1/chat/completions" HEADERS = {"Content-Type": "application/json"} def ask_secgpt(question): data = { "model": "SecGPT-14B", "messages": [{"role": "user", "content": question}], "temperature": 0.3, "max_tokens": 256 } response = requests.post(API_URL, headers=HEADERS, json=data) return response.json()3. 封装SecGPT-14B API
3.1 基础API封装类
让我们创建一个更健壮的API封装类,包含错误处理和重试机制:
import time from typing import Optional, Dict, Any class SecGPTClient: def __init__(self, base_url: str = "http://127.0.0.1:8000"): self.base_url = base_url self.chat_url = f"{base_url}/v1/chat/completions" self.headers = {"Content-Type": "application/json"} self.timeout = 30 self.max_retries = 3 self.retry_delay = 2 def ask(self, question: str, temperature: float = 0.3, max_tokens: int = 256) -> Optional[Dict[str, Any]]: payload = { "model": "SecGPT-14B", "messages": [{"role": "user", "content": question}], "temperature": temperature, "max_tokens": max_tokens } for attempt in range(self.max_retries): try: response = requests.post( self.chat_url, headers=self.headers, json=payload, timeout=self.timeout ) response.raise_for_status() return response.json() except requests.exceptions.RequestException as e: if attempt == self.max_retries - 1: print(f"API请求失败: {str(e)}") return None time.sleep(self.retry_delay)3.2 添加实用功能方法
扩展我们的客户端类,添加一些针对安全分析的特殊方法:
class SecGPTClient: # ... 之前的代码 ... def analyze_log(self, log_content: str) -> Optional[str]: """分析安全日志""" prompt = f"请分析以下安全日志,指出其中的可疑行为:\n{log_content}" response = self.ask(prompt, max_tokens=512) return response["choices"][0]["message"]["content"] if response else None def check_vulnerability(self, vulnerability_name: str) -> Optional[str]: """查询特定漏洞信息""" prompt = f"请解释什么是{vulnerability_name}漏洞,并提供防护建议" response = self.ask(prompt) return response["choices"][0]["message"]["content"] if response else None def generate_security_report(self, findings: list) -> Optional[str]: """生成安全报告""" prompt = ( "请根据以下安全发现生成一份专业的安全报告:\n" + "\n".join(f"- {item}" for item in findings) + "\n报告应包括风险等级评估和修复建议" ) response = self.ask(prompt, max_tokens=1024) return response["choices"][0]["message"]["content"] if response else None4. 构建自动化巡检工具
4.1 设计巡检流程
现在我们将使用封装好的API客户端构建一个完整的自动化巡检工具:
import os from datetime import datetime class SecurityScanner: def __init__(self, client: SecGPTClient): self.client = client self.scan_results = [] def scan_log_file(self, file_path: str) -> bool: """扫描日志文件并分析可疑行为""" if not os.path.exists(file_path): print(f"文件不存在: {file_path}") return False try: with open(file_path, 'r') as f: log_content = f.read() analysis = self.client.analyze_log(log_content) if analysis: self.scan_results.append({ "file": file_path, "timestamp": datetime.now().isoformat(), "analysis": analysis }) return True except Exception as e: print(f"分析日志文件时出错: {str(e)}") return False def generate_final_report(self, output_file: str) -> bool: """生成最终巡检报告""" if not self.scan_results: print("没有可报告的结果") return False findings = [] for result in self.scan_results: findings.append( f"文件: {result['file']}\n" f"时间: {result['timestamp']}\n" f"分析结果:\n{result['analysis']}\n" "="*50 ) full_report = self.client.generate_security_report(findings) if not full_report: return False try: with open(output_file, 'w') as f: f.write(full_report) return True except Exception as e: print(f"写入报告文件时出错: {str(e)}") return False4.2 完整示例:使用巡检工具
下面是一个完整的示例,展示如何使用我们构建的工具:
def main(): # 初始化客户端 secgpt = SecGPTClient() # 创建扫描器实例 scanner = SecurityScanner(secgpt) # 扫描日志文件 log_files = [ "/var/log/auth.log", "/var/log/syslog", "/var/log/nginx/access.log" ] for log_file in log_files: print(f"正在扫描: {log_file}") if scanner.scan_log_file(log_file): print("扫描完成") else: print("扫描失败") # 生成报告 report_file = "security_report.txt" if scanner.generate_final_report(report_file): print(f"安全报告已生成: {report_file}") else: print("生成报告失败") if __name__ == "__main__": main()5. 进阶功能与优化
5.1 添加并发处理
为了提高巡检效率,我们可以使用多线程并发处理多个日志文件:
import concurrent.futures class SecurityScanner: # ... 之前的代码 ... def scan_log_files_concurrently(self, file_paths: list, max_workers: int = 4) -> int: """并发扫描多个日志文件""" success_count = 0 with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: future_to_file = { executor.submit(self.scan_log_file, file_path): file_path for file_path in file_paths } for future in concurrent.futures.as_completed(future_to_file): file_path = future_to_file[future] try: if future.result(): success_count += 1 except Exception as e: print(f"处理文件{file_path}时出错: {str(e)}") return success_count5.2 添加配置管理
使用配置文件管理API参数和扫描设置:
import json from typing import Optional class ConfigManager: def __init__(self, config_file: str = "config.json"): self.config_file = config_file self.config = self._load_config() def _load_config(self) -> dict: default_config = { "api": { "base_url": "http://127.0.0.1:8000", "timeout": 30, "max_retries": 3, "retry_delay": 2 }, "scanner": { "default_log_paths": [ "/var/log/auth.log", "/var/log/syslog" ], "max_workers": 4 } } try: with open(self.config_file, 'r') as f: return json.load(f) except (FileNotFoundError, json.JSONDecodeError): return default_config def save_config(self) -> bool: try: with open(self.config_file, 'w') as f: json.dump(self.config, f, indent=2) return True except Exception: return False def get_api_config(self) -> dict: return self.config.get("api", {}) def get_scanner_config(self) -> dict: return self.config.get("scanner", {})5.3 添加日志记录
为工具添加日志记录功能,便于调试和审计:
import logging from logging.handlers import RotatingFileHandler def setup_logging(log_file: str = "security_scanner.log"): logger = logging.getLogger("SecurityScanner") logger.setLevel(logging.INFO) # 文件日志处理器 file_handler = RotatingFileHandler( log_file, maxBytes=5*1024*1024, backupCount=3 ) file_handler.setFormatter(logging.Formatter( "%(asctime)s - %(levelname)s - %(message)s" )) # 控制台日志处理器 console_handler = logging.StreamHandler() console_handler.setFormatter(logging.Formatter( "%(levelname)s - %(message)s" )) logger.addHandler(file_handler) logger.addHandler(console_handler) return logger6. 总结
通过本教程,我们完成了一个基于SecGPT-14B API的自动化安全巡检工具的构建。这个工具能够:
- 封装SecGPT-14B API,提供更友好的编程接口
- 自动扫描和分析安全日志
- 生成专业的安全报告
- 支持并发处理提高效率
- 包含完善的配置管理和日志记录
你可以进一步扩展这个工具,例如:
- 添加定时任务功能,定期执行安全巡检
- 集成更多安全数据源,如网络设备日志、IDS/IPS告警等
- 开发Web界面,提供更友好的交互体验
- 添加告警功能,当发现高危威胁时自动通知安全团队
获取更多AI镜像
想探索更多AI镜像和应用场景?访问 CSDN星图镜像广场,提供丰富的预置镜像,覆盖大模型推理、图像生成、视频生成、模型微调等多个领域,支持一键部署。