上传文件至 /

This commit is contained in:
2025-10-13 18:15:05 +08:00
parent 2a198c3da6
commit c4e2150acf
18 changed files with 1178 additions and 1 deletions

82
API_AimeDB.py Normal file
View File

@@ -0,0 +1,82 @@
import hashlib
import time
import requests
import json
import re
from loguru import logger
CHIP_ID = ""
COMMON_KEY = "XcW5FW4cPArBXEk4vzKz3CIrMuA5EVVW"
API_URL = "http://ai.sys-allnet.cn/wc_aime/api/get_data"
def getSHA256(input_str):
return hashlib.sha256(input_str.encode('utf-8')).hexdigest().upper()
def generateSEGATimestamp():
return time.strftime("%y%m%d%H%M%S", time.localtime())
def calcSEGAAimeDBAuthKey(varString:str, timestamp:str, commonKey:str="XcW5FW4cPArBXEk4vzKz3CIrMuA5EVVW") -> str:
return hashlib.sha256((varString + timestamp + commonKey).encode("utf-8")).hexdigest().upper()
def apiAimeDB(qrCode):
timestamp = generateSEGATimestamp()
currentKey = calcSEGAAimeDBAuthKey(CHIP_ID, timestamp, COMMON_KEY)
payload = {
"chipID": CHIP_ID,
"openGameID": "MAID",
"key": currentKey,
"qrCode": qrCode,
"timestamp": timestamp
}
print("Payload:", json.dumps(payload, separators=(',', ':')))
headers = {
"Connection": "Keep-Alive",
"Host": API_URL.split("//")[-1].split("/")[0],
"User-Agent": "WC_AIME_LIB",
"Content-Type": "application/json",
}
response = requests.post(API_URL, data=json.dumps(payload, separators=(',', ':')), headers=headers)
return response
def isSGWCFormat(input_string: str) -> bool:
if (
len(input_string) != 84
or not input_string.startswith("SGWCMAID")
or re.match("^[0-9A-F]+$", input_string[20:]) is None
):
return False
else:
return True
def implAimeDB(qrCode:str, isAlreadyFinal:bool=False) -> str:
if isAlreadyFinal:
qr_code_final = qrCode
else:
qr_code_final = qrCode[20:]
response = apiAimeDB(qr_code_final)
print("implAimeDB: StatusCode is ", response.status_code)
print("implAimeDB: Response Body is:", response.text)
return response.text
def implGetUID(qr_content:str) -> dict:
if not isSGWCFormat(qr_content):
return {'errorID': 60001}
try:
result = json.loads(implAimeDB(qr_content))
logger.info(f"QRScan Got Response {result}")
except:
return {'errorID': 60002}
return result

269
API_TitleServer.py Normal file
View File

@@ -0,0 +1,269 @@
import zlib
import hashlib
import httpx
from loguru import logger
import random
import time
from ctypes import c_int32
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad, unpad
from Config import *
from typing import Optional
import certifi
use2024Api = False
if use2024Api:
AesKey = "n7bx6:@Fg_:2;5E89Phy7AyIcpxEQ:R@"
AesIV = ";;KjR1C3hgB1ovXa"
ObfuscateParam = "BEs2D5vW"
else:
AesKey = "a>32bVP7v<63BVLkY[xM>daZ1s9MBP<R"
AesIV = "d6xHIKq]1J]Dt^ue"
ObfuscateParam = "B44df8yT"
class SDGBApiError(Exception):
pass
class SDGBRequestError(SDGBApiError):
pass
class SDGBResponseError(SDGBApiError):
pass
class aes_pkcs7(object):
def __init__(self, key: str, iv: str):
self.key = key.encode('utf-8')
self.iv = iv.encode('utf-8')
self.mode = AES.MODE_CBC
def encrypt(self, content: bytes) -> bytes:
cipher = AES.new(self.key, self.mode, self.iv)
content_padded = pad(content, AES.block_size)
encrypted_bytes = cipher.encrypt(content_padded)
return encrypted_bytes
def decrypt(self, content):
cipher = AES.new(self.key, self.mode, self.iv)
decrypted_padded = cipher.decrypt(content)
decrypted = unpad(decrypted_padded, AES.block_size)
return decrypted
def pkcs7unpadding(self, text):
length = len(text)
unpadding = ord(text[length - 1])
return text[0:length - unpadding]
def pkcs7padding(self, text):
bs = 16
length = len(text)
bytes_length = len(text.encode('utf-8'))
padding_size = length if (bytes_length == length) else bytes_length
padding = bs - padding_size % bs
padding_text = chr(padding) * padding
return text + padding_text
def getSDGBApiHash(api):
return hashlib.md5((api+"MaimaiChn"+ObfuscateParam).encode()).hexdigest()
def apiSDGB(
data: str,
targetApi: str,
userAgentExtraData: str,
noLog: bool = False,
timeout: int = 5,
maxRetries: int = 3,
) -> str:
agentExtra = str(userAgentExtraData)
aes = aes_pkcs7(AesKey, AesIV)
endpoint = "https://maimai-gm.wahlap.com:42081/Maimai2Servlet/"
requestDataFinal = aes.encrypt(zlib.compress(data.encode('utf-8')))
if not noLog:
logger.debug(f"[Stage 1] 准备开始请求 {targetApi},以 {data}")
retries = 0
while retries < maxRetries:
try:
if useProxy and proxyUrl:
logger.debug("使用代理")
httpClient = httpx.Client(proxy=proxyUrl, verify=False)
else:
httpClient = httpx.Client(verify=False)
response = httpClient.post(
url=endpoint + getSDGBApiHash(targetApi),
headers={
"User-Agent": f"{getSDGBApiHash(targetApi)}#{agentExtra}",
"Content-Type": "application/json",
"Mai-Encoding": "1.50",
"Accept-Encoding": "",
"Charset": "UTF-8",
"Content-Encoding": "deflate",
"Expect": "100-continue"
},
content=requestDataFinal,
timeout=timeout
)
if not noLog:
logger.info(f"[Stage 2] {targetApi} 请求结果: {response.status_code}")
if response.status_code != 200:
errorMessage = f"[Stage 2] 请求失败: {response.status_code}"
logger.error(errorMessage)
raise SDGBRequestError(errorMessage)
responseContentRaw = response.content
try:
responseContentDecrypted = aes.decrypt(responseContentRaw)
if not noLog:
logger.debug("[Stage 3] Decryption SUCCESS.")
except Exception as e:
logger.warning(f"[Stage 3] Decryption FAILED. Raw Content: {responseContentRaw}, Error: {e}")
raise SDGBResponseError("Decryption failed")
try:
if responseContentDecrypted.startswith(b'\x78\x9c'):
logger.debug("[Stage 4] Zlib detected, decompressing...")
responseContentFinal = zlib.decompress(responseContentDecrypted).decode('utf-8')
else:
logger.warning(f"[Stage 4] Not Zlib Format!! using raw content: {responseContentDecrypted}")
responseContentFinal = responseContentDecrypted.decode('utf-8')
if not noLog:
logger.debug(f"[Stage 4] Process OK, Content: {responseContentFinal}")
if responseContentFinal.startswith('{') and responseContentFinal.endswith('}'):
logger.debug("[Stage 5] Response is JSON, returning.")
return responseContentFinal
else:
logger.warning("[Stage 5] Response is not JSON, returning as is, take care!")
return responseContentFinal
except:
logger.warning(f"解压失败,原始响应: {responseContentDecrypted}")
raise SDGBResponseError("解压失败")
except SDGBRequestError as e:
logger.error(f"请求格式错误: {e}")
raise
except SDGBResponseError as e:
logger.warning(f"响应错误,将重试: {e}")
retries += 1
time.sleep(2)
except Exception as e:
logger.warning(f"请求失败,将重试: {e}")
retries += 1
time.sleep(2)
finally:
if 'httpClient' in locals():
httpClient.close()
raise SDGBApiError("重试多次仍然无法成功请求服务器")
def calcPlaySpecial():
rng = random.SystemRandom()
num2 = rng.randint(1, 1037933) * 2069
num2 += 1024 #GameManager.CalcSpecialNum()
num2 = c_int32(num2).value
result = c_int32(0)
for _ in range(32):
result.value <<= 1
result.value += num2 & 1
num2 >>= 1
return c_int32(result.value).value
class AESPKCS7_2024:
def __init__(self, key: str, iv: str):
self.key = key.encode('utf-8')
self.iv = iv.encode('utf-8')
self.mode = AES.MODE_CBC
def encrypt(self, content) -> bytes:
# if content is str, convert to bytes
if isinstance(content, str):
encodedData = content.encode('utf-8')
cipher = AES.new(self.key, self.mode, self.iv)
content_padded = pad(encodedData, AES.block_size)
encrypted_bytes = cipher.encrypt(content_padded)
return encrypted_bytes
def decrypt(self, encrypted_content: bytes) -> str:
cipher = AES.new(self.key, self.mode, self.iv)
decrypted_padded = cipher.decrypt(encrypted_content)
decrypted = unpad(decrypted_padded, AES.block_size)
return decrypted
def apiSDGB_2024(data:str, targetApi:str, userAgentExtraData:str, noLog:bool=False, timeout:int=5):
maxRetries = 3
agentExtra = str(userAgentExtraData)
aes = AESPKCS7_2024(AesKey, AesIV)
reqData_encrypted = aes.encrypt(data)
reqData_deflated = zlib.compress(reqData_encrypted)
endpoint = "https://maimai-gm.wahlap.com:42081/Maimai2Servlet/"
if not noLog:
logger.debug(f"开始请求 {targetApi},以 {data}")
retries = 0
while retries < maxRetries:
try:
if useProxy:
logger.debug("使用代理")
httpClient = httpx.Client(proxy=proxyUrl, verify=False)
else:
logger.debug("不使用代理")
httpClient = httpx.Client(verify=False)
responseOriginal = httpClient.post(
url=endpoint + getSDGBApiHash(targetApi),
headers={
"User-Agent": f"{getSDGBApiHash(targetApi)}#{agentExtra}",
"Content-Type": "application/json",
"Mai-Encoding": "1.40",
"Accept-Encoding": "",
"Charset": "UTF-8",
"Content-Encoding": "deflate",
"Expect": "100-continue"
},
content=reqData_deflated,
timeout=timeout
)
if not noLog:
logger.info(f"{targetApi} 请求结果: {responseOriginal.status_code}")
if responseOriginal.status_code == 200:
logger.debug("200 OK!")
else:
errorMessage = f"请求失败: {responseOriginal.status_code}"
logger.error(errorMessage)
raise SDGBRequestError(errorMessage)
responseRAWContent = responseOriginal.content
try:
responseDecompressed = zlib.decompress(responseRAWContent)
logger.debug("成功解压响应!")
except:
logger.warning(f"无法解压,得到的原始响应: {responseRAWContent}")
raise SDGBResponseError("解压失败")
try:
resultResponse = aes.decrypt(responseDecompressed)
logger.debug(f"成功解密响应!")
except:
logger.warning(f"解密失败,得到的原始响应: {responseDecompressed}")
raise SDGBResponseError("解密失败")
if not noLog:
logger.debug(f"响应: {resultResponse}")
return resultResponse
except SDGBRequestError as e:
raise SDGBRequestError("请求格式错误")
except SDGBResponseError as e:
logger.warning(f"将重试一次 Resp Err: {e}")
retries += 2
time.sleep(2)
except Exception as e:
logger.warning(f"将开始重试请求. {e}")
retries += 1
time.sleep(2)
raise SDGBApiError("重试多次仍然无法成功请求服务器")

30
ActionChangeVersion.py Normal file
View File

@@ -0,0 +1,30 @@
from loguru import logger
from Config import *
from HelperLogInOut import apiLogin, apiLogout, generateTimestamp
from HelperFullPlay import implFullPlayAction, generateMusicData
from HelperGetUserThing import implGetUser_
def implWipeTickets(userId: int, currentLoginTimestamp:int, currentLoginResult) -> str:
currentUserCharge = implGetUser_("Charge", userId)
currentUserChargeList = currentUserCharge['userChargeList']
for charge in currentUserChargeList:
charge['stock'] = 0
musicData = generateMusicData()
userAllPatches = {
"upsertUserAll": {
# "userData": [{
# "lastRomVersion": romVersion,
# "lastDataVersion": dataVersion
# }],
"userChargeList": currentUserChargeList,
"userMusicDetailList": [musicData],
"isNewMusicDetailList": "1"
}}
result = implFullPlayAction(userId, currentLoginTimestamp, currentLoginResult, musicData, userAllPatches)
return result

45
ActionScoreRecord.py Normal file
View File

@@ -0,0 +1,45 @@
from loguru import logger
from Config import *
from HelperLogInOut import apiLogin, apiLogout, generateTimestamp
from HelperFullPlay import implFullPlayAction
def implDeleteMusicRecord(userId: int, currentLoginTimestamp:int, currentLoginResult, musicId:int, levelId:int) -> str:
musicData= ({
"musicId": musicId,
"level": levelId,
"playCount": 1,
"achievement": 0,
"comboStatus": 0,
"syncStatus": 0,
"deluxscoreMax": 0,
"scoreRank": 0,
"extNum1": 0
})
userAllPatches = {
"upsertUserAll": {
"userMusicDetailList": [musicData],
"isNewMusicDetailList": "0"
}}
result = implFullPlayAction(userId, currentLoginTimestamp, currentLoginResult, musicData, userAllPatches)
return result
def implUploadMusicRecord(userId: int, currentLoginTimestamp:int, currentLoginResult, musicId:int, levelId:int, achievement:int, dxScore:int) -> str:
musicData= ({
"musicId": musicId,
"level": levelId,
"playCount": 1,
"achievement": achievement,
"comboStatus": 0,
"syncStatus": 0,
"deluxscoreMax": dxScore,
"scoreRank": 0,
"extNum1": 0
})
userAllPatches = {
"upsertUserAll": {
"userMusicDetailList": [musicData],
"isNewMusicDetailList": "1"
}}
result = implFullPlayAction(userId, currentLoginTimestamp, currentLoginResult, musicData, userAllPatches)
return result

37
ActionUnlockItem.py Normal file
View File

@@ -0,0 +1,37 @@
from loguru import logger
from Config import *
from HelperLogInOut import apiLogin, apiLogout, generateTimestamp
from HelperUnlockThing import implUnlockThing
def implUnlockSingleItem(itemId: int, itemKind: int, userId: int, currentLoginTimestamp:int, currentLoginResult) -> str:
userItemList = [
{
"itemKind": itemKind,
"itemId": itemId,
"stock": 1,
"isValid": True
}
]
unlockThingResult = implUnlockThing(userItemList, userId, currentLoginTimestamp, currentLoginResult)
return unlockThingResult
def implUnlockMusic(musicToBeUnlocked: int, userId: int, currentLoginTimestamp:int, currentLoginResult) -> str:
userItemList = [
{
"itemKind": 5,
"itemId": musicToBeUnlocked,
"stock": 1,
"isValid": True
},
{
"itemKind": 6,
"itemId": musicToBeUnlocked,
"stock": 1,
"isValid": True
},
]
unlockThingResult = implUnlockThing(userItemList, userId, currentLoginTimestamp, currentLoginResult)
return unlockThingResult

73
ChargeTicket.py Normal file
View File

@@ -0,0 +1,73 @@
import rapidjson as json
import pytz
from datetime import datetime, timedelta
from Config import *
from API_TitleServer import apiSDGB
from HelperGetUserThing import implGetUser_
from loguru import logger
from HelperLogInOut import apiLogin, apiLogout, generateTimestamp
from HelperFullPlay import implFullPlayAction, generateMusicData
from HelperGetUserThing import implGetUser_
def implWipeTickets(userId: int, currentLoginTimestamp:int, currentLoginResult) -> str:
currentUserCharge = implGetUser_("Charge", userId)
currentUserChargeList = currentUserCharge['userChargeList']
for charge in currentUserChargeList:
charge['stock'] = 0
musicData = generateMusicData()
userAllPatches = {
"upsertUserAll": {
"userChargeList": currentUserChargeList,
"userMusicDetailList": [musicData],
"isNewMusicDetailList": "1"
}}
result = implFullPlayAction(userId, currentLoginTimestamp, currentLoginResult, musicData, userAllPatches)
return result
def apiQueryTicket(userId:int) -> str:
data = json.dumps({
"userId": userId
})
userdata_result = apiSDGB(data, "GetUserChargeApi", userId)
return userdata_result
def apiBuyTicket(userId:int, ticketType:int, price:int, playerRating:int, playCount:int) -> str:
nowTime = datetime.now(pytz.timezone('Asia/Shanghai'))
data = json.dumps({
"userId": userId,
"userChargelog": {
"chargeId": ticketType,
"price": price,
"purchaseDate": nowTime.strftime("%Y-%m-%d %H:%M:%S.0"),
"playCount": playCount,
"playerRating": playerRating,
"placeId": placeId,
"regionId": regionId,
"clientId": clientId
},
"userCharge": {
"chargeId": ticketType,
"stock": 1,
"purchaseDate": nowTime.strftime("%Y-%m-%d %H:%M:%S.0"),
"validDate": (nowTime + timedelta(days=90)).replace(hour=4, minute=0, second=0).strftime("%Y-%m-%d %H:%M:%S")
}
})
return apiSDGB(data, "UpsertUserChargelogApi", userId)
def implBuyTicket(userId:int, ticketType:int):
currentUserData = implGetUser_("Data", userId)
if currentUserData:
playerRating = currentUserData['userData']['playerRating']
playCount = currentUserData['userData'].get('playCount', 0)
else:
return False
getTicketResponseStr = apiBuyTicket(userId, ticketType, ticketType-1, playerRating, playCount)
return getTicketResponseStr

15
Config.py Normal file
View File

@@ -0,0 +1,15 @@
regionId =
regionName = ""
placeId =
placeName = ""
clientId = ""
useProxy = False
proxyUrl = ""
loginBonusDBPath = "./Data/loginBonusDB.xml"
musicDBPath = "./Data/musicDB.json"
loginBonusDBPathFallback = "./maimaiDX-Api/Data/loginBonusDB.xml"
musicDBPathFallback = "./maimaiDX-Api/Data/musicDB.json"

13
GetPreview.py Normal file
View File

@@ -0,0 +1,13 @@
import rapidjson as json
from API_TitleServer import apiSDGB
from Config import *
import time
import random
from loguru import logger
def apiGetUserPreview(userId, noLog:bool=False) -> str:
data = json.dumps({
"userId": int(userId)
})
preview_result = apiSDGB(data, "GetUserPreviewApi", userId, noLog)
return preview_result

70
HelperFullPlay.py Normal file
View File

@@ -0,0 +1,70 @@
import rapidjson as json
from loguru import logger
from Config import *
from API_TitleServer import *
from HelperGetUserThing import implGetUser_
from HelperUploadUserPlayLog import apiUploadUserPlaylog
from HelperUserAll import generateFullUserAll
def generateMusicData():
return {
"musicId": 0,
"level": 0,
"playCount": 0,
"achievement": 0,
"comboStatus": 0,
"syncStatus": 0,
"deluxscoreMax": 0,
"scoreRank": 0,
"extNum1": 0
}
def applyUserAllPatches(userAll, patches):
for key, value in patches.items():
if isinstance(value, dict) and key in userAll and isinstance(userAll[key], dict):
applyUserAllPatches(userAll[key], value)
elif isinstance(value, list) and key in userAll and isinstance(userAll[key], list):
for i, patch_item in enumerate(value):
if i < len(userAll[key]) and isinstance(patch_item, dict) and isinstance(userAll[key][i], dict):
applyUserAllPatches(userAll[key][i], patch_item)
elif i >= len(userAll[key]):
userAll[key].append(patch_item)
else:
userAll[key] = value
def implFullPlayAction(userId: int, currentLoginTimestamp:int, currentLoginResult, musicData, userAllPatches, debugMode=False):
currentUserData = implGetUser_("Data", userId)
currentUserData2 = currentUserData['userData']
currentUploadUserPlaylogApiResult = apiUploadUserPlaylog(userId, musicData, currentUserData2, currentLoginResult['loginId'])
logger.debug(f"上传 UserPlayLog 结果: {currentUploadUserPlaylogApiResult}")
retries = 0
while retries < 3:
currentPlaySpecial = calcPlaySpecial()
currentUserAll = generateFullUserAll(userId, currentLoginResult, currentLoginTimestamp, currentUserData2, currentPlaySpecial)
applyUserAllPatches(currentUserAll, userAllPatches)
if debugMode:
logger.debug("调试模式:构建出的 UserAll 数据:" + json.dumps(currentUserAll, indent=4))
logger.info("Bye!")
return
data = json.dumps(currentUserAll)
try:
currentUserAllResult = json.loads(apiSDGB(data, "UpsertUserAllApi", userId))
except SDGBRequestError:
logger.warning("上传 UserAll 出现 500. 重建数据.")
retries += 1
continue
except Exception:
raise SDGBApiError("邪门错误")
break
else:
raise SDGBRequestError
logger.info("上机:结果:"+ str(currentUserAllResult))
return currentUserAllResult

View File

@@ -0,0 +1,43 @@
from API_TitleServer import *
from HelperLogInOut import apiLogin, apiLogout, generateTimestamp
from Config import *
import rapidjson as json
from HelperMusicDB import getMusicTitle
from loguru import logger
import sys
def getUserMusicDetail(userId:int, nextIndex:int=0, maxCount:int=50) -> dict:
data = json.dumps({
"userId": int(userId),
"nextIndex": nextIndex,
"maxCount": maxCount
})
return json.loads(apiSDGB(data, "GetUserMusicApi", userId))
def getUserFullMusicDetail(userId: int):
currentUserMusicDetailList = []
nextIndex:int|None = None
while nextIndex != 0 or nextIndex is None:
userMusicResponse = getUserMusicDetail(userId, nextIndex or 0)
nextIndex = userMusicResponse['nextIndex']
logger.info(f"NextIndex: {nextIndex}")
if not userMusicResponse['userMusicList']:
break
for currentMusic in userMusicResponse['userMusicList']:
for currentMusicDetail in currentMusic['userMusicDetailList']:
if not currentMusicDetail['playCount'] > 0:
continue
currentUserMusicDetailList.append(currentMusicDetail)
return currentUserMusicDetailList
def parseUserFullMusicDetail(userFullMusicDetailList: list):
musicDetailList = []
for currentMusicDetail in userFullMusicDetailList:
musicDetailList.append({
'歌名': getMusicTitle(currentMusicDetail['musicId']),
'难度': currentMusicDetail['level'],
'分数': currentMusicDetail['achievement'] / 10000,
'DX分数': currentMusicDetail['deluxscoreMax']
})
return musicDetailList

16
HelperGetUserThing.py Normal file
View File

@@ -0,0 +1,16 @@
from loguru import logger
import rapidjson as json
from API_TitleServer import apiSDGB
def implGetUser_(thing:str, userId:int, noLog=False) -> dict:
result = apiGetUserThing(userId, thing, noLog)
userthingDict = json.loads(result)
return userthingDict
def apiGetUserThing(userId:int, thing:str, noLog=False) -> str:
data = json.dumps({
"userId": userId
})
userthing_result = apiSDGB(data, "GetUser" + thing + "Api", userId, noLog)
return userthing_result

49
HelperLogInOut.py Normal file
View File

@@ -0,0 +1,49 @@
import rapidjson as json
import time
from loguru import logger
import random
from Config import *
from API_TitleServer import apiSDGB
def apiLogin(timestamp:int, userId:int, noLog:bool=False) -> dict:
data = json.dumps({
"userId": userId,
"accessCode": "",
"regionId": regionId,
"placeId": placeId,
"clientId": clientId,
"dateTime": timestamp,
"isContinue": False,
"genericFlag": 0,
})
login_result = json.loads(apiSDGB(data, "UserLoginApi", userId, noLog))
if not noLog:
logger.info("登录:结果:"+ str(login_result))
return login_result
def apiLogout(timestamp:int, userId:int, noLog:bool=False) -> dict:
data = json.dumps({
"userId": userId,
"accessCode": "",
"regionId": regionId,
"placeId": placeId,
"clientId": clientId,
"dateTime": timestamp,
"type": 1
})
logout_result = json.loads(apiSDGB(data, "UserLogoutApi", userId, noLog))
if not noLog:
logger.info("登出:结果:"+ str(logout_result))
return logout_result
def generateTimestampLegacy() -> int:
timestamp = int(time.time()) - 60
logger.info(f"生成时间戳: {timestamp}")
return timestamp
def generateTimestamp() -> int:
timestamp = int(time.mktime(time.strptime(time.strftime("%Y-%m-%d 10:00:00"), "%Y-%m-%d %H:%M:%S"))) + random.randint(-600, 600)
logger.info(f"生成时间戳: {timestamp}")
logger.info(f"此时间戳对应的时间为: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(timestamp))}")
return timestamp

11
HelperMusicDB.py Normal file
View File

@@ -0,0 +1,11 @@
from MusicDB import musicDB
from loguru import logger
def getMusicTitle(musicId: int) -> str:
musicInfo = musicDB.get(musicId)
if not musicInfo:
logger.warning(f"数据库里未找到此歌曲: {musicId}")
return "R_ERR_MUSIC_ID_NOT_IN_DATABASE"
musicName = musicInfo.get("name")
return musicName

74
HelperUnlockThing.py Normal file
View File

@@ -0,0 +1,74 @@
from loguru import logger
from Config import *
from HelperFullPlay import implFullPlayAction
def implUnlockThing(newUserItemList, userId: int, currentLoginTimestamp:int, currentLoginResult) -> str:
musicData= ({
"musicId": 0,
"level": 0,
"playCount": 0,
"achievement": 0,
"comboStatus": 0,
"syncStatus": 0,
"deluxscoreMax": 0,
"scoreRank": 0,
"extNum1": 0
})
userAllPatches = {
"upsertUserAll": {
"userMusicDetailList": [musicData],
"isNewMusicDetailList": "1",
"userItemList": newUserItemList,
"isNewItemList": "1" * len(newUserItemList)
}}
result = implFullPlayAction(userId, currentLoginTimestamp, currentLoginResult, musicData, userAllPatches)
return result
itemKindDict = {
"PLATE": 1,
"TITLE": 2,
"ICON": 3,
"MUSIC": 5,
"MUSIC_MASTER": 6,
"MUSIC_RE_MASTER": 7,
"CHARACTER": 9,
"PARTNER": 10,
"FRAME": 11,
"TICKET": 12
}
itemKindzhCNDict = {
"姓名框": "PLATE",
"称号": "TITLE",
"头像": "ICON",
"": "MUSIC",
"紫谱": "MUSIC_MASTER",
"白谱": "MUSIC_RE_MASTER",
"旅行伙伴": "CHARACTER",
"搭档": "PARTNER",
"背景板": "FRAME",
"功能票": "TICKET"
}
partnerList = {
"1": "迪拉熊",
"17": "青柠熊&柠檬熊",
"11": "乙姫",
"12": "拉兹",
"13": "雪纺",
"14": "莎露朵",
"15": "夏玛",
"16": "咪璐库",
"18": "乙姫Splash",
"19": "夏玛UNiVERSE",
"20": "咪璐库UNiVERSE",
"21": "小咪璐库",
"22": "百合咲美香",
"23": "拉兹2023",
"24": "雪纺2023",
"25": "莎露朵2023",
"26": "黒姫",
"27": "俊达萌",
"28": "乙姫2024",
"29": "青柠熊柠檬熊2024"
}

131
HelperUploadUserPlayLog.py Normal file
View File

@@ -0,0 +1,131 @@
import rapidjson as json
import pytz
import time
import random
from datetime import datetime
from loguru import logger
from API_TitleServer import apiSDGB
from Config import *
def apiUploadUserPlaylog(userId:int, musicDataToBeUploaded, currentUserData2, loginId:int) -> str:
data = json.dumps({
"userId": int(userId),
"userPlaylogList": [
{
"userId": 0,
"orderId": 0,
"playlogId": loginId,
"version": 1051000,
"placeId": placeId,
"placeName": placeName,
"loginDate": int(time.time()),
"playDate": datetime.now(pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d'),
"userPlayDate": datetime.now(pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d %H:%M:%S') + '.0',
"type": 0,
"musicId": int(musicDataToBeUploaded['musicId']),
"level": int(musicDataToBeUploaded['level']),
"trackNo": 1,
"vsMode": 0,
"vsUserName": "",
"vsStatus": 0,
"vsUserRating": 0,
"vsUserAchievement": 0,
"vsUserGradeRank": 0,
"vsRank": 0,
"playerNum": 1,
"playedUserId1": 0,
"playedUserName1": "",
"playedMusicLevel1": 0,
"playedUserId2": 0,
"playedUserName2": "",
"playedMusicLevel2": 0,
"playedUserId3": 0,
"playedUserName3": "",
"playedMusicLevel3": 0,
"characterId1": currentUserData2['charaSlot'][0],
"characterLevel1": random.randint(1000,6500),
"characterAwakening1": 5,
"characterId2": currentUserData2['charaSlot'][1],
"characterLevel2": random.randint(1000,6500),
"characterAwakening2": 5,
"characterId3": currentUserData2['charaSlot'][2],
"characterLevel3": random.randint(1000,6500),
"characterAwakening3": 5,
"characterId4": currentUserData2['charaSlot'][3],
"characterLevel4": random.randint(1000,6500),
"characterAwakening4": 5,
"characterId5": currentUserData2['charaSlot'][4],
"characterLevel5": random.randint(1000,6500),
"characterAwakening5": 5,
"achievement": int(musicDataToBeUploaded['achievement']),
"deluxscore": int(musicDataToBeUploaded['deluxscoreMax']),
"scoreRank": int(musicDataToBeUploaded['scoreRank']),
"maxCombo": 0,
"totalCombo": random.randint(700,900),
"maxSync": 0,
"totalSync": 0,
"tapCriticalPerfect": 0,
"tapPerfect": 0,
"tapGreat": 0,
"tapGood": 0,
"tapMiss": random.randint(1,10),
"holdCriticalPerfect": 0,
"holdPerfect": 0,
"holdGreat": 0,
"holdGood": 0,
"holdMiss": random.randint(1,15),
"slideCriticalPerfect": 0,
"slidePerfect": 0,
"slideGreat": 0,
"slideGood": 0,
"slideMiss": random.randint(1,15),
"touchCriticalPerfect": 0,
"touchPerfect": 0,
"touchGreat": 0,
"touchGood": 0,
"touchMiss": random.randint(1,15),
"breakCriticalPerfect": 0,
"breakPerfect": 0,
"breakGreat": 0,
"breakGood": 0,
"breakMiss": random.randint(1,15),
"isTap": True,
"isHold": True,
"isSlide": True,
"isTouch": True,
"isBreak": True,
"isCriticalDisp": True,
"isFastLateDisp": True,
"fastCount": 0,
"lateCount": 0,
"isAchieveNewRecord": True,
"isDeluxscoreNewRecord": True,
"comboStatus": 0,
"syncStatus": 0,
"isClear": False,
"beforeRating": currentUserData2['playerRating'],
"afterRating": currentUserData2['playerRating'],
"beforeGrade": 0,
"afterGrade": 0,
"afterGradeRank": 1,
"beforeDeluxRating": currentUserData2['playerRating'],
"afterDeluxRating": currentUserData2['playerRating'],
"isPlayTutorial": False,
"isEventMode": False,
"isFreedomMode": False,
"playMode": 0,
"isNewFree": False,
"trialPlayAchievement": -1,
"extNum1": 0,
"extNum2": 0,
"extNum4": 3020,
"extBool1": False,
"extBool2": False
}
]
})
result = apiSDGB(data, "UploadUserPlaylogListApi", userId)
logger.info("上传游玩记录:结果:"+ str(result))
return result

198
HelperUserAll.py Normal file
View File

@@ -0,0 +1,198 @@
import pytz
from datetime import datetime
from Config import *
from HelperGetUserThing import implGetUser_
from HelperGetUserMusicDetail import getUserMusicDetail
from loguru import logger
def isNewMusicType(userId, musicId, level) -> str:
userMusicDetailList = getUserMusicDetail(userId, musicId, 1)['userMusicList'][0]['userMusicDetailList']
logger.info(userMusicDetailList)
try:
if userMusicDetailList[0]['musicId'] == musicId and userMusicDetailList[0]['level'] == level:
logger.info(f"We think {musicId} Level {level} should use EDIT.")
return "0"
except:
return "1"
def generateFullUserAll(userId, currentLoginResult, currentLoginTimestamp, currentUserData2, currentPlaySpecial):
currentUserAll = generateUserAllData(userId, currentLoginResult, currentLoginTimestamp, currentUserData2, currentPlaySpecial)
currentUserExtend = implGetUser_("Extend", userId, True)
currentUserOption = implGetUser_("Option", userId, True)
currentUserRating = implGetUser_("Rating", userId, True)
currentUserActivity = implGetUser_("Activity", userId, True)
currentUserCharge = implGetUser_("Charge", userId, True)
currentUserMissionData = implGetUser_("MissionData", userId, True)
currentUserAll['upsertUserAll']['userExtend'] = [currentUserExtend['userExtend']]
currentUserAll['upsertUserAll']['userOption'] = [currentUserOption['userOption']]
currentUserAll['upsertUserAll']['userRatingList'] = [currentUserRating['userRating']]
currentUserAll['upsertUserAll']['userActivityList'] = [currentUserActivity['userActivity']]
currentUserAll['upsertUserAll']['userChargeList'] = currentUserCharge['userChargeList']
currentUserAll['upsertUserAll']['userWeeklyData'] = currentUserMissionData['userWeeklyData']
return currentUserAll
def generateUserAllData(userId, currentLoginResult, currentLoginTimestamp, currentUserData2, currentPlaySpecial):
data = {
"userId": userId,
"playlogId": currentLoginResult['loginId'],
"isEventMode": False,
"isFreePlay": False,
"upsertUserAll": {
"userData": [
{
"accessCode": "",
"userName": currentUserData2['userName'],
"isNetMember": 1,
"point": currentUserData2['point'],
"totalPoint": currentUserData2['totalPoint'],
"iconId": currentUserData2['iconId'],
"plateId": currentUserData2['plateId'],
"titleId": currentUserData2['titleId'],
"partnerId": currentUserData2['partnerId'],
"frameId": currentUserData2['frameId'],
"selectMapId": currentUserData2['selectMapId'],
"totalAwake": currentUserData2['totalAwake'],
"gradeRating": currentUserData2['gradeRating'],
"musicRating": currentUserData2['musicRating'],
"playerRating": currentUserData2['playerRating'],
"highestRating": currentUserData2['highestRating'],
"gradeRank": currentUserData2['gradeRank'],
"classRank": currentUserData2['classRank'],
"courseRank": currentUserData2['courseRank'],
"charaSlot": currentUserData2['charaSlot'],
"charaLockSlot": currentUserData2['charaLockSlot'],
"contentBit": currentUserData2['contentBit'],
"playCount": currentUserData2['playCount'],
"currentPlayCount": currentUserData2['currentPlayCount'],
"renameCredit": 0,
"mapStock": currentUserData2['mapStock'],
"eventWatchedDate": currentUserData2['eventWatchedDate'],
"lastGameId": "SDGB",
"lastRomVersion": currentUserData2['lastRomVersion'],
"lastDataVersion": currentUserData2['lastDataVersion'],
"lastLoginDate": currentUserData2['lastLoginDate'],
"lastPlayDate": datetime.now(pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d %H:%M:%S') + '.0',
"lastPlayCredit": 1,
"lastPlayMode": 0,
"lastPlaceId": placeId,
"lastPlaceName": placeName,
"lastAllNetId": 0,
"lastRegionId": regionId,
"lastRegionName": regionName,
"lastClientId": clientId,
"lastCountryCode": "CHN",
"lastSelectEMoney": 0,
"lastSelectTicket": 0,
"lastSelectCourse": currentUserData2['lastSelectCourse'],
"lastCountCourse": 0,
"firstGameId": "SDGB",
"firstRomVersion": currentUserData2['firstRomVersion'],
"firstDataVersion": currentUserData2['firstDataVersion'],
"firstPlayDate": currentUserData2['firstPlayDate'],
"compatibleCmVersion": currentUserData2['compatibleCmVersion'],
"dailyBonusDate": currentUserData2['dailyBonusDate'],
"dailyCourseBonusDate": currentUserData2['dailyCourseBonusDate'],
"lastPairLoginDate": currentUserData2['lastPairLoginDate'],
"lastTrialPlayDate": currentUserData2['lastTrialPlayDate'],
"playVsCount": 0,
"playSyncCount": 0,
"winCount": 0,
"helpCount": 0,
"comboCount": 0,
"totalDeluxscore": currentUserData2['totalDeluxscore'],
"totalBasicDeluxscore": currentUserData2['totalBasicDeluxscore'],
"totalAdvancedDeluxscore": currentUserData2['totalAdvancedDeluxscore'],
"totalExpertDeluxscore": currentUserData2['totalExpertDeluxscore'],
"totalMasterDeluxscore": currentUserData2['totalMasterDeluxscore'],
"totalReMasterDeluxscore": currentUserData2['totalReMasterDeluxscore'],
"totalSync": currentUserData2['totalSync'],
"totalBasicSync": currentUserData2['totalBasicSync'],
"totalAdvancedSync": currentUserData2['totalAdvancedSync'],
"totalExpertSync": currentUserData2['totalExpertSync'],
"totalMasterSync": currentUserData2['totalMasterSync'],
"totalReMasterSync": currentUserData2['totalReMasterSync'],
"totalAchievement": currentUserData2['totalAchievement'],
"totalBasicAchievement": currentUserData2['totalBasicAchievement'],
"totalAdvancedAchievement": currentUserData2['totalAdvancedAchievement'],
"totalExpertAchievement": currentUserData2['totalExpertAchievement'],
"totalMasterAchievement": currentUserData2['totalMasterAchievement'],
"totalReMasterAchievement": currentUserData2['totalReMasterAchievement'],
"playerOldRating": currentUserData2['playerOldRating'],
"playerNewRating": currentUserData2['playerNewRating'],
"banState": 0,
"friendRegistSkip": currentUserData2['friendRegistSkip'],
"dateTime": currentLoginTimestamp
}
],
"userExtend": [],
"userOption": [],
"userGhost": [],
"userCharacterList": [],
"userMapList": [],
"userLoginBonusList": [],
"userRatingList": [],
"userItemList": [],
"userMusicDetailList": [],
"userCourseList": [],
"userFriendSeasonRankingList": [],
"userChargeList": [],
"userFavoriteList": [],
"userActivityList": [],
"userMissionDataList": [],
"userWeeklyData": [],
"userGamePlaylogList": [
{
"playlogId": currentLoginResult['loginId'],
"version": "1.51.00",
"playDate": datetime.now(pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d %H:%M:%S') + '.0',
"playMode": 0,
"useTicketId": -1,
"playCredit": 1,
"playTrack": 1,
"clientId": clientId,
"isPlayTutorial": False,
"isEventMode": False,
"isNewFree": False,
"playCount": currentUserData2['playCount'],
"playSpecial": currentPlaySpecial,
"playOtherUserId": 0
}
],
"user2pPlaylog": {
"userId1": 0,
"userId2": 0,
"userName1": "",
"userName2": "",
"regionId": 0,
"placeId": 0,
"user2pPlaylogDetailList": []
},
"userIntimateList": [],
"userShopItemStockList": [],
"userGetPointList": [],
"userTradeItemList": [],
"userFavoritemusicList": [],
"userKaleidxScopeList": [],
"isNewCharacterList": "",
"isNewMapList": "",
"isNewLoginBonusList": "",
"isNewItemList": "",
"isNewMusicDetailList": "",
"isNewCourseList": "0",
"isNewFavoriteList": "",
"isNewFriendSeasonRankingList": "",
"isNewUserIntimateList": "",
"isNewFavoritemusicList": "",
"isNewKaleidxScopeList": ""
}
}
return data

19
MusicDB.py Normal file
View File

@@ -0,0 +1,19 @@
import rapidjson as json
from Config import *
from typing import Dict, Union
MusicDBType = Dict[int, Dict[str, Union[int, str]]]
__all__ = ['musicDB']
try:
with open(musicDBPath, 'r', encoding='utf-8') as f:
data = json.load(f)
except FileNotFoundError:
try:
with open(musicDBPathFallback, 'r', encoding='utf-8') as f:
data = json.load(f)
except:
raise FileNotFoundError("musicDB.json 文件不存在!")
musicDB: MusicDBType = {int(k): v for k, v in data.items()}

View File

@@ -1,3 +1,5 @@
# SDGB-API
舞萌dx2025部分API
某游戏的API请自行配置使用
开源协议GPL V3