#!/usr/bin/env python3 # SPDX-License-Identifier: MIT # Set this to True if behind an ip masking proxy (with X-Forwarded-For support) while using prevent-proxy-connections behind_proxy = True # Address and port to listen to bind_to = ('127.0.0.1', 8080) # Should mojang accounts be allowed to join with this AltAuth proxy allow_mojang_accounts = True # Should banned microsoft accounts ("UserBannedException") be allowed to join? # Should accounts with disabled multiplayer ("InsufficientPrivilegesException") be allowed to join? allowed_microsoft_accounts = () # allowed_microsoft_accounts = ("InsufficientPrivilegesException", "UserBannedException") from traceback import format_exc from collections import namedtuple from typing import Dict from json import dumps, loads from base64 import b64decode from time import time, sleep from threading import Thread from urllib.error import HTTPError from urllib.request import Request, urlopen from http import HTTPStatus from http.server import ThreadingHTTPServer, BaseHTTPRequestHandler def moj_request(url, data=None): try: response = urlopen(Request( url, headers={ "Content-Type": "application/json" } if data else {}, data=dumps(data).encode("utf8") if data else None )) return response.code, response.read() except HTTPError as response: return response.code, response.read() CachedProfile = namedtuple('CachedProfile', ('timestamp', 'use_altauth', 'ip', 'uuid')) cached_profiles: Dict[str, CachedProfile] = {} def timeout_cleaner(): global cached_profiles while True: timeout = time() - 60 cached_profiles = { serverId: profile for serverId, profile in cached_profiles.items() if profile.timestamp > timeout } sleep(1) class SilentException(Exception): pass class AltAuthRequestHandler(BaseHTTPRequestHandler): def do_POST(self): try: self.close_connection = True if self.path != "/session/minecraft/join": self.send_response(HTTPStatus.NOT_FOUND) self.end_headers() return content_length = int(self.headers['Content-Length']) if content_length > 1024: raise Exception("Unusual large request (malicious actor?)") request = loads(self.rfile.read(content_length)) access_token = request["accessToken"] selected_profile = request["selectedProfile"] server_id = request["serverId"] token = loads(b64decode(access_token.split(".")[1] + "==")) # Decode the JSON Web Token now = time() use_altauth = False if token["exp"] <= now: # check token expiration date raise SilentException("Expired token") if token["iss"] == "Yggdrasil-Auth" and allow_mojang_accounts: # Mojang account if token["spr"] != selected_profile: raise Exception("UUIDs don't match (malicious actor?)") # Valid token (even on other ip): 204 # Invalid token: 403 {"error": "ForbiddenOperationException", "errorMessage": "Invalid token"} code, data = moj_request("https://authserver.mojang.com/validate", data={ "accessToken": access_token }) if code != 204: raise Exception("Token invalid for unknown reasons") use_altauth = True else: # Microsoft account # Valid token: 204 # According to wiki.vg Xbox multiplayer disabled: InsufficientPrivilegesException # According to wiki.vg Multiplayer banned: UserBannedException # Mojang account: 403 {"error":"ForbiddenOperationException","path":"/session/minecraft/join"} # Invalid token: 403 {"error":"ForbiddenOperationException","path":"/session/minecraft/join"} code, data = moj_request("https://sessionserver.mojang.com/session/minecraft/join", data={ "accessToken": access_token, "selectedProfile": selected_profile, "serverId": server_id }) if code == 403: if loads(data)["error"] in allowed_microsoft_accounts: use_altauth = True code = 204 if code == 204: cached_profiles[server_id] = CachedProfile( timestamp=now, use_altauth=use_altauth, ip=self.headers['X-Forwarded-For'] if behind_proxy else self.client_address[0], uuid=selected_profile ) self.send_response(code) self.end_headers() if code != 204 and data: self.wfile.write(data) except SilentException: self.send_response(HTTPStatus.FORBIDDEN) self.end_headers() self.wfile.write(b'{"error":"ForbiddenOperationException","path":"/session/minecraft/join"}') except BaseException: self.log_message("Exception handling request: %s", format_exc()) # The client continues the login process with a 500 response, therefore 403 instead self.send_response(HTTPStatus.FORBIDDEN) self.end_headers() self.wfile.write(b'{"error":"ForbiddenOperationException","path":"/session/minecraft/join"}') def do_GET(self): try: self.close_connection = True if not self.path.startswith("/session/minecraft/hasJoined?"): self.send_response(HTTPStatus.NOT_FOUND) self.end_headers() return query = { attribute.split("=")[0]: attribute.split("=")[1] for attribute in self.path.split("?", 1)[1].split("&") } server_id = query["serverId"] username = query["username"] altauth_client = server_id in cached_profiles cached_profile = cached_profiles.pop(server_id) if altauth_client else None if altauth_client and "ip" in query: if query.pop("ip") != cached_profile.ip: raise Exception("IPs don't match (prevent-proxy-connections)") if altauth_client and cached_profile.use_altauth: code, data = moj_request( f"https://sessionserver.mojang.com/session/minecraft/profile/{cached_profile.uuid}" ) if data: profile = loads(data) if profile["name"] != username: # Disarm server_id hash collisions and prevent username spoofing raise Exception("Usernames don't match (potential hash collision)") elif "ip" in query: code, data = moj_request( f"https://sessionserver.mojang.com/session/minecraft/hasJoined?username={username}&serverId={server_id}&ip={query['ip']}" ) else: code, data = moj_request( f"https://sessionserver.mojang.com/session/minecraft/hasJoined?username={username}&serverId={server_id}" ) self.send_response(code) self.end_headers() self.wfile.write(data) except BaseException as e: self.log_message("Exception handling request: %s", format_exc()) # 204 matches failure response for hasJoined self.send_response(HTTPStatus.NO_CONTENT) self.end_headers() def log_request(self, code='-', size='-'): # Don't log every request (may become spammy & may contain IPs from prevent-proxy-connections (GDPR) pass if __name__ == '__main__': Thread(target=timeout_cleaner, name="TimeoutCleanup", daemon=True).start() ThreadingHTTPServer(bind_to, AltAuthRequestHandler).serve_forever()