上传文件至 /

This commit is contained in:
2025-10-15 19:42:27 +08:00
parent 4c142d19e6
commit 5cfee507f0
7 changed files with 660 additions and 0 deletions

177
charge.py Normal file
View File

@@ -0,0 +1,177 @@
__all__ = ["handle_buy_ticket", "handle_wipe_ticket", "handle_query_ticket"]
bind_lock = asyncio.Lock()
async def _ensure_login(userId: int):
ts = generateTimestamp()
loginResult = apiLogin(ts, userId)
return ts, loginResult
async def handle_buy_ticket(bot: Bot, message: UniMsg, session: EventSession):
user_qq = str(session.id1)
async with bind_lock:
if user_qq not in bind_data:
await MessageUtils.build_message("请先绑定 /bind").send(reply_to=True)
return
userId = bind_data[user_qq]
text = message.extract_plain_text().strip().lower()
match = re.match(r'^/发票\s*([2356])$', text)
if not match:
await MessageUtils.build_message("命令格式错误,请使用 /发票2356").send(reply_to=True)
return
ticket_type = int(match.group(1))
ts, loginResult = await _ensure_login(userId)
return_code = loginResult.get("returnCode")
if return_code == 1:
pass
elif return_code == 100:
await MessageUtils.build_message("用户正在上机游玩,请下机后再试,或等待 15 分钟。").send(reply_to=True)
return
elif return_code == 102:
await MessageUtils.build_message("zako~zako~又不获取二维码吗").send(reply_to=True)
return
elif return_code == 103:
await MessageUtils.build_message("登录的账号 UID 无效,请检查账号是否正确。").send(reply_to=True)
return
else:
error_details = loginResult.get("message", str(loginResult))
await MessageUtils.build_message(f"登录失败!这不应该发生,请反馈此问题。\n错误详情:{error_details}").send(reply_to=True)
return
try:
try:
result_str = implBuyTicket(userId, ticket_type)
try:
result_json = json.loads(result_str)
if result_json.get("returnCode") == 1:
msg = f"✅ 发票 {ticket_type}成功"
else:
msg = f"❌ 发票失败: {result_json.get('message', result_str)}"
except:
msg = f"发票 {ticket_type} 返回: {result_str}"
await MessageUtils.build_message(msg).send(reply_to=True)
except Exception as e:
await MessageUtils.build_message(f"发票失败: {e}").send(reply_to=True)
logger.error(f"发票异常: {e}")
finally:
try:
apiLogout(ts, userId)
except Exception as e:
logger.warning(f"登出失败: {e}")
async def handle_wipe_ticket(bot: Bot, session: EventSession):
user_qq = str(session.id1)
async with bind_lock:
if user_qq not in bind_data:
await MessageUtils.build_message("请先绑定 /bind").send(reply_to=True)
return
userId = bind_data[user_qq]
ts, loginResult = await _ensure_login(userId)
return_code = loginResult.get("returnCode")
if return_code == 1:
pass
elif return_code == 100:
await MessageUtils.build_message("用户正在上机游玩,请下机后再试,或等待 15 分钟。").send(reply_to=True)
return
elif return_code == 102:
await MessageUtils.build_message("zako~zako~又不获取二维码吗").send(reply_to=True)
return
elif return_code == 103:
await MessageUtils.build_message("登录的账号 UID 无效,请检查账号是否正确。").send(reply_to=True)
return
else:
error_details = loginResult.get("message", str(loginResult))
await MessageUtils.build_message(f"登录失败!这不应该发生,请反馈此问题。\n错误详情:{error_details}").send(reply_to=True)
return
try:
try:
result = implWipeTickets(userId, ts, loginResult)
if isinstance(result, dict) and result.get("returnCode") == 1:
await MessageUtils.build_message("✅ 清票成功!").send(reply_to=True)
else:
await MessageUtils.build_message(f"❌ 清票失败: {result}").send(reply_to=True)
except Exception as e:
await MessageUtils.build_message(f"❌ 清票失败: {e}").send(reply_to=True)
logger.error(f"清票异常: {e}")
finally:
try:
apiLogout(ts, userId)
except Exception as e:
logger.warning(f"登出失败: {e}")
async def handle_query_ticket(bot: Bot, session: EventSession):
user_qq = str(session.id1)
async with bind_lock:
if user_qq not in bind_data:
await MessageUtils.build_message("请先绑定 /bind").send(reply_to=True)
return
userId = bind_data[user_qq]
ts, loginResult = await _ensure_login(userId)
return_code = loginResult.get("returnCode")
if return_code == 1:
pass
elif return_code == 100:
await MessageUtils.build_message("用户正在上机游玩,请下机后再试,或等待 15 分钟。").send(reply_to=True)
return
elif return_code == 102:
await MessageUtils.build_message("zako~zako~又不获取二维码吗").send(reply_to=True)
return
elif return_code == 103:
await MessageUtils.build_message("登录的账号 UID 无效,请检查账号是否正确。").send(reply_to=True)
return
else:
error_details = loginResult.get("message", str(loginResult))
await MessageUtils.build_message(f"登录失败!这不应该发生,请反馈此问题。\n错误详情:{error_details}").send(reply_to=True)
return
try:
try:
result = apiQueryTicket(userId)
if isinstance(result, str):
import json
try:
result = json.loads(result)
except Exception:
pass
msg_lines = []
userChargeList = result.get("userChargeList", []) if isinstance(result, dict) else []
if not userChargeList:
msg_lines.append("当前没有任何记录。")
else:
charge_name_map = {2: "2倍票", 3: "3倍票", 5: "5倍票", 6: "6倍票"}
for charge in userChargeList:
charge_id = charge.get("chargeId")
stock = charge.get("stock", 0)
valid_date = charge.get("validDate", "未知")
msg_lines.append(f"{charge_name_map.get(charge_id, charge_id)}: 持有 {stock}, 有效期 {valid_date}")
msg_text = "📋 当前票状态:\n" + "\n".join(msg_lines)
await MessageUtils.build_message(msg_text).send(reply_to=True)
except Exception as e:
await MessageUtils.build_message(f"❌ 查票失败: {e}").send(reply_to=True)
logger.error(f"查票异常: {e}")

70
delete.py Normal file
View File

@@ -0,0 +1,70 @@
bind_lock = asyncio.Lock()
async def _ensure_login(userId: int):
ts = generateTimestamp()
loginResult = apiLogin(ts, userId)
return ts, loginResult
async def delete(bot: Bot, event: MessageEvent, session: EventSession, message: UniMsg):
user_qq = str(event.user_id)
async with bind_lock:
if user_qq not in bind_data:
await MessageUtils.build_message("请先绑定 /bind").send(reply_to=True)
return
userId = bind_data[user_qq]
args = event.get_plaintext().strip().split()
if len(args) < 3:
await MessageUtils.build_message("用法: /del musicId levelId").send(reply_to=True)
return
try:
musicId = int(args[1])
levelId = int(args[2])
except Exception:
await MessageUtils.build_message("参数错误: musicId 和 levelId 必须是数字").send(reply_to=True)
return
ts, loginResult = await _ensure_login(userId)
return_code = loginResult.get("returnCode")
if return_code == 1:
pass
elif return_code == 100:
await MessageUtils.build_message("用户正在上机游玩,请下机后再试,或等待 15 分钟。").send(reply_to=True)
return
elif return_code == 102:
await MessageUtils.build_message("zako~zako~又不获取二维码吗").send(reply_to=True)
return
elif return_code == 103:
await MessageUtils.build_message("登录的账号 UID 无效,请检查账号是否正确。").send(reply_to=True)
return
else:
error_details = loginResult.get("message", str(loginResult))
await MessageUtils.build_message(f"登录失败!这不应该发生,请反馈此问题。\n错误详情:{error_details}").send(reply_to=True)
return
try:
result_str = implDeleteMusicRecord(userId, ts, loginResult, musicId, levelId)
msg = None
try:
result_json = json.loads(result_str) if isinstance(result_str, str) else result_str
if isinstance(result_json, dict):
if result_json.get("returnCode") == 1:
msg = f"🗑 成功删除成绩: musicId={musicId}, levelId={levelId}"
else:
msg = f"❌ 删除失败: {result_json.get('message', result_json)}"
except Exception:
pass
if not msg:
msg = f"删除成绩返回: {result_str}"
await MessageUtils.build_message(msg).send(reply_to=True)
except Exception as e:
await MessageUtils.build_message(f"❌ 删除成绩异常: {e}").send(reply_to=True)
logger.error(f"删除成绩异常: {e}")

45
info.py Normal file
View File

@@ -0,0 +1,45 @@
EXCLUDE_KEYS = {"isNetMember", "isInherit", "dispRate", "dailyBonusDate", "headPhoneVolume", "nameplateId", "iconId", "trophyId"}
MAPPING = {
"userName": "玩家昵称",
"isLogin": "是否在线",
"lastGameId": "最后游玩游戏ID",
"lastRomVersion": "最后ROM版本",
"lastDataVersion": "最后数据版本",
"lastLoginDate": "最后登录时间",
"lastPlayDate": "最后游玩时间",
"playerRating": "玩家Rating",
"totalAwake": "觉醒总数",
"banState": "封禁状态",
}
async def handle_info_logic(bot: Bot, message: UniMsg, session: EventSession):
user_qq = str(session.id1)
if user_qq not in bind_data:
await MessageUtils.build_message("❌ zako~又不绑定账号吗,请先使用 /bind 绑定。").send(reply_to=True)
return
user_id = bind_data[user_qq]
try:
raw = apiGetUserPreview(user_id)
data = json.loads(raw)
except Exception as e:
logger.error(f"获取用户预览失败: {e}")
await MessageUtils.build_message("❌ zako~获取用户信息失败了呢").send(reply_to=True)
return
for k in EXCLUDE_KEYS:
data.pop(k, None)
lines = []
for k, v in data.items():
if k == "userId":
continue
zh_key = MAPPING.get(k, k)
lines.append(f"{zh_key}: {v}")
msg = "📋 用户信息:\n" + "\n".join(lines)
await MessageUtils.build_message(msg).send(reply_to=True)

101
lock.py Normal file
View File

@@ -0,0 +1,101 @@
bind_lock = asyncio.Lock()
async def _ensure_login(userId: int):
ts = generateTimestamp()
loginResult = apiLogin(ts, userId)
return ts, loginResult
async def handle_lock(bot: Bot, message: UniMsg, session: EventSession):
user_qq = str(session.id1)
async with bind_lock:
if user_qq not in bind_data:
await MessageUtils.build_message("请先绑定 /bind").send(reply_to=True)
return
userId = bind_data[user_qq]
args = message.extract_plain_text().strip().split()
if len(args) < 3:
await MessageUtils.build_message(
"用法: /lock <类型> <itemId>\n示例: /lock 歌 11734\n示例: /lock 搭档 10"
).send(reply_to=True)
return
itemType = args[1]
itemId = args[2]
ts, loginResult = await _ensure_login(userId)
return_code = loginResult.get("returnCode")
if return_code == 1:
pass
elif return_code == 100:
await MessageUtils.build_message("用户正在上机游玩,请下机后再试,或等待 15 分钟。").send(reply_to=True)
return
elif return_code == 102:
await MessageUtils.build_message("zako~zako~又不获取二维码吗").send(reply_to=True)
return
elif return_code == 103:
await MessageUtils.build_message("登录的账号 UID 无效,请检查账号是否正确。").send(reply_to=True)
return
else:
error_details = loginResult.get("message", str(loginResult))
await MessageUtils.build_message(f"登录失败!这不应该发生,请反馈此问题。\n错误详情:{error_details}").send(reply_to=True)
return
try:
if itemType in ["", "乐曲", "MUSIC"]:
try:
musicId = int(itemId)
except ValueError:
await MessageUtils.build_message("乐曲ID必须是数字").send(reply_to=True)
return
result_str = impllockMusic(musicId, userId, ts, loginResult)
else:
if itemType in itemKindzhCNDict:
itemType = itemKindzhCNDict[itemType]
if itemType not in itemKindDict:
await MessageUtils.build_message(
f"未知类型 {itemType},可用类型: {','.join(itemKindzhCNDict.keys())}"
).send(reply_to=True)
return
try:
itemId = int(itemId)
except ValueError:
await MessageUtils.build_message("itemId 必须是数字").send(reply_to=True)
return
result_str = impllockSingleItem(
itemId, itemKindDict[itemType], userId, ts, loginResult
)
msg = None
try:
result_json = json.loads(result_str) if isinstance(result_str, str) else result_str
if isinstance(result_json, dict):
if result_json.get("returnCode") == 1:
msg = f"✅ 成功: {itemType} {itemId}"
else:
msg = f"❌ 失败: {result_json.get('message', result_json)}"
except Exception:
pass
if not msg:
msg = f"解锁返回: {result_str}"
await MessageUtils.build_message(msg).send(reply_to=True)
except Exception as e:
await MessageUtils.build_message(f"❌ 解锁异常: {e}").send(reply_to=True)
logger.error(f"解锁异常: {e}")
finally:
try:
apiLogout(ts, userId)
except Exception as e:
logger.warning(f"登出失败: {e}")

115
mai_unban.py Normal file
View File

@@ -0,0 +1,115 @@
def isUserLoggedIn(userId):
try:
isLogin = json.loads(apiGetUserPreview(userId, True))['isLogin']
logger.debug(f"用户 {userId} 是否登录: {isLogin}")
return isLogin
except Exception as e:
logger.error(f"检查用户 {userId} 登录状态时出错: {e}")
return False
def convert_hhmm_to_timestamp(time_str: str) -> int:
now = datetime.now()
dt_obj = datetime.strptime(f"{now.year}-{now.month}-{now.day} {time_str}", "%Y-%m-%d %H:%M")
return int(dt_obj.timestamp())
def logOut(userId, Timestamp):
try:
response = apiLogout(Timestamp, userId, True)
if response and response.get('returnCode') == 1:
logger.debug(f"已成功发送登出请求给用户 {userId},时间戳 {Timestamp}")
return True
return False
except Exception as e:
logger.error(f"使用时间戳 {Timestamp} 登出用户 {userId} 时发生错误: {e}")
return False
def isCorrectTimestamp(timestamp, userId):
if not logOut(userId, timestamp):
return False
time.sleep(0.1)
isLoggedOut = not isUserLoggedIn(userId)
logger.debug(f"时间戳 {timestamp} 是否正确: {isLoggedOut}")
return isLoggedOut
def findTimestampInRange(start_ts: int, end_ts: int, userId: int):
logger.info(f"开始在时间范围 [{start_ts}, {end_ts}] 内为用户 {userId} 搜索有效时间戳...")
for ts in range(start_ts, end_ts + 1):
if isCorrectTimestamp(ts, userId):
logger.info(f"找到正确的时间戳: {ts}")
return ts
logger.error(f"在指定范围内未能找到用户 {userId} 的有效时间戳")
return None
async def handle_unban_command(bot: Bot, event: MessageEvent, args: Message = CommandArg()):
arg_list = args.extract_plain_text().strip().split()
if len(arg_list) != 2:
await MessageUtils.build_message(
"命令格式不正确喵~\n"
"正确格式: /黑屋 <开始时间> <结束时间>\n"
"例如: /黑屋 19:00 19:10\n"
"注意时间范围不能超过30分钟。"
).send(reply_to=True)
return
start_time_str, end_time_str = arg_list
user_qq = str(event.user_id)
if user_qq not in bind_data:
await MessageUtils.build_message("zako~又不绑定账号吗").send(reply_to=True)
return
user_id = bind_data[user_qq]
try:
logger.info("开始解析时间参数...")
start_timestamp = convert_hhmm_to_timestamp(start_time_str)
end_timestamp = convert_hhmm_to_timestamp(end_time_str)
logger.info(f"时间参数解析成功: 开始时间戳 {start_timestamp}, 结束时间戳 {end_timestamp}")
if end_timestamp <= start_timestamp:
await MessageUtils.build_message("结束时间必须晚于开始时间!").send(reply_to=True)
return
if (end_timestamp - start_timestamp) > 1800: # 30分钟 * 60秒
await MessageUtils.build_message("时间范围不能超过30分钟哦").send(reply_to=True)
return
except ValueError:
logger.error(f"无法解析时间格式: {start_time_str}{end_time_str}")
await MessageUtils.build_message("时间格式错误,请使用 HH:MM 格式!").send(reply_to=True)
return
except Exception as e:
logger.error(f"解析时间时发生未知错误: {e}")
await MessageUtils.build_message("解析时间时发生未知错误,请检查后台日志。").send(reply_to=True)
return
logger.info(f"正在检查用户 {user_id} 的登录状态...")
if not isUserLoggedIn(user_id):
await MessageUtils.build_message("zako~没进黑屋干什么解").send(reply_to=True)
return
await MessageUtils.build_message(f"收到!将在 {start_time_str}{end_time_str} 的时间范围内尝试解黑屋,请稍候...").send(reply_to=True)
try:
start_process_time = time.time()
result_timestamp = await asyncio.to_thread(findTimestampInRange, start_timestamp, end_timestamp, user_id)
end_process_time = time.time()
duration = end_process_time - start_process_time
if result_timestamp is not None:
human_readable_time = datetime.fromtimestamp(result_timestamp).strftime('%Y-%m-%d %H:%M:%S')
final_message = f"解黑屋成功!\n找到的时间点: {human_readable_time}\n消耗时间: {duration:.2f}"
else:
final_message = "解黑屋失败,在指定时间段内未能找到有效的时间点。"
await MessageUtils.build_message(final_message).send(reply_to=True)
except Exception as e:
logger.error(f"处理 /黑屋 命令时发生意外错误: {e}")
await MessageUtils.build_message("执行过程中发生未知错误,请联系管理员。").send(reply_to=True)
}

101
unlock.py Normal file
View File

@@ -0,0 +1,101 @@
bind_lock = asyncio.Lock()
async def _ensure_login(userId: int):
ts = generateTimestamp()
loginResult = apiLogin(ts, userId)
return ts, loginResult
async def handle_unlock(bot: Bot, message: UniMsg, session: EventSession):
user_qq = str(session.id1)
async with bind_lock:
if user_qq not in bind_data:
await MessageUtils.build_message("请先绑定 /bind").send(reply_to=True)
return
userId = bind_data[user_qq]
args = message.extract_plain_text().strip().split()
if len(args) < 3:
await MessageUtils.build_message(
"用法: /unlock <类型> <itemId>\n示例: /unlock 歌 11734\n示例: /unlock 搭档 10"
).send(reply_to=True)
return
itemType = args[1]
itemId = args[2]
ts, loginResult = await _ensure_login(userId)
return_code = loginResult.get("returnCode")
if return_code == 1:
pass
elif return_code == 100:
await MessageUtils.build_message("用户正在上机游玩,请下机后再试,或等待 15 分钟。").send(reply_to=True)
return
elif return_code == 102:
await MessageUtils.build_message("zako~zako~又不获取二维码吗").send(reply_to=True)
return
elif return_code == 103:
await MessageUtils.build_message("登录的账号 UID 无效,请检查账号是否正确。").send(reply_to=True)
return
else:
error_details = loginResult.get("message", str(loginResult))
await MessageUtils.build_message(f"登录失败!这不应该发生,请反馈此问题。\n错误详情:{error_details}").send(reply_to=True)
return
try:
if itemType in ["", "乐曲", "MUSIC"]:
try:
musicId = int(itemId)
except ValueError:
await MessageUtils.build_message("乐曲ID必须是数字").send(reply_to=True)
return
result_str = implUnlockMusic(musicId, userId, ts, loginResult)
else:
if itemType in itemKindzhCNDict:
itemType = itemKindzhCNDict[itemType]
if itemType not in itemKindDict:
await MessageUtils.build_message(
f"未知类型 {itemType},可用类型: {','.join(itemKindzhCNDict.keys())}"
).send(reply_to=True)
return
try:
itemId = int(itemId)
except ValueError:
await MessageUtils.build_message("itemId 必须是数字").send(reply_to=True)
return
result_str = implUnlockSingleItem(
itemId, itemKindDict[itemType], userId, ts, loginResult
)
msg = None
try:
result_json = json.loads(result_str) if isinstance(result_str, str) else result_str
if isinstance(result_json, dict):
if result_json.get("returnCode") == 1:
msg = f"✅ 解锁成功: {itemType} {itemId}"
else:
msg = f"❌ 解锁失败: {result_json.get('message', result_json)}"
except Exception:
pass
if not msg:
msg = f"解锁返回: {result_str}"
await MessageUtils.build_message(msg).send(reply_to=True)
except Exception as e:
await MessageUtils.build_message(f"❌ 解锁异常: {e}").send(reply_to=True)
logger.error(f"解锁异常: {e}")
finally:
try:
apiLogout(ts, userId)
except Exception as e:
logger.warning(f"登出失败: {e}")

51
user.py Normal file
View File

@@ -0,0 +1,51 @@
__all__ = ["handle_user"]
bind_lock = asyncio.Lock()
async def _ensure_login(userId: int):
ts = generateTimestamp()
loginResult = apiLogin(ts, userId)
return ts, loginResult
async def handle_user(bot: Bot, event: Event):
user_qq = str(event.user_id)
async with bind_lock:
if user_qq not in bind_data:
await MessageUtils.build_message("请先绑定 /bind").send(reply_to=True)
return
userId = bind_data[user_qq]
ts, loginResult = await _ensure_login(userId)
return_code = loginResult.get("returnCode")
if return_code == 1:
pass
elif return_code == 100:
await MessageUtils.build_message("用户正在上机游玩,请下机后再试,或等待 15 分钟。").send(reply_to=True)
return
elif return_code == 102:
await MessageUtils.build_message("zako~zako~又不获取二维码吗").send(reply_to=True)
return
elif return_code == 103:
await MessageUtils.build_message("登录的账号 UID 无效,请检查账号是否正确。").send(reply_to=True)
return
else:
error_details = loginResult.get("message", str(loginResult))
await MessageUtils.build_message(f"登录失败!这不应该发生,请反馈此问题。\n错误详情:{error_details}").send(reply_to=True)
return
try:
friendly_info = getFriendlyUserData(userId)
await MessageUtils.build_message(f"📄 用户信息:\n{friendly_info}").send(reply_to=True)
except Exception as e:
await MessageUtils.build_message(f"❌ 获取用户信息失败: {e}").send(reply_to=True)
logger.error(f"获取用户信息异常: {e}")
finally:
try:
apiLogout(ts, userId)
except Exception as e:
logger.warning(f"登出失败: {e}")