Files
Human-verification/__init__.py
2025-11-09 08:45:52 +08:00

143 lines
4.2 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import random
import asyncio
from datetime import datetime, timedelta
from nonebot import on_notice, on_message, get_bot
from nonebot.adapters.onebot.v11 import (
GroupIncreaseNoticeEvent,
MessageEvent,
Message,
Bot,
)
from nonebot.permission import SUPERUSER
BAN_FILE = "zhenxun/plugins/nb_new/ban.json"
GROUP_FILE = "zhenxun/plugins/nb_new/group.json"
VERIFY_TIMEOUT = 300 # 5分钟
LEVEL_LIMIT = 20 # QQ等级限制
# 内存缓存
verify_data = {} # user_id -> {"code": xxx, "group_id": xxx, "expire": time}
# 初始化配置文件
def load_json(path, default):
try:
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
except:
with open(path, "w", encoding="utf-8") as f:
json.dump(default, f, indent=2, ensure_ascii=False)
return default
def save_json(path, data):
with open(path, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
ban_list = load_json(BAN_FILE, [])
group_list = load_json(GROUP_FILE, [记得改成要启用的群聊]) # 默认群
# 新人进群事件
new_member = on_notice(priority=5, block=False)
@new_member.handle()
async def _(bot: Bot, event: GroupIncreaseNoticeEvent):
user_id = event.user_id
group_id = event.group_id
if group_id not in group_list:
return
# 如果在ban名单中
if user_id in ban_list:
await bot.call_api("set_group_kick", group_id=group_id, user_id=user_id)
return
# 检查QQ等级
try:
profile = await bot.get_stranger_info(user_id=user_id, no_cache=True)
level = profile.get("level", 0)
if level < LEVEL_LIMIT:
await bot.call_api("set_group_kick", group_id=group_id, user_id=user_id)
return
except:
pass
# 禁言
await bot.call_api("set_group_ban", group_id=group_id, user_id=user_id, duration=3600)
# 生成验证码
code = str(random.randint(100000, 999999))
verify_data[user_id] = {
"code": code,
"group_id": group_id,
"expire": datetime.now() + timedelta(seconds=VERIFY_TIMEOUT),
}
try:
await bot.send_private_msg(user_id=user_id, message=f"请在5分钟内回复验证码{code}\n回复错误或超时将被踢出群聊。")
except:
await bot.send_group_msg(group_id=group_id, message=f"[CQ:at,qq={user_id}] 请在私聊完成验证")
# 私聊消息监听
verify_reply = on_message(priority=5, block=False)
@verify_reply.handle()
async def _(bot: Bot, event: MessageEvent):
user_id = event.user_id
msg = str(event.get_message()).strip()
if user_id not in verify_data:
return
data = verify_data[user_id]
group_id = data["group_id"]
correct_code = data["code"]
# 超时
if datetime.now() > data["expire"]:
await bot.call_api("set_group_kick", group_id=group_id, user_id=user_id)
ban_list.append(user_id)
save_json(BAN_FILE, ban_list)
verify_data.pop(user_id, None)
return
# 验证
if msg == correct_code:
await bot.call_api("set_group_ban", group_id=group_id, user_id=user_id, duration=0)
await bot.send_group_msg(group_id=group_id, message=f"[CQ:at,qq={user_id}] 验证成功,欢迎加入群聊")
verify_data.pop(user_id, None)
else:
await bot.call_api("set_group_kick", group_id=group_id, user_id=user_id)
ban_list.append(user_id)
save_json(BAN_FILE, ban_list)
verify_data.pop(user_id, None)
# 定时任务:清理超时
async def check_timeout(bot: Bot):
while True:
now = datetime.now()
to_remove = []
for uid, data in verify_data.items():
if now > data["expire"]:
try:
await bot.call_api("set_group_kick", group_id=data["group_id"], user_id=uid)
except:
pass
ban_list.append(uid)
save_json(BAN_FILE, ban_list)
to_remove.append(uid)
for uid in to_remove:
verify_data.pop(uid, None)
await asyncio.sleep(60)
@new_member.handle()
async def start_check(bot: Bot, event: GroupIncreaseNoticeEvent):
asyncio.create_task(check_timeout(bot))