diff --git a/charge.py b/charge.py new file mode 100644 index 0000000..55d0354 --- /dev/null +++ b/charge.py @@ -0,0 +1,177 @@ + +__all__ = ["handle_buy_ticket", "handle_wipe_ticket", "handle_query_ticket"] + +bind_lock = asyncio.Lock() + +async def _ensure_login(userId: int): + ts = generateTimestamp() + loginResult = apiLogin(ts, userId) + return ts, loginResult + +async def handle_buy_ticket(bot: Bot, message: UniMsg, session: EventSession): + + user_qq = str(session.id1) + + async with bind_lock: + if user_qq not in bind_data: + await MessageUtils.build_message("请先绑定 /bind").send(reply_to=True) + return + userId = bind_data[user_qq] + + text = message.extract_plain_text().strip().lower() + match = re.match(r'^/发票\s*([2356])$', text) + if not match: + await MessageUtils.build_message("命令格式错误,请使用 /发票2356").send(reply_to=True) + return + + ticket_type = int(match.group(1)) + + ts, loginResult = await _ensure_login(userId) + return_code = loginResult.get("returnCode") + + if return_code == 1: + + pass + elif return_code == 100: + await MessageUtils.build_message("用户正在上机游玩,请下机后再试,或等待 15 分钟。").send(reply_to=True) + return + elif return_code == 102: + await MessageUtils.build_message("zako~zako~又不获取二维码吗").send(reply_to=True) + return + elif return_code == 103: + await MessageUtils.build_message("登录的账号 UID 无效,请检查账号是否正确。").send(reply_to=True) + return + else: + + error_details = loginResult.get("message", str(loginResult)) + await MessageUtils.build_message(f"登录失败!这不应该发生,请反馈此问题。\n错误详情:{error_details}").send(reply_to=True) + return + + try: + try: + result_str = implBuyTicket(userId, ticket_type) + try: + + result_json = json.loads(result_str) + if result_json.get("returnCode") == 1: + msg = f"✅ 发票 {ticket_type}成功" + else: + msg = f"❌ 发票失败: {result_json.get('message', result_str)}" + except: + msg = f"发票 {ticket_type} 返回: {result_str}" + await MessageUtils.build_message(msg).send(reply_to=True) + except Exception as e: + await MessageUtils.build_message(f"发票失败: {e}").send(reply_to=True) + logger.error(f"发票异常: {e}") + finally: + try: + apiLogout(ts, userId) + except Exception as e: + logger.warning(f"登出失败: {e}") + +async def handle_wipe_ticket(bot: Bot, session: EventSession): + user_qq = str(session.id1) + + async with bind_lock: + if user_qq not in bind_data: + await MessageUtils.build_message("请先绑定 /bind").send(reply_to=True) + return + userId = bind_data[user_qq] + + ts, loginResult = await _ensure_login(userId) + return_code = loginResult.get("returnCode") + + if return_code == 1: + pass + elif return_code == 100: + await MessageUtils.build_message("用户正在上机游玩,请下机后再试,或等待 15 分钟。").send(reply_to=True) + return + elif return_code == 102: + await MessageUtils.build_message("zako~zako~又不获取二维码吗").send(reply_to=True) + return + elif return_code == 103: + await MessageUtils.build_message("登录的账号 UID 无效,请检查账号是否正确。").send(reply_to=True) + return + else: + + error_details = loginResult.get("message", str(loginResult)) + await MessageUtils.build_message(f"登录失败!这不应该发生,请反馈此问题。\n错误详情:{error_details}").send(reply_to=True) + return + + try: + try: + result = implWipeTickets(userId, ts, loginResult) + + if isinstance(result, dict) and result.get("returnCode") == 1: + await MessageUtils.build_message("✅ 清票成功!").send(reply_to=True) + else: + await MessageUtils.build_message(f"❌ 清票失败: {result}").send(reply_to=True) + except Exception as e: + await MessageUtils.build_message(f"❌ 清票失败: {e}").send(reply_to=True) + logger.error(f"清票异常: {e}") + finally: + try: + apiLogout(ts, userId) + except Exception as e: + logger.warning(f"登出失败: {e}") + +async def handle_query_ticket(bot: Bot, session: EventSession): + user_qq = str(session.id1) + + async with bind_lock: + if user_qq not in bind_data: + await MessageUtils.build_message("请先绑定 /bind").send(reply_to=True) + return + userId = bind_data[user_qq] + + ts, loginResult = await _ensure_login(userId) + return_code = loginResult.get("returnCode") + + if return_code == 1: + pass + elif return_code == 100: + await MessageUtils.build_message("用户正在上机游玩,请下机后再试,或等待 15 分钟。").send(reply_to=True) + return + elif return_code == 102: + await MessageUtils.build_message("zako~zako~又不获取二维码吗").send(reply_to=True) + return + elif return_code == 103: + await MessageUtils.build_message("登录的账号 UID 无效,请检查账号是否正确。").send(reply_to=True) + return + else: + + error_details = loginResult.get("message", str(loginResult)) + await MessageUtils.build_message(f"登录失败!这不应该发生,请反馈此问题。\n错误详情:{error_details}").send(reply_to=True) + return + + try: + try: + result = apiQueryTicket(userId) + + if isinstance(result, str): + import json + try: + result = json.loads(result) + except Exception: + pass + + msg_lines = [] + userChargeList = result.get("userChargeList", []) if isinstance(result, dict) else [] + + if not userChargeList: + msg_lines.append("当前没有任何记录。") + else: + + charge_name_map = {2: "2倍票", 3: "3倍票", 5: "5倍票", 6: "6倍票"} + for charge in userChargeList: + charge_id = charge.get("chargeId") + stock = charge.get("stock", 0) + valid_date = charge.get("validDate", "未知") + msg_lines.append(f"{charge_name_map.get(charge_id, charge_id)}: 持有 {stock}, 有效期 {valid_date}") + + msg_text = "📋 当前票状态:\n" + "\n".join(msg_lines) + await MessageUtils.build_message(msg_text).send(reply_to=True) + + except Exception as e: + await MessageUtils.build_message(f"❌ 查票失败: {e}").send(reply_to=True) + logger.error(f"查票异常: {e}") diff --git a/delete.py b/delete.py new file mode 100644 index 0000000..8c0bd89 --- /dev/null +++ b/delete.py @@ -0,0 +1,70 @@ + +bind_lock = asyncio.Lock() + +async def _ensure_login(userId: int): + ts = generateTimestamp() + loginResult = apiLogin(ts, userId) + return ts, loginResult + +async def delete(bot: Bot, event: MessageEvent, session: EventSession, message: UniMsg): + user_qq = str(event.user_id) + + async with bind_lock: + if user_qq not in bind_data: + await MessageUtils.build_message("请先绑定 /bind").send(reply_to=True) + return + userId = bind_data[user_qq] + + args = event.get_plaintext().strip().split() + if len(args) < 3: + await MessageUtils.build_message("用法: /del musicId levelId").send(reply_to=True) + return + + try: + musicId = int(args[1]) + levelId = int(args[2]) + except Exception: + await MessageUtils.build_message("参数错误: musicId 和 levelId 必须是数字").send(reply_to=True) + return + + ts, loginResult = await _ensure_login(userId) + return_code = loginResult.get("returnCode") + + if return_code == 1: + pass + elif return_code == 100: + await MessageUtils.build_message("用户正在上机游玩,请下机后再试,或等待 15 分钟。").send(reply_to=True) + return + elif return_code == 102: + await MessageUtils.build_message("zako~zako~又不获取二维码吗").send(reply_to=True) + return + elif return_code == 103: + await MessageUtils.build_message("登录的账号 UID 无效,请检查账号是否正确。").send(reply_to=True) + return + else: + error_details = loginResult.get("message", str(loginResult)) + await MessageUtils.build_message(f"登录失败!这不应该发生,请反馈此问题。\n错误详情:{error_details}").send(reply_to=True) + return + + try: + result_str = implDeleteMusicRecord(userId, ts, loginResult, musicId, levelId) + + msg = None + try: + result_json = json.loads(result_str) if isinstance(result_str, str) else result_str + if isinstance(result_json, dict): + if result_json.get("returnCode") == 1: + msg = f"🗑 成功删除成绩: musicId={musicId}, levelId={levelId}" + else: + msg = f"❌ 删除失败: {result_json.get('message', result_json)}" + except Exception: + pass + + if not msg: + msg = f"删除成绩返回: {result_str}" + + await MessageUtils.build_message(msg).send(reply_to=True) + + except Exception as e: + await MessageUtils.build_message(f"❌ 删除成绩异常: {e}").send(reply_to=True) + logger.error(f"删除成绩异常: {e}") diff --git a/info.py b/info.py new file mode 100644 index 0000000..0023808 --- /dev/null +++ b/info.py @@ -0,0 +1,45 @@ + +EXCLUDE_KEYS = {"isNetMember", "isInherit", "dispRate", "dailyBonusDate", "headPhoneVolume", "nameplateId", "iconId", "trophyId"} + +MAPPING = { + "userName": "玩家昵称", + "isLogin": "是否在线", + "lastGameId": "最后游玩游戏ID", + "lastRomVersion": "最后ROM版本", + "lastDataVersion": "最后数据版本", + "lastLoginDate": "最后登录时间", + "lastPlayDate": "最后游玩时间", + "playerRating": "玩家Rating", + "totalAwake": "觉醒总数", + "banState": "封禁状态", +} + +async def handle_info_logic(bot: Bot, message: UniMsg, session: EventSession): + user_qq = str(session.id1) + + if user_qq not in bind_data: + await MessageUtils.build_message("❌ zako~又不绑定账号吗,请先使用 /bind 绑定。").send(reply_to=True) + return + + user_id = bind_data[user_qq] + + try: + raw = apiGetUserPreview(user_id) + data = json.loads(raw) + except Exception as e: + logger.error(f"获取用户预览失败: {e}") + await MessageUtils.build_message("❌ zako~获取用户信息失败了呢").send(reply_to=True) + return + + for k in EXCLUDE_KEYS: + data.pop(k, None) + + lines = [] + for k, v in data.items(): + if k == "userId": + continue + zh_key = MAPPING.get(k, k) + lines.append(f"{zh_key}: {v}") + + msg = "📋 用户信息:\n" + "\n".join(lines) + await MessageUtils.build_message(msg).send(reply_to=True) diff --git a/lock.py b/lock.py new file mode 100644 index 0000000..a78ac32 --- /dev/null +++ b/lock.py @@ -0,0 +1,101 @@ + +bind_lock = asyncio.Lock() + + +async def _ensure_login(userId: int): + ts = generateTimestamp() + loginResult = apiLogin(ts, userId) + return ts, loginResult + + +async def handle_lock(bot: Bot, message: UniMsg, session: EventSession): + user_qq = str(session.id1) + + async with bind_lock: + if user_qq not in bind_data: + await MessageUtils.build_message("请先绑定 /bind").send(reply_to=True) + return + userId = bind_data[user_qq] + + args = message.extract_plain_text().strip().split() + if len(args) < 3: + await MessageUtils.build_message( + "用法: /lock <类型> \n示例: /lock 歌 11734\n示例: /lock 搭档 10" + ).send(reply_to=True) + return + + itemType = args[1] + itemId = args[2] + + ts, loginResult = await _ensure_login(userId) + return_code = loginResult.get("returnCode") + + if return_code == 1: + pass + elif return_code == 100: + await MessageUtils.build_message("用户正在上机游玩,请下机后再试,或等待 15 分钟。").send(reply_to=True) + return + elif return_code == 102: + await MessageUtils.build_message("zako~zako~又不获取二维码吗").send(reply_to=True) + return + elif return_code == 103: + await MessageUtils.build_message("登录的账号 UID 无效,请检查账号是否正确。").send(reply_to=True) + return + else: + error_details = loginResult.get("message", str(loginResult)) + await MessageUtils.build_message(f"登录失败!这不应该发生,请反馈此问题。\n错误详情:{error_details}").send(reply_to=True) + return + + try: + if itemType in ["歌", "乐曲", "MUSIC"]: + try: + musicId = int(itemId) + except ValueError: + await MessageUtils.build_message("乐曲ID必须是数字").send(reply_to=True) + return + result_str = impllockMusic(musicId, userId, ts, loginResult) + + else: + if itemType in itemKindzhCNDict: + itemType = itemKindzhCNDict[itemType] + + if itemType not in itemKindDict: + await MessageUtils.build_message( + f"未知类型 {itemType},可用类型: {','.join(itemKindzhCNDict.keys())}" + ).send(reply_to=True) + return + + try: + itemId = int(itemId) + except ValueError: + await MessageUtils.build_message("itemId 必须是数字").send(reply_to=True) + return + + result_str = impllockSingleItem( + itemId, itemKindDict[itemType], userId, ts, loginResult + ) + + msg = None + try: + result_json = json.loads(result_str) if isinstance(result_str, str) else result_str + if isinstance(result_json, dict): + if result_json.get("returnCode") == 1: + msg = f"✅ 成功: {itemType} {itemId}" + else: + msg = f"❌ 失败: {result_json.get('message', result_json)}" + except Exception: + pass + + if not msg: + msg = f"解锁返回: {result_str}" + + await MessageUtils.build_message(msg).send(reply_to=True) + + except Exception as e: + await MessageUtils.build_message(f"❌ 解锁异常: {e}").send(reply_to=True) + logger.error(f"解锁异常: {e}") + finally: + try: + apiLogout(ts, userId) + except Exception as e: + logger.warning(f"登出失败: {e}") diff --git a/mai_unban.py b/mai_unban.py new file mode 100644 index 0000000..d92cf55 --- /dev/null +++ b/mai_unban.py @@ -0,0 +1,115 @@ + + +def isUserLoggedIn(userId): + try: + isLogin = json.loads(apiGetUserPreview(userId, True))['isLogin'] + logger.debug(f"用户 {userId} 是否登录: {isLogin}") + return isLogin + except Exception as e: + logger.error(f"检查用户 {userId} 登录状态时出错: {e}") + return False + +def convert_hhmm_to_timestamp(time_str: str) -> int: + now = datetime.now() + dt_obj = datetime.strptime(f"{now.year}-{now.month}-{now.day} {time_str}", "%Y-%m-%d %H:%M") + return int(dt_obj.timestamp()) + +def logOut(userId, Timestamp): + try: + response = apiLogout(Timestamp, userId, True) + if response and response.get('returnCode') == 1: + logger.debug(f"已成功发送登出请求给用户 {userId},时间戳 {Timestamp}") + return True + return False + except Exception as e: + logger.error(f"使用时间戳 {Timestamp} 登出用户 {userId} 时发生错误: {e}") + return False + +def isCorrectTimestamp(timestamp, userId): + if not logOut(userId, timestamp): + return False + time.sleep(0.1) + isLoggedOut = not isUserLoggedIn(userId) + logger.debug(f"时间戳 {timestamp} 是否正确: {isLoggedOut}") + return isLoggedOut + +def findTimestampInRange(start_ts: int, end_ts: int, userId: int): + logger.info(f"开始在时间范围 [{start_ts}, {end_ts}] 内为用户 {userId} 搜索有效时间戳...") + for ts in range(start_ts, end_ts + 1): + if isCorrectTimestamp(ts, userId): + logger.info(f"找到正确的时间戳: {ts}") + return ts + + logger.error(f"在指定范围内未能找到用户 {userId} 的有效时间戳") + return None + +async def handle_unban_command(bot: Bot, event: MessageEvent, args: Message = CommandArg()): + arg_list = args.extract_plain_text().strip().split() + + if len(arg_list) != 2: + await MessageUtils.build_message( + "命令格式不正确喵~\n" + "正确格式: /黑屋 <开始时间> <结束时间>\n" + "例如: /黑屋 19:00 19:10\n" + "注意:时间范围不能超过30分钟。" + ).send(reply_to=True) + return + + start_time_str, end_time_str = arg_list + user_qq = str(event.user_id) + + if user_qq not in bind_data: + await MessageUtils.build_message("zako~又不绑定账号吗").send(reply_to=True) + return + user_id = bind_data[user_qq] + + try: + logger.info("开始解析时间参数...") + start_timestamp = convert_hhmm_to_timestamp(start_time_str) + end_timestamp = convert_hhmm_to_timestamp(end_time_str) + logger.info(f"时间参数解析成功: 开始时间戳 {start_timestamp}, 结束时间戳 {end_timestamp}") + + if end_timestamp <= start_timestamp: + await MessageUtils.build_message("结束时间必须晚于开始时间!").send(reply_to=True) + return + + if (end_timestamp - start_timestamp) > 1800: # 30分钟 * 60秒 + await MessageUtils.build_message("时间范围不能超过30分钟哦!").send(reply_to=True) + return + + except ValueError: + logger.error(f"无法解析时间格式: {start_time_str} 或 {end_time_str}") + await MessageUtils.build_message("时间格式错误,请使用 HH:MM 格式!").send(reply_to=True) + return + except Exception as e: + logger.error(f"解析时间时发生未知错误: {e}") + await MessageUtils.build_message("解析时间时发生未知错误,请检查后台日志。").send(reply_to=True) + return + + logger.info(f"正在检查用户 {user_id} 的登录状态...") + if not isUserLoggedIn(user_id): + await MessageUtils.build_message("zako~没进黑屋干什么解").send(reply_to=True) + return + + await MessageUtils.build_message(f"收到!将在 {start_time_str} 到 {end_time_str} 的时间范围内尝试解黑屋,请稍候...").send(reply_to=True) + + try: + start_process_time = time.time() + + result_timestamp = await asyncio.to_thread(findTimestampInRange, start_timestamp, end_timestamp, user_id) + + end_process_time = time.time() + duration = end_process_time - start_process_time + + if result_timestamp is not None: + human_readable_time = datetime.fromtimestamp(result_timestamp).strftime('%Y-%m-%d %H:%M:%S') + final_message = f"解黑屋成功!\n找到的时间点: {human_readable_time}\n消耗时间: {duration:.2f}秒" + else: + final_message = "解黑屋失败,在指定时间段内未能找到有效的时间点。" + + await MessageUtils.build_message(final_message).send(reply_to=True) + + except Exception as e: + logger.error(f"处理 /黑屋 命令时发生意外错误: {e}") + await MessageUtils.build_message("执行过程中发生未知错误,请联系管理员。").send(reply_to=True) +} \ No newline at end of file diff --git a/unlock.py b/unlock.py new file mode 100644 index 0000000..360511a --- /dev/null +++ b/unlock.py @@ -0,0 +1,101 @@ + +bind_lock = asyncio.Lock() + + +async def _ensure_login(userId: int): + ts = generateTimestamp() + loginResult = apiLogin(ts, userId) + return ts, loginResult + + +async def handle_unlock(bot: Bot, message: UniMsg, session: EventSession): + user_qq = str(session.id1) + + async with bind_lock: + if user_qq not in bind_data: + await MessageUtils.build_message("请先绑定 /bind").send(reply_to=True) + return + userId = bind_data[user_qq] + + args = message.extract_plain_text().strip().split() + if len(args) < 3: + await MessageUtils.build_message( + "用法: /unlock <类型> \n示例: /unlock 歌 11734\n示例: /unlock 搭档 10" + ).send(reply_to=True) + return + + itemType = args[1] + itemId = args[2] + + ts, loginResult = await _ensure_login(userId) + return_code = loginResult.get("returnCode") + + if return_code == 1: + pass + elif return_code == 100: + await MessageUtils.build_message("用户正在上机游玩,请下机后再试,或等待 15 分钟。").send(reply_to=True) + return + elif return_code == 102: + await MessageUtils.build_message("zako~zako~又不获取二维码吗").send(reply_to=True) + return + elif return_code == 103: + await MessageUtils.build_message("登录的账号 UID 无效,请检查账号是否正确。").send(reply_to=True) + return + else: + error_details = loginResult.get("message", str(loginResult)) + await MessageUtils.build_message(f"登录失败!这不应该发生,请反馈此问题。\n错误详情:{error_details}").send(reply_to=True) + return + + try: + if itemType in ["歌", "乐曲", "MUSIC"]: + try: + musicId = int(itemId) + except ValueError: + await MessageUtils.build_message("乐曲ID必须是数字").send(reply_to=True) + return + result_str = implUnlockMusic(musicId, userId, ts, loginResult) + + else: + if itemType in itemKindzhCNDict: + itemType = itemKindzhCNDict[itemType] + + if itemType not in itemKindDict: + await MessageUtils.build_message( + f"未知类型 {itemType},可用类型: {','.join(itemKindzhCNDict.keys())}" + ).send(reply_to=True) + return + + try: + itemId = int(itemId) + except ValueError: + await MessageUtils.build_message("itemId 必须是数字").send(reply_to=True) + return + + result_str = implUnlockSingleItem( + itemId, itemKindDict[itemType], userId, ts, loginResult + ) + + msg = None + try: + result_json = json.loads(result_str) if isinstance(result_str, str) else result_str + if isinstance(result_json, dict): + if result_json.get("returnCode") == 1: + msg = f"✅ 解锁成功: {itemType} {itemId}" + else: + msg = f"❌ 解锁失败: {result_json.get('message', result_json)}" + except Exception: + pass + + if not msg: + msg = f"解锁返回: {result_str}" + + await MessageUtils.build_message(msg).send(reply_to=True) + + except Exception as e: + await MessageUtils.build_message(f"❌ 解锁异常: {e}").send(reply_to=True) + logger.error(f"解锁异常: {e}") + finally: + try: + apiLogout(ts, userId) + except Exception as e: + logger.warning(f"登出失败: {e}") diff --git a/user.py b/user.py new file mode 100644 index 0000000..883f974 --- /dev/null +++ b/user.py @@ -0,0 +1,51 @@ + +__all__ = ["handle_user"] + +bind_lock = asyncio.Lock() + + +async def _ensure_login(userId: int): + ts = generateTimestamp() + loginResult = apiLogin(ts, userId) + return ts, loginResult + + +async def handle_user(bot: Bot, event: Event): + user_qq = str(event.user_id) + + async with bind_lock: + if user_qq not in bind_data: + await MessageUtils.build_message("请先绑定 /bind").send(reply_to=True) + return + userId = bind_data[user_qq] + + ts, loginResult = await _ensure_login(userId) + return_code = loginResult.get("returnCode") + + if return_code == 1: + pass + elif return_code == 100: + await MessageUtils.build_message("用户正在上机游玩,请下机后再试,或等待 15 分钟。").send(reply_to=True) + return + elif return_code == 102: + await MessageUtils.build_message("zako~zako~又不获取二维码吗").send(reply_to=True) + return + elif return_code == 103: + await MessageUtils.build_message("登录的账号 UID 无效,请检查账号是否正确。").send(reply_to=True) + return + else: + error_details = loginResult.get("message", str(loginResult)) + await MessageUtils.build_message(f"登录失败!这不应该发生,请反馈此问题。\n错误详情:{error_details}").send(reply_to=True) + return + + try: + friendly_info = getFriendlyUserData(userId) + await MessageUtils.build_message(f"📄 用户信息:\n{friendly_info}").send(reply_to=True) + except Exception as e: + await MessageUtils.build_message(f"❌ 获取用户信息失败: {e}").send(reply_to=True) + logger.error(f"获取用户信息异常: {e}") + finally: + try: + apiLogout(ts, userId) + except Exception as e: + logger.warning(f"登出失败: {e}")