Files
security_check/baseline_checker.py
Fanjun Kong a93846f56f rename checks to rules
Signed-off-by: Fanjun Kong <kongfanjun@iscas.ac.cn>
2025-12-30 15:31:20 +08:00

519 lines
22 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
import yaml
import subprocess
import sys
import os
import logging
import argparse
import json
from datetime import datetime
from pathlib import Path
class BaselineChecker:
def __init__(self, config_path, security_level="high", fix_mode=False, output_dir=None):
self.config_path = Path(config_path)
self.security_level = security_level
self.fix_mode = fix_mode
# 创建log目录结构
base_dir = Path(output_dir) if output_dir else Path.cwd()
self.log_dir = base_dir / "log"
self.log_dir.mkdir(parents=True, exist_ok=True)
# 创建本次运行的子目录(使用时间戳)
self.timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
self.run_dir = self.log_dir / self.timestamp
self.run_dir.mkdir(parents=True, exist_ok=True)
# 设置输出文件路径(都在本次运行的子目录中)
self.output_file = self.run_dir / f"baseline_check_{self.timestamp}.log"
self.html_report = self.run_dir / f"baseline_check_{self.timestamp}.html"
self.json_report = self.run_dir / f"baseline_check_{self.timestamp}.json"
self.results = []
# 检查root权限
self.is_root = (os.geteuid() == 0)
self.setup_logging()
def setup_logging(self):
"""配置日志系统"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(self.output_file),
logging.StreamHandler(sys.stdout)
]
)
self.logger = logging.getLogger("BaselineChecker")
def load_configs(self):
"""加载YAML配置文件或配置目录"""
all_checks = []
try:
# 如果是目录加载目录下所有YAML文件
if self.config_path.is_dir():
yaml_files = sorted(self.config_path.glob("*.yaml")) + sorted(self.config_path.glob("*.yml"))
if not yaml_files:
self.logger.error(f"配置目录 '{self.config_path}' 中未找到YAML文件")
sys.exit(1)
self.logger.info(f"从配置目录加载 {len(yaml_files)} 个配置文件")
for yaml_file in yaml_files:
self.logger.debug(f"加载配置文件: {yaml_file}")
with open(yaml_file, 'r', encoding='utf-8') as f:
config = yaml.safe_load(f)
checks = config.get('checks', [])
all_checks.extend(checks)
self.logger.debug(f"{yaml_file.name} 加载了 {len(checks)} 个检查项")
# 如果是文件,直接加载
elif self.config_path.is_file():
with open(self.config_path, 'r', encoding='utf-8') as f:
config = yaml.safe_load(f)
all_checks = config.get('checks', [])
else:
self.logger.error(f"配置路径 '{self.config_path}' 不存在")
sys.exit(1)
# 验证检查项
validated_checks = []
for check in all_checks:
errors = self._validate_check_item(check)
if errors:
self.logger.warning(f"检查项 {check.get('id', 'unknown')} 验证失败: {', '.join(errors)}")
else:
validated_checks.append(check)
self.logger.info(f"共加载 {len(validated_checks)} 个有效检查项")
return validated_checks
except Exception as e:
self.logger.error(f"加载配置文件失败: {e}")
sys.exit(1)
def _validate_check_item(self, check_item):
"""验证检查项配置"""
errors = []
# 必需字段
if 'id' not in check_item:
errors.append("缺少必需字段: id")
if 'check_script' not in check_item:
errors.append("缺少必需字段: check_script")
# 标签验证
if 'tags' not in check_item:
errors.append("缺少必需字段: tags")
else:
tags = check_item['tags']
if not isinstance(tags, list):
errors.append("tags 必须是列表")
elif len(tags) == 0:
errors.append("tags 不能为空")
else:
# 验证是否包含安全级别标签
valid_levels = ['low', 'medium', 'high']
has_level_tag = any(tag in valid_levels for tag in tags)
if not has_level_tag:
errors.append(f"tags 必须包含至少一个安全级别标签: {valid_levels}")
return errors
def execute_script(self, script_content, timeout=30):
"""执行shell脚本并返回结果"""
try:
# 创建临时脚本文件
script_file = f"/tmp/baseline_check_{os.getpid()}.sh"
with open(script_file, 'w') as f:
f.write("#!/bin/bash\nset -e\n")
f.write(script_content)
os.chmod(script_file, 0o755)
# 执行脚本
result = subprocess.run(
[script_file],
capture_output=True,
text=True,
timeout=timeout
)
# 删除临时脚本
os.remove(script_file)
return {
'returncode': result.returncode,
'stdout': result.stdout.strip(),
'stderr': result.stderr.strip()
}
except subprocess.TimeoutExpired:
return {
'returncode': -1,
'stdout': '',
'stderr': f'执行超时 ({timeout}秒)'
}
except Exception as e:
return {
'returncode': -2,
'stdout': '',
'stderr': f'执行错误: {str(e)}'
}
def _should_execute_check(self, check_item):
"""判断是否应该执行检查项基于tags"""
tags = check_item.get('tags', [])
if not tags:
self.logger.warning(f"检查项 {check_item.get('id', 'unknown')} 没有tags将被跳过")
return False
# 检查当前安全级别是否在tags中
return self.security_level in tags
def execute_check(self, check_item):
"""执行单个检查项"""
check_id = check_item.get('id', 'unknown')
title = check_item.get('title', '未命名检查')
description = check_item.get('description', '')
script = check_item.get('check_script', '')
tags = check_item.get('tags', [])
# 检查是否应该执行基于tags
if not self._should_execute_check(check_item):
self.logger.info(f"跳过检查 [{check_id}] - 标签不匹配 (标签: {tags}, 当前级别: {self.security_level})")
return {
'id': check_id,
'title': title,
'category': check_item.get('category', '未分类'),
'status': 'SKIPPED',
'message': f'安全级别不匹配 (标签: {tags}, 当前: {self.security_level})',
'applied_fix': False
}
self.logger.info(f"执行检查: {title}")
self.logger.debug(f"检查描述: {description}")
self.logger.debug(f"检查标签: {tags}")
# 执行检查脚本
result = self.execute_script(script)
if result['returncode'] == 0:
status = "PASS"
message = result['stdout'] or "检查通过"
else:
status = "FAIL"
message = result['stdout'] or result['stderr'] or "检查失败"
# 如果在修复模式下且检查失败,尝试执行修复脚本
if self.fix_mode and status == "FAIL":
fix_script = check_item.get('fix_script')
if fix_script:
self.logger.warning(f"检查失败,尝试自动修复: {title}")
fix_result = self.execute_script(fix_script, timeout=60)
if fix_result['returncode'] == 0:
# 修复后重新检查
self.logger.info(f"修复成功,重新检查: {title}")
recheck_result = self.execute_script(script)
if recheck_result['returncode'] == 0:
status = "PASS"
message = f"[已自动修复] {recheck_result['stdout'] or '修复后检查通过'}"
else:
status = "FAIL"
message = f"[修复后仍失败] {recheck_result['stdout'] or recheck_result['stderr'] or '修复后检查仍失败'}"
return {
'id': check_id,
'title': title,
'category': check_item.get('category', '未分类'),
'status': status,
'message': message,
'applied_fix': True
}
else:
self.logger.error(f"自动修复失败: {fix_result['stderr'] or fix_result['stdout']}")
return {
'id': check_id,
'title': title,
'category': check_item.get('category', '未分类'),
'status': status,
'message': message,
'applied_fix': False
}
def generate_html_report(self):
"""生成HTML格式的报告"""
# 统计结果
total = len(self.results)
passed = len([r for r in self.results if r['status'] == 'PASS'])
failed = len([r for r in self.results if r['status'] == 'FAIL'])
errors = len([r for r in self.results if r['status'] == 'ERROR'])
skipped = len([r for r in self.results if r['status'] == 'SKIPPED'])
fixed = len([r for r in self.results if r.get('applied_fix') and r['status'] == 'PASS'])
# 计算失败百分比(避免除零错误)
failed_percent = (failed/total*100) if total > 0 else 0.0
# 按类别分组
categories = {}
for result in self.results:
category = result['category']
if category not in categories:
categories[category] = []
categories[category].append(result)
# 生成HTML
html_content = f"""
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>系统安全基线检查报告</title>
<style>
body {{ font-family: Arial, sans-serif; margin: 20px; }}
.header {{ text-align: center; margin-bottom: 30px; }}
.summary {{ display: flex; justify-content: space-around; margin: 20px 0; }}
.summary-box {{ text-align: center; padding: 15px; border-radius: 5px; width: 150px; }}
.pass {{ background-color: #d4edda; border: 1px solid #c3e6cb; color: #155724; }}
.fail {{ background-color: #f8d7da; border: 1px solid #f5c6cb; color: #721c24; }}
.skip {{ background-color: #fff3cd; border: 1px solid #ffeeba; color: #856404; }}
.fixed {{ background-color: #d1ecf1; border: 1px solid #bee5eb; color: #0c5460; }}
.category {{ margin-bottom: 30px; }}
.category-title {{ background-color: #f1f1f1; padding: 10px; border-radius: 5px; margin-bottom: 10px; }}
.check-item {{ margin-left: 20px; margin-bottom: 10px; }}
.status-pass {{ color: green; font-weight: bold; }}
.status-fail {{ color: red; font-weight: bold; }}
.status-skip {{ color: orange; font-weight: bold; }}
.fixed-tag {{ background-color: #17a2b8; color: white; padding: 2px 5px; border-radius: 3px; font-size: 0.8em; }}
.message {{ margin-top: 5px; margin-left: 20px; font-family: monospace; white-space: pre-wrap; }}
.footer {{ margin-top: 30px; text-align: center; color: #666; }}
</style>
</head>
<body>
<div class="header">
<h1>系统安全基线检查报告</h1>
<p>生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}</p>
<p>安全等级: <strong>{self.security_level}</strong> | 修复模式: <strong>{"启用" if self.fix_mode else "禁用"}</strong></p>
<p>检查主机: <strong>{os.uname().nodename}</strong> | 系统版本: <strong>{os.uname().sysname} {os.uname().release}</strong></p>
</div>
<div class="summary">
<div class="summary-box pass">
<h3>通过</h3>
<p style="font-size: 24px; margin: 5px 0;">{passed}</p>
<p>总数: {total}</p>
</div>
<div class="summary-box fail">
<h3>失败</h3>
<p style="font-size: 24px; margin: 5px 0;">{failed}</p>
<p>{failed_percent:.1f}%</p>
</div>
<div class="summary-box skip">
<h3>跳过</h3>
<p style="font-size: 24px; margin: 5px 0;">{skipped}</p>
</div>
<div class="summary-box fixed">
<h3>已修复</h3>
<p style="font-size: 24px; margin: 5px 0;">{fixed}</p>
</div>
</div>
"""
# 按类别生成检查结果
for category, items in categories.items():
html_content += f"""
<div class="category">
<div class="category-title">
<h2>{category}</h2>
</div>
"""
for item in items:
status_class = {
'PASS': 'status-pass',
'FAIL': 'status-fail',
'SKIPPED': 'status-skip'
}.get(item['status'], '')
status_text = {
'PASS': '✓ 通过',
'FAIL': '✗ 失败',
'SKIPPED': '» 跳过'
}.get(item['status'], item['status'])
fixed_tag = '<span class="fixed-tag">已修复</span>' if item.get('applied_fix') else ''
message_formatted = item['message'].replace('\n', '<br>')
html_content += f"""
<div class="check-item">
<h4>{item['title']} {fixed_tag}</h4>
<p><strong>状态:</strong> <span class="{status_class}">{status_text}</span></p>
<div class="message">{message_formatted}</div>
</div>
"""
html_content += "</div>"
html_content += f"""
<div class="footer">
<p>报告生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} | OpenRuyi安全基线检查工具 v1.0</p>
</div>
</body>
</html>
"""
# 保存HTML报告
with open(self.html_report, 'w', encoding='utf-8') as f:
f.write(html_content)
return str(self.html_report)
def generate_json_report(self):
"""生成JSON格式的报告便于自动化处理"""
report = {
'metadata': {
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'hostname': os.uname().nodename,
'os': f"{os.uname().sysname} {os.uname().release}",
'security_level': self.security_level,
'fix_mode': self.fix_mode
},
'summary': {
'total': len(self.results),
'passed': len([r for r in self.results if r['status'] == 'PASS']),
'failed': len([r for r in self.results if r['status'] == 'FAIL']),
'skipped': len([r for r in self.results if r['status'] == 'SKIPPED']),
'fixed': len([r for r in self.results if r.get('applied_fix') and r['status'] == 'PASS'])
},
'results': self.results
}
with open(self.json_report, 'w', encoding='utf-8') as f:
json.dump(report, f, indent=2, ensure_ascii=False)
return str(self.json_report)
def run(self):
"""运行所有检查"""
if not self.is_root:
self.logger.warning("=" * 60)
self.logger.warning("警告: 当前不是root用户部分检查可能无法执行或结果不准确")
self.logger.warning("建议使用sudo运行此程序以获得完整检查结果")
self.logger.warning("=" * 60)
# 加载所有检查项
checks = self.load_configs()
# 根据tags筛选检查项
filtered_checks = [check for check in checks if self._should_execute_check(check)]
total_checks = len(filtered_checks)
skipped_count = len(checks) - total_checks
self.logger.info(f"开始执行安全基线检查,共加载{len(checks)}项,筛选后{total_checks}项,跳过{skipped_count}")
self.logger.info(f"安全等级: {self.security_level} | 修复模式: {'启用' if self.fix_mode else '禁用'}")
# 执行筛选后的检查
for i, check_item in enumerate(filtered_checks, 1):
self.logger.info(f"[{i}/{total_checks}] {check_item.get('title', '未命名检查')}")
result = self.execute_check(check_item)
self.results.append(result)
# 生成报告
html_report = self.generate_html_report()
json_report = self.generate_json_report()
self.logger.info("\n" + "=" * 80)
self.logger.info("检查完成,报告已生成:")
self.logger.info(f"- 报告目录: {self.run_dir}")
self.logger.info(f"- 详细日志: {self.output_file}")
self.logger.info(f"- HTML报告: {html_report}")
self.logger.info(f"- JSON报告: {json_report}")
self.logger.info("=" * 80 + "\n")
# 打印总结
passed = len([r for r in self.results if r['status'] == 'PASS'])
failed = len([r for r in self.results if r['status'] == 'FAIL'])
skipped = len([r for r in self.results if r['status'] == 'SKIPPED'])
self.logger.info(f"检查总结: 总计{total_checks}项,通过{passed}项,失败{failed}项,跳过{skipped}")
# 返回退出码
return 0 if failed == 0 else 1
def main():
parser = argparse.ArgumentParser(description='OpenRuyi系统安全基线检查工具')
parser.add_argument('-c', '--config', default=None,
help='YAML规则配置文件路径或配置目录 (默认: 先查找当前目录rules再查找/etc/baseline_checker/rules)')
parser.add_argument('-l', '--level', choices=['low', 'medium', 'high'], default='high',
help='安全等级 (默认: high)')
parser.add_argument('-f', '--fix', action='store_true',
help='启用自动修复模式 (需要root权限)')
parser.add_argument('-o', '--output', default='.',
help='输出目录 (默认: 当前目录)')
parser.add_argument('-v', '--verbose', action='store_true',
help='详细输出')
args = parser.parse_args()
# 如果没有指定config参数自动查找默认路径
if args.config is None:
# 先查找当前目录下的rules目录
current_rules = Path('rules')
if current_rules.exists() and current_rules.is_dir():
args.config = str(current_rules)
else:
# 如果不存在,查找/etc/baseline_checker/rules
etc_rules = Path('/etc/baseline_checker/rules')
if etc_rules.exists() and etc_rules.is_dir():
args.config = str(etc_rules)
else:
# 如果都不存在,尝试/etc/baseline_checker兼容旧版本
etc_config = Path('/etc/baseline_checker')
if etc_config.exists():
args.config = str(etc_config)
else:
print("错误: 未找到配置文件目录")
print(" 请确保以下路径之一存在:")
print(f" - {current_rules.absolute()}")
print(f" - {etc_rules}")
print(f" - {etc_config}")
print(" 或使用 -c 参数指定配置文件路径")
sys.exit(1)
# 检查配置文件或目录是否存在
config_path = Path(args.config)
if not config_path.exists():
print(f"错误: 配置文件或目录 '{args.config}' 不存在")
sys.exit(1)
# 创建输出目录
output_dir = Path(args.output)
output_dir.mkdir(parents=True, exist_ok=True)
# 设置日志级别
if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)
# 检查修复模式是否需要root权限
if args.fix and os.geteuid() != 0:
print("错误: 修复模式需要root权限请使用sudo运行")
sys.exit(1)
checker = BaselineChecker(
config_path=args.config,
security_level=args.level,
fix_mode=args.fix,
output_dir=args.output
)
exit_code = checker.run()
sys.exit(exit_code)
if __name__ == "__main__":
main()