Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 73 additions & 17 deletions fred/cogs/crashes.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
from zipfile import ZipFile

import re2
import regex as regex_fallback
import re as std_re

re2.set_fallback_notification(re2.FALLBACK_WARNING)

Expand Down Expand Up @@ -47,6 +49,61 @@ async def regex_with_timeout(*args, **kwargs):
raise ValueError(args[0]) from e


def pattern_uses_lookaround(pattern: str) -> bool:
return bool(std_re.search(r"\(\?=|\(\?!|\(\?<=|\(\?<!", pattern))


async def re2_search_with_timeout(pattern: str, text: str, flags=0):
try:
return await asyncio.wait_for(asyncio.to_thread(re2.search, pattern, text, flags=flags), REGEX_LIMIT)
except asyncio.TimeoutError:
raise TimeoutError(
f"A regex timed out after {REGEX_LIMIT} seconds! \n"
f"pattern: ({pattern}) \n"
f"flags: {flags} \n"
f"on text of length {len(text)}"
)


async def safe_search(pattern: str, text: str, flags=0):
# If the pattern contains lookaround constructs, go straight to fallback
if pattern_uses_lookaround(pattern):
return await asyncio.to_thread(regex_fallback.search, pattern, text, flags=flags)
try:
return await re2_search_with_timeout(pattern, text, flags=flags)
except re2.RegexError:
# fallback to the full-featured regex module
return await asyncio.to_thread(regex_fallback.search, pattern, text, flags=flags)


def safe_search_sync(pattern: str, text: str, flags=0):
if pattern_uses_lookaround(pattern):
return regex_fallback.search(pattern, text, flags=flags)
try:
return re2.search(pattern, text, flags=flags)
except re2.RegexError:
return regex_fallback.search(pattern, text, flags=flags)


def safe_sub(pattern: str, repl, string: str, flags=0):
if pattern_uses_lookaround(pattern):
return regex_fallback.sub(pattern, repl, string, flags=flags)
try:
return re2.sub(pattern, repl, string, flags=flags)
except re2.RegexError:
return regex_fallback.sub(pattern, repl, string, flags=flags)


def safe_findall(pattern: str, string: str, flags=0):
if pattern_uses_lookaround(pattern):
return regex_fallback.findall(pattern, string, flags=flags)
try:
return re2.findall(pattern, string, flags=flags)
except re2.RegexError:
return regex_fallback.findall(pattern, string, flags=flags)



class Crashes(FredCog):

type CrashJob = Coroutine[Any, Any, list[CrashResponse]]
Expand Down Expand Up @@ -174,7 +231,8 @@ def formatted_chunks_of_100_mod_references() -> Generator[str, None, None]:

async def mass_regex(self, text: str) -> AsyncIterator[CrashResponse]:
for crash in config.Crashes.fetch_all():
if match := await regex_with_timeout(crash["crash"], text, flags=re2.IGNORECASE | re2.S):
# Use safe_search so patterns with lookaround fall back to the full `regex` package
if match := await safe_search(crash["crash"], text, flags=re2.IGNORECASE | re2.S):
if str(crash["response"]).startswith(self.bot.command_prefix):
if command := config.Commands.fetch(crash["response"].strip(self.bot.command_prefix)):
command_response = command["content"]
Expand All @@ -188,19 +246,19 @@ async def mass_regex(self, text: str) -> AsyncIterator[CrashResponse]:
)
else:

def replace_response_value_with_captured(m: re2.Match) -> str:
def replace_response_value_with_captured(m) -> str:
group = int(m.group(1))
if group > len(match.groups()):
return f"{{Group {group} not captured in crash regex!}}"
return match.group(group)

response = re2.sub(r"{(\d+)}", replace_response_value_with_captured, str(crash["response"]))
response = safe_sub(r"{(\d+)}", replace_response_value_with_captured, str(crash["response"]))
yield CrashResponse(name=crash["name"], value=response, inline=True)

async def detect_and_fetch_pastebin_content(self, text: str) -> str:
if match := re2.search(r"(https://pastebin.com/\S+)", text):
if match := await safe_search(r"(https://pastebin.com/\S+)", text):
self.logger.info("Found a pastebin link! Fetching text.")
url = re2.sub(r"(?<=bin.com)/", "/raw/", match.group(1))
url = safe_sub(r"(?<=bin.com)/", "/raw/", match.group(1))
async with self.bot.web_session.get(url) as response:
return await response.text()
else:
Expand All @@ -215,7 +273,7 @@ async def process_text(self, text: str, filename="") -> list[CrashResponse]:

responses.extend(await self.process_text(await self.detect_and_fetch_pastebin_content(text)))

if match := re2.search(r"([^\n]*Critical error:.*Engine exit[^\n]*\))", text, flags=re2.I | re2.M | re2.S):
if match := await safe_search(r"([^\n]*Critical error:.*Engine exit[^\n]*\))", text, flags=re2.I | re2.M | re2.S):
filename = os.path.basename(filename)
crash = match.group(1)
responses.append(
Expand Down Expand Up @@ -278,9 +336,7 @@ def _ext_filter(ext: str) -> bool:
return ext in ("png", "log", "txt", "zip", "json")

async def _obtain_attachments(self, message: Message) -> AsyncGenerator[tuple[str, IO | Exception], None, None]:
cdn_links = re2.findall(
r"(https://(?:cdn.discordapp.com|media.discordapp.net)/attachments/\S+)", message.content
)
cdn_links = safe_findall(r"(https://(?:cdn.discordapp.com|media.discordapp.net)/attachments/\S+)", message.content)

yield bool(cdn_links or message.attachments)

Expand Down Expand Up @@ -553,14 +609,14 @@ def _get_fg_log_details(log_file: IO[bytes]):
# It used to matter more when we were using slower regex libraries. - Borketh

lines: list[bytes] = log_file.readlines()
vanilla_info_search_area = filter(lambda l: re2.match("^LogInit", l), map(lambda b: b.decode(), lines))
vanilla_info_search_area = filter(lambda l: safe_search_sync("^LogInit", l), map(lambda b: b.decode(), lines))

info = {}
patterns = [
re2.compile(r"Net CL: (?P<game_version>\d+)"),
re2.compile(r"Command Line:(?P<cli>.*)"),
re2.compile(r"Base Directory:(?P<path>.+)"),
re2.compile(r"Launcher ID: (?P<launcher>\w+)"),
r"Net CL: (?P<game_version>\d+)",
r"Command Line:(?P<cli>.*)",
r"Base Directory:(?P<path>.+)",
r"Launcher ID: (?P<launcher>\w+)",
]

# This loop sequentially finds information,
Expand All @@ -571,17 +627,17 @@ def _get_fg_log_details(log_file: IO[bytes]):
for line in vanilla_info_search_area:
if not patterns:
break
elif match := re2.search(patterns[0], line):
elif match := safe_search_sync(patterns[0], line):
info |= match.groupdict()
patterns.pop(0)
else:
logger.info("Didn't find all four pieces of information normally found in a log!")
logger.debug(json.dumps(info, indent=2))

mod_loader_logs = filter(lambda l: re2.match("LogSatisfactoryModLoader", l), map(lambda b: b.decode(), lines))
mod_loader_logs = filter(lambda l: safe_search_sync("LogSatisfactoryModLoader", l), map(lambda b: b.decode(), lines))

for line in mod_loader_logs:
if match := re2.search(r"(?<=v\.)(?P<sml>[\d.]+)", line):
if match := safe_search_sync(r"(?<=v\.)(?P<sml>[\d.]+)", line):
info |= match.groupdict()
break

Expand Down
2 changes: 1 addition & 1 deletion fred/fred_commands/_command_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
logger = new_logger("[Command/Crash Search]")


def search(table: Type[Commands | Crashes], pattern: str, column: str, force_fuzzy: bool) -> (str | list[str], bool):
def search(table: Type[Commands | Crashes], pattern: str, column: str, force_fuzzy: bool) -> tuple[str | list[str], bool]:
"""Returns the top three results based on the result"""

if column not in dir(table):
Expand Down