forked from continuedev/continue
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathchat.py
299 lines (246 loc) · 12.7 KB
/
chat.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
import json
from textwrap import dedent
from typing import Any, Coroutine, List
from pydantic import Field
from ...libs.util.strings import remove_quotes_and_escapes
from .main import EditHighlightedCodeStep
from .core.core import DisplayErrorStep, MessageStep
from ...core.main import FunctionCall, Models
from ...core.main import ChatMessage, Step, step_to_json_schema
from ...core.sdk import ContinueSDK
from ...libs.llm.openai import OpenAI
from ...libs.llm.maybe_proxy_openai import MaybeProxyOpenAI
import openai
import os
from dotenv import load_dotenv
from directory_tree import display_tree
load_dotenv()
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
openai.api_key = OPENAI_API_KEY
FREE_USAGE_STEP_NAME = "Please enter OpenAI API key"
class SimpleChatStep(Step):
name: str = "Generating Response..."
manage_own_chat_context: bool = True
description: str = ""
messages: List[ChatMessage] = None
async def run(self, sdk: ContinueSDK):
# Check if proxy server API key
if isinstance(sdk.models.default, MaybeProxyOpenAI) and (sdk.models.default.api_key is None or sdk.models.default.api_key.strip() == "") and len(list(filter(lambda x: not x.step.hide, sdk.history.timeline))) >= 10 and len(list(filter(lambda x: x.step.name == FREE_USAGE_STEP_NAME, sdk.history.timeline))) == 0:
await sdk.run_step(MessageStep(
name=FREE_USAGE_STEP_NAME,
message=dedent("""\
To make it easier to use Continue, you're getting limited free usage. When you have the chance, please enter your own OpenAI key in `~/.continue/config.py`. You can open the file by using the '/config' slash command in the text box below.
Here's an example of how to edit the file:
```python
...
config=ContinueConfig(
...
models=Models(
default=MaybeProxyOpenAI(api_key="<API_KEY>", model="gpt-4"),
medium=MaybeProxyOpenAI(api_key="<API_KEY>", model="gpt-3.5-turbo")
)
)
```
You can also learn more about customizations [here](https://continue.dev/docs/customization).
"""),
))
messages = self.messages or await sdk.get_chat_context()
generator = sdk.models.default.stream_chat(
messages, temperature=sdk.config.temperature)
async for chunk in generator:
if sdk.current_step_was_deleted():
# So that the message doesn't disappear
self.hide = False
break
if "content" in chunk:
self.description += chunk["content"]
await sdk.update_ui()
self.name = remove_quotes_and_escapes(await sdk.models.medium.complete(
f"Write a short title for the following chat message: {self.description}"))
self.chat_context.append(ChatMessage(
role="assistant",
content=self.description,
summary=self.name
))
# TODO: Never actually closing.
await generator.aclose()
class AddFileStep(Step):
name: str = "Add File"
description = "Add a file to the workspace. Should always view the directory tree before this."
filename: str
file_contents: str
async def describe(self, models: Models) -> Coroutine[Any, Any, Coroutine[str, None, None]]:
return f"Added a file named `{self.filename}` to the workspace."
async def run(self, sdk: ContinueSDK):
await sdk.add_file(self.filename, self.file_contents)
await sdk.ide.setFileOpen(os.path.join(sdk.ide.workspace_directory, self.filename))
class DeleteFileStep(Step):
name: str = "Delete File"
description = "Delete a file from the workspace."
filename: str
async def describe(self, models: Models) -> Coroutine[Any, Any, Coroutine[str, None, None]]:
return f"Deleted a file named `{self.filename}` from the workspace."
async def run(self, sdk: ContinueSDK):
await sdk.delete_file(self.filename)
class AddDirectoryStep(Step):
name: str = "Add Directory"
description = "Add a directory to the workspace."
directory_name: str
async def describe(self, models: Models) -> Coroutine[Any, Any, Coroutine[str, None, None]]:
return f"Added a directory named `{self.directory_name}` to the workspace."
async def run(self, sdk: ContinueSDK):
try:
await sdk.add_directory(self.directory_name)
except FileExistsError:
self.description = f"Directory {self.directory_name} already exists."
class RunTerminalCommandStep(Step):
name: str = "Run Terminal Command"
description: str = "Run a terminal command."
command: str
async def run(self, sdk: ContinueSDK):
self.description = f"Copy this command and run in your terminal:\n\n```bash\n{self.command}\n```"
class ViewDirectoryTreeStep(Step):
name: str = "View Directory Tree"
description: str = "View the directory tree to learn which folder and files exist. You should always do this before adding new files."
async def describe(self, models: Models) -> Coroutine[Any, Any, Coroutine[str, None, None]]:
return f"Viewed the directory tree."
async def run(self, sdk: ContinueSDK):
self.description = f"```\n{display_tree(sdk.ide.workspace_directory, True, max_depth=2)}\n```"
class EditFileStep(Step):
name: str = "Edit File"
description: str = "Edit a file in the workspace that is not currently open."
filename: str = Field(
..., description="The name of the file to edit.")
instructions: str = Field(
..., description="The instructions to edit the file.")
hide: bool = True
async def run(self, sdk: ContinueSDK):
await sdk.edit_file(self.filename, self.instructions)
class ChatWithFunctions(Step):
user_input: str
functions: List[Step] = [AddFileStep(filename="", file_contents=""),
EditFileStep(filename="", instructions=""),
EditHighlightedCodeStep(user_input=""),
ViewDirectoryTreeStep(), AddDirectoryStep(directory_name=""),
DeleteFileStep(filename=""), RunTerminalCommandStep(command="")]
name: str = "Input"
manage_own_chat_context: bool = True
description: str = ""
hide: bool = True
async def run(self, sdk: ContinueSDK):
await sdk.update_ui()
step_name_step_class_map = {
step.name.replace(" ", ""): step.__class__ for step in self.functions}
functions = [step_to_json_schema(
function) for function in self.functions]
self.chat_context.append(ChatMessage(
role="user",
content=self.user_input,
summary=self.user_input
))
last_function_called_index_in_history = None
last_function_called_name = None
last_function_called_params = None
while True:
was_function_called = False
func_args = ""
func_name = ""
msg_content = ""
msg_step = None
gpt350613 = OpenAI(model="gpt-3.5-turbo-0613")
await sdk.start_model(gpt350613)
async for msg_chunk in gpt350613.stream_chat(await sdk.get_chat_context(), functions=functions):
if sdk.current_step_was_deleted():
return
if "content" in msg_chunk and msg_chunk["content"] is not None:
msg_content += msg_chunk["content"]
# if last_function_called_index_in_history is not None:
# while sdk.history.timeline[last_function_called_index].step.hide:
# last_function_called_index += 1
# sdk.history.timeline[last_function_called_index_in_history].step.description = msg_content
if msg_step is None:
msg_step = MessageStep(
name="Chat",
message=msg_chunk["content"]
)
await sdk.run_step(msg_step)
else:
msg_step.description = msg_content
await sdk.update_ui()
elif "function_call" in msg_chunk or func_name != "":
was_function_called = True
if "function_call" in msg_chunk:
if "arguments" in msg_chunk["function_call"]:
func_args += msg_chunk["function_call"]["arguments"]
if "name" in msg_chunk["function_call"]:
func_name += msg_chunk["function_call"]["name"]
if not was_function_called:
self.chat_context.append(ChatMessage(
role="assistant",
content=msg_content,
summary=msg_content
))
break
else:
last_function_called = func_name
if func_name == "python" and "python" not in step_name_step_class_map:
# GPT must be fine-tuned to believe this exists, but it doesn't always
func_name = "EditHighlightedCodeStep"
func_args = json.dumps({"user_input": self.user_input})
# self.chat_context.append(ChatMessage(
# role="assistant",
# content=None,
# function_call=FunctionCall(
# name=func_name,
# arguments=func_args
# ),
# summary=f"Called function {func_name}"
# ))
# self.chat_context.append(ChatMessage(
# role="user",
# content="The 'python' function does not exist. Don't call it. Try again to call another function.",
# summary="'python' function does not exist."
# ))
# msg_step.hide = True
# continue
# Call the function, then continue to chat
func_args = "{}" if func_args == "" else func_args
try:
fn_call_params = json.loads(func_args)
except json.JSONDecodeError:
raise Exception(
"The model returned invalid JSON. Please try again")
self.chat_context.append(ChatMessage(
role="assistant",
content=None,
function_call=FunctionCall(
name=func_name,
arguments=func_args
),
summary=f"Called function {func_name}"
))
last_function_called_index_in_history = sdk.history.current_index + 1
if func_name not in step_name_step_class_map:
raise Exception(
f"The model tried to call a function ({func_name}) that does not exist. Please try again.")
# if func_name == "AddFileStep":
# step_to_run.hide = True
# self.description += f"\nAdded file `{func_args['filename']}`"
# elif func_name == "AddDirectoryStep":
# step_to_run.hide = True
# self.description += f"\nAdded directory `{func_args['directory_name']}`"
# else:
# self.description += f"\n`Running function {func_name}`\n\n"
if func_name == "EditHighlightedCodeStep":
fn_call_params["user_input"] = self.user_input
elif func_name == "EditFile":
fn_call_params["instructions"] = self.user_input
step_to_run = step_name_step_class_map[func_name](
**fn_call_params)
if last_function_called_name is not None and last_function_called_name == func_name and last_function_called_params is not None and last_function_called_params == fn_call_params:
# If it's calling the same function more than once in a row, it's probably looping and confused
return
last_function_called_name = func_name
last_function_called_params = fn_call_params
await sdk.run_step(step_to_run)
await sdk.update_ui()