-
Notifications
You must be signed in to change notification settings - Fork 438
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
FEAT: Gradio HiTL Scorer #722
base: main
Are you sure you want to change the base?
Changes from 9 commits
f62e992
c5a24d5
b1df059
829532d
1499dc2
2557cfd
3ba7ca6
53b9b9c
4e279b1
6616eee
0970314
9a45e55
bf0931e
e9959be
ff7b5fb
4941211
b4eb898
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
import asyncio | ||
from pyrit.score.scorer import Scorer | ||
from pyrit.models import Score, PromptRequestPiece | ||
from typing import Optional | ||
|
||
class HumanInTheLoopScorerGradio(Scorer): | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The class or constructor needs a docstring |
||
def __init__(self, *, open_browser=False, scorer: Scorer = None, re_scorers: list[Scorer] = None) -> None: | ||
# Import here to avoid importing rpyc in the main module that might not be installed | ||
from pyrit.ui.rpc import AppRpcServer | ||
|
||
self._scorer = scorer | ||
self._re_scorers = re_scorers | ||
self._rpc_server = AppRpcServer(open_browser=open_browser) | ||
self._rpc_server.start() | ||
|
||
|
||
async def score_async(self, request_response: PromptRequestPiece, *, task: Optional[str] = None) -> list[Score]: | ||
try: | ||
return await asyncio.to_thread(self.score_prompt_manually, request_response, task=task) | ||
except asyncio.CancelledError: | ||
self._rpc_server.stop() | ||
raise | ||
|
||
|
||
def score_prompt_manually(self, request_prompt: PromptRequestPiece, *, task: Optional[str] = None) -> list[Score]: | ||
self._rpc_server.wait_for_client() | ||
self._rpc_server.send_score_prompt(request_prompt) | ||
score = self._rpc_server.wait_for_score() | ||
return [score] | ||
|
||
def validate(self, request_response: PromptRequestPiece, *, task: Optional[str] = None): | ||
pass | ||
|
||
def __del__(self): | ||
self._rpc_server.stop() |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,61 @@ | ||
import os | ||
mart123p marked this conversation as resolved.
Show resolved
Hide resolved
|
||
import sys | ||
import subprocess | ||
import traceback | ||
|
||
GLOBAL_MUTEX_NAME = "PyRIT-Gradio" | ||
|
||
def launch_app(open_browser=False): | ||
# Launch a new process to run the gradio UI. | ||
# Locate the python executable and run this file. | ||
current_path = os.path.abspath(__file__) | ||
python_path = sys.executable | ||
|
||
# Start a new process to run it | ||
subprocess.Popen([python_path, current_path, str(open_browser)], creationflags=subprocess.CREATE_NEW_CONSOLE) | ||
|
||
def is_app_running(): | ||
if sys.platform != "win32": | ||
raise NotImplementedError("This function is only supported on Windows.") | ||
return True | ||
|
||
import ctypes.wintypes | ||
|
||
SYNCHRONIZE = 0x00100000 | ||
mutex = ctypes.windll.kernel32.OpenMutexW(SYNCHRONIZE, False, GLOBAL_MUTEX_NAME) | ||
if not mutex: | ||
return False | ||
|
||
# Close the handle to the mutex | ||
ctypes.windll.kernel32.CloseHandle(mutex) | ||
return True | ||
|
||
if __name__ == "__main__": | ||
def create_mutex(): | ||
if sys.platform != "win32": | ||
raise NotImplementedError("This function is only supported on Windows.") | ||
|
||
# TODO make sure to add cross-platform support for this. | ||
mart123p marked this conversation as resolved.
Show resolved
Hide resolved
|
||
import ctypes.wintypes | ||
mutex = ctypes.windll.kernel32.CreateMutexW(None, False, GLOBAL_MUTEX_NAME) | ||
last_error = ctypes.windll.kernel32.GetLastError() | ||
if last_error == 183: # ERROR_ALREADY_EXISTS | ||
return False | ||
return True | ||
|
||
if not create_mutex(): | ||
print("Gradio UI is already running.") | ||
sys.exit(1) | ||
print("Starting Gradio Interface please wait...") | ||
try: | ||
open_browser = False | ||
if len(sys.argv) > 1: | ||
open_browser = sys.argv[1] == "True" | ||
|
||
from scorer import GradioApp | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm guessing this is here so that we don't import it if the gradio extra isn't installed? If so, we import gradio in the scorer file. So that won't help, right? |
||
app = GradioApp() | ||
app.start_gradio(open_browser=open_browser) | ||
except: | ||
# Print the error message and traceback | ||
print(traceback.format_exc()) | ||
input("Press Enter to exit.") |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
import gradio as gr | ||
|
||
from rpc_client import RpcClient | ||
|
||
class ConnectionStatusHandler: | ||
def __init__(self, | ||
is_connected_state: gr.State, | ||
rpc_client: RpcClient): | ||
self.state = is_connected_state | ||
self.server_disconnected = False | ||
self.rpc_client = rpc_client | ||
self.next_prompt = "" | ||
|
||
def setup(self, main_interface: gr.Column, loading_animation: gr.Column, next_prompt_state: gr.State): | ||
self.state.change(fn=self.__on_state_change, inputs=[self.state], outputs=[main_interface, loading_animation, next_prompt_state]) | ||
|
||
connection_status_timer = gr.Timer(1) | ||
connection_status_timer.tick( | ||
fn=self.__check_connection_status, | ||
inputs=[self.state], | ||
outputs=[self.state] | ||
).then( | ||
fn=self.__reconnect_if_needed, | ||
outputs=[self.state] | ||
) | ||
|
||
def set_ready(self): | ||
self.server_disconnected = False | ||
|
||
def set_disconnected(self): | ||
self.server_disconnected = True | ||
|
||
def set_next_prompt(self, next_prompt: str): | ||
self.next_prompt = next_prompt | ||
|
||
def __on_state_change(self, is_connected: bool): | ||
print("Connection status changed to: ", is_connected, " - ", self.next_prompt) | ||
if is_connected: | ||
return [gr.Column(visible=True), gr.Row(visible=False), self.next_prompt] | ||
return [gr.Column(visible=False), gr.Row(visible=True), self.next_prompt] | ||
|
||
def __check_connection_status(self, is_connected: bool): | ||
if self.server_disconnected or not is_connected: | ||
print("Gradio disconnected") | ||
return False | ||
return True | ||
|
||
def __reconnect_if_needed(self): | ||
if self.server_disconnected: | ||
print("Attempting to reconnect") | ||
self.rpc_client.reconnect() | ||
prompt = self.rpc_client.wait_for_prompt() | ||
self.next_prompt = str(prompt.original_value) | ||
self.server_disconnected = False | ||
return True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this downgrade necessary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes there's an issue with Gradio
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then we need to document that here otherwise we'll accidentally break it soon.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also this just allows more versions and doesn't actually force downgrading