Skip to content
This repository was archived by the owner on Feb 15, 2024. It is now read-only.

AJAX_TOKEN parser #5

Merged
merged 8 commits into from
Oct 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Python
__pycache__

#Vim
*.swp
2 changes: 1 addition & 1 deletion NOTICE
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
Copyright 2021 Chechkenev Andrey
Copyright 2021 Chechkenev Andrey, lusm554

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Python Aternos API
An unofficial Aternos API written in Python.
It uses requests, cloudscraper and lxml to parse data from [aternos.org](https://aternos.org/).
> Note for vim: if u have problem like this `IndentationError: unindent does not match any outer indentation level`, try out `retab`.

## Using
First you need to install the module:
Expand Down Expand Up @@ -55,7 +56,7 @@ You can find full documentation on the [Project Wiki](https://github.com/DarkCat
## License
[License Notice](NOTICE):
```
Copyright 2021 Chechkenev Andrey
Copyright 2021 Chechkenev Andrey, lusm554

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down
11 changes: 11 additions & 0 deletions connect_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from python_aternos import Client as AternosClient

aternos = AternosClient('', password='')

srvs = aternos.servers

print(srvs)

s = srvs[0]

s.start()
44 changes: 44 additions & 0 deletions js2py_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#!/usr/bin/env python3
import base64
import js2py

# Emulate 'atob' function
#print(base64.standard_b64decode('MmlYaDVXNXVFWXE1ZldKSWF6UTY='))

# Test cases
tests = [
"""(() => {window[("A" + "J" + "AX_T" + "OKE" + "N")]=("2iXh5W5u" + "EYq" + "5fWJIa" + "zQ6");})();""",
""" (() => {window[["N","TOKE","AJAX_"].reverse().join('')]=["IazQ6","fWJ","h5W5uEYq5","2iX"].reverse().join('');})();""",
"""(() => {window["AJAX_TOKEN"] = atob("SGVsbG8sIHdvcmxk")})();""",
"""(() => {window[atob('QUpBWF9UT0tFTg==')]=atob('MmlYaDVXNXVFWXE1ZldKSWF6UTY=');})();""",
"""(() => {window["AJAX_TOKEN"] = "1234" })();""",
"""(() => {window[atob('QUpBWF9UT0tFTg==')]="2iXh5W5uEYq5fWJIazQ6";})();""",
]

# Array function to ECMAScript 5.1
def code(f):
return "(function() { " + f[f.index("{")+1 : f.index("}")] + "})();"

# Emulation atob V8
def atob(arg):
return base64.standard_b64decode(str(arg)).decode("utf-8")

presettings = """
let window = {};
"""

ctx = js2py.EvalJs({ 'atob': atob })

'''
ctx.execute(presettings + code(tests[3]))
print(ctx.window)
'''

for f in tests:
try:
c = code(f)
ctx.execute(presettings + c)
print(ctx.window['AJAX_TOKEN'])
except Exception as e:
print(c, '\n', e)

290 changes: 158 additions & 132 deletions python_aternos/atconnect.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,141 +8,167 @@

from . import aterrors

# TEST
import js2py
import base64

# Set obj for js
presettings = """
let window = {};
"""

# Convert array function to CMAScript 5 function
def toECMAScript5Function(f):
return "(function() { " + f[f.index("{")+1 : f.index("}")] + "})();"

# Emulation of atob - https://developer.mozilla.org/en-US/docs/Web/API/atob
def atob(s):
return base64.standard_b64decode(str(s)).decode("utf-8")

REQGET = 0
REQPOST = 1
REQUA = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:68.0) Gecko/20100101 Goanna/4.8 Firefox/68.0 PaleMoon/29.4.0.2'

class AternosConnect:

def __init__(self) -> None:

pass

def parse_token(self, response:Optional[Union[str,bytes]]=None) -> str:

if response == None:
loginpage = self.request_cloudflare(
f'https://aternos.org/go/', REQGET
).content
pagetree = lxml.html.fromstring(loginpage)
else:
pagetree = lxml.html.fromstring(response)

try:
pagehead = pagetree.head
self.token = re.search(
r'const\s+AJAX_TOKEN\s*=\s*["\'](\w+)["\']',
pagehead.text_content()
)[1]
except (IndexError, TypeError):
raise aterrors.AternosCredentialsError(
'Unable to parse TOKEN from the page'
)

return self.token

def generate_sec(self) -> str:

randkey = self.generate_aternos_rand()
randval = self.generate_aternos_rand()
self.sec = f'{randkey}:{randval}'
self.session.cookies.set(
f'ATERNOS_SEC_{randkey}', randval,
domain='aternos.org'
)

return self.sec

def generate_aternos_rand(self, randlen:int=16) -> str:

rand_arr = []
for i in range(randlen+1):
rand_arr.append('')

rand_alphanum = \
self.convert_num(random.random(),36) + \
'00000000000000000'
return (rand_alphanum[2:18].join(rand_arr)[:randlen])

def convert_num(self, num:Union[int,float], base:int) -> str:

result = ''
while num > 0:
result = str(num % base) + result
num //= base
return result

def request_cloudflare(
self, url:str, method:int,
retries:int=10,
params:Optional[dict]=None,
data:Optional[dict]=None,
headers:Optional[dict]=None,
reqcookies:Optional[dict]=None,
sendtoken:bool=False) -> Response:

cftitle = '<title>Please Wait... | Cloudflare</title>'

if sendtoken:
if params == None:
params = {}
params['SEC'] = self.sec
params['TOKEN'] = self.token

if headers == None:
headers = {}
headers['User-Agent'] = REQUA

try:
cookies = self.session.cookies
except AttributeError:
cookies = None

self.session = CloudScraper()
if cookies != None:
self.session.cookies = cookies

if method == REQPOST:
req = self.session.post(
url,
data=data,
headers=headers,
cookies=reqcookies
)
else:
req = self.session.get(
url,
params=params,
headers=headers,
cookies=reqcookies
)

countdown = retries
while cftitle in req.text \
and (countdown > 0):

self.session = CloudScraper()
if cookies != None:
self.session.cookies = cookies
if reqcookies != None:
for cookiekey in reqcookies:
self.session.cookies.set(cookiekey, reqcookies[cookiekey])

time.sleep(1)
if method == REQPOST:
req = self.session.post(
url,
data=data,
headers=headers,
cookies=reqcookies
)
else:
req = self.session.get(
url,
params=params,
headers=headers,
cookies=reqcookies
)
countdown -= 1

return req
def __init__(self) -> None:

pass

def parse_token(self, response:Optional[Union[str,bytes]]=None) -> str:

if response == None:
loginpage = self.request_cloudflare(
f'https://aternos.org/go/', REQGET
).content
pagetree = lxml.html.fromstring(loginpage)
else:
pagetree = lxml.html.fromstring(response)

try:
# fetch text
pagehead = pagetree.head
text = pagehead.text_content()

#search
js_funcs = re.findall(r"\(\(\)(.*?)\)\(\);", text)
token_js_func = js_funcs[1] if len(js_funcs) > 1 else js_funcs[0]

# run js
ctx = js2py.EvalJs({ 'atob': atob })
jsf = toECMAScript5Function(token_js_func)
ctx.execute(presettings + jsf)

self.token = ctx.window['AJAX_TOKEN']
except (IndexError, TypeError):
raise aterrors.AternosCredentialsError(
'Unable to parse TOKEN from the page'
)

return self.token

def generate_sec(self) -> str:

randkey = self.generate_aternos_rand()
randval = self.generate_aternos_rand()
self.sec = f'{randkey}:{randval}'
self.session.cookies.set(
f'ATERNOS_SEC_{randkey}', randval,
domain='aternos.org'
)

return self.sec

def generate_aternos_rand(self, randlen:int=16) -> str:

rand_arr = []
for i in range(randlen+1):
rand_arr.append('')

rand_alphanum = \
self.convert_num(random.random(),36) + \
'00000000000000000'
return (rand_alphanum[2:18].join(rand_arr)[:randlen])

def convert_num(self, num:Union[int,float], base:int) -> str:

result = ''
while num > 0:
result = str(num % base) + result
num //= base
return result

def request_cloudflare(
self, url:str, method:int,
retries:int=10,
params:Optional[dict]=None,
data:Optional[dict]=None,
headers:Optional[dict]=None,
reqcookies:Optional[dict]=None,
sendtoken:bool=False) -> Response:

cftitle = '<title>Please Wait... | Cloudflare</title>'

if sendtoken:
if params == None:
params = {}
params['SEC'] = self.sec
params['TOKEN'] = self.token

if headers == None:
headers = {}
headers['User-Agent'] = REQUA

try:
cookies = self.session.cookies
except AttributeError:
cookies = None

self.session = CloudScraper()
if cookies != None:
self.session.cookies = cookies

if method == REQPOST:
req = self.session.post(
url,
data=data,
headers=headers,
cookies=reqcookies
)
else:
req = self.session.get(
url,
params=params,
headers=headers,
cookies=reqcookies
)

countdown = retries
while cftitle in req.text \
and (countdown > 0):

self.session = CloudScraper()
if cookies != None:
self.session.cookies = cookies
if reqcookies != None:
for cookiekey in reqcookies:
self.session.cookies.set(cookiekey, reqcookies[cookiekey])

time.sleep(1)
if method == REQPOST:
req = self.session.post(
url,
data=data,
headers=headers,
cookies=reqcookies
)
else:
req = self.session.get(
url,
params=params,
headers=headers,
cookies=reqcookies
)
countdown -= 1

return req
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
lxml==4.6.2
requests==2.25.1
cloudscraper==1.2.58
Js2Py==0.71