AltAuth/proxy.py

187 Zeilen
7.0 KiB
Python

#!/usr/bin/env python3
# SPDX-License-Identifier: MIT
from collections import namedtuple
from typing import Dict
from json import dumps, loads
from base64 import b64decode
from time import time, sleep
from threading import Thread
from urllib.error import HTTPError
from urllib.request import Request, urlopen
from http import HTTPStatus
from http.server import ThreadingHTTPServer, BaseHTTPRequestHandler
# Set this to True if behind an ip masking proxy (with X-Forwarded-For support) while using prevent-proxy-connections
behind_proxy = False
# Should mojang accounts be allowed to join with this AltAuth proxy
allow_mojang_accounts = True
# Should banned microsoft accounts ("UserBannedException") be allowed to join?
# Should accounts with disabled multiplayer ("InsufficientPrivilegesException") be allowed to join?
allowed_microsoft_accounts = ("InsufficientPrivilegesException", "UserBannedException")
def moj_request(url, data=None):
try:
response = urlopen(Request(
url,
headers={
"Content-Type": "application/json"
} if data else {},
data=dumps(data).encode("utf8") if data else None
))
return response.code, response.read()
except HTTPError as response:
return response.code, response.read()
CachedProfile = namedtuple('CachedProfile', ('timestamp', 'use_altauth', 'ip', 'uuid'))
cached_profiles: Dict[str, CachedProfile] = {}
def timeout_cleaner():
global cached_profiles
while True:
timeout = time() - 60
cached_profiles = {
serverId: profile
for serverId, profile in cached_profiles.items()
if profile.timestamp > timeout
}
sleep(1)
class AltAuthRequestHandler(BaseHTTPRequestHandler):
def do_POST(self):
try:
self.close_connection = True
if self.path != "/session/minecraft/join":
self.send_response(HTTPStatus.NOT_FOUND)
self.end_headers()
return
content_length = int(self.headers['Content-Length'])
if content_length > 1024:
raise Exception()
request = loads(self.rfile.read(content_length))
access_token = request["accessToken"]
selected_profile = request["selectedProfile"]
server_id = request["serverId"]
token = loads(b64decode(access_token.split(".")[1] + "==")) # Decode the JSON Web Token
now = time()
use_altauth = False
if token["exp"] >= now: # check token expiration date
raise Exception()
if token["iss"] == "Yggdrasil-Auth" and allow_mojang_accounts: # Mojang account
if token["spr"] != selected_profile:
raise Exception()
# Valid token (even on other ip): 204
# Invalid token: 403 {"error": "ForbiddenOperationException", "errorMessage": "Invalid token"}
code, data = moj_request("https://authserver.mojang.com/validate", data={
"accessToken": access_token
})
if code != 204:
raise Exception()
use_altauth = True
else: # Microsoft account
# Valid token: 204
# According to wiki.vg Xbox multiplayer disabled: InsufficientPrivilegesException
# According to wiki.vg Multiplayer banned: UserBannedException
# Mojang account: 403 {"error":"ForbiddenOperationException","path":"/session/minecraft/join"}
# Invalid token: 403 {"error":"ForbiddenOperationException","path":"/session/minecraft/join"}
code, data = moj_request("https://sessionserver.mojang.com/session/minecraft/join", data={
"accessToken": access_token,
"selectedProfile": selected_profile,
"serverId": server_id
})
if code == 403:
if loads(data)["error"] in allowed_microsoft_accounts:
use_altauth = True
code = 204
if code == 204:
cached_profiles[server_id] = CachedProfile(
timestamp=now,
use_altauth=use_altauth,
ip=self.headers['X-Forwarded-For'] if behind_proxy else self.client_address[0],
uuid=selected_profile
)
self.send_response(code)
self.end_headers()
if code != 204 and data:
self.wfile.write(data)
except BaseException as e:
print(e)
# The client continues the login process with a 500 response, therefore 403 instead
self.send_response(HTTPStatus.FORBIDDEN)
self.end_headers()
self.wfile.write(b'{"error":"ForbiddenOperationException","path":"/session/minecraft/join"}')
def do_GET(self):
try:
self.close_connection = True
if not self.path.startswith("/session/minecraft/hasJoined?"):
self.send_response(HTTPStatus.NOT_FOUND)
self.end_headers()
return
query = {
attribute.split("=")[0]: attribute.split("=")[1]
for attribute in self.path.split("?", 1)[1].split("&")
}
server_id = query["serverId"]
username = query["username"]
altauth_client = server_id in cached_profiles
cached_profile = cached_profiles.pop(server_id) if altauth_client else None
if altauth_client and "ip" in query:
if query.pop("ip") != cached_profile.ip:
raise Exception()
if altauth_client and cached_profile.use_altauth:
code, data = moj_request(
f"https://sessionserver.mojang.com/session/minecraft/profile/{cached_profile.uuid}"
)
if data:
profile = loads(data)
if profile["name"] != username: # Disarm server_id hash collisions and prevent username spoofing
raise Exception()
elif "ip" in query:
code, data = moj_request(
f"https://sessionserver.mojang.com/session/minecraft/hasJoined?username={username}&serverId={server_id}&ip={query['ip']}"
)
else:
code, data = moj_request(
f"https://sessionserver.mojang.com/session/minecraft/hasJoined?username={username}&serverId={server_id}"
)
self.send_response(code)
self.end_headers()
self.wfile.write(data)
except BaseException as e:
print(e)
self.send_response(HTTPStatus.FORBIDDEN)
self.end_headers()
if __name__ == '__main__':
Thread(target=timeout_cleaner, name="TimeoutCleanup", daemon=True).start()
ThreadingHTTPServer(('127.0.0.1', 8080), AltAuthRequestHandler).serve_forever()