|
5 | 5 | import time
|
6 | 6 | import logging
|
7 | 7 | from pathlib import Path
|
8 |
| -from typing import List, Dict, Any, Optional, TYPE_CHECKING |
| 8 | +from typing import List, Dict, Any, Optional, TYPE_CHECKING, Literal |
9 | 9 |
|
10 | 10 | from parted import ( # type: ignore
|
11 | 11 | Disk, Geometry, FileSystem,
|
@@ -272,76 +272,54 @@ def format_enc(
|
272 | 272 | info(f'luks2 locking device: {dev_path}')
|
273 | 273 | luks_handler.lock()
|
274 | 274 |
|
275 |
| - def lvm_vol_info(self, lv_name: str) -> Optional[LvmVolumeInfo]: |
276 |
| - cmd = 'lvs --reportformat json ' \ |
277 |
| - '--unit B ' \ |
278 |
| - f'-S lv_name={lv_name}' |
279 |
| - |
| 275 | + def _lvm_info(self, cmd: str, name: str, info_type: Literal['lv', 'vg']) -> Optional[Any]: |
280 | 276 | raw_info = SysCommand(cmd).decode().split('\n')
|
281 | 277 |
|
282 | 278 | # for whatever reason the output sometimes contains
|
283 | 279 | # "File descriptor X leaked leaked on vgs invocation
|
284 |
| - data = '\n'.join([l for l in raw_info if 'File descriptor' not in l]) |
| 280 | + data = '\n'.join([raw for raw in raw_info if 'File descriptor' not in raw]) |
285 | 281 |
|
286 |
| - debug(f'LVM info: {data}') |
| 282 | + debug(f'LVM raw: {data}') |
287 | 283 |
|
288 | 284 | reports = json.loads(data)
|
289 | 285 |
|
290 |
| - lv = None |
| 286 | + type_name = f'{info_type}_name' |
291 | 287 |
|
292 | 288 | for report in reports['report']:
|
293 |
| - lvs = report.get('lv', []) |
294 |
| - for lv_data in lvs: |
295 |
| - if lv_data['lv_name'] == lv_name: |
296 |
| - lv = lv_data |
297 |
| - |
298 |
| - if lv is None: |
299 |
| - return None |
| 289 | + entries = report.get(info_type, []) |
| 290 | + for entry in entries: |
| 291 | + if entry[type_name] == name: |
| 292 | + lv_size = int(entry[f'{info_type}_size'][:-1]) |
| 293 | + size = Size(lv_size, Unit.B, SectorSize.default()) |
| 294 | + |
| 295 | + match info_type: |
| 296 | + case 'lv': |
| 297 | + return LvmVolumeInfo( |
| 298 | + lv_name=entry['lv_name'], |
| 299 | + vg_name=entry['vg_name'], |
| 300 | + lv_size=size |
| 301 | + ) |
| 302 | + case 'vg': |
| 303 | + return LvmGroupInfo( |
| 304 | + vg_size=size, |
| 305 | + vg_uuid=entry['vg_uuid'] |
| 306 | + ) |
| 307 | + return None |
300 | 308 |
|
301 |
| - lv_size = int(lv['lv_size'][:-1]) |
302 |
| - size = Size(lv_size, Unit.B, SectorSize.default()) |
| 309 | + def lvm_vol_info(self, lv_name: str) -> Optional[LvmVolumeInfo]: |
| 310 | + cmd = 'lvs --reportformat json ' \ |
| 311 | + '--unit B ' \ |
| 312 | + f'-S lv_name={lv_name}' |
303 | 313 |
|
304 |
| - return LvmVolumeInfo( |
305 |
| - lv_name=lv['lv_name'], |
306 |
| - vg_name=lv['vg_name'], |
307 |
| - lv_size=size |
308 |
| - ) |
| 314 | + return self._lvm_info(cmd, lv_name, 'lv') |
309 | 315 |
|
310 | 316 | def lvm_group_info(self, vg_name: str) -> Optional[LvmGroupInfo]:
|
311 | 317 | cmd = 'vgs --reportformat json ' \
|
312 |
| - '--unit B ' \ |
313 |
| - '-o vg_name,vg_uuid,vg_size ' \ |
314 |
| - f'-S vg_name={vg_name}' |
| 318 | + '--unit B ' \ |
| 319 | + '-o vg_name,vg_uuid,vg_size ' \ |
| 320 | + f'-S vg_name={vg_name}' |
315 | 321 |
|
316 |
| - debug(f'LVM group info: {cmd}') |
317 |
| - raw_info = SysCommand(cmd).decode().strip().split('\n') |
318 |
| - |
319 |
| - # for whatever reason the output sometimes contains |
320 |
| - # "File descriptor X leaked leaked on vgs invocation |
321 |
| - data = '\n'.join([l for l in raw_info if 'File descriptor' not in l]) |
322 |
| - |
323 |
| - debug(f'VG info: {data}') |
324 |
| - |
325 |
| - reports = json.loads(data) |
326 |
| - |
327 |
| - vg = None |
328 |
| - |
329 |
| - for report in reports['report']: |
330 |
| - vgs = report.get('vg', []) |
331 |
| - for vg_data in vgs: |
332 |
| - if vg_data['vg_name'] == vg_name: |
333 |
| - vg = vg_data |
334 |
| - |
335 |
| - if vg is None: |
336 |
| - return None |
337 |
| - |
338 |
| - vg_size = int(vg['vg_size'][:-1]) |
339 |
| - size = Size(vg_size, Unit.B, SectorSize.default()) |
340 |
| - |
341 |
| - return LvmGroupInfo( |
342 |
| - vg_size=size, |
343 |
| - vg_uuid=vg['vg_uuid'] |
344 |
| - ) |
| 322 | + return self._lvm_info(cmd, vg_name, 'vg') |
345 | 323 |
|
346 | 324 | def lvm_vol_reduce(self, vol_path: Path, amount: Size):
|
347 | 325 | val = amount.format_size(Unit.B, include_unit=False)
|
@@ -383,7 +361,7 @@ def lvm_vol_create(self, vg_name: str, volume: LvmVolume, offset: Optional[Size]
|
383 | 361 | worker.poll()
|
384 | 362 | worker.write(b'y\n', line_ending=False)
|
385 | 363 |
|
386 |
| - volume.dev_path = f'/dev/{vg_name}/{volume.name}' |
| 364 | + volume.dev_path = Path(f'/dev/{vg_name}/{volume.name}') |
387 | 365 |
|
388 | 366 | def _perform_partitioning(
|
389 | 367 | self,
|
|
0 commit comments