最近有傻逼一直攻击我网站,之前我都是自己看日志然后分析哪些ip然后简单封一下就行了,但是最近有点多,我就需要写个脚本来进行过滤通配了,直接附上代码:

import re
import gzip
import os
from collections import defaultdict
from datetime import datetime

# 配置参数
LOG_FILE = 'easygif.cn_2025_07_07_000000_010000'
THRESHOLD = 0
CHECK_4XX_5XX = True
OUTPUT_FILE = 'abnormal_ips.txt'


log_pattern = re.compile(
r'^\[(.*?)\]\s+' # [时间戳]
r'(\S+)\s+' # 客户端IP
r'-\s+' # -
r'\d+\s+' # 日志编号 "-" (忽略)
r'"(.*?)"\s+' # xxx.com
r'"(.*?)"\s+' # 请求行,如 GET xxxxx
r'(\d{3})\s+\d+\s+\S+\s+' # 状态码、响应体大小等(忽略部分)
r'\S+\s+' # 是否命中
r'"(.*?)"\s+' # User-Agent
r'"(.*?)"\s+' # Content-Type
r'(\S+)' # Backend IP
)

def extract_gz_files(directory):
"""解压目录下的所有 .gz 文件"""
extracted_files = []
for filename in os.listdir(directory):
if filename.endswith('.gz'):
gz_path = os.path.join(directory, filename)
extracted_path = gz_path[:-3] # 去掉 .gz 后缀
with gzip.open(gz_path, 'rt', encoding='utf-8') as gz_file:
with open(extracted_path, 'w', encoding='utf-8') as extracted_file:
extracted_file.write(gz_file.read())
extracted_files.append(extracted_path)
return extracted_files

def parse_log(file_paths):
"""解析日志文件"""
ip_requests = defaultdict(list)
error_ips = set()

for file_path in file_paths:
with open(file_path, 'r', encoding='utf-8') as f:
for line in f:
match = log_pattern.match(line.strip())
if not match:
continue

timestamp_str, client_ip, website, request_line, status, user_agent, content_type, backend_ip = match.groups()
timestamp = datetime.strptime(timestamp_str, "%d/%b/%Y:%H:%M:%S %z")

# 忽略无效IP
if client_ip == '-' or client_ip.startswith(('10.', '192.168.')):
continue

# 记录每个IP的请求时间
ip_requests[client_ip].append(int(timestamp.timestamp()))

# 判断是否是异常状态码
if CHECK_4XX_5XX and status.startswith(('4', '5')):
error_ips.add(client_ip)

return ip_requests, error_ips


def detect_high_frequency(ip_requests):
"""检测高频访问IP"""
top_ip_cnt = defaultdict(int)
for ip, timestamps in ip_requests.items():
if len(timestamps) >= THRESHOLD:
top_ip_cnt[ip] = len(timestamps)
# 按访问次数排序,返回访问次数超过阈值的IP
return {ip:count for ip, count in sorted(top_ip_cnt.items(), key=lambda item: item[1], reverse=True) if count >= THRESHOLD}


def main():
# 解压 .gz 文件
extracted_files = extract_gz_files(os.getcwd())

# 解析解压后的日志文件
ip_requests, error_ips = parse_log(extracted_files)
high_freq_ips = detect_high_frequency(ip_requests)

abnormal_ips = dict()
abnormal_ips.update(high_freq_ips)

with open(OUTPUT_FILE, 'w', encoding='utf-8') as f:
for ip, count in abnormal_ips.items():
f.write(f"{ip} - {count}\n")
print(f"Abnormal IP detected: {ip}")

print(f"\n✅ Total abnormal IPs found: {len(abnormal_ips)}")
print(f"Saved to: {OUTPUT_FILE}")


if __name__ == '__main__':
main()

如果日志格式不改的话,后面能一直用吧,就在这mark一下