519 lines
22 KiB
Python
519 lines
22 KiB
Python
#!/usr/bin/env python3
|
||
import yaml
|
||
import subprocess
|
||
import sys
|
||
import os
|
||
import logging
|
||
import argparse
|
||
import json
|
||
from datetime import datetime
|
||
from pathlib import Path
|
||
|
||
class BaselineChecker:
|
||
def __init__(self, config_path, security_level="high", fix_mode=False, output_dir=None):
|
||
self.config_path = Path(config_path)
|
||
self.security_level = security_level
|
||
self.fix_mode = fix_mode
|
||
|
||
# 创建log目录结构
|
||
base_dir = Path(output_dir) if output_dir else Path.cwd()
|
||
self.log_dir = base_dir / "log"
|
||
self.log_dir.mkdir(parents=True, exist_ok=True)
|
||
|
||
# 创建本次运行的子目录(使用时间戳)
|
||
self.timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||
self.run_dir = self.log_dir / self.timestamp
|
||
self.run_dir.mkdir(parents=True, exist_ok=True)
|
||
|
||
# 设置输出文件路径(都在本次运行的子目录中)
|
||
self.output_file = self.run_dir / f"baseline_check_{self.timestamp}.log"
|
||
self.html_report = self.run_dir / f"baseline_check_{self.timestamp}.html"
|
||
self.json_report = self.run_dir / f"baseline_check_{self.timestamp}.json"
|
||
|
||
self.results = []
|
||
|
||
# 检查root权限
|
||
self.is_root = (os.geteuid() == 0)
|
||
|
||
self.setup_logging()
|
||
|
||
def setup_logging(self):
|
||
"""配置日志系统"""
|
||
logging.basicConfig(
|
||
level=logging.INFO,
|
||
format='%(asctime)s - %(levelname)s - %(message)s',
|
||
handlers=[
|
||
logging.FileHandler(self.output_file),
|
||
logging.StreamHandler(sys.stdout)
|
||
]
|
||
)
|
||
self.logger = logging.getLogger("BaselineChecker")
|
||
|
||
def load_configs(self):
|
||
"""加载YAML配置文件或配置目录"""
|
||
all_checks = []
|
||
|
||
try:
|
||
# 如果是目录,加载目录下所有YAML文件
|
||
if self.config_path.is_dir():
|
||
yaml_files = sorted(self.config_path.glob("*.yaml")) + sorted(self.config_path.glob("*.yml"))
|
||
if not yaml_files:
|
||
self.logger.error(f"配置目录 '{self.config_path}' 中未找到YAML文件")
|
||
sys.exit(1)
|
||
|
||
self.logger.info(f"从配置目录加载 {len(yaml_files)} 个配置文件")
|
||
for yaml_file in yaml_files:
|
||
self.logger.debug(f"加载配置文件: {yaml_file}")
|
||
with open(yaml_file, 'r', encoding='utf-8') as f:
|
||
config = yaml.safe_load(f)
|
||
checks = config.get('checks', [])
|
||
all_checks.extend(checks)
|
||
self.logger.debug(f" 从 {yaml_file.name} 加载了 {len(checks)} 个检查项")
|
||
|
||
# 如果是文件,直接加载
|
||
elif self.config_path.is_file():
|
||
with open(self.config_path, 'r', encoding='utf-8') as f:
|
||
config = yaml.safe_load(f)
|
||
all_checks = config.get('checks', [])
|
||
|
||
else:
|
||
self.logger.error(f"配置路径 '{self.config_path}' 不存在")
|
||
sys.exit(1)
|
||
|
||
# 验证检查项
|
||
validated_checks = []
|
||
for check in all_checks:
|
||
errors = self._validate_check_item(check)
|
||
if errors:
|
||
self.logger.warning(f"检查项 {check.get('id', 'unknown')} 验证失败: {', '.join(errors)}")
|
||
else:
|
||
validated_checks.append(check)
|
||
|
||
self.logger.info(f"共加载 {len(validated_checks)} 个有效检查项")
|
||
return validated_checks
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"加载配置文件失败: {e}")
|
||
sys.exit(1)
|
||
|
||
def _validate_check_item(self, check_item):
|
||
"""验证检查项配置"""
|
||
errors = []
|
||
|
||
# 必需字段
|
||
if 'id' not in check_item:
|
||
errors.append("缺少必需字段: id")
|
||
if 'check_script' not in check_item:
|
||
errors.append("缺少必需字段: check_script")
|
||
|
||
# 标签验证
|
||
if 'tags' not in check_item:
|
||
errors.append("缺少必需字段: tags")
|
||
else:
|
||
tags = check_item['tags']
|
||
if not isinstance(tags, list):
|
||
errors.append("tags 必须是列表")
|
||
elif len(tags) == 0:
|
||
errors.append("tags 不能为空")
|
||
else:
|
||
# 验证是否包含安全级别标签
|
||
valid_levels = ['low', 'medium', 'high']
|
||
has_level_tag = any(tag in valid_levels for tag in tags)
|
||
if not has_level_tag:
|
||
errors.append(f"tags 必须包含至少一个安全级别标签: {valid_levels}")
|
||
|
||
return errors
|
||
|
||
def execute_script(self, script_content, timeout=30):
|
||
"""执行shell脚本并返回结果"""
|
||
try:
|
||
# 创建临时脚本文件
|
||
script_file = f"/tmp/baseline_check_{os.getpid()}.sh"
|
||
with open(script_file, 'w') as f:
|
||
f.write("#!/bin/bash\nset -e\n")
|
||
f.write(script_content)
|
||
os.chmod(script_file, 0o755)
|
||
|
||
# 执行脚本
|
||
result = subprocess.run(
|
||
[script_file],
|
||
capture_output=True,
|
||
text=True,
|
||
timeout=timeout
|
||
)
|
||
|
||
# 删除临时脚本
|
||
os.remove(script_file)
|
||
|
||
return {
|
||
'returncode': result.returncode,
|
||
'stdout': result.stdout.strip(),
|
||
'stderr': result.stderr.strip()
|
||
}
|
||
|
||
except subprocess.TimeoutExpired:
|
||
return {
|
||
'returncode': -1,
|
||
'stdout': '',
|
||
'stderr': f'执行超时 ({timeout}秒)'
|
||
}
|
||
except Exception as e:
|
||
return {
|
||
'returncode': -2,
|
||
'stdout': '',
|
||
'stderr': f'执行错误: {str(e)}'
|
||
}
|
||
|
||
def _should_execute_check(self, check_item):
|
||
"""判断是否应该执行检查项(基于tags)"""
|
||
tags = check_item.get('tags', [])
|
||
if not tags:
|
||
self.logger.warning(f"检查项 {check_item.get('id', 'unknown')} 没有tags,将被跳过")
|
||
return False
|
||
|
||
# 检查当前安全级别是否在tags中
|
||
return self.security_level in tags
|
||
|
||
def execute_check(self, check_item):
|
||
"""执行单个检查项"""
|
||
check_id = check_item.get('id', 'unknown')
|
||
title = check_item.get('title', '未命名检查')
|
||
description = check_item.get('description', '')
|
||
script = check_item.get('check_script', '')
|
||
tags = check_item.get('tags', [])
|
||
|
||
# 检查是否应该执行(基于tags)
|
||
if not self._should_execute_check(check_item):
|
||
self.logger.info(f"跳过检查 [{check_id}] - 标签不匹配 (标签: {tags}, 当前级别: {self.security_level})")
|
||
return {
|
||
'id': check_id,
|
||
'title': title,
|
||
'category': check_item.get('category', '未分类'),
|
||
'status': 'SKIPPED',
|
||
'message': f'安全级别不匹配 (标签: {tags}, 当前: {self.security_level})',
|
||
'applied_fix': False
|
||
}
|
||
|
||
self.logger.info(f"执行检查: {title}")
|
||
self.logger.debug(f"检查描述: {description}")
|
||
self.logger.debug(f"检查标签: {tags}")
|
||
|
||
# 执行检查脚本
|
||
result = self.execute_script(script)
|
||
|
||
if result['returncode'] == 0:
|
||
status = "PASS"
|
||
message = result['stdout'] or "检查通过"
|
||
else:
|
||
status = "FAIL"
|
||
message = result['stdout'] or result['stderr'] or "检查失败"
|
||
|
||
# 如果在修复模式下且检查失败,尝试执行修复脚本
|
||
if self.fix_mode and status == "FAIL":
|
||
fix_script = check_item.get('fix_script')
|
||
if fix_script:
|
||
self.logger.warning(f"检查失败,尝试自动修复: {title}")
|
||
fix_result = self.execute_script(fix_script, timeout=60)
|
||
|
||
if fix_result['returncode'] == 0:
|
||
# 修复后重新检查
|
||
self.logger.info(f"修复成功,重新检查: {title}")
|
||
recheck_result = self.execute_script(script)
|
||
if recheck_result['returncode'] == 0:
|
||
status = "PASS"
|
||
message = f"[已自动修复] {recheck_result['stdout'] or '修复后检查通过'}"
|
||
else:
|
||
status = "FAIL"
|
||
message = f"[修复后仍失败] {recheck_result['stdout'] or recheck_result['stderr'] or '修复后检查仍失败'}"
|
||
return {
|
||
'id': check_id,
|
||
'title': title,
|
||
'category': check_item.get('category', '未分类'),
|
||
'status': status,
|
||
'message': message,
|
||
'applied_fix': True
|
||
}
|
||
else:
|
||
self.logger.error(f"自动修复失败: {fix_result['stderr'] or fix_result['stdout']}")
|
||
|
||
return {
|
||
'id': check_id,
|
||
'title': title,
|
||
'category': check_item.get('category', '未分类'),
|
||
'status': status,
|
||
'message': message,
|
||
'applied_fix': False
|
||
}
|
||
|
||
def generate_html_report(self):
|
||
"""生成HTML格式的报告"""
|
||
# 统计结果
|
||
total = len(self.results)
|
||
passed = len([r for r in self.results if r['status'] == 'PASS'])
|
||
failed = len([r for r in self.results if r['status'] == 'FAIL'])
|
||
errors = len([r for r in self.results if r['status'] == 'ERROR'])
|
||
skipped = len([r for r in self.results if r['status'] == 'SKIPPED'])
|
||
fixed = len([r for r in self.results if r.get('applied_fix') and r['status'] == 'PASS'])
|
||
|
||
# 计算失败百分比(避免除零错误)
|
||
failed_percent = (failed/total*100) if total > 0 else 0.0
|
||
|
||
# 按类别分组
|
||
categories = {}
|
||
for result in self.results:
|
||
category = result['category']
|
||
if category not in categories:
|
||
categories[category] = []
|
||
categories[category].append(result)
|
||
|
||
# 生成HTML
|
||
html_content = f"""
|
||
<!DOCTYPE html>
|
||
<html lang="zh-CN">
|
||
<head>
|
||
<meta charset="UTF-8">
|
||
<meta name="viewport" content="width=device-width, initial-scale=1.0">
|
||
<title>系统安全基线检查报告</title>
|
||
<style>
|
||
body {{ font-family: Arial, sans-serif; margin: 20px; }}
|
||
.header {{ text-align: center; margin-bottom: 30px; }}
|
||
.summary {{ display: flex; justify-content: space-around; margin: 20px 0; }}
|
||
.summary-box {{ text-align: center; padding: 15px; border-radius: 5px; width: 150px; }}
|
||
.pass {{ background-color: #d4edda; border: 1px solid #c3e6cb; color: #155724; }}
|
||
.fail {{ background-color: #f8d7da; border: 1px solid #f5c6cb; color: #721c24; }}
|
||
.skip {{ background-color: #fff3cd; border: 1px solid #ffeeba; color: #856404; }}
|
||
.fixed {{ background-color: #d1ecf1; border: 1px solid #bee5eb; color: #0c5460; }}
|
||
.category {{ margin-bottom: 30px; }}
|
||
.category-title {{ background-color: #f1f1f1; padding: 10px; border-radius: 5px; margin-bottom: 10px; }}
|
||
.check-item {{ margin-left: 20px; margin-bottom: 10px; }}
|
||
.status-pass {{ color: green; font-weight: bold; }}
|
||
.status-fail {{ color: red; font-weight: bold; }}
|
||
.status-skip {{ color: orange; font-weight: bold; }}
|
||
.fixed-tag {{ background-color: #17a2b8; color: white; padding: 2px 5px; border-radius: 3px; font-size: 0.8em; }}
|
||
.message {{ margin-top: 5px; margin-left: 20px; font-family: monospace; white-space: pre-wrap; }}
|
||
.footer {{ margin-top: 30px; text-align: center; color: #666; }}
|
||
</style>
|
||
</head>
|
||
<body>
|
||
<div class="header">
|
||
<h1>系统安全基线检查报告</h1>
|
||
<p>生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}</p>
|
||
<p>安全等级: <strong>{self.security_level}</strong> | 修复模式: <strong>{"启用" if self.fix_mode else "禁用"}</strong></p>
|
||
<p>检查主机: <strong>{os.uname().nodename}</strong> | 系统版本: <strong>{os.uname().sysname} {os.uname().release}</strong></p>
|
||
</div>
|
||
|
||
<div class="summary">
|
||
<div class="summary-box pass">
|
||
<h3>通过</h3>
|
||
<p style="font-size: 24px; margin: 5px 0;">{passed}</p>
|
||
<p>总数: {total}</p>
|
||
</div>
|
||
<div class="summary-box fail">
|
||
<h3>失败</h3>
|
||
<p style="font-size: 24px; margin: 5px 0;">{failed}</p>
|
||
<p>{failed_percent:.1f}%</p>
|
||
</div>
|
||
<div class="summary-box skip">
|
||
<h3>跳过</h3>
|
||
<p style="font-size: 24px; margin: 5px 0;">{skipped}</p>
|
||
</div>
|
||
<div class="summary-box fixed">
|
||
<h3>已修复</h3>
|
||
<p style="font-size: 24px; margin: 5px 0;">{fixed}</p>
|
||
</div>
|
||
</div>
|
||
"""
|
||
|
||
# 按类别生成检查结果
|
||
for category, items in categories.items():
|
||
html_content += f"""
|
||
<div class="category">
|
||
<div class="category-title">
|
||
<h2>{category}</h2>
|
||
</div>
|
||
"""
|
||
|
||
for item in items:
|
||
status_class = {
|
||
'PASS': 'status-pass',
|
||
'FAIL': 'status-fail',
|
||
'SKIPPED': 'status-skip'
|
||
}.get(item['status'], '')
|
||
|
||
status_text = {
|
||
'PASS': '✓ 通过',
|
||
'FAIL': '✗ 失败',
|
||
'SKIPPED': '» 跳过'
|
||
}.get(item['status'], item['status'])
|
||
|
||
fixed_tag = '<span class="fixed-tag">已修复</span>' if item.get('applied_fix') else ''
|
||
|
||
message_formatted = item['message'].replace('\n', '<br>')
|
||
html_content += f"""
|
||
<div class="check-item">
|
||
<h4>{item['title']} {fixed_tag}</h4>
|
||
<p><strong>状态:</strong> <span class="{status_class}">{status_text}</span></p>
|
||
<div class="message">{message_formatted}</div>
|
||
</div>
|
||
"""
|
||
|
||
html_content += "</div>"
|
||
|
||
html_content += f"""
|
||
<div class="footer">
|
||
<p>报告生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} | OpenRuyi安全基线检查工具 v1.0</p>
|
||
</div>
|
||
</body>
|
||
</html>
|
||
"""
|
||
|
||
# 保存HTML报告
|
||
with open(self.html_report, 'w', encoding='utf-8') as f:
|
||
f.write(html_content)
|
||
|
||
return str(self.html_report)
|
||
|
||
def generate_json_report(self):
|
||
"""生成JSON格式的报告,便于自动化处理"""
|
||
report = {
|
||
'metadata': {
|
||
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
|
||
'hostname': os.uname().nodename,
|
||
'os': f"{os.uname().sysname} {os.uname().release}",
|
||
'security_level': self.security_level,
|
||
'fix_mode': self.fix_mode
|
||
},
|
||
'summary': {
|
||
'total': len(self.results),
|
||
'passed': len([r for r in self.results if r['status'] == 'PASS']),
|
||
'failed': len([r for r in self.results if r['status'] == 'FAIL']),
|
||
'skipped': len([r for r in self.results if r['status'] == 'SKIPPED']),
|
||
'fixed': len([r for r in self.results if r.get('applied_fix') and r['status'] == 'PASS'])
|
||
},
|
||
'results': self.results
|
||
}
|
||
|
||
with open(self.json_report, 'w', encoding='utf-8') as f:
|
||
json.dump(report, f, indent=2, ensure_ascii=False)
|
||
|
||
return str(self.json_report)
|
||
|
||
def run(self):
|
||
"""运行所有检查"""
|
||
if not self.is_root:
|
||
self.logger.warning("=" * 60)
|
||
self.logger.warning("警告: 当前不是root用户,部分检查可能无法执行或结果不准确")
|
||
self.logger.warning("建议使用sudo运行此程序以获得完整检查结果")
|
||
self.logger.warning("=" * 60)
|
||
|
||
# 加载所有检查项
|
||
checks = self.load_configs()
|
||
|
||
# 根据tags筛选检查项
|
||
filtered_checks = [check for check in checks if self._should_execute_check(check)]
|
||
total_checks = len(filtered_checks)
|
||
skipped_count = len(checks) - total_checks
|
||
|
||
self.logger.info(f"开始执行安全基线检查,共加载{len(checks)}项,筛选后{total_checks}项,跳过{skipped_count}项")
|
||
self.logger.info(f"安全等级: {self.security_level} | 修复模式: {'启用' if self.fix_mode else '禁用'}")
|
||
|
||
# 执行筛选后的检查
|
||
for i, check_item in enumerate(filtered_checks, 1):
|
||
self.logger.info(f"[{i}/{total_checks}] {check_item.get('title', '未命名检查')}")
|
||
result = self.execute_check(check_item)
|
||
self.results.append(result)
|
||
|
||
# 生成报告
|
||
html_report = self.generate_html_report()
|
||
json_report = self.generate_json_report()
|
||
|
||
self.logger.info("\n" + "=" * 80)
|
||
self.logger.info("检查完成,报告已生成:")
|
||
self.logger.info(f"- 报告目录: {self.run_dir}")
|
||
self.logger.info(f"- 详细日志: {self.output_file}")
|
||
self.logger.info(f"- HTML报告: {html_report}")
|
||
self.logger.info(f"- JSON报告: {json_report}")
|
||
self.logger.info("=" * 80 + "\n")
|
||
|
||
# 打印总结
|
||
passed = len([r for r in self.results if r['status'] == 'PASS'])
|
||
failed = len([r for r in self.results if r['status'] == 'FAIL'])
|
||
skipped = len([r for r in self.results if r['status'] == 'SKIPPED'])
|
||
|
||
self.logger.info(f"检查总结: 总计{total_checks}项,通过{passed}项,失败{failed}项,跳过{skipped}项")
|
||
|
||
# 返回退出码
|
||
return 0 if failed == 0 else 1
|
||
|
||
def main():
|
||
parser = argparse.ArgumentParser(description='OpenRuyi系统安全基线检查工具')
|
||
parser.add_argument('-c', '--config', default=None,
|
||
help='YAML规则配置文件路径或配置目录 (默认: 先查找当前目录rules,再查找/etc/baseline_checker/rules)')
|
||
parser.add_argument('-l', '--level', choices=['low', 'medium', 'high'], default='high',
|
||
help='安全等级 (默认: high)')
|
||
parser.add_argument('-f', '--fix', action='store_true',
|
||
help='启用自动修复模式 (需要root权限)')
|
||
parser.add_argument('-o', '--output', default='.',
|
||
help='输出目录 (默认: 当前目录)')
|
||
parser.add_argument('-v', '--verbose', action='store_true',
|
||
help='详细输出')
|
||
|
||
args = parser.parse_args()
|
||
|
||
# 如果没有指定config参数,自动查找默认路径
|
||
if args.config is None:
|
||
# 先查找当前目录下的rules目录
|
||
current_rules = Path('rules')
|
||
if current_rules.exists() and current_rules.is_dir():
|
||
args.config = str(current_rules)
|
||
else:
|
||
# 如果不存在,查找/etc/baseline_checker/rules
|
||
etc_rules = Path('/etc/baseline_checker/rules')
|
||
if etc_rules.exists() and etc_rules.is_dir():
|
||
args.config = str(etc_rules)
|
||
else:
|
||
# 如果都不存在,尝试/etc/baseline_checker(兼容旧版本)
|
||
etc_config = Path('/etc/baseline_checker')
|
||
if etc_config.exists():
|
||
args.config = str(etc_config)
|
||
else:
|
||
print("错误: 未找到配置文件目录")
|
||
print(" 请确保以下路径之一存在:")
|
||
print(f" - {current_rules.absolute()}")
|
||
print(f" - {etc_rules}")
|
||
print(f" - {etc_config}")
|
||
print(" 或使用 -c 参数指定配置文件路径")
|
||
sys.exit(1)
|
||
|
||
# 检查配置文件或目录是否存在
|
||
config_path = Path(args.config)
|
||
if not config_path.exists():
|
||
print(f"错误: 配置文件或目录 '{args.config}' 不存在")
|
||
sys.exit(1)
|
||
|
||
# 创建输出目录
|
||
output_dir = Path(args.output)
|
||
output_dir.mkdir(parents=True, exist_ok=True)
|
||
|
||
# 设置日志级别
|
||
if args.verbose:
|
||
logging.getLogger().setLevel(logging.DEBUG)
|
||
|
||
# 检查修复模式是否需要root权限
|
||
if args.fix and os.geteuid() != 0:
|
||
print("错误: 修复模式需要root权限,请使用sudo运行")
|
||
sys.exit(1)
|
||
|
||
checker = BaselineChecker(
|
||
config_path=args.config,
|
||
security_level=args.level,
|
||
fix_mode=args.fix,
|
||
output_dir=args.output
|
||
)
|
||
|
||
exit_code = checker.run()
|
||
sys.exit(exit_code)
|
||
|
||
if __name__ == "__main__":
|
||
main()
|