-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcli_common.py
275 lines (219 loc) · 9.99 KB
/
cli_common.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
#!/usr/bin/env python3
# cli_common.py
import os
from typing import Dict, List, Optional, Any, Iterator, Tuple, Callable
from prompt_toolkit.key_binding import KeyBindings
from prompt_toolkit.application.run_in_terminal import run_in_terminal
from prompt_toolkit.document import Document
from prompt_toolkit.auto_suggest import AutoSuggest, Suggestion
from prompt_toolkit.completion import Completer, Completion
from prompt_toolkit.validation import Validator, ValidationError
from tabulate import tabulate
from validators import validators
from suggestors import suggestors
class AutoSuggestFromTree(AutoSuggest):
"""Provides auto-suggestions based on a command tree structure."""
def __init__(self, root: Dict[str, Any]):
self.root = root
def get_suggestion(self, buffer: Any, document: Document) -> Optional[Suggestion]:
"""Get suggestion for current input."""
parts = document.text.strip().split()
node = self.root
if not parts:
return None
*base_parts, last_part = parts
for part in base_parts:
node = node.get(part) or next((v for k, v in node.items()
if isinstance(v, dict) and v.get("type") == "tagNode"), None)
if not node:
return None
# Only suggest static (non-tagNode) keys
candidates = [k for k in node
if isinstance(node[k], dict) and node[k].get("type") != "tagNode" and k.startswith(last_part)]
if not candidates:
return None
common_prefix = os.path.commonprefix(candidates)
if common_prefix and common_prefix != last_part:
return Suggestion(common_prefix[len(last_part):])
return None
class TreeCompleter(Completer):
"""Provides command completion based on a tree structure."""
def __init__(self, tree: Dict[str, Any]):
self.tree = tree
def get_completions(self, document: Document, complete_event: Any) -> Iterator[Completion]:
"""Get completions for current input."""
text = document.text_before_cursor
parts = text.strip().split()
node = self.tree
is_mid_token = not text.endswith(" ")
path_parts = parts[:-1] if is_mid_token else parts
last_word = parts[-1] if is_mid_token else ""
if not parts:
yield from self._get_base_completions(node)
return
for part in path_parts:
node = node.get(part) or next((v for k, v in node.items()
if isinstance(v, dict) and v.get("type") == "tagNode"), None)
if not node:
return
if isinstance(node, dict):
yield from self._get_node_completions(node, last_word)
def _get_base_completions(self, node: Dict[str, Any]) -> Iterator[Completion]:
"""Get completions for base level commands."""
for key, val in node.items():
if isinstance(val, dict) and val.get("type") != "tagNode":
yield Completion(text=key, start_position=0)
def _get_node_completions(self, node: Dict[str, Any], last_word: str) -> Iterator[Completion]:
"""Get completions for a specific node in the tree."""
for key, val in node.items():
if not isinstance(val, dict):
continue
if val.get("type") == "tagNode" and "suggestor" in val:
sugg_name = val["suggestor"]
if sugg_name in suggestors:
args = val.get("suggestor_args", [])
try:
for option in suggestors[sugg_name](*args):
if option.startswith(last_word):
yield Completion(text=option, start_position=-len(last_word))
except Exception as e:
yield Completion(text=f"<error: {sugg_name}>", start_position=0)
elif val.get("type") != "tagNode" and key.startswith(last_word):
yield Completion(text=key, start_position=-len(last_word))
class CommandValidator(Validator):
"""Validates commands against a command tree structure."""
def __init__(self, root: Dict[str, Any]):
self.root = root
def validate(self, document: Document) -> None:
"""Validate the command against the command tree."""
parts = document.text.strip().split()
node = self.root
for part in parts:
if part in node:
node = node[part]
else:
tag_entry = next(((k, v) for k, v in node.items()
if isinstance(v, dict) and v.get("type") == "tagNode"), None)
if tag_entry:
tag_node = tag_entry[1]
validator_type = tag_node.get("validator")
if validator_type:
if validator_type == "enum":
allowed = tag_node.get("enum-values", [])
validator_fn = lambda v: v in allowed
else:
validator_fn = validators.get(validator_type)
if validator_fn and not validator_fn(part):
raise ValidationError(
message=f"'{part}' is not a valid {validator_type.replace('-', ' ')}.",
cursor_position=document.text.find(part)
)
node = tag_node
else:
break
def print_possible_completions(path: List[str], root: Dict[str, Any]) -> None:
"""Print possible completions for the current command path."""
node = root
for part in path:
if part in node:
node = node[part]
else:
tag_entry = next((v for k, v in node.items()
if isinstance(v, dict) and v.get("type") == "tagNode"), None)
if tag_entry:
node = tag_entry
else:
print("No completions found.\n")
return
if not isinstance(node, dict):
print("No completions found.\n")
return
rows = [["<enter>", "Execute the current command"]] if "command" in node else []
rows.extend(_get_completion_rows(node))
if not rows:
print("No completions found.\n")
return
print("\nPossible completions:\n")
print(" " + tabulate(rows, tablefmt="plain").replace("\n", "\n "))
def _get_completion_rows(node: Dict[str, Any]) -> List[List[str]]:
"""Get completion rows for a node."""
rows = []
for k, v in node.items():
if not isinstance(v, dict):
continue
desc = v.get("description", "")
node_type = v.get("type")
if node_type == "tagNode":
rows.append([k, desc])
if "suggestor" in v:
rows.extend(_get_suggestor_rows(v))
else:
rows.append([k, desc])
return rows
def _get_suggestor_rows(node: Dict[str, Any]) -> List[List[str]]:
"""Get completion rows from a suggestor."""
rows = []
sugg_name = node["suggestor"]
args = node.get("suggestor_args", [])
if sugg_name in suggestors:
try:
suggestions = suggestors[sugg_name](*args)
rows.extend([[s, ""] for s in suggestions])
except Exception as e:
rows.append([f"<error calling {sugg_name}>", str(e)])
else:
rows.append([f"<missing suggestor: {sugg_name}>", ""])
return rows
def setup_keybindings(commands_json: Dict[str, Any],
print_possible_completions: Callable[[List[str], Dict[str, Any]], None],
suggestors: Dict[str, Callable]) -> KeyBindings:
"""Set up key bindings for the CLI."""
bindings = KeyBindings()
@bindings.add('?', eager=True)
def show_possible(event):
buffer = event.app.current_buffer
text = buffer.text.strip()
parts = text.split()
run_in_terminal(lambda: print_possible_completions(parts if parts else [], commands_json))
buffer.insert_text("")
@bindings.add('tab')
def autocomplete(event):
buffer = event.app.current_buffer
text = buffer.text
document = Document(text=text, cursor_position=len(text))
parts = text.strip().split()
if not parts:
return
is_mid_token = not text.endswith(" ")
last_token = parts[-1] if is_mid_token else ""
last_token_len = len(last_token)
# Traverse to node
node = commands_json
for part in parts[:-1] if is_mid_token else parts:
node = node.get(part) or next((v for k, v in node.items()
if isinstance(v, dict) and v.get("type") == "tagNode"), None)
if not node:
return
rows = _get_completion_rows(node)
if "command" in node:
rows.insert(0, ["<enter>", "Execute the current command"])
if not rows:
return
plain_matches = [r[0] for r in rows if not r[0].startswith('<') and r[0].startswith(last_token)]
if plain_matches:
common_prefix = os.path.commonprefix(plain_matches)
if len(plain_matches) == 1:
# If there's only one valid match, complete and add a space
if is_mid_token:
buffer.delete_before_cursor(last_token_len)
buffer.insert_text(plain_matches[0] + " ")
return
elif common_prefix and common_prefix != last_token:
# If there's a common prefix longer than current token, complete it
if is_mid_token:
buffer.delete_before_cursor(last_token_len)
buffer.insert_text(common_prefix)
return
# If no direct completion possible, show all possibilities
run_in_terminal(lambda: print_possible_completions(parts if parts else [], commands_json))
return bindings