-
Notifications
You must be signed in to change notification settings - Fork 229
Expand file tree
/
Copy pathfile_io.py
More file actions
360 lines (315 loc) · 11.3 KB
/
file_io.py
File metadata and controls
360 lines (315 loc) · 11.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
"""File I/O operations with a configurable working directory."""
import os
from pathlib import Path
from typing import Optional
from agentscope.message import TextBlock
from agentscope.tool import ToolResponse
from ..utils import read_file_safe, truncate_text_output, TRUNCATION_NOTICE_MARKER
class FileIO:
"""File I/O operations with a configurable working directory."""
def __init__(self, working_dir: str | Path):
"""Initialize FileIO with a working directory.
Args:
working_dir (`str`):
The working directory for resolving relative paths.
"""
self.working_dir = Path(working_dir)
def _resolve_file_path(self, file_path: str) -> str:
"""Resolve file path: use absolute path as-is,
resolve relative path from working_dir.
Args:
file_path: The input file path (absolute or relative).
Returns:
The resolved absolute file path as string.
"""
path = Path(file_path).expanduser()
if path.is_absolute():
return str(path)
else:
return str(self.working_dir / file_path)
async def read_file( # pylint: disable=too-many-return-statements
self,
file_path: str,
start_line: Optional[int] = None,
end_line: Optional[int] = None,
) -> ToolResponse:
"""Read a file. Relative paths resolve from WORKING_DIR.
Use start_line/end_line to read a specific line range (output includes
line numbers). Omit both to read the full file.
Args:
file_path (`str`):
Path to the file.
start_line (`int`, optional):
First line to read (1-based, inclusive).
end_line (`int`, optional):
Last line to read (1-based, inclusive).
"""
# Convert start_line/end_line to int if they are strings
if start_line is not None:
try:
start_line = int(start_line)
except (ValueError, TypeError):
return ToolResponse(
content=[
TextBlock(
type="text",
text=f"Error: start_line must be an integer, got {start_line!r}.",
),
],
)
if end_line is not None:
try:
end_line = int(end_line)
except (ValueError, TypeError):
return ToolResponse(
content=[
TextBlock(
type="text",
text=f"Error: end_line must be an integer, got {end_line!r}.",
),
],
)
file_path = self._resolve_file_path(file_path)
if not os.path.exists(file_path):
return ToolResponse(
content=[
TextBlock(
type="text",
text=f"Error: The file {file_path} does not exist.",
),
],
)
if not os.path.isfile(file_path):
return ToolResponse(
content=[
TextBlock(
type="text",
text=f"Error: The path {file_path} is not a file.",
),
],
)
try:
content = read_file_safe(file_path)
all_lines = content.split("\n")
total = len(all_lines)
# Determine read range
s = max(1, start_line if start_line is not None else 1)
e = min(total, end_line if end_line is not None else total)
if s > total:
return ToolResponse(
content=[
TextBlock(
type="text",
text=f"Error: start_line {s} exceeds file length ({total} lines).",
),
],
)
if s > e:
return ToolResponse(
content=[
TextBlock(
type="text",
text=f"Error: start_line ({s}) > end_line ({e}).",
),
],
)
# Extract selected lines
selected_content = "\n".join(all_lines[s - 1 : e])
# Apply smart truncation (consistent with shell output format)
text = truncate_text_output(
selected_content,
start_line=s,
total_lines=total,
file_path=file_path,
)
# Add continuation hint if partial read without truncation.
# Use TRUNCATION_NOTICE_MARKER format so ToolResultCompactor can
# re-truncate with the correct start_line when compacting old messages.
if text == selected_content and e < total:
content_bytes = len(text.encode("utf-8"))
notice = (
TRUNCATION_NOTICE_MARKER + f"\nThe output above was truncated."
f"\nThe full content is saved to the file "
f"and contains {total} lines in total."
f"\nThis excerpt starts at line {s} and "
f"covers the next {content_bytes} bytes."
"\nIf the current content is not enough, "
f"call `read_file` with file_path={file_path} start_line={e + 1} to read more."
)
text = text + notice
return ToolResponse(
content=[TextBlock(type="text", text=text)],
)
except Exception as e:
return ToolResponse(
content=[
TextBlock(
type="text",
text=f"Error: Read file failed due to \n{e}",
),
],
)
async def write_file(
self,
file_path: str,
content: str,
) -> ToolResponse:
"""Create or overwrite a file. Relative paths resolve from working_dir.
Args:
file_path (`str`):
Path to the file.
content (`str`):
Content to write.
"""
if not file_path:
return ToolResponse(
content=[
TextBlock(
type="text",
text="Error: No `file_path` provided.",
),
],
)
file_path = self._resolve_file_path(file_path)
try:
with open(file_path, "w", encoding="utf-8") as file:
file.write(content)
return ToolResponse(
content=[
TextBlock(
type="text",
text=f"Wrote {len(content)} bytes to {file_path}.",
),
],
)
except Exception as e:
return ToolResponse(
content=[
TextBlock(
type="text",
text=f"Error: Write file failed due to \n{e}",
),
],
)
# pylint: disable=too-many-return-statements
async def edit_file(
self,
file_path: str,
old_text: str,
new_text: str,
) -> ToolResponse:
"""Find-and-replace text in a file. All occurrences of old_text are
replaced with new_text. Relative paths resolve from working_dir.
Args:
file_path (`str`):
Path to the file.
old_text (`str`):
Exact text to find.
new_text (`str`):
Replacement text.
"""
if not file_path:
return ToolResponse(
content=[
TextBlock(
type="text",
text="Error: No `file_path` provided.",
),
],
)
resolved_path = self._resolve_file_path(file_path)
if not os.path.exists(resolved_path):
return ToolResponse(
content=[
TextBlock(
type="text",
text=f"Error: The file {resolved_path} does not exist.",
),
],
)
if not os.path.isfile(resolved_path):
return ToolResponse(
content=[
TextBlock(
type="text",
text=f"Error: The path {resolved_path} is not a file.",
),
],
)
try:
content = read_file_safe(resolved_path)
except Exception as e:
return ToolResponse(
content=[
TextBlock(
type="text",
text=f"Error: Read file failed due to \n{e}",
),
],
)
if old_text not in content:
return ToolResponse(
content=[
TextBlock(
type="text",
text=f"Error: The text to replace was not found in {file_path}.",
),
],
)
new_content = content.replace(old_text, new_text)
write_response = await self.write_file(file_path=resolved_path, content=new_content)
if write_response.content and len(write_response.content) > 0:
write_text = write_response.content[0].get("text", "")
if write_text.startswith("Error:"):
return write_response
return ToolResponse(
content=[
TextBlock(
type="text",
text=f"Successfully replaced text in {file_path}.",
),
],
)
async def append_file(
self,
file_path: str,
content: str,
) -> ToolResponse:
"""Append content to the end of a file. Relative paths resolve from
working_dir.
Args:
file_path (`str`):
Path to the file.
content (`str`):
Content to append.
"""
if not file_path:
return ToolResponse(
content=[
TextBlock(
type="text",
text="Error: No `file_path` provided.",
),
],
)
file_path = self._resolve_file_path(file_path)
try:
with open(file_path, "a", encoding="utf-8") as file:
file.write(content)
return ToolResponse(
content=[
TextBlock(
type="text",
text=f"Appended {len(content)} bytes to {file_path}.",
),
],
)
except Exception as e:
return ToolResponse(
content=[
TextBlock(
type="text",
text=f"Error: Append file failed due to \n{e}",
),
],
)