137
01_init.sh
Executable file
137
01_init.sh
Executable file
@@ -0,0 +1,137 @@
|
||||
#!/bin/bash
|
||||
set -euo pipefail
|
||||
|
||||
source "$(dirname "$0")/config.sh"
|
||||
|
||||
log() {
|
||||
printf '[%s] %s\n' "$(date '+%Y-%m-%d %H:%M:%S')" "$*" | tee -a "$LOG_FILE"
|
||||
}
|
||||
|
||||
error() {
|
||||
printf '[ERROR] %s\n' "$*" | tee -a "$ERROR_LOG" >&2
|
||||
}
|
||||
|
||||
validate_config() {
|
||||
local errors=0
|
||||
|
||||
if [ -z "${WORK_DIR:-}" ]; then
|
||||
error "WORK_DIR 未设置"
|
||||
errors=$((errors + 1))
|
||||
fi
|
||||
|
||||
if [ -z "${DB_FILE:-}" ]; then
|
||||
error "DB_FILE 未设置"
|
||||
errors=$((errors + 1))
|
||||
fi
|
||||
|
||||
if [ -z "${REPO_ARCH:-}" ]; then
|
||||
error "REPO_ARCH 未设置"
|
||||
errors=$((errors + 1))
|
||||
fi
|
||||
|
||||
if [[ ! "${PARALLEL_JOBS:-}" =~ ^[0-9]+$ ]] || [ "${PARALLEL_JOBS:-0}" -le 0 ]; then
|
||||
error "PARALLEL_JOBS 必须是大于 0 的数字"
|
||||
errors=$((errors + 1))
|
||||
fi
|
||||
|
||||
if [[ ! "${DOWNLOAD_TIMEOUT:-}" =~ ^[0-9]+$ ]] || [ "${DOWNLOAD_TIMEOUT:-0}" -le 0 ]; then
|
||||
error "DOWNLOAD_TIMEOUT 必须是大于 0 的数字"
|
||||
errors=$((errors + 1))
|
||||
fi
|
||||
|
||||
if [ $errors -gt 0 ]; then
|
||||
error "发现 $errors 个配置错误,请检查 config.sh"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
log "配置验证通过"
|
||||
}
|
||||
|
||||
init_workspace() {
|
||||
log "初始化工作空间: $WORK_DIR"
|
||||
mkdir -p "$RPM_CACHE_DIR" "$EXTRACT_DIR" "$RESULTS_DIR"
|
||||
}
|
||||
|
||||
# 初始化数据库
|
||||
init_database() {
|
||||
log "初始化数据库: $DB_FILE"
|
||||
sqlite3 "$DB_FILE" <<'EOF'
|
||||
CREATE TABLE IF NOT EXISTS packages (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
name TEXT NOT NULL,
|
||||
version TEXT,
|
||||
release TEXT,
|
||||
arch TEXT,
|
||||
scan_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||||
status TEXT,
|
||||
UNIQUE(name, version, release, arch)
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS binaries (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
package_id INTEGER,
|
||||
file_path TEXT,
|
||||
file_type TEXT,
|
||||
inode INTEGER,
|
||||
FOREIGN KEY (package_id) REFERENCES packages(id)
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS security_checks (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
binary_id INTEGER,
|
||||
pie TEXT,
|
||||
nx TEXT,
|
||||
canary TEXT,
|
||||
fortify TEXT,
|
||||
relro TEXT,
|
||||
bind_now TEXT,
|
||||
FOREIGN KEY (binary_id) REFERENCES binaries(id)
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_package_name ON packages(name);
|
||||
CREATE INDEX IF NOT EXISTS idx_binary_path ON binaries(file_path);
|
||||
CREATE INDEX IF NOT EXISTS idx_package_status ON packages(status);
|
||||
EOF
|
||||
}
|
||||
|
||||
# 获取包列表(预筛选)
|
||||
fetch_package_list() {
|
||||
local output_file="${WORK_DIR}/packages.list"
|
||||
log "获取 ${REPO_ARCH} 架构的包列表..."
|
||||
|
||||
dnf repoquery --arch "$REPO_ARCH" --qf '%{name}-%{version}-%{release}.%{arch}\n' \
|
||||
> "$output_file"
|
||||
|
||||
local count=$(wc -l < "$output_file")
|
||||
log "找到 $count 个包"
|
||||
echo "$output_file"
|
||||
}
|
||||
|
||||
# 检查依赖
|
||||
check_dependencies() {
|
||||
local deps=(dnf sqlite3 parallel checksec rpm2cpio file stat)
|
||||
local missing=()
|
||||
|
||||
for cmd in "${deps[@]}"; do
|
||||
if ! command -v "$cmd" &>/dev/null; then
|
||||
missing+=("$cmd")
|
||||
fi
|
||||
done
|
||||
|
||||
if [ ${#missing[@]} -gt 0 ]; then
|
||||
error "缺少依赖: ${missing[*]}"
|
||||
exit 1
|
||||
fi
|
||||
}
|
||||
|
||||
main() {
|
||||
log "=== RPM 安全扫描系统初始化 ==="
|
||||
validate_config
|
||||
check_dependencies
|
||||
init_workspace
|
||||
init_database
|
||||
fetch_package_list
|
||||
log "初始化完成"
|
||||
}
|
||||
|
||||
main "$@"
|
||||
471
02_scan.sh
Executable file
471
02_scan.sh
Executable file
@@ -0,0 +1,471 @@
|
||||
#!/bin/bash
|
||||
set -euo pipefail
|
||||
|
||||
source "$(dirname "$0")/config.sh"
|
||||
source "$(dirname "$0")/common.sh"
|
||||
|
||||
log() {
|
||||
# 使用 printf 防止日志注入
|
||||
printf '[%s] %s\n' "$(date '+%Y-%m-%d %H:%M:%S')" "$*" | tee -a "$LOG_FILE"
|
||||
}
|
||||
|
||||
error() {
|
||||
# 使用 printf 防止日志注入
|
||||
printf '[ERROR] %s\n' "$*" | tee -a "$ERROR_LOG" >&2
|
||||
}
|
||||
|
||||
# 扫描单个包(无数据库写入)
|
||||
scan_package() {
|
||||
local pkg_name="$1"
|
||||
|
||||
# 验证包名格式(防止路径遍历和命令注入)
|
||||
if ! validate_package_name "$pkg_name"; then
|
||||
error "非法包名格式: $pkg_name"
|
||||
log_failure "$pkg_name" "invalid_name"
|
||||
return 1
|
||||
fi
|
||||
|
||||
local pkg_dir="${EXTRACT_DIR}/${pkg_name}"
|
||||
local result_file="${RESULTS_DIR}/${pkg_name}.json"
|
||||
local status_file="${RESULTS_DIR}/scanned.txt"
|
||||
local rpm_file=""
|
||||
|
||||
# 资源清理函数(在函数内定义以访问局部变量)
|
||||
cleanup_pkg_dir() {
|
||||
# 使用 ${pkg_dir:-} 避免变量未定义时的错误
|
||||
if [ -n "${pkg_dir:-}" ] && [ -d "$pkg_dir" ]; then
|
||||
rm -rf "$pkg_dir"
|
||||
fi
|
||||
}
|
||||
|
||||
# 设置资源清理 trap,确保临时文件在任何情况下都被清理
|
||||
trap cleanup_pkg_dir EXIT ERR INT TERM
|
||||
|
||||
# 使用文件锁检查是否已扫描,并在同一锁内标记为处理中
|
||||
# 修复 TOCTOU 竞态条件:检查和标记必须在同一锁区域
|
||||
(
|
||||
flock -x 200
|
||||
if grep -qx "$pkg_name" "$status_file" 2>/dev/null; then
|
||||
log "跳过已扫描: $pkg_name"
|
||||
exit 0
|
||||
fi
|
||||
# 立即标记为处理中,防止其他进程重复扫描
|
||||
echo "$pkg_name" >> "$status_file"
|
||||
) 200>"${status_file}.lock"
|
||||
|
||||
# 下载包
|
||||
mkdir -p "$pkg_dir"
|
||||
if ! timeout "$DOWNLOAD_TIMEOUT" dnf download --destdir "$RPM_CACHE_DIR" "$pkg_name" &>/dev/null; then
|
||||
error "下载失败: $pkg_name"
|
||||
log_failure "$pkg_name" "download_failed"
|
||||
return 1
|
||||
fi
|
||||
|
||||
# 解压
|
||||
local rpm_file=$(find "$RPM_CACHE_DIR" -name "${pkg_name}.rpm" | head -1)
|
||||
if [ -z "$rpm_file" ]; then
|
||||
error "找不到 RPM 文件: $pkg_name"
|
||||
log_failure "$pkg_name" "not_found"
|
||||
return 1
|
||||
fi
|
||||
|
||||
cd "$pkg_dir"
|
||||
if ! rpm2cpio "$rpm_file" | cpio -idm 2>/dev/null; then
|
||||
error "解压失败: $pkg_name"
|
||||
log_failure "$pkg_name" "extract_failed"
|
||||
return 1
|
||||
fi
|
||||
|
||||
# 检查文件路径是否应该被排除
|
||||
should_exclude_path() {
|
||||
local file_path="$1"
|
||||
for pattern in "${EXCLUDE_PATHS[@]}"; do
|
||||
# 将路径转换为相对于 pkg_dir 的路径进行匹配
|
||||
local rel_path="${file_path#$pkg_dir/}"
|
||||
# 使用 glob 模式匹配
|
||||
case "$rel_path" in
|
||||
$pattern)
|
||||
return 0
|
||||
;;
|
||||
esac
|
||||
done
|
||||
return 1
|
||||
}
|
||||
|
||||
# 查找 ELF 文件(去重)
|
||||
local elf_list="${pkg_dir}/elf_files.txt"
|
||||
> "$elf_list"
|
||||
|
||||
# 遍历目标路径查找文件
|
||||
for target_path in "${TARGET_PATHS[@]}"; do
|
||||
local search_path="${pkg_dir}/${target_path}"
|
||||
if [ -d "$search_path" ]; then
|
||||
# 直接列出文件,使用 NUL 分隔符处理特殊字符
|
||||
find "$search_path" -type f ! -name '*.ko' ! -name '*.ko.xz' -print0 2>/dev/null | \
|
||||
while IFS= read -r -d '' file; do
|
||||
# 跳过匹配排除路径的文件
|
||||
if ! should_exclude_path "$file"; then
|
||||
echo "$file"
|
||||
fi
|
||||
done | sort -u >> "$elf_list"
|
||||
fi
|
||||
done
|
||||
|
||||
local total_files=$(wc -l < "$elf_list" 2>/dev/null || echo 0)
|
||||
|
||||
# 过滤 ELF 文件
|
||||
local elf_filtered="${pkg_dir}/elf_filtered.txt"
|
||||
> "$elf_filtered"
|
||||
|
||||
if [ "$total_files" -gt 0 ]; then
|
||||
while IFS= read -r file; do
|
||||
if [ -f "$file" ]; then
|
||||
# 使用 file 命令检测 ELF 文件
|
||||
# 匹配 ELF 32-bit 或 64-bit 可执行文件或共享库
|
||||
if file "$file" 2>/dev/null | grep -qE 'ELF.*(executable|shared object)'; then
|
||||
echo "$file" >> "$elf_filtered"
|
||||
fi
|
||||
fi
|
||||
done < "$elf_list"
|
||||
fi
|
||||
|
||||
# 扫描
|
||||
if [ -s "$elf_filtered" ]; then
|
||||
local file_count=$(wc -l < "$elf_filtered")
|
||||
|
||||
# 使用新版 checksec 语法:checksec file --output=json <file>
|
||||
# 构建标准 JSON 数组格式
|
||||
> "$result_file"
|
||||
echo "[" > "$result_file"
|
||||
local first_file=true
|
||||
|
||||
while IFS= read -r file; do
|
||||
if [ -f "$file" ]; then
|
||||
# 使用新版 checksec 命令格式
|
||||
# 过滤警告信息和错误信息(防止污染 JSON)
|
||||
local json_output=$(checksec file --output=json "$file" 2>&1 | grep -v '^Warning:' | grep -v '^Error reading' || echo "")
|
||||
|
||||
if [ -n "$json_output" ]; then
|
||||
# 提取 JSON 数组中的对象(去掉外层的 [])
|
||||
# 使用 sed 提取数组内容,并过滤掉错误行
|
||||
local json_object=$(echo "$json_output" | sed 's/Warning:.*$//g' | sed 's/^Error.*$//g' | sed 's/^\s*\[\s*//g' | sed 's/\s*\]\s*$//g' | grep -v '^$')
|
||||
|
||||
if [ -n "$json_object" ]; then
|
||||
# 移除完整路径前缀,只保留相对路径
|
||||
# 例如: /full/path/extracted/pkg/usr/bin/file -> /usr/bin/file
|
||||
local relative_path=$(echo "$file" | sed "s|^${pkg_dir}/|/|")
|
||||
|
||||
# 替换 JSON 中的 name 字段
|
||||
json_object=$(echo "$json_object" | sed "s|\"name\": *\"[^\"]*\"|\"name\": \"${relative_path}\"|")
|
||||
|
||||
if [ "$first_file" = true ]; then
|
||||
echo " $json_object" >> "$result_file"
|
||||
first_file=false
|
||||
else
|
||||
echo " ,$json_object" >> "$result_file"
|
||||
fi
|
||||
fi
|
||||
fi
|
||||
fi
|
||||
done < "$elf_filtered"
|
||||
echo "]" >> "$result_file"
|
||||
|
||||
# 记录扫描成功(包名已在开头标记,此处只记录统计)
|
||||
(
|
||||
flock -x 200
|
||||
echo "$pkg_name:$file_count" >> "${RESULTS_DIR}/success.txt"
|
||||
) 200>"${RESULTS_DIR}/success.txt.lock"
|
||||
|
||||
log "扫描完成: $pkg_name ($file_count 个文件)"
|
||||
else
|
||||
# 记录无二进制文件(包名已在开头标记,此处只记录统计)
|
||||
(
|
||||
flock -x 200
|
||||
echo "$pkg_name:no_binaries" >> "${RESULTS_DIR}/no_binary.txt"
|
||||
) 200>"${RESULTS_DIR}/no_binary.txt.lock"
|
||||
log "无 ELF 文件: $pkg_name"
|
||||
fi
|
||||
|
||||
# trap 会自动清理临时文件(在 EXIT 时触发)
|
||||
return 0
|
||||
}
|
||||
|
||||
# 主流程
|
||||
main() {
|
||||
# 支持单包扫描模式
|
||||
if [ $# -gt 0 ] && [ "$1" == "scan_package" ] && [ -n "${2:-}" ]; then
|
||||
# 单包扫描模式
|
||||
local pkg_name="$2"
|
||||
log "=== 单包扫描模式: $pkg_name ==="
|
||||
|
||||
# 初始化状态文件
|
||||
mkdir -p "$RESULTS_DIR"
|
||||
touch "${RESULTS_DIR}/scanned.txt"
|
||||
touch "${RESULTS_DIR}/success.txt"
|
||||
touch "${RESULTS_DIR}/failed.txt"
|
||||
touch "${RESULTS_DIR}/no_binary.txt"
|
||||
|
||||
scan_package "$pkg_name"
|
||||
exit $?
|
||||
fi
|
||||
|
||||
# 批量扫描模式
|
||||
local package_list="${WORK_DIR}/packages.list"
|
||||
|
||||
if [ ! -f "$package_list" ]; then
|
||||
echo "错误: 包列表不存在,请先运行 01_init.sh"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# 初始化状态文件
|
||||
touch "${RESULTS_DIR}/scanned.txt"
|
||||
touch "${RESULTS_DIR}/success.txt"
|
||||
touch "${RESULTS_DIR}/failed.txt"
|
||||
touch "${RESULTS_DIR}/no_binary.txt"
|
||||
|
||||
log "=== 开始并行扫描 (并发数: $PARALLEL_JOBS) ==="
|
||||
|
||||
# 导出脚本目录供包装脚本使用
|
||||
export CHECKSEC_SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
|
||||
|
||||
# 导出环境变量供 parallel 使用
|
||||
export WORK_DIR RPM_CACHE_DIR EXTRACT_DIR RESULTS_DIR DB_FILE
|
||||
export DOWNLOAD_TIMEOUT CHECKSEC_TIMEOUT TARGET_PATHS EXCLUDE_PATTERNS LOG_FILE ERROR_LOG
|
||||
|
||||
# 创建临时的包装脚本供 parallel 调用
|
||||
local wrapper_script="${WORK_DIR}/.scan_wrapper.sh"
|
||||
cat > "$wrapper_script" <<'WRAPPER_EOF'
|
||||
#!/bin/bash
|
||||
set -euo pipefail
|
||||
|
||||
# 使用导出的脚本目录
|
||||
SCRIPT_DIR="$CHECKSEC_SCRIPT_DIR"
|
||||
|
||||
# 加载所有依赖
|
||||
source "$SCRIPT_DIR/config.sh"
|
||||
source "$SCRIPT_DIR/common.sh"
|
||||
|
||||
# 定义日志函数
|
||||
log() {
|
||||
printf '[%s] %s\n' "$(date '+%Y-%m-%d %H:%M:%S')" "$*" | tee -a "$LOG_FILE"
|
||||
}
|
||||
|
||||
error() {
|
||||
printf '[ERROR] %s\n' "$*" | tee -a "$ERROR_LOG" >&2
|
||||
}
|
||||
|
||||
# 重新定义 scan_package 函数(复制自 02_scan.sh)
|
||||
scan_package() {
|
||||
local pkg_name="$1"
|
||||
|
||||
# 验证包名格式(防止路径遍历和命令注入)
|
||||
if ! validate_package_name "$pkg_name"; then
|
||||
error "非法包名格式: $pkg_name"
|
||||
log_failure "$pkg_name" "invalid_name"
|
||||
return 1
|
||||
fi
|
||||
|
||||
local pkg_dir="${EXTRACT_DIR}/${pkg_name}"
|
||||
local result_file="${RESULTS_DIR}/${pkg_name}.json"
|
||||
local status_file="${RESULTS_DIR}/scanned.txt"
|
||||
local rpm_file=""
|
||||
|
||||
# 资源清理函数(在函数内定义以访问局部变量)
|
||||
cleanup_pkg_dir() {
|
||||
# 使用 ${pkg_dir:-} 避免变量未定义时的错误
|
||||
if [ -n "${pkg_dir:-}" ] && [ -d "$pkg_dir" ]; then
|
||||
rm -rf "$pkg_dir"
|
||||
fi
|
||||
}
|
||||
|
||||
# 设置资源清理 trap,确保临时文件在任何情况下都被清理
|
||||
trap cleanup_pkg_dir EXIT ERR INT TERM
|
||||
|
||||
# 使用文件锁检查是否已扫描,并在同一锁内标记为处理中
|
||||
# 修复 TOCTOU 竞态条件:检查和标记必须在同一锁区域
|
||||
(
|
||||
flock -x 200
|
||||
if grep -qx "$pkg_name" "$status_file" 2>/dev/null; then
|
||||
log "跳过已扫描: $pkg_name"
|
||||
exit 0
|
||||
fi
|
||||
# 立即标记为处理中,防止其他进程重复扫描
|
||||
echo "$pkg_name" >> "$status_file"
|
||||
) 200>"${status_file}.lock"
|
||||
|
||||
# 下载包
|
||||
mkdir -p "$pkg_dir"
|
||||
if ! timeout "$DOWNLOAD_TIMEOUT" dnf download --destdir "$RPM_CACHE_DIR" "$pkg_name" &>/dev/null; then
|
||||
error "下载失败: $pkg_name"
|
||||
log_failure "$pkg_name" "download_failed"
|
||||
return 1
|
||||
fi
|
||||
|
||||
# 解压
|
||||
local rpm_file=$(find "$RPM_CACHE_DIR" -name "${pkg_name}.rpm" | head -1)
|
||||
if [ -z "$rpm_file" ]; then
|
||||
error "找不到 RPM 文件: $pkg_name"
|
||||
log_failure "$pkg_name" "not_found"
|
||||
return 1
|
||||
fi
|
||||
|
||||
cd "$pkg_dir"
|
||||
if ! rpm2cpio "$rpm_file" | cpio -idm 2>/dev/null; then
|
||||
error "解压失败: $pkg_name"
|
||||
log_failure "$pkg_name" "extract_failed"
|
||||
return 1
|
||||
fi
|
||||
|
||||
# 检查文件路径是否应该被排除
|
||||
should_exclude_path() {
|
||||
local file_path="$1"
|
||||
for pattern in "${EXCLUDE_PATHS[@]}"; do
|
||||
# 将路径转换为相对于 pkg_dir 的路径进行匹配
|
||||
local rel_path="${file_path#$pkg_dir/}"
|
||||
# 使用 glob 模式匹配
|
||||
case "$rel_path" in
|
||||
$pattern)
|
||||
return 0
|
||||
;;
|
||||
esac
|
||||
done
|
||||
return 1
|
||||
}
|
||||
|
||||
# 查找 ELF 文件(去重)
|
||||
local elf_list="${pkg_dir}/elf_files.txt"
|
||||
> "$elf_list"
|
||||
|
||||
# 遍历目标路径查找文件
|
||||
for target_path in "${TARGET_PATHS[@]}"; do
|
||||
local search_path="${pkg_dir}/${target_path}"
|
||||
if [ -d "$search_path" ]; then
|
||||
# 直接列出文件,使用 NUL 分隔符处理特殊字符
|
||||
find "$search_path" -type f ! -name '*.ko' ! -name '*.ko.xz' -print0 2>/dev/null | \
|
||||
while IFS= read -r -d '' file; do
|
||||
# 跳过匹配排除路径的文件
|
||||
if ! should_exclude_path "$file"; then
|
||||
echo "$file"
|
||||
fi
|
||||
done | sort -u >> "$elf_list"
|
||||
fi
|
||||
done
|
||||
|
||||
local total_files=$(wc -l < "$elf_list" 2>/dev/null || echo 0)
|
||||
|
||||
# 过滤 ELF 文件
|
||||
local elf_filtered="${pkg_dir}/elf_filtered.txt"
|
||||
> "$elf_filtered"
|
||||
|
||||
if [ "$total_files" -gt 0 ]; then
|
||||
while IFS= read -r file; do
|
||||
if [ -f "$file" ]; then
|
||||
# 使用 file 命令检测 ELF 文件
|
||||
# 匹配 ELF 32-bit 或 64-bit 可执行文件或共享库
|
||||
if file "$file" 2>/dev/null | grep -qE 'ELF.*(executable|shared object)'; then
|
||||
echo "$file" >> "$elf_filtered"
|
||||
fi
|
||||
fi
|
||||
done < "$elf_list"
|
||||
fi
|
||||
|
||||
# 扫描
|
||||
if [ -s "$elf_filtered" ]; then
|
||||
local file_count=$(wc -l < "$elf_filtered")
|
||||
|
||||
# 使用新版 checksec 语法:checksec file --output=json <file>
|
||||
# 构建标准 JSON 数组格式
|
||||
> "$result_file"
|
||||
echo "[" > "$result_file"
|
||||
local first_file=true
|
||||
|
||||
while IFS= read -r file; do
|
||||
if [ -f "$file" ]; then
|
||||
# 使用新版 checksec 命令格式
|
||||
# 过滤警告信息和错误信息(防止污染 JSON)
|
||||
local json_output=$(checksec file --output=json "$file" 2>&1 | grep -v '^Warning:' | grep -v '^Error reading' || echo "")
|
||||
|
||||
if [ -n "$json_output" ]; then
|
||||
# 提取 JSON 数组中的对象(去掉外层的 [])
|
||||
# 使用 sed 提取数组内容,并过滤掉错误行
|
||||
local json_object=$(echo "$json_output" | sed 's/Warning:.*$//g' | sed 's/^Error.*$//g' | sed 's/^\s*\[\s*//g' | sed 's/\s*\]\s*$//g' | grep -v '^$')
|
||||
|
||||
if [ -n "$json_object" ]; then
|
||||
# 移除完整路径前缀,只保留相对路径
|
||||
# 例如: /full/path/extracted/pkg/usr/bin/file -> /usr/bin/file
|
||||
local relative_path=$(echo "$file" | sed "s|^${pkg_dir}/|/|")
|
||||
|
||||
# 替换 JSON 中的 name 字段
|
||||
json_object=$(echo "$json_object" | sed "s|\"name\": *\"[^\"]*\"|\"name\": \"${relative_path}\"|")
|
||||
|
||||
if [ "$first_file" = true ]; then
|
||||
echo " $json_object" >> "$result_file"
|
||||
first_file=false
|
||||
else
|
||||
echo " ,$json_object" >> "$result_file"
|
||||
fi
|
||||
fi
|
||||
fi
|
||||
fi
|
||||
done < "$elf_filtered"
|
||||
echo "]" >> "$result_file"
|
||||
|
||||
# 记录扫描成功(包名已在开头标记,此处只记录统计)
|
||||
(
|
||||
flock -x 200
|
||||
echo "$pkg_name:$file_count" >> "${RESULTS_DIR}/success.txt"
|
||||
) 200>"${RESULTS_DIR}/success.txt.lock"
|
||||
|
||||
log "扫描完成: $pkg_name ($file_count 个文件)"
|
||||
else
|
||||
# 记录无二进制文件(包名已在开头标记,此处只记录统计)
|
||||
(
|
||||
flock -x 200
|
||||
echo "$pkg_name:no_binaries" >> "${RESULTS_DIR}/no_binary.txt"
|
||||
) 200>"${RESULTS_DIR}/no_binary.txt.lock"
|
||||
log "无 ELF 文件: $pkg_name"
|
||||
fi
|
||||
|
||||
# trap 会自动清理临时文件(在 EXIT 时触发)
|
||||
return 0
|
||||
}
|
||||
|
||||
# 调用函数
|
||||
scan_package "$1"
|
||||
WRAPPER_EOF
|
||||
|
||||
chmod +x "$wrapper_script"
|
||||
|
||||
# 使用包装脚本进行并行扫描
|
||||
cat "$package_list" | parallel -j "$PARALLEL_JOBS" --joblog "${WORK_DIR}/parallel.log" \
|
||||
--retries 2 --bar "$wrapper_script" {}
|
||||
|
||||
# 清理临时包装脚本
|
||||
rm -f "$wrapper_script"
|
||||
|
||||
log "=== 扫描完成 ==="
|
||||
|
||||
# 统计结果
|
||||
local total=$(wc -l < "$package_list")
|
||||
local success=$(wc -l < "${RESULTS_DIR}/success.txt" 2>/dev/null || echo 0)
|
||||
local failed=$(wc -l < "${RESULTS_DIR}/failed.txt" 2>/dev/null || echo 0)
|
||||
local no_binary=$(wc -l < "${RESULTS_DIR}/no_binary.txt" 2>/dev/null || echo 0)
|
||||
|
||||
echo ""
|
||||
echo "========== 扫描统计 =========="
|
||||
echo "总包数: $total"
|
||||
echo "扫描成功: $success"
|
||||
echo "扫描失败: $failed"
|
||||
echo "无二进制: $no_binary"
|
||||
echo "=============================="
|
||||
|
||||
# 提示导入数据库
|
||||
echo ""
|
||||
echo "下一步:运行以下命令导入数据库"
|
||||
echo "python3 04_import_results.py ${DB_FILE} ${RESULTS_DIR}"
|
||||
}
|
||||
|
||||
# 只在直接执行脚本时运行 main 函数
|
||||
if [[ "${BASH_SOURCE[0]}" == "${0}" ]]; then
|
||||
main "$@"
|
||||
fi
|
||||
264
03_report.py
Normal file
264
03_report.py
Normal file
@@ -0,0 +1,264 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
RPM 安全扫描结果解析与报告生成
|
||||
"""
|
||||
import sqlite3
|
||||
import json
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from collections import defaultdict
|
||||
|
||||
class ScanReporter:
|
||||
def __init__(self, db_path):
|
||||
self.db = sqlite3.connect(db_path)
|
||||
self.db.row_factory = sqlite3.Row
|
||||
|
||||
def parse_checksec_results(self, results_dir):
|
||||
"""解析 checksec JSON 结果并入库"""
|
||||
results_dir = Path(results_dir)
|
||||
|
||||
for json_file in results_dir.glob("*.json"):
|
||||
try:
|
||||
with open(json_file) as f:
|
||||
data = json.load(f)
|
||||
|
||||
pkg_name = json_file.stem
|
||||
|
||||
# 获取 package_id
|
||||
cur = self.db.execute(
|
||||
"SELECT id FROM packages WHERE name=? AND status='success'",
|
||||
(pkg_name.split('-')[0],)
|
||||
)
|
||||
row = cur.fetchone()
|
||||
if not row:
|
||||
continue
|
||||
|
||||
pkg_id = row[0]
|
||||
|
||||
# 解析每个文件的结果
|
||||
for file_path, checks in data.items():
|
||||
# 插入 binary 记录
|
||||
cur = self.db.execute(
|
||||
"INSERT INTO binaries (package_id, file_path, file_type) VALUES (?, ?, ?) RETURNING id",
|
||||
(pkg_id, file_path, checks.get('type', 'unknown'))
|
||||
)
|
||||
binary_id = cur.fetchone()[0]
|
||||
|
||||
# 插入安全检查结果
|
||||
self.db.execute(
|
||||
"""INSERT INTO security_checks
|
||||
(binary_id, pie, nx, canary, fortify, relro, bind_now)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?)""",
|
||||
(
|
||||
binary_id,
|
||||
checks.get('pie', 'unknown'),
|
||||
checks.get('nx', 'unknown'),
|
||||
checks.get('canary', 'unknown'),
|
||||
checks.get('fortify', 'unknown'),
|
||||
checks.get('relro', 'unknown'),
|
||||
checks.get('bind_now', 'unknown')
|
||||
)
|
||||
)
|
||||
|
||||
self.db.commit()
|
||||
except Exception as e:
|
||||
print(f"解析失败 {json_file}: {e}", file=sys.stderr)
|
||||
|
||||
def generate_statistics(self):
|
||||
"""生成统计报告"""
|
||||
stats = {}
|
||||
|
||||
# 总体统计
|
||||
cur = self.db.execute("SELECT COUNT(*) FROM packages WHERE status='success'")
|
||||
stats['total_packages'] = cur.fetchone()[0]
|
||||
|
||||
cur = self.db.execute("SELECT COUNT(*) FROM binaries")
|
||||
stats['total_binaries'] = cur.fetchone()[0]
|
||||
|
||||
# 定义每个特性的启用判断规则
|
||||
feature_rules = {
|
||||
'pie': {
|
||||
'enabled': ['PIE Enabled'],
|
||||
'disabled': ['PIE Disabled'],
|
||||
'label': 'PIE'
|
||||
},
|
||||
'nx': {
|
||||
'enabled': ['NX enabled', 'NX enabled'],
|
||||
'disabled': ['NX disabled', 'NX disabled'],
|
||||
'label': 'NX'
|
||||
},
|
||||
'canary': {
|
||||
'enabled': ['Canary Found'],
|
||||
'disabled': ['No Canary Found', 'No canary found'],
|
||||
'label': 'CANARY'
|
||||
},
|
||||
'fortify': {
|
||||
'enabled_values': lambda x: x and x != '0' and x != 'unknown',
|
||||
'label': 'FORTIFY'
|
||||
},
|
||||
'relro': {
|
||||
'enabled': ['Full RELRO'],
|
||||
'partial': ['Partial RELRO'],
|
||||
'disabled': ['No RELRO'],
|
||||
'label': 'RELRO'
|
||||
}
|
||||
}
|
||||
|
||||
stats['coverage'] = {}
|
||||
|
||||
for feature, rules in feature_rules.items():
|
||||
cur = self.db.execute(f"""
|
||||
SELECT
|
||||
{feature},
|
||||
COUNT(*) as count
|
||||
FROM security_checks
|
||||
GROUP BY {feature}
|
||||
""")
|
||||
|
||||
feature_stats = {}
|
||||
total = 0
|
||||
enabled = 0
|
||||
|
||||
for row in cur:
|
||||
value = row[0]
|
||||
count = row[1]
|
||||
feature_stats[value] = count
|
||||
total += count
|
||||
|
||||
# 根据规则判断是否启用
|
||||
if 'enabled' in rules and value in rules['enabled']:
|
||||
enabled += count
|
||||
elif 'enabled_values' in rules and rules['enabled_values'](value):
|
||||
enabled += count
|
||||
|
||||
coverage_rate = f"{enabled/total*100:.2f}%" if total > 0 else "0.00%"
|
||||
|
||||
stats['coverage'][feature] = {
|
||||
'label': rules['label'],
|
||||
'details': feature_stats,
|
||||
'rate': coverage_rate,
|
||||
'total': total,
|
||||
'enabled': enabled
|
||||
}
|
||||
|
||||
return stats
|
||||
|
||||
def export_csv(self, output_file):
|
||||
"""导出 CSV 报告"""
|
||||
cur = self.db.execute("""
|
||||
SELECT
|
||||
p.name,
|
||||
b.file_path,
|
||||
s.pie, s.nx, s.canary, s.fortify, s.relro, s.bind_now
|
||||
FROM packages p
|
||||
JOIN binaries b ON p.id = b.package_id
|
||||
JOIN security_checks s ON b.id = s.binary_id
|
||||
ORDER BY p.name, b.file_path
|
||||
""")
|
||||
|
||||
with open(output_file, 'w') as f:
|
||||
f.write("Package,File,PIE,NX,Canary,Fortify,RELRO,BIND_NOW\n")
|
||||
for row in cur:
|
||||
f.write(','.join(str(x) for x in row) + '\n')
|
||||
|
||||
def print_report(self):
|
||||
"""打印统计报告"""
|
||||
stats = self.generate_statistics()
|
||||
|
||||
# Markdown 格式输出
|
||||
print("# RPM 安全扫描统计报告")
|
||||
print()
|
||||
print(f"**扫描包数量**: {stats['total_packages']}")
|
||||
print(f"**二进制文件数**: {stats['total_binaries']}")
|
||||
print()
|
||||
print("## 安全特性覆盖率")
|
||||
print()
|
||||
|
||||
# 按顺序输出: PIE, NX, CANARY, FORTIFY, RELRO
|
||||
feature_order = ['pie', 'nx', 'canary', 'fortify', 'relro']
|
||||
|
||||
for feature in feature_order:
|
||||
if feature not in stats['coverage']:
|
||||
continue
|
||||
|
||||
data = stats['coverage'][feature]
|
||||
label = data['label']
|
||||
rate = data['rate']
|
||||
details = data['details']
|
||||
|
||||
# 输出标题和覆盖率
|
||||
print(f"### {label}")
|
||||
print()
|
||||
print(f"**覆盖率**: {rate}")
|
||||
print()
|
||||
print("| 状态 | 数量 | 百分比 |")
|
||||
print("|------|------|--------|")
|
||||
|
||||
# 输出详细统计(按数量降序排列)
|
||||
sorted_details = sorted(details.items(), key=lambda x: x[1], reverse=True)
|
||||
for value, count in sorted_details:
|
||||
percentage = count * 100 / data['total'] if data['total'] > 0 else 0
|
||||
print(f"| {value} | {count} | {percentage:.2f}% |")
|
||||
|
||||
print()
|
||||
|
||||
print("## 综合安全评分")
|
||||
print()
|
||||
|
||||
# 计算综合安全评分
|
||||
total_binaries = stats['total_binaries']
|
||||
if total_binaries > 0:
|
||||
pie_enabled = stats['coverage']['pie']['enabled']
|
||||
nx_enabled = stats['coverage']['nx']['enabled']
|
||||
canary_enabled = stats['coverage']['canary']['enabled']
|
||||
relro_full = stats['coverage']['relro']['enabled']
|
||||
|
||||
# 计算所有保护都开启的文件数(取最小值)
|
||||
fully_protected = min(pie_enabled, nx_enabled, canary_enabled, relro_full)
|
||||
security_score = fully_protected * 100 / total_binaries
|
||||
|
||||
print(f"- **综合安全评分**: {security_score:.2f}%")
|
||||
print(f"- **完全加固的文件**: {fully_protected}/{total_binaries}")
|
||||
print(f"- **标准**: PIE + NX + CANARY + Full RELRO")
|
||||
print()
|
||||
|
||||
def main():
|
||||
# 设置默认数据库路径
|
||||
default_db = "scan_results.db"
|
||||
|
||||
# 解析参数
|
||||
db_path = default_db
|
||||
args_start = 0
|
||||
|
||||
# 如果第一个参数不是选项,则作为数据库路径
|
||||
if len(sys.argv) > 1 and not sys.argv[1].startswith('--'):
|
||||
db_path = sys.argv[1]
|
||||
args_start = 1
|
||||
|
||||
reporter = ScanReporter(db_path)
|
||||
|
||||
# 处理选项参数
|
||||
if '--parse-results' in sys.argv:
|
||||
idx = sys.argv.index('--parse-results')
|
||||
# 检查参数是否越界
|
||||
if idx + 1 >= len(sys.argv):
|
||||
print("错误: --parse-results 需要指定结果目录路径", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
results_dir = sys.argv[idx + 1]
|
||||
print(f"解析结果目录: {results_dir}")
|
||||
reporter.parse_checksec_results(results_dir)
|
||||
|
||||
if '--export-csv' in sys.argv:
|
||||
idx = sys.argv.index('--export-csv')
|
||||
# 检查参数是否越界
|
||||
if idx + 1 >= len(sys.argv):
|
||||
print("错误: --export-csv 需要指定输出 CSV 文件路径", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
output_file = sys.argv[idx + 1]
|
||||
print(f"导出 CSV: {output_file}")
|
||||
reporter.export_csv(output_file)
|
||||
|
||||
reporter.print_report()
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
354
04_import_results.py
Normal file
354
04_import_results.py
Normal file
@@ -0,0 +1,354 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
将扫描结果批量导入到 SQLite 数据库
|
||||
"""
|
||||
import sqlite3
|
||||
import json
|
||||
import sys
|
||||
import re
|
||||
from pathlib import Path
|
||||
|
||||
def validate_package_name(name: str) -> bool:
|
||||
"""
|
||||
验证包名格式(防止路径遍历和 SQL 注入)
|
||||
允许字母、数字、点、下划线、加号、减号、波浪号
|
||||
波浪号用于 RPM 预发布版本(如 2.14~rc1)
|
||||
|
||||
Args:
|
||||
name: 包名
|
||||
|
||||
Returns:
|
||||
bool: True=合法, False=非法
|
||||
"""
|
||||
if not name:
|
||||
return False
|
||||
# 与 shell 脚本的正则表达式保持一致: ^[a-zA-Z0-9._~+-]+$
|
||||
pattern = r'^[a-zA-Z0-9._~+-]+$'
|
||||
return re.match(pattern, name) is not None
|
||||
|
||||
def create_tables(cursor):
|
||||
"""
|
||||
创建数据库表结构
|
||||
"""
|
||||
# 创建包表
|
||||
cursor.execute("""
|
||||
CREATE TABLE IF NOT EXISTS packages (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
name TEXT NOT NULL UNIQUE,
|
||||
status TEXT NOT NULL,
|
||||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
||||
)
|
||||
""")
|
||||
|
||||
# 创建二进制文件表
|
||||
cursor.execute("""
|
||||
CREATE TABLE IF NOT EXISTS binaries (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
package_id INTEGER NOT NULL,
|
||||
file_path TEXT NOT NULL,
|
||||
file_type TEXT,
|
||||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||||
FOREIGN KEY (package_id) REFERENCES packages(id)
|
||||
)
|
||||
""")
|
||||
|
||||
# 创建安全检查结果表
|
||||
cursor.execute("""
|
||||
CREATE TABLE IF NOT EXISTS security_checks (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
binary_id INTEGER NOT NULL,
|
||||
pie TEXT,
|
||||
nx TEXT,
|
||||
canary TEXT,
|
||||
fortify TEXT,
|
||||
relro TEXT,
|
||||
bind_now TEXT,
|
||||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||||
FOREIGN KEY (binary_id) REFERENCES binaries(id)
|
||||
)
|
||||
""")
|
||||
|
||||
# 创建索引以提高查询性能
|
||||
cursor.execute("""
|
||||
CREATE INDEX IF NOT EXISTS idx_packages_status
|
||||
ON packages(status)
|
||||
""")
|
||||
|
||||
cursor.execute("""
|
||||
CREATE INDEX IF NOT EXISTS idx_binaries_package
|
||||
ON binaries(package_id)
|
||||
""")
|
||||
|
||||
def parse_checksec_json(json_file):
|
||||
"""
|
||||
解析新版 checksec JSON 输出(数组格式)
|
||||
格式:[{"name": "/path", "checks": {...}}, ...]
|
||||
"""
|
||||
try:
|
||||
with open(json_file) as f:
|
||||
content = f.read()
|
||||
|
||||
# 解析 JSON 数组
|
||||
data = json.loads(content)
|
||||
|
||||
# 确保返回的是列表
|
||||
if isinstance(data, list):
|
||||
return data
|
||||
else:
|
||||
print(f"警告: {json_file} 不是数组格式", file=sys.stderr)
|
||||
return []
|
||||
|
||||
except json.JSONDecodeError as e:
|
||||
print(f"JSON 解析失败 {json_file}: {e}", file=sys.stderr)
|
||||
return []
|
||||
except Exception as e:
|
||||
print(f"读取失败 {json_file}: {e}", file=sys.stderr)
|
||||
return []
|
||||
|
||||
def parse_checksec_text(json_file):
|
||||
"""
|
||||
解析 checksec 输出
|
||||
checksec --output=json 可能返回的是文本格式,需要兼容处理
|
||||
"""
|
||||
try:
|
||||
with open(json_file) as f:
|
||||
content = f.read()
|
||||
|
||||
# 尝试解析为 JSON
|
||||
try:
|
||||
data = json.loads(content)
|
||||
return data
|
||||
except json.JSONDecodeError:
|
||||
# 如果不是 JSON,解析文本格式
|
||||
results = {}
|
||||
lines = content.split('\n')
|
||||
current_file = None
|
||||
|
||||
for line in lines:
|
||||
line = line.strip()
|
||||
if not line:
|
||||
continue
|
||||
|
||||
# 检测文件名行
|
||||
if line.startswith('*') or line.startswith('/') or '.rpm' in line:
|
||||
current_file = line.lstrip('*').strip()
|
||||
results[current_file] = {}
|
||||
elif current_file and ':' in line:
|
||||
# 解析检查项,例如 "PIE: Enabled"
|
||||
parts = line.split(':', 1)
|
||||
if len(parts) == 2:
|
||||
key = parts[0].strip().lower()
|
||||
value = parts[1].strip()
|
||||
results[current_file][key] = value
|
||||
|
||||
return results
|
||||
except Exception as e:
|
||||
print(f"解析失败 {json_file}: {e}", file=sys.stderr)
|
||||
return {}
|
||||
|
||||
def import_results(db_path, results_dir, clean=False):
|
||||
"""
|
||||
批量导入扫描结果到数据库
|
||||
使用事务确保数据一致性
|
||||
|
||||
Args:
|
||||
db_path: 数据库文件路径
|
||||
results_dir: 结果目录路径
|
||||
clean: 是否清空旧数据(默认 False)
|
||||
"""
|
||||
results_dir = Path(results_dir)
|
||||
|
||||
# 连接数据库
|
||||
conn = sqlite3.connect(db_path)
|
||||
cursor = conn.cursor()
|
||||
|
||||
# 创建数据库表
|
||||
create_tables(cursor)
|
||||
|
||||
# 如果需要清理旧数据
|
||||
if clean:
|
||||
print("清理旧数据...")
|
||||
cursor.execute("DELETE FROM security_checks")
|
||||
cursor.execute("DELETE FROM binaries")
|
||||
cursor.execute("DELETE FROM packages")
|
||||
print("清理完成")
|
||||
|
||||
# 使用事务确保数据一致性
|
||||
try:
|
||||
cursor.execute("BEGIN TRANSACTION")
|
||||
|
||||
# 读取状态文件
|
||||
scanned_file = results_dir / "scanned.txt"
|
||||
success_file = results_dir / "success.txt"
|
||||
failed_file = results_dir / "failed.txt"
|
||||
no_binary_file = results_dir / "no_binary.txt"
|
||||
|
||||
# 导入失败的包
|
||||
failed_count = 0
|
||||
if failed_file.exists():
|
||||
with open(failed_file) as f:
|
||||
for line in f:
|
||||
line = line.strip()
|
||||
if ':' in line:
|
||||
pkg_name, status = line.split(':', 1)
|
||||
# 验证包名
|
||||
if not validate_package_name(pkg_name):
|
||||
print(f"警告: 跳过非法包名: {pkg_name}", file=sys.stderr)
|
||||
continue
|
||||
cursor.execute(
|
||||
"INSERT OR REPLACE INTO packages (name, status) VALUES (?, ?)",
|
||||
(pkg_name, status)
|
||||
)
|
||||
failed_count += 1
|
||||
print(f"导入失败包: {failed_count} 个")
|
||||
|
||||
# 导入无二进制的包
|
||||
no_binary_count = 0
|
||||
if no_binary_file.exists():
|
||||
with open(no_binary_file) as f:
|
||||
for line in f:
|
||||
line = line.strip()
|
||||
if ':' in line:
|
||||
pkg_name, status = line.split(':', 1)
|
||||
# 验证包名
|
||||
if not validate_package_name(pkg_name):
|
||||
print(f"警告: 跳过非法包名: {pkg_name}", file=sys.stderr)
|
||||
continue
|
||||
cursor.execute(
|
||||
"INSERT OR REPLACE INTO packages (name, status) VALUES (?, ?)",
|
||||
(pkg_name, 'no_binaries')
|
||||
)
|
||||
no_binary_count += 1
|
||||
print(f"导入无二进制包: {no_binary_count} 个")
|
||||
|
||||
# 导入成功的包
|
||||
success_count = 0
|
||||
binary_count = 0
|
||||
if success_file.exists():
|
||||
with open(success_file) as f:
|
||||
for line in f:
|
||||
line = line.strip()
|
||||
if ':' in line:
|
||||
pkg_name, file_count = line.split(':', 1)
|
||||
|
||||
# 验证包名
|
||||
if not validate_package_name(pkg_name):
|
||||
print(f"警告: 跳过非法包名: {pkg_name}", file=sys.stderr)
|
||||
continue
|
||||
|
||||
# 插入包记录(使用 REPLACE 支持重新导入)
|
||||
cursor.execute(
|
||||
"INSERT OR REPLACE INTO packages (name, status) VALUES (?, ?)",
|
||||
(pkg_name, 'success')
|
||||
)
|
||||
pkg_id = cursor.lastrowid
|
||||
success_count += 1
|
||||
|
||||
# 解析 JSON 结果
|
||||
json_file = results_dir / f"{pkg_name}.json"
|
||||
if not json_file.exists():
|
||||
continue
|
||||
|
||||
checksec_results = parse_checksec_json(json_file)
|
||||
|
||||
# 导入二进制文件和安全检查结果
|
||||
for item in checksec_results:
|
||||
if not isinstance(item, dict):
|
||||
continue
|
||||
|
||||
file_path = item.get('name', 'unknown')
|
||||
checks = item.get('checks', {})
|
||||
|
||||
if not checks:
|
||||
continue
|
||||
|
||||
# 插入二进制文件记录
|
||||
cursor.execute(
|
||||
"INSERT INTO binaries (package_id, file_path, file_type) VALUES (?, ?, ?)",
|
||||
(pkg_id, file_path, 'elf')
|
||||
)
|
||||
binary_id = cursor.lastrowid
|
||||
binary_count += 1
|
||||
|
||||
# 插入安全检查结果
|
||||
cursor.execute(
|
||||
"""INSERT INTO security_checks
|
||||
(binary_id, pie, nx, canary, fortify, relro, bind_now)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?)""",
|
||||
(
|
||||
binary_id,
|
||||
str(checks.get('pie', 'unknown')),
|
||||
str(checks.get('nx', 'unknown')),
|
||||
str(checks.get('canary', 'unknown')),
|
||||
str(checks.get('fortified', 'unknown')),
|
||||
str(checks.get('relro', 'unknown')),
|
||||
str(checks.get('bind_now', 'unknown'))
|
||||
)
|
||||
)
|
||||
|
||||
print(f"导入成功包: {success_count} 个")
|
||||
|
||||
# 提交事务
|
||||
cursor.execute("COMMIT")
|
||||
print("事务提交成功")
|
||||
|
||||
except Exception as e:
|
||||
# 发生错误时回滚
|
||||
cursor.execute("ROLLBACK")
|
||||
print(f"导入失败,已回滚: {e}", file=sys.stderr)
|
||||
raise
|
||||
|
||||
# 打印统计
|
||||
cursor.execute("SELECT COUNT(*) FROM packages")
|
||||
total = cursor.fetchone()[0]
|
||||
|
||||
cursor.execute("SELECT COUNT(*) FROM packages WHERE status='success'")
|
||||
success = cursor.fetchone()[0]
|
||||
|
||||
cursor.execute("SELECT COUNT(*) FROM binaries")
|
||||
binaries = cursor.fetchone()[0]
|
||||
|
||||
print(f"\n数据库统计:")
|
||||
print(f" 总包数: {total}")
|
||||
print(f" 成功: {success}")
|
||||
print(f" 二进制文件: {binaries}")
|
||||
|
||||
conn.close()
|
||||
|
||||
def main():
|
||||
# 设置默认值
|
||||
default_db = "scan_results.db"
|
||||
default_results = "scan_workspace/results"
|
||||
|
||||
# 解析参数
|
||||
db_path = default_db
|
||||
results_dir = default_results
|
||||
clean = False
|
||||
|
||||
# 检查 --clean 选项
|
||||
if '--clean' in sys.argv:
|
||||
clean = True
|
||||
|
||||
# 获取数据库路径(第一个非选项参数)
|
||||
args = [arg for arg in sys.argv[1:] if not arg.startswith('--')]
|
||||
if len(args) >= 1:
|
||||
db_path = args[0]
|
||||
|
||||
if len(args) >= 2:
|
||||
results_dir = args[1]
|
||||
|
||||
print(f"导入扫描结果...")
|
||||
print(f" 数据库: {db_path}")
|
||||
print(f" 结果目录: {results_dir}")
|
||||
if clean:
|
||||
print(f" 模式: 清理旧数据后重新导入")
|
||||
print()
|
||||
|
||||
import_results(db_path, results_dir, clean=clean)
|
||||
|
||||
print("\n导入完成!")
|
||||
print(f"\n下一步:生成报告")
|
||||
print(f"python3 03_report.py {db_path}")
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
275
README.md
Normal file
275
README.md
Normal file
@@ -0,0 +1,275 @@
|
||||
# RPM 包安全特性大规模扫描系统
|
||||
|
||||
## 架构概览
|
||||
|
||||
```
|
||||
[包列表获取] → [并行下载] → [ELF提取] → [安全扫描] → [结果入库] → [报告生成]
|
||||
```
|
||||
|
||||
## 快速开始
|
||||
|
||||
### 1. 安装依赖
|
||||
|
||||
```bash
|
||||
# RHEL/Fedora/OpenEuler
|
||||
sudo dnf install -y parallel checksec sqlite
|
||||
|
||||
# 验证
|
||||
checksec --version
|
||||
parallel --version
|
||||
```
|
||||
|
||||
### 2. 初始化
|
||||
|
||||
```bash
|
||||
chmod +x *.sh
|
||||
./01_init.sh
|
||||
```
|
||||
|
||||
### 3. 执行扫描(无锁设计)
|
||||
|
||||
```bash
|
||||
# 全量扫描(8 并发)
|
||||
./02_scan.sh
|
||||
|
||||
# 自定义并发数
|
||||
PARALLEL_JOBS=16 ./02_scan.sh
|
||||
```
|
||||
|
||||
**输出文件**:
|
||||
- `results/scanned.txt` - 已扫描包列表
|
||||
- `results/success.txt` - 扫描成功的包
|
||||
- `results/failed.txt` - 扫描失败的包
|
||||
- `results/no_binary.txt` - 无二进制文件的包
|
||||
- `results/*.json` - checksec 原始结果
|
||||
|
||||
### 4. 导入数据库
|
||||
|
||||
```bash
|
||||
python3 04_import_results.py scan_workspace/results/scan_results.db scan_workspace/results
|
||||
```
|
||||
|
||||
**为什么分两步?**
|
||||
- 避免多进程并发写入 SQLite 导致的数据库锁定问题
|
||||
- 扫描阶段只写文件,完全无锁
|
||||
- 导入阶段串行处理,安全可靠
|
||||
|
||||
### 5. 生成报告
|
||||
|
||||
```bash
|
||||
# 生成统计报告和 CSV
|
||||
python3 03_report.py scan_workspace/results/scan_results.db --export-csv report.csv
|
||||
```
|
||||
|
||||
## 核心设计
|
||||
|
||||
### 1. 预筛选策略
|
||||
|
||||
**问题**: `dnf repoquery` 默认包含 noarch 包,浪费资源
|
||||
|
||||
**解决方案**:
|
||||
```bash
|
||||
# 只查询 x86_64 架构(注意格式字符串末尾的 \n)
|
||||
dnf repoquery --arch x86_64 --qf '%{name}-%{version}-%{release}.%{arch}\n'
|
||||
|
||||
# 进一步过滤:检查是否包含目标路径
|
||||
dnf repoquery --arch x86_64 --list PACKAGE | grep -E '^/(usr/)?(bin|sbin|lib64)'
|
||||
```
|
||||
|
||||
### 2. 软链接去重
|
||||
|
||||
**问题**: `/bin` → `/usr/bin` 导致重复扫描
|
||||
|
||||
**解决方案**:
|
||||
```bash
|
||||
# 使用 inode 去重
|
||||
find . -type f -exec stat -c '%i %n' {} \; | sort -u -k1,1 | cut -d' ' -f2-
|
||||
```
|
||||
|
||||
### 3. 并行优化
|
||||
|
||||
**三级并行**:
|
||||
- **包级**: GNU Parallel 并行处理包 (`-j 8`)
|
||||
- **下载级**: DNF 并行下载配置
|
||||
- **扫描级**: checksec 批量调用
|
||||
|
||||
### 4. 无锁设计(重要)
|
||||
|
||||
**问题**: SQLite 不支持多进程并发写入
|
||||
|
||||
**解决方案**: 扫描与入库分离
|
||||
```bash
|
||||
# 扫描阶段:多进程并发写文件(无锁)
|
||||
./02_scan.sh
|
||||
→ results/scanned.txt
|
||||
→ results/success.txt
|
||||
→ results/failed.txt
|
||||
→ results/*.json
|
||||
|
||||
# 导入阶段:单进程串行入库(安全)
|
||||
python3 04_import_results.py scan_results.db results/
|
||||
```
|
||||
|
||||
**关键改进**:
|
||||
- ✅ 使用 `flock` 文件锁代替数据库查询
|
||||
- ✅ 状态文件代替数据库状态查询
|
||||
- ✅ JSON 文件存储中间结果
|
||||
- ✅ 批量导入代替逐条写入
|
||||
|
||||
### 5. 增量扫描
|
||||
|
||||
文件记录扫描状态,跳过已扫描的包:
|
||||
```bash
|
||||
# 使用 flock 检查是否已扫描
|
||||
if grep -qx "$pkg_name" "$status_file"; then
|
||||
echo "跳过已扫描: $pkg_name"
|
||||
exit 0
|
||||
fi
|
||||
```
|
||||
|
||||
### 6. 错误处理
|
||||
|
||||
- 下载超时: `timeout 300s`
|
||||
- 解压失败: 记录状态到数据库
|
||||
- checksec 超时: 单包超时 60s
|
||||
- 重试机制: `parallel --retries 2`
|
||||
|
||||
## 数据库 Schema
|
||||
|
||||
```sql
|
||||
packages (id, name, version, status, scan_time)
|
||||
↓
|
||||
binaries (id, package_id, file_path, inode)
|
||||
↓
|
||||
security_checks (id, binary_id, pie, nx, canary, fortify, relro)
|
||||
```
|
||||
|
||||
## 配置文件
|
||||
|
||||
编辑 `config.sh`:
|
||||
```bash
|
||||
PARALLEL_JOBS=8 # 并发数
|
||||
REPO_ARCH="x86_64" # 架构
|
||||
DOWNLOAD_TIMEOUT=300 # 下载超时
|
||||
CHECKSEC_TIMEOUT=60 # 扫描超时
|
||||
```
|
||||
|
||||
## 输出文件
|
||||
|
||||
```
|
||||
scan_workspace/
|
||||
├── rpm_cache/ # RPM 缓存
|
||||
├── extracted/ # 临时解压目录
|
||||
├── results/
|
||||
│ ├── scan_results.db # SQLite 数据库
|
||||
│ ├── *.json # checksec 原始结果
|
||||
│ └── report.csv # CSV 报告
|
||||
├── scan.log # 运行日志
|
||||
├── error.log # 错误日志
|
||||
└── parallel.log # GNU Parallel 日志
|
||||
```
|
||||
|
||||
## 性能估算
|
||||
|
||||
- 单包平均耗时: 10-30s (下载 + 解压 + 扫描)
|
||||
- 8 并发处理 1000 个包: ~30-60 分钟
|
||||
- 磁盘空间: 每包 10-100MB (临时)
|
||||
|
||||
## 故障排查
|
||||
|
||||
### 1. checksec 找不到
|
||||
|
||||
```bash
|
||||
# 从源码安装
|
||||
git clone https://github.com/slimm609/checksec.sh
|
||||
sudo cp checksec /usr/local/bin/
|
||||
```
|
||||
|
||||
### 2. 并行任务失败
|
||||
|
||||
查看日志:
|
||||
```bash
|
||||
tail -f scan_workspace/error.log
|
||||
cat scan_workspace/parallel.log | grep -v "^0"
|
||||
```
|
||||
|
||||
### 3. 数据库锁定(已解决)
|
||||
|
||||
**旧版本问题**:
|
||||
```
|
||||
Error: in prepare, database is locked (5)
|
||||
```
|
||||
|
||||
**新版本解决方案**:
|
||||
- ✅ 采用无锁设计,扫描阶段不写数据库
|
||||
- ✅ 使用文件锁 `flock` 代替数据库锁
|
||||
- ✅ 扫描完成后批量导入
|
||||
|
||||
如果仍然遇到数据库锁定:
|
||||
```bash
|
||||
# 删除锁文件
|
||||
rm scan_workspace/results/scanned.txt.lock
|
||||
|
||||
# 或使用新的工作目录重新扫描
|
||||
WORK_DIR="./scan_workspace_new" ./02_scan.sh
|
||||
```
|
||||
|
||||
## 扩展功能
|
||||
|
||||
### 1. 分布式扫描
|
||||
|
||||
```bash
|
||||
# 机器 A: 扫描前 500 个包
|
||||
head -500 packages.list | parallel -j 8 ./scan_package.sh
|
||||
|
||||
# 机器 B: 扫描后 500 个包
|
||||
tail -500 packages.list | parallel -j 8 ./scan_package.sh
|
||||
|
||||
# 合并结果文件(不需要合并数据库)
|
||||
cat machine_a/results/*.json > all_results.json
|
||||
cat machine_a/results/success.txt machine_b/results/success.txt > all_success.txt
|
||||
|
||||
# 导入到统一数据库
|
||||
python3 04_import_results.py merged.db results/
|
||||
```
|
||||
|
||||
**优势**:
|
||||
- 每台机器独立工作,无数据库冲突
|
||||
- 只需合并文本文件和 JSON
|
||||
- 最后统一导入数据库
|
||||
|
||||
### 2. 定时增量扫描
|
||||
|
||||
```bash
|
||||
# crontab
|
||||
0 2 * * * cd /path/to/scan && ./01_init.sh && ./02_scan.sh && python3 04_import_results.py scan_results.db results/
|
||||
```
|
||||
|
||||
### 3. Web Dashboard
|
||||
|
||||
使用 Grafana + SQLite 数据源可视化结果
|
||||
|
||||
## 潜在问题与解决方案
|
||||
|
||||
| 问题 | 影响 | 解决方案 |
|
||||
|------|------|----------|
|
||||
| noarch 包误扫描 | 浪费资源 | `--arch x86_64` 预筛选 |
|
||||
| 软链接重复 | 结果重复 | inode 去重 |
|
||||
| 内核模块误报 | PIE 检查不适用 | 排除 `*.ko` |
|
||||
| 网络不稳定 | 下载失败 | 超时 + 重试 |
|
||||
| 磁盘空间不足 | 解压失败 | 及时清理临时文件 |
|
||||
| checksec 慢 | 扫描耗时长 | 批量调用 + 并行 |
|
||||
|
||||
## 最佳实践
|
||||
|
||||
1. **首次运行**: 先用小样本测试 (`head -10 packages.list`)
|
||||
2. **并发调优**: 根据 CPU 核心数和网络带宽调整 `PARALLEL_JOBS`
|
||||
3. **磁盘管理**: 定期清理 `rpm_cache/` 和 `extracted/`
|
||||
4. **日志监控**: 实时查看 `tail -f scan.log`
|
||||
5. **增量更新**: 定期运行获取新包
|
||||
|
||||
## 参考资料
|
||||
|
||||
- [checksec 文档](https://github.com/slimm609/checksec.sh)
|
||||
- [GNU Parallel 教程](https://www.gnu.org/software/parallel/parallel_tutorial.html)
|
||||
- [RPM 包管理](https://rpm.org/)
|
||||
55
common.sh
Normal file
55
common.sh
Normal file
@@ -0,0 +1,55 @@
|
||||
#!/bin/bash
|
||||
# 公共函数库 - 输入验证和安全工具函数
|
||||
|
||||
# 验证包名格式(防止路径遍历和命令注入)
|
||||
# 只允许字母、数字、点、下划线、加号、减号、波浪号
|
||||
# 波浪号用于 RPM 预发布版本(如 2.14~rc1)
|
||||
# 参数: $1 - 包名
|
||||
# 返回: 0=合法, 1=非法
|
||||
validate_package_name() {
|
||||
local name="$1"
|
||||
if [[ ! "$name" =~ ^[a-zA-Z0-9._~+-]+$ ]]; then
|
||||
return 1
|
||||
fi
|
||||
return 0
|
||||
}
|
||||
|
||||
# 验证文件路径(防止路径遍历)
|
||||
# 参数: $1 - 文件路径
|
||||
# 返回: 0=安全, 1=包含路径遍历
|
||||
validate_path() {
|
||||
local path="$1"
|
||||
if [[ "$path" == *".."* ]] || [[ "$path" == *"/"* ]]; then
|
||||
return 1
|
||||
fi
|
||||
return 0
|
||||
}
|
||||
|
||||
# 安全的文件追加函数(带文件锁)
|
||||
# 参数: $1 - 目标文件, $2 - 内容
|
||||
safe_append() {
|
||||
local file="$1"
|
||||
local content="$2"
|
||||
(
|
||||
flock -x 200
|
||||
echo "$content" >> "$file"
|
||||
) 200>"${file}.lock"
|
||||
}
|
||||
|
||||
# 记录失败信息(带文件锁)
|
||||
# 参数: $1 - 包名, $2 - 失败原因
|
||||
log_failure() {
|
||||
local pkg_name="$1"
|
||||
local reason="$2"
|
||||
local failed_file="${RESULTS_DIR}/failed.txt"
|
||||
(
|
||||
flock -x 201
|
||||
echo "${pkg_name}:${reason}" >> "$failed_file"
|
||||
) 201>"${failed_file}.lock"
|
||||
}
|
||||
|
||||
# 导出函数供其他脚本使用
|
||||
export -f validate_package_name
|
||||
export -f validate_path
|
||||
export -f safe_append
|
||||
export -f log_failure
|
||||
57
config.sh
Executable file
57
config.sh
Executable file
@@ -0,0 +1,57 @@
|
||||
#!/bin/bash
|
||||
# 扫描系统配置文件
|
||||
|
||||
# 软件源配置
|
||||
REPO_NAME="openruyi"
|
||||
REPO_ARCH="riscv64"
|
||||
|
||||
# 并行配置
|
||||
PARALLEL_JOBS=8
|
||||
DOWNLOAD_PARALLEL=4
|
||||
|
||||
# 路径配置
|
||||
WORK_DIR="$(pwd)/scan_workspace"
|
||||
RPM_CACHE_DIR="${WORK_DIR}/rpm_cache"
|
||||
EXTRACT_DIR="${WORK_DIR}/extracted"
|
||||
RESULTS_DIR="${WORK_DIR}/results"
|
||||
DB_FILE="${RESULTS_DIR}/scan_results.db"
|
||||
|
||||
# 扫描目标路径
|
||||
TARGET_PATHS=(
|
||||
"usr/bin"
|
||||
"usr/sbin"
|
||||
"bin"
|
||||
"sbin"
|
||||
"usr/lib64"
|
||||
"usr/lib"
|
||||
)
|
||||
|
||||
# 排除模式(文件名匹配)
|
||||
EXCLUDE_PATTERNS=(
|
||||
"*.ko" # 内核模块
|
||||
"*.ko.xz"
|
||||
"*.a" # 静态库
|
||||
"*.o" # 目标文件
|
||||
"*.go" # Guile 字节码文件(不是 Go 语言源码)
|
||||
)
|
||||
|
||||
# 排除路径(路径模式匹配)
|
||||
EXCLUDE_PATHS=(
|
||||
"*/testdata/*" # 测试数据
|
||||
"*/test/*" # 测试目录
|
||||
"*/tests/*" # 测试套件
|
||||
"*/examples/*" # 示例代码
|
||||
"*/src/*/testdata/*" # 源码中的测试数据
|
||||
"*/test_*" # 测试工具
|
||||
"*/ccache/*" # 编译缓存(Guile .go 文件等)
|
||||
"*/.cache/*" # 各种缓存目录
|
||||
"*/__pycache__/*" # Python 缓存
|
||||
)
|
||||
|
||||
# 超时配置(秒)
|
||||
DOWNLOAD_TIMEOUT=300
|
||||
CHECKSEC_TIMEOUT=60
|
||||
|
||||
# 日志配置
|
||||
LOG_FILE="${WORK_DIR}/scan.log"
|
||||
ERROR_LOG="${WORK_DIR}/error.log"
|
||||
120
requirements.md
Normal file
120
requirements.md
Normal file
@@ -0,0 +1,120 @@
|
||||
# RPM 包安全特性大规模扫描系统 - 需求文档
|
||||
|
||||
## 1. 项目背景
|
||||
|
||||
需要对 RPM 软件源中的所有二进制文件进行安全特性扫描,检查编译时开启的安全保护机制(如 PIE、NX、Stack Canary、Fortify Source 等),以评估软件源的整体安全状况。
|
||||
|
||||
## 2. 核心需求
|
||||
|
||||
### 2.1 基本功能
|
||||
1. **RPM 包获取**
|
||||
- 从 RPM 软件源获取所有 RPM 包列表
|
||||
- 支持配置软件源地址
|
||||
|
||||
2. **二进制文件提取**
|
||||
- 从 RPM 包中提取二进制文件
|
||||
- 关注路径:`/usr/bin`, `/bin`, `/usr/sbin`, `/sbin`, `/usr/lib64`, `/usr/lib`
|
||||
- 识别 ELF 格式文件(可执行文件、动态库)
|
||||
|
||||
3. **安全检查**
|
||||
- 使用 `checksec` 工具检查 ELF 文件
|
||||
- 检查项包括:
|
||||
- PIE (Position Independent Executable)
|
||||
- NX (No-Execute)
|
||||
- Stack Canary (Stack Protection)
|
||||
- Fortify Source
|
||||
- RELRO (Relocation Read-Only)
|
||||
- BIND_NOW
|
||||
- 涌出保护
|
||||
|
||||
4. **结果输出**
|
||||
- 支持 CSV 格式导出
|
||||
- 支持生成统计报告
|
||||
|
||||
### 2.2 大规模扫描需求
|
||||
|
||||
#### 2.2.1 性能要求
|
||||
- 支持扫描数千个 RPM 包
|
||||
- 并行处理提升效率
|
||||
- 预计单次扫描时间:合理范围(待定)
|
||||
|
||||
#### 2.2.2 预筛选优化
|
||||
- 排除 `noarch` 架构的包(主要是 Python/Perl/文档)
|
||||
- 使用 `dnf repoquery` 提前识别包含二进制文件的包
|
||||
- 只处理 x86_64 架构的包
|
||||
|
||||
#### 2.2.3 并行处理
|
||||
- 使用 GNU Parallel 进行并发
|
||||
- 可配置并发数(如 `-j 8`)
|
||||
|
||||
### 2.3 异常处理需求
|
||||
|
||||
1. **软链接处理**
|
||||
- 避免重复扫描同一个文件(如 `/bin` → `/usr/bin`)
|
||||
- 使用 `find -type f` 只处理真实文件
|
||||
- 基于 inode 去重
|
||||
|
||||
2. **动态库处理**
|
||||
- 正确识别 `.so` 文件
|
||||
- 处理插件式动态库(`/usr/lib64` 下的插件)
|
||||
|
||||
3. **内核模块排除**
|
||||
- 排除 `.ko` 内核模块文件
|
||||
- 理由:checksec 的某些检查项(如 PIE)对内核模块不适用
|
||||
|
||||
4. **错误处理**
|
||||
- 网络下载失败重试
|
||||
- 损坏的 RPM 包处理
|
||||
- 解压失败处理
|
||||
- checksec 超时处理
|
||||
|
||||
## 3. 数据输出需求
|
||||
|
||||
### 3.1 统计指标
|
||||
- PIE 覆盖率(开启 PIE 的文件占比)
|
||||
- NX 覆盖率
|
||||
- Stack Canary 覆盖率
|
||||
- 危急项统计(如 NX 为 Disabled 的文件)
|
||||
|
||||
### 3.2 数据存储
|
||||
- 初期:CSV 格式
|
||||
- 进阶:数据库支持(SQLite/MySQL)
|
||||
- 可视化:Dashboard 展示
|
||||
|
||||
## 4. 技术约束
|
||||
|
||||
1. **操作系统**:Linux(支持 DNF/YUM 包管理器)
|
||||
2. **依赖工具**:
|
||||
- `dnf` 或 `yum`(RPM 包查询)
|
||||
- `checksec`(安全特性检查)
|
||||
- `GNU Parallel`(并行处理)
|
||||
- `rpm2cpio`(RPM 包解压)
|
||||
- `file`(文件类型识别)
|
||||
|
||||
## 5. 扩展需求
|
||||
|
||||
### 5.1 增量扫描
|
||||
- 记录已扫描的包
|
||||
- 支持只扫描新增或更新的包
|
||||
|
||||
### 5.2 断点续传
|
||||
- 扫描中断后可继续
|
||||
- 避免重复扫描已完成的部分
|
||||
|
||||
### 5.3 分布式扫描(未来)
|
||||
- 支持多机并行扫描
|
||||
- 结果汇总
|
||||
|
||||
## 6. 交付物
|
||||
|
||||
1. 扫描脚本(Shell/Python)
|
||||
2. 配置文件
|
||||
3. 使用文档
|
||||
4. 示例报告
|
||||
|
||||
## 7. 非功能性需求
|
||||
|
||||
1. **可维护性**:代码清晰,注释完整
|
||||
2. **可扩展性**:易于添加新的检查项
|
||||
3. **可靠性**:异常处理完善
|
||||
4. **性能**:支持大规模扫描
|
||||
Reference in New Issue
Block a user