270 lines
9.7 KiB
Python
270 lines
9.7 KiB
Python
import zlib
|
|
import hashlib
|
|
import httpx
|
|
from loguru import logger
|
|
import random
|
|
import time
|
|
from ctypes import c_int32
|
|
from Crypto.Cipher import AES
|
|
from Crypto.Util.Padding import pad, unpad
|
|
from Config import *
|
|
from typing import Optional
|
|
import certifi
|
|
|
|
use2024Api = False
|
|
|
|
if use2024Api:
|
|
AesKey = "n7bx6:@Fg_:2;5E89Phy7AyIcpxEQ:R@"
|
|
AesIV = ";;KjR1C3hgB1ovXa"
|
|
ObfuscateParam = "BEs2D5vW"
|
|
else:
|
|
AesKey = "a>32bVP7v<63BVLkY[xM>daZ1s9MBP<R"
|
|
AesIV = "d6xHIKq]1J]Dt^ue"
|
|
ObfuscateParam = "B44df8yT"
|
|
|
|
class SDGBApiError(Exception):
|
|
pass
|
|
|
|
class SDGBRequestError(SDGBApiError):
|
|
pass
|
|
|
|
class SDGBResponseError(SDGBApiError):
|
|
pass
|
|
|
|
class aes_pkcs7(object):
|
|
def __init__(self, key: str, iv: str):
|
|
self.key = key.encode('utf-8')
|
|
self.iv = iv.encode('utf-8')
|
|
self.mode = AES.MODE_CBC
|
|
|
|
def encrypt(self, content: bytes) -> bytes:
|
|
cipher = AES.new(self.key, self.mode, self.iv)
|
|
content_padded = pad(content, AES.block_size)
|
|
encrypted_bytes = cipher.encrypt(content_padded)
|
|
return encrypted_bytes
|
|
|
|
def decrypt(self, content):
|
|
cipher = AES.new(self.key, self.mode, self.iv)
|
|
decrypted_padded = cipher.decrypt(content)
|
|
decrypted = unpad(decrypted_padded, AES.block_size)
|
|
return decrypted
|
|
|
|
def pkcs7unpadding(self, text):
|
|
length = len(text)
|
|
unpadding = ord(text[length - 1])
|
|
return text[0:length - unpadding]
|
|
|
|
def pkcs7padding(self, text):
|
|
bs = 16
|
|
length = len(text)
|
|
bytes_length = len(text.encode('utf-8'))
|
|
padding_size = length if (bytes_length == length) else bytes_length
|
|
padding = bs - padding_size % bs
|
|
padding_text = chr(padding) * padding
|
|
return text + padding_text
|
|
|
|
def getSDGBApiHash(api):
|
|
return hashlib.md5((api+"MaimaiChn"+ObfuscateParam).encode()).hexdigest()
|
|
|
|
def apiSDGB(
|
|
data: str,
|
|
targetApi: str,
|
|
userAgentExtraData: str,
|
|
noLog: bool = False,
|
|
timeout: int = 5,
|
|
maxRetries: int = 3,
|
|
) -> str:
|
|
agentExtra = str(userAgentExtraData)
|
|
aes = aes_pkcs7(AesKey, AesIV)
|
|
endpoint = "https://maimai-gm.wahlap.com:42081/Maimai2Servlet/"
|
|
|
|
requestDataFinal = aes.encrypt(zlib.compress(data.encode('utf-8')))
|
|
|
|
if not noLog:
|
|
logger.debug(f"[Stage 1] 准备开始请求 {targetApi},以 {data}")
|
|
|
|
retries = 0
|
|
while retries < maxRetries:
|
|
try:
|
|
if useProxy and proxyUrl:
|
|
logger.debug("使用代理")
|
|
httpClient = httpx.Client(proxy=proxyUrl, verify=False)
|
|
else:
|
|
httpClient = httpx.Client(verify=False)
|
|
|
|
response = httpClient.post(
|
|
url=endpoint + getSDGBApiHash(targetApi),
|
|
headers={
|
|
"User-Agent": f"{getSDGBApiHash(targetApi)}#{agentExtra}",
|
|
"Content-Type": "application/json",
|
|
"Mai-Encoding": "1.50",
|
|
"Accept-Encoding": "",
|
|
"Charset": "UTF-8",
|
|
"Content-Encoding": "deflate",
|
|
"Expect": "100-continue"
|
|
},
|
|
content=requestDataFinal,
|
|
timeout=timeout
|
|
)
|
|
|
|
if not noLog:
|
|
logger.info(f"[Stage 2] {targetApi} 请求结果: {response.status_code}")
|
|
|
|
if response.status_code != 200:
|
|
errorMessage = f"[Stage 2] 请求失败: {response.status_code}"
|
|
logger.error(errorMessage)
|
|
raise SDGBRequestError(errorMessage)
|
|
|
|
responseContentRaw = response.content
|
|
|
|
try:
|
|
responseContentDecrypted = aes.decrypt(responseContentRaw)
|
|
if not noLog:
|
|
logger.debug("[Stage 3] Decryption SUCCESS.")
|
|
except Exception as e:
|
|
logger.warning(f"[Stage 3] Decryption FAILED. Raw Content: {responseContentRaw}, Error: {e}")
|
|
raise SDGBResponseError("Decryption failed")
|
|
try:
|
|
if responseContentDecrypted.startswith(b'\x78\x9c'):
|
|
logger.debug("[Stage 4] Zlib detected, decompressing...")
|
|
responseContentFinal = zlib.decompress(responseContentDecrypted).decode('utf-8')
|
|
else:
|
|
logger.warning(f"[Stage 4] Not Zlib Format!! using raw content: {responseContentDecrypted}")
|
|
responseContentFinal = responseContentDecrypted.decode('utf-8')
|
|
if not noLog:
|
|
logger.debug(f"[Stage 4] Process OK, Content: {responseContentFinal}")
|
|
if responseContentFinal.startswith('{') and responseContentFinal.endswith('}'):
|
|
logger.debug("[Stage 5] Response is JSON, returning.")
|
|
return responseContentFinal
|
|
else:
|
|
logger.warning("[Stage 5] Response is not JSON, returning as is, take care!")
|
|
return responseContentFinal
|
|
except:
|
|
logger.warning(f"解压失败,原始响应: {responseContentDecrypted}")
|
|
raise SDGBResponseError("解压失败")
|
|
except SDGBRequestError as e:
|
|
logger.error(f"请求格式错误: {e}")
|
|
raise
|
|
except SDGBResponseError as e:
|
|
logger.warning(f"响应错误,将重试: {e}")
|
|
retries += 1
|
|
time.sleep(2)
|
|
except Exception as e:
|
|
logger.warning(f"请求失败,将重试: {e}")
|
|
retries += 1
|
|
time.sleep(2)
|
|
|
|
finally:
|
|
if 'httpClient' in locals():
|
|
httpClient.close()
|
|
|
|
raise SDGBApiError("重试多次仍然无法成功请求服务器")
|
|
|
|
def calcPlaySpecial():
|
|
rng = random.SystemRandom()
|
|
num2 = rng.randint(1, 1037933) * 2069
|
|
num2 += 1024 #GameManager.CalcSpecialNum()
|
|
num2 = c_int32(num2).value
|
|
result = c_int32(0)
|
|
for _ in range(32):
|
|
result.value <<= 1
|
|
result.value += num2 & 1
|
|
num2 >>= 1
|
|
return c_int32(result.value).value
|
|
|
|
class AESPKCS7_2024:
|
|
def __init__(self, key: str, iv: str):
|
|
self.key = key.encode('utf-8')
|
|
self.iv = iv.encode('utf-8')
|
|
self.mode = AES.MODE_CBC
|
|
def encrypt(self, content) -> bytes:
|
|
# if content is str, convert to bytes
|
|
if isinstance(content, str):
|
|
encodedData = content.encode('utf-8')
|
|
cipher = AES.new(self.key, self.mode, self.iv)
|
|
content_padded = pad(encodedData, AES.block_size)
|
|
encrypted_bytes = cipher.encrypt(content_padded)
|
|
return encrypted_bytes
|
|
def decrypt(self, encrypted_content: bytes) -> str:
|
|
cipher = AES.new(self.key, self.mode, self.iv)
|
|
decrypted_padded = cipher.decrypt(encrypted_content)
|
|
decrypted = unpad(decrypted_padded, AES.block_size)
|
|
return decrypted
|
|
|
|
def apiSDGB_2024(data:str, targetApi:str, userAgentExtraData:str, noLog:bool=False, timeout:int=5):
|
|
|
|
maxRetries = 3
|
|
agentExtra = str(userAgentExtraData)
|
|
aes = AESPKCS7_2024(AesKey, AesIV)
|
|
reqData_encrypted = aes.encrypt(data)
|
|
reqData_deflated = zlib.compress(reqData_encrypted)
|
|
endpoint = "https://maimai-gm.wahlap.com:42081/Maimai2Servlet/"
|
|
if not noLog:
|
|
logger.debug(f"开始请求 {targetApi},以 {data}")
|
|
|
|
retries = 0
|
|
while retries < maxRetries:
|
|
try:
|
|
if useProxy:
|
|
logger.debug("使用代理")
|
|
httpClient = httpx.Client(proxy=proxyUrl, verify=False)
|
|
else:
|
|
logger.debug("不使用代理")
|
|
httpClient = httpx.Client(verify=False)
|
|
responseOriginal = httpClient.post(
|
|
url=endpoint + getSDGBApiHash(targetApi),
|
|
headers={
|
|
"User-Agent": f"{getSDGBApiHash(targetApi)}#{agentExtra}",
|
|
"Content-Type": "application/json",
|
|
"Mai-Encoding": "1.40",
|
|
"Accept-Encoding": "",
|
|
"Charset": "UTF-8",
|
|
"Content-Encoding": "deflate",
|
|
"Expect": "100-continue"
|
|
},
|
|
content=reqData_deflated,
|
|
timeout=timeout
|
|
)
|
|
|
|
if not noLog:
|
|
logger.info(f"{targetApi} 请求结果: {responseOriginal.status_code}")
|
|
|
|
if responseOriginal.status_code == 200:
|
|
logger.debug("200 OK!")
|
|
else:
|
|
errorMessage = f"请求失败: {responseOriginal.status_code}"
|
|
logger.error(errorMessage)
|
|
raise SDGBRequestError(errorMessage)
|
|
|
|
responseRAWContent = responseOriginal.content
|
|
|
|
try:
|
|
responseDecompressed = zlib.decompress(responseRAWContent)
|
|
logger.debug("成功解压响应!")
|
|
except:
|
|
logger.warning(f"无法解压,得到的原始响应: {responseRAWContent}")
|
|
raise SDGBResponseError("解压失败")
|
|
try:
|
|
resultResponse = aes.decrypt(responseDecompressed)
|
|
logger.debug(f"成功解密响应!")
|
|
except:
|
|
logger.warning(f"解密失败,得到的原始响应: {responseDecompressed}")
|
|
raise SDGBResponseError("解密失败")
|
|
|
|
if not noLog:
|
|
logger.debug(f"响应: {resultResponse}")
|
|
return resultResponse
|
|
|
|
except SDGBRequestError as e:
|
|
raise SDGBRequestError("请求格式错误")
|
|
except SDGBResponseError as e:
|
|
logger.warning(f"将重试一次 Resp Err: {e}")
|
|
retries += 2
|
|
time.sleep(2)
|
|
except Exception as e:
|
|
logger.warning(f"将开始重试请求. {e}")
|
|
retries += 1
|
|
time.sleep(2)
|
|
|
|
raise SDGBApiError("重试多次仍然无法成功请求服务器")
|