import json import random import asyncio from datetime import datetime, timedelta from nonebot import on_notice, on_message, get_bot from nonebot.adapters.onebot.v11 import ( GroupIncreaseNoticeEvent, MessageEvent, Message, Bot, ) from nonebot.permission import SUPERUSER BAN_FILE = "zhenxun/plugins/nb_new/ban.json" GROUP_FILE = "zhenxun/plugins/nb_new/group.json" VERIFY_TIMEOUT = 300 # 5分钟 LEVEL_LIMIT = 20 # QQ等级限制 # 内存缓存 verify_data = {} # user_id -> {"code": xxx, "group_id": xxx, "expire": time} # 初始化配置文件 def load_json(path, default): try: with open(path, "r", encoding="utf-8") as f: return json.load(f) except: with open(path, "w", encoding="utf-8") as f: json.dump(default, f, indent=2, ensure_ascii=False) return default def save_json(path, data): with open(path, "w", encoding="utf-8") as f: json.dump(data, f, indent=2, ensure_ascii=False) ban_list = load_json(BAN_FILE, []) group_list = load_json(GROUP_FILE, [记得改成要启用的群聊]) # 默认群 # 新人进群事件 new_member = on_notice(priority=5, block=False) @new_member.handle() async def _(bot: Bot, event: GroupIncreaseNoticeEvent): user_id = event.user_id group_id = event.group_id if group_id not in group_list: return # 如果在ban名单中 if user_id in ban_list: await bot.call_api("set_group_kick", group_id=group_id, user_id=user_id) return # 检查QQ等级 try: profile = await bot.get_stranger_info(user_id=user_id, no_cache=True) level = profile.get("level", 0) if level < LEVEL_LIMIT: await bot.call_api("set_group_kick", group_id=group_id, user_id=user_id) return except: pass # 禁言 await bot.call_api("set_group_ban", group_id=group_id, user_id=user_id, duration=3600) # 生成验证码 code = str(random.randint(100000, 999999)) verify_data[user_id] = { "code": code, "group_id": group_id, "expire": datetime.now() + timedelta(seconds=VERIFY_TIMEOUT), } try: await bot.send_private_msg(user_id=user_id, message=f"请在5分钟内回复验证码:{code}\n回复错误或超时将被踢出群聊。") except: await bot.send_group_msg(group_id=group_id, message=f"[CQ:at,qq={user_id}] 请在私聊完成验证") # 私聊消息监听 verify_reply = on_message(priority=5, block=False) @verify_reply.handle() async def _(bot: Bot, event: MessageEvent): user_id = event.user_id msg = str(event.get_message()).strip() if user_id not in verify_data: return data = verify_data[user_id] group_id = data["group_id"] correct_code = data["code"] # 超时 if datetime.now() > data["expire"]: await bot.call_api("set_group_kick", group_id=group_id, user_id=user_id) ban_list.append(user_id) save_json(BAN_FILE, ban_list) verify_data.pop(user_id, None) return # 验证 if msg == correct_code: await bot.call_api("set_group_ban", group_id=group_id, user_id=user_id, duration=0) await bot.send_group_msg(group_id=group_id, message=f"[CQ:at,qq={user_id}] 验证成功,欢迎加入群聊") verify_data.pop(user_id, None) else: await bot.call_api("set_group_kick", group_id=group_id, user_id=user_id) ban_list.append(user_id) save_json(BAN_FILE, ban_list) verify_data.pop(user_id, None) # 定时任务:清理超时 async def check_timeout(bot: Bot): while True: now = datetime.now() to_remove = [] for uid, data in verify_data.items(): if now > data["expire"]: try: await bot.call_api("set_group_kick", group_id=data["group_id"], user_id=uid) except: pass ban_list.append(uid) save_json(BAN_FILE, ban_list) to_remove.append(uid) for uid in to_remove: verify_data.pop(uid, None) await asyncio.sleep(60) @new_member.handle() async def start_check(bot: Bot, event: GroupIncreaseNoticeEvent): asyncio.create_task(check_timeout(bot))