阿里云CDN离线日志过滤ip

发布于 2025-07-09

最近有傻逼一直攻击我网站,之前我都是自己看日志然后分析哪些ip然后简单封一下就行了,但是最近有点多,我就需要写个脚本来进行过滤通配了,直接附上代码:

import re
import gzip
import os
from collections import defaultdict
from datetime import datetime

# 配置参数
LOG_FILE = 'easygif.cn_2025_07_07_000000_010000'
THRESHOLD = 0
CHECK_4XX_5XX = True
OUTPUT_FILE = 'abnormal_ips.txt'


log_pattern = re.compile(
    r'^\[(.*?)\]\s+'              # [时间戳]
    r'(\S+)\s+'                   # 客户端IP
    r'-\s+'                       # -
    r'\d+\s+'                     # 日志编号 "-" (忽略)
    r'"(.*?)"\s+'                 # xxx.com
    r'"(.*?)"\s+'                 # 请求行,如 GET xxxxx
    r'(\d{3})\s+\d+\s+\S+\s+'     # 状态码、响应体大小等(忽略部分)
    r'\S+\s+'                     # 是否命中
    r'"(.*?)"\s+'                 # User-Agent
    r'"(.*?)"\s+'                 # Content-Type
    r'(\S+)'                      # Backend IP
)

def extract_gz_files(directory):
    """解压目录下的所有 .gz 文件"""
    extracted_files = []
    for filename in os.listdir(directory):
        if filename.endswith('.gz'):
            gz_path = os.path.join(directory, filename)
            extracted_path = gz_path[:-3]  # 去掉 .gz 后缀
            with gzip.open(gz_path, 'rt', encoding='utf-8') as gz_file:
                with open(extracted_path, 'w', encoding='utf-8') as extracted_file:
                    extracted_file.write(gz_file.read())
            extracted_files.append(extracted_path)
    return extracted_files

def parse_log(file_paths):
    """解析日志文件"""
    ip_requests = defaultdict(list)
    error_ips = set()

    for file_path in file_paths:
        with open(file_path, 'r', encoding='utf-8') as f:
            for line in f:
                match = log_pattern.match(line.strip())
                if not match:
                    continue

                timestamp_str, client_ip, website, request_line, status, user_agent, content_type, backend_ip = match.groups()
                timestamp = datetime.strptime(timestamp_str, "%d/%b/%Y:%H:%M:%S %z")

                # 忽略无效IP
                if client_ip == '-' or client_ip.startswith(('10.', '192.168.')):
                    continue

                # 记录每个IP的请求时间
                ip_requests[client_ip].append(int(timestamp.timestamp()))

                # 判断是否是异常状态码
                if CHECK_4XX_5XX and status.startswith(('4', '5')):
                    error_ips.add(client_ip)

    return ip_requests, error_ips


def detect_high_frequency(ip_requests):
    """检测高频访问IP"""
    top_ip_cnt = defaultdict(int)
    for ip, timestamps in ip_requests.items():
        if len(timestamps) >= THRESHOLD:
            top_ip_cnt[ip] = len(timestamps)
    # 按访问次数排序,返回访问次数超过阈值的IP
    return {ip:count for ip, count in sorted(top_ip_cnt.items(), key=lambda item: item[1], reverse=True) if count >= THRESHOLD}


def main():
    # 解压 .gz 文件
    extracted_files = extract_gz_files(os.getcwd())

    # 解析解压后的日志文件
    ip_requests, error_ips = parse_log(extracted_files)
    high_freq_ips = detect_high_frequency(ip_requests)

    abnormal_ips = dict()
    abnormal_ips.update(high_freq_ips)

    with open(OUTPUT_FILE, 'w', encoding='utf-8') as f:
        for ip, count in abnormal_ips.items():
            f.write(f"{ip} - {count}\n")
            print(f"Abnormal IP detected: {ip}")

    print(f"\n✅ Total abnormal IPs found: {len(abnormal_ips)}")
    print(f"Saved to: {OUTPUT_FILE}")


if __name__ == '__main__':
    main()

如果日志格式不改的话,后面能一直用吧,就在这mark一下