1+ import os
2+ import sys
13import uuid
2- from collections . abc import Iterable
4+ from dataclasses import dataclass
35from importlib .metadata import version
4- from typing import Final , cast , get_args
6+ from pathlib import Path
7+ from typing import Any , Final , TypedDict , cast
8+ from urllib .parse import unquote_to_bytes
59
610import attr
711import cattrs
812import lsprotocol .types as lsp
913from ast_grep_py import SgRoot
10- from pygls import server
11- from pygls .workspace import TextDocument
14+ from pygls .server import LanguageServer
1215
1316from auto_typing_final .transform import (
1417 IMPORT_STYLES_TO_IMPORT_CONFIGS ,
1922 make_replacements ,
2023)
2124
22- LSP_SERVER = server .LanguageServer (name = "auto-typing-final" , version = version ("auto-typing-final" ), max_workers = 5 )
23- IMPORT_CONFIG : ImportConfig | None = None
2425
26+ # From Python 3.13: https://github.com/python/cpython/blob/0790418a0406cc5419bfd9d718522a749542bbc8/Lib/pathlib/_local.py#L815
27+ def path_from_uri (uri : str ) -> Path | None :
28+ if not uri .startswith ("file:" ):
29+ return None
30+ path = uri [5 :]
31+ if path [:3 ] == "///" :
32+ # Remove empty authority
33+ path = path [2 :]
34+ elif path [:12 ] == "//localhost/" :
35+ # Remove 'localhost' authority
36+ path = path [11 :]
37+ if path [:3 ] == "///" or (path [:1 ] == "/" and path [2 :3 ] in ":|" ):
38+ # Remove slash before DOS device/UNC path
39+ path = path [1 :]
40+ if path [1 :2 ] == "|" :
41+ # Replace bar with colon in DOS drive
42+ path = path [:1 ] + ":" + path [2 :]
43+
44+ path_ : Final = Path (os .fsdecode (unquote_to_bytes (path )))
45+ if not path_ .is_absolute ():
46+ return None
47+ return path_
2548
26- @attr .define
27- class Fix :
28- message : str
29- text_edits : list [lsp .TextEdit ]
49+
50+ ClientSettings = TypedDict ("ClientSettings" , {"import-style" : ImportStyle })
51+ FullClientSettings = TypedDict ("FullClientSettings" , {"auto-typing-final" : ClientSettings })
3052
3153
3254@attr .define
33- class DiagnosticData :
34- fix : Fix
55+ class Fix :
56+ message : str
57+ text_edits : list [lsp .TextEdit | lsp .AnnotatedTextEdit ]
3558
3659
3760def make_import_text_edit (import_text : str ) -> lsp .TextEdit :
@@ -52,108 +75,127 @@ def make_text_edit(edit: Edit) -> lsp.TextEdit:
5275 )
5376
5477
55- def make_diagnostics (source : str ) -> Iterable [lsp .Diagnostic ]:
56- if not IMPORT_CONFIG :
57- return
58- result : Final = make_replacements (root = SgRoot (source , "python" ).root (), import_config = IMPORT_CONFIG )
59-
60- for replacement in result .replacements :
61- if replacement .operation_type == AddFinal :
62- fix_message = f"{ LSP_SERVER .name } : Add { IMPORT_CONFIG .value } "
63- diagnostic_message = f"Missing { IMPORT_CONFIG .value } "
64- else :
65- fix_message = f"{ LSP_SERVER .name } : Remove { IMPORT_CONFIG .value } "
66- diagnostic_message = f"Unexpected { IMPORT_CONFIG .value } "
67-
68- fix = Fix (message = fix_message , text_edits = [make_text_edit (edit ) for edit in replacement .edits ])
69- if result .import_text :
70- fix .text_edits .append (make_import_text_edit (result .import_text ))
71-
72- for applied_edit in replacement .edits :
73- node_range = applied_edit .node .range ()
74- yield lsp .Diagnostic (
75- range = lsp .Range (
76- start = lsp .Position (line = node_range .start .line , character = node_range .start .column ),
77- end = lsp .Position (line = node_range .end .line , character = node_range .end .column ),
78- ),
79- message = diagnostic_message ,
80- severity = lsp .DiagnosticSeverity .Warning ,
81- source = LSP_SERVER .name ,
82- data = cattrs .unstructure (DiagnosticData (fix = fix )),
83- )
84-
78+ @dataclass (init = False )
79+ class Service :
80+ ls_name : str
81+ ignored_paths : list [Path ]
82+ import_config : ImportConfig
8583
86- def make_fixall_text_edits (source : str ) -> Iterable [lsp .TextEdit ]:
87- if not IMPORT_CONFIG :
88- return
89- result : Final = make_replacements (root = SgRoot (source , "python" ).root (), import_config = IMPORT_CONFIG )
84+ def __init__ (self , ls_name : str , settings : Any ) -> None : # noqa: ANN401
85+ self .ls_name = ls_name
9086
91- for replacement in result . replacements :
92- for edit in replacement . edits :
93- yield make_text_edit ( edit )
87+ executable_path : Final = Path ( sys . executable )
88+ if executable_path . parent . name == "bin" :
89+ self . ignored_paths = [ executable_path . parent . parent ]
9490
95- if result .import_text :
96- yield make_import_text_edit (result .import_text )
91+ try :
92+ validated_settings : Final = cattrs .structure (settings , FullClientSettings )
93+ except cattrs .BaseValidationError :
94+ return
95+ self .import_config = IMPORT_STYLES_TO_IMPORT_CONFIGS [validated_settings ["auto-typing-final" ]["import-style" ]]
9796
97+ def make_diagnostics (self , source : str ) -> list [lsp .Diagnostic ]:
98+ replacement_result : Final = make_replacements (
99+ root = SgRoot (source , "python" ).root (), import_config = self .import_config
100+ )
101+ result : Final = []
102+
103+ for replacement in replacement_result .replacements :
104+ if replacement .operation_type == AddFinal :
105+ fix_message = f"{ self .ls_name } : Add { self .import_config .value } "
106+ diagnostic_message = f"Missing { self .import_config .value } "
107+ else :
108+ fix_message = f"{ self .ls_name } : Remove { self .import_config .value } "
109+ diagnostic_message = f"Unexpected { self .import_config .value } "
110+
111+ fix = Fix (message = fix_message , text_edits = [make_text_edit (edit ) for edit in replacement .edits ])
112+ if replacement_result .import_text :
113+ fix .text_edits .append (make_import_text_edit (replacement_result .import_text ))
114+
115+ for applied_edit in replacement .edits :
116+ node_range = applied_edit .node .range ()
117+ result .append (
118+ lsp .Diagnostic (
119+ range = lsp .Range (
120+ start = lsp .Position (line = node_range .start .line , character = node_range .start .column ),
121+ end = lsp .Position (line = node_range .end .line , character = node_range .end .column ),
122+ ),
123+ message = diagnostic_message ,
124+ severity = lsp .DiagnosticSeverity .Warning ,
125+ source = self .ls_name ,
126+ data = cattrs .unstructure (fix ),
127+ )
128+ )
129+ return result
98130
99- def make_workspace_edit (text_document : TextDocument , text_edits : list [lsp .TextEdit ]) -> lsp .WorkspaceEdit :
100- return lsp .WorkspaceEdit (
101- document_changes = [
102- lsp .TextDocumentEdit (
103- text_document = lsp .OptionalVersionedTextDocumentIdentifier (
104- uri = text_document .uri , version = text_document .version
105- ),
106- edits = text_edits , # type: ignore[arg-type]
107- )
131+ def make_fix_all_text_edits (self , source : str ) -> list [lsp .TextEdit | lsp .AnnotatedTextEdit ]:
132+ replacement_result : Final = make_replacements (
133+ root = SgRoot (source , "python" ).root (), import_config = self .import_config
134+ )
135+ result : Final [list [lsp .TextEdit | lsp .AnnotatedTextEdit ]] = [
136+ make_text_edit (edit ) for replacement in replacement_result .replacements for edit in replacement .edits
108137 ]
109- )
138+ if replacement_result .import_text :
139+ result .append (make_import_text_edit (replacement_result .import_text ))
140+ return result
141+
142+ def path_is_ignored (self , uri : str ) -> bool :
143+ if path := path_from_uri (uri ):
144+ return any (path .is_relative_to (ignored_path ) for ignored_path in self .ignored_paths )
145+ return False
146+
147+
148+ class CustomLanguageServer (LanguageServer ):
149+ service : Service | None = None
150+
151+
152+ LSP_SERVER = CustomLanguageServer (name = "auto-typing-final" , version = version ("auto-typing-final" ), max_workers = 5 )
110153
111154
112155@LSP_SERVER .feature (lsp .INITIALIZE )
113- async def initialize (_ : lsp .InitializeParams ) -> None : ...
156+ def initialize (_ : lsp .InitializeParams ) -> None : ...
114157
115158
116159@LSP_SERVER .feature (lsp .INITIALIZED )
117- async def initialized (_ : lsp .InitializedParams ) -> None :
118- await LSP_SERVER .register_capability_async (
160+ async def initialized (ls : CustomLanguageServer , _ : lsp .InitializedParams ) -> None :
161+ await ls .register_capability_async (
119162 params = lsp .RegistrationParams (
120163 registrations = [
121164 lsp .Registration (
122165 id = str (uuid .uuid4 ()),
123166 method = lsp .WORKSPACE_DID_CHANGE_CONFIGURATION ,
124- register_options = lsp .DidChangeConfigurationRegistrationOptions (section = LSP_SERVER .name ),
125- )
167+ register_options = lsp .DidChangeConfigurationRegistrationOptions (section = ls .name ),
168+ ),
126169 ]
127170 )
128171 )
129172
130173
131174@LSP_SERVER .feature (lsp .WORKSPACE_DID_CHANGE_CONFIGURATION )
132- def workspace_did_change_configuration (params : lsp .DidChangeConfigurationParams ) -> None :
133- if (
134- isinstance (params .settings , dict )
135- and (settings := params .settings .get (LSP_SERVER .name ))
136- and isinstance (settings , dict )
137- and (import_style := settings .get ("import-style" ))
138- and (import_style in get_args (ImportStyle ))
139- ):
140- global IMPORT_CONFIG # noqa: PLW0603
141- IMPORT_CONFIG = IMPORT_STYLES_TO_IMPORT_CONFIGS [import_style ]
175+ def workspace_did_change_configuration (ls : CustomLanguageServer , params : lsp .DidChangeConfigurationParams ) -> None :
176+ ls .service = Service (ls_name = ls .name , settings = params .settings )
177+ for text_document in ls .workspace .text_documents .values ():
178+ ls .publish_diagnostics (text_document .uri , diagnostics = ls .service .make_diagnostics (text_document .source ))
142179
143180
144181@LSP_SERVER .feature (lsp .TEXT_DOCUMENT_DID_OPEN )
145182@LSP_SERVER .feature (lsp .TEXT_DOCUMENT_DID_SAVE )
146183@LSP_SERVER .feature (lsp .TEXT_DOCUMENT_DID_CHANGE )
147184def did_open_did_save_did_change (
185+ ls : CustomLanguageServer ,
148186 params : lsp .DidOpenTextDocumentParams | lsp .DidSaveTextDocumentParams | lsp .DidChangeTextDocumentParams ,
149187) -> None :
150- text_document : Final = LSP_SERVER .workspace .get_text_document (params .text_document .uri )
151- LSP_SERVER .publish_diagnostics (text_document .uri , diagnostics = list (make_diagnostics (text_document .source )))
188+ if not ls .service :
189+ return
190+ if ls .service .path_is_ignored (params .text_document .uri ):
191+ return
192+ text_document : Final = ls .workspace .get_text_document (params .text_document .uri )
193+ ls .publish_diagnostics (text_document .uri , diagnostics = ls .service .make_diagnostics (text_document .source ))
152194
153195
154196@LSP_SERVER .feature (lsp .TEXT_DOCUMENT_DID_CLOSE )
155- def did_close (params : lsp .DidCloseTextDocumentParams ) -> None :
156- LSP_SERVER .publish_diagnostics (params .text_document .uri , [])
197+ def did_close (ls : CustomLanguageServer , params : lsp .DidCloseTextDocumentParams ) -> None :
198+ ls .publish_diagnostics (params .text_document .uri , [])
157199
158200
159201@LSP_SERVER .feature (
@@ -162,31 +204,40 @@ def did_close(params: lsp.DidCloseTextDocumentParams) -> None:
162204 code_action_kinds = [lsp .CodeActionKind .QuickFix , lsp .CodeActionKind .SourceFixAll ], resolve_provider = True
163205 ),
164206)
165- def code_action (params : lsp .CodeActionParams ) -> list [lsp .CodeAction ] | None :
207+ def code_action (ls : CustomLanguageServer , params : lsp .CodeActionParams ) -> list [lsp .CodeAction ] | None :
166208 requested_kinds : Final = params .context .only or {lsp .CodeActionKind .QuickFix , lsp .CodeActionKind .SourceFixAll }
167209 actions : Final [list [lsp .CodeAction ]] = []
168210
169211 if lsp .CodeActionKind .QuickFix in requested_kinds :
170- text_document : Final = LSP_SERVER .workspace .get_text_document (params .text_document .uri )
212+ text_document : Final = ls .workspace .get_text_document (params .text_document .uri )
171213 our_diagnostics : Final = [
172- diagnostic for diagnostic in params .context .diagnostics if diagnostic .source == LSP_SERVER .name
214+ diagnostic for diagnostic in params .context .diagnostics if diagnostic .source == ls .name
173215 ]
174216
175217 for diagnostic in our_diagnostics :
176- data = cattrs .structure (diagnostic .data , DiagnosticData )
218+ fix = cattrs .structure (diagnostic .data , Fix )
177219 actions .append (
178220 lsp .CodeAction (
179- title = data . fix .message ,
221+ title = fix .message ,
180222 kind = lsp .CodeActionKind .QuickFix ,
181- edit = make_workspace_edit (text_document = text_document , text_edits = data .fix .text_edits ),
223+ edit = lsp .WorkspaceEdit (
224+ document_changes = [
225+ lsp .TextDocumentEdit (
226+ text_document = lsp .OptionalVersionedTextDocumentIdentifier (
227+ uri = text_document .uri , version = text_document .version
228+ ),
229+ edits = fix .text_edits ,
230+ )
231+ ]
232+ ),
182233 diagnostics = [diagnostic ],
183234 )
184235 )
185236
186237 if our_diagnostics :
187238 actions .append (
188239 lsp .CodeAction (
189- title = f"{ LSP_SERVER .name } : Fix All" ,
240+ title = f"{ ls .name } : Fix All" ,
190241 kind = lsp .CodeActionKind .QuickFix ,
191242 data = params .text_document .uri ,
192243 edit = None ,
@@ -197,10 +248,9 @@ def code_action(params: lsp.CodeActionParams) -> list[lsp.CodeAction] | None:
197248 if lsp .CodeActionKind .SourceFixAll in requested_kinds :
198249 actions .append (
199250 lsp .CodeAction (
200- title = f"{ LSP_SERVER .name } : Fix All" ,
251+ title = f"{ ls .name } : Fix All" ,
201252 kind = lsp .CodeActionKind .SourceFixAll ,
202253 data = params .text_document .uri ,
203- edit = None ,
204254 diagnostics = params .context .diagnostics ,
205255 ),
206256 )
@@ -209,18 +259,19 @@ def code_action(params: lsp.CodeActionParams) -> list[lsp.CodeAction] | None:
209259
210260
211261@LSP_SERVER .feature (lsp .CODE_ACTION_RESOLVE )
212- def resolve_code_action (params : lsp .CodeAction ) -> lsp .CodeAction :
213- text_document : Final = LSP_SERVER .workspace .get_text_document (cast (str , params .data ))
214- params .edit = lsp .WorkspaceEdit (
215- document_changes = [
216- lsp .TextDocumentEdit (
217- text_document = lsp .OptionalVersionedTextDocumentIdentifier (
218- uri = text_document .uri , version = text_document .version
219- ),
220- edits = list (make_fixall_text_edits (text_document .source )),
221- )
222- ],
223- )
262+ def resolve_code_action (ls : CustomLanguageServer , params : lsp .CodeAction ) -> lsp .CodeAction :
263+ if ls .service :
264+ text_document : Final = ls .workspace .get_text_document (cast (str , params .data ))
265+ params .edit = lsp .WorkspaceEdit (
266+ document_changes = [
267+ lsp .TextDocumentEdit (
268+ text_document = lsp .OptionalVersionedTextDocumentIdentifier (
269+ uri = text_document .uri , version = text_document .version
270+ ),
271+ edits = ls .service .make_fix_all_text_edits (text_document .source ),
272+ )
273+ ],
274+ )
224275 return params
225276
226277
0 commit comments