-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlog_downloader.py
142 lines (105 loc) · 4.08 KB
/
log_downloader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
import os
import re
import subprocess
import time
import shutil
from pathlib import Path
from loguru import logger
from git import Repo
import wmi
DRIVERSTATION_LOGS_DIRECTORY = Path("C:/Users/Public/Documents/FRC/Log Files")
DRIVERSTATION_FILE_EXTENSION = ".dsevents"
LOG_FILE_EXTENSION = "wpilog"
LOGS_DIR = Path(os.getenv("LOGS_DIR"))
DOWNLOAD_COMPETITION_LOGS = bool(os.getenv("DOWNLOAD_COMPETITION_LOGS"))
def open_latest_log_in_advantage_scope():
try:
log_files = [(f, f.stat().st_mtime) for f in LOGS_DIR.glob("*.wpilog")]
if not log_files:
logger.warning("No log files found")
return
latest_log = max(log_files, key=lambda x: x[1])[0]
logger.info(f"Opening {latest_log.name} with AdvantageScope")
subprocess.Popen(("start", str(latest_log)))
except Exception as error:
logger.error(f"Failed to open log file: {error}")
def commit_and_push_log(repo, log_file):
try:
repo.index.add(Path(os.getcwd()) / log_file.name)
repo.index.commit(f"Add log file: {log_file.name}")
logger.info(f"Committed {log_file.name} to repository")
try:
repo.remote().push()
logger.info(f"Successfully pushed commit for {log_file.name}")
except Exception as error:
logger.error(f"Error pushing commit for {log_file.name}: {error}")
except Exception as error:
logger.exception(f"Error committing {log_file.name}: {error}")
def get_file_signature(file_path):
return file_path.name, file_path.stat().st_size
def is_file_downloaded(log_file):
for existing_file in LOGS_DIR.glob(f"*.{LOG_FILE_EXTENSION}"):
if get_file_signature(existing_file) == get_file_signature(log_file):
return True
return False
def get_usb_drives():
drives = set()
wmi_connection = wmi.WMI()
for drive in wmi_connection.Win32_LogicalDisk():
if drive.DriveType == 2: # removable
drives.add(Path(f"{drive.DeviceID}/"))
return drives
def copy_file(log_file):
try:
shutil.copy2(log_file, Path.cwd())
except OSError as error:
logger.error(f"Failed to copy {log_file.name}: {error}")
return
logger.info(f"Copied {log_file.name} ({log_file.stat().st_size} bytes)")
def download_log_file(log_file, repo):
copy_file(log_file)
commit_and_push_log(repo, log_file)
ds_log = (DRIVERSTATION_LOGS_DIRECTORY / log_file.stem).with_suffix(DRIVERSTATION_FILE_EXTENSION)
copy_file(ds_log)
def is_competition_log(log_file) -> bool:
return re.search(r"_[pqe]", log_file.name) is not None
def download_logs(drive_path, repo):
try:
for log_file in drive_path.glob(f"**/*.{LOG_FILE_EXTENSION}"):
if DOWNLOAD_COMPETITION_LOGS and not is_competition_log(log_file):
continue
if is_file_downloaded(log_file):
logger.info(f"Skipped {log_file.name} - already exists")
continue
download_log_file(log_file, repo)
except Exception as error:
logger.error(f"Error accessing {drive_path}: {error}")
def monitor_drives():
logger.info(f"Monitoring for USB drives. Saving logs to {LOGS_DIR}")
previous_drives = set()
try:
repo = Repo()
logger.info("Git repository initialized")
except Exception as error:
logger.exception(f"Error initializing git repository: {error}")
return
while True:
current_drives = get_usb_drives()
new_drives = current_drives - previous_drives
for drive in new_drives:
logger.info(f"New drive detected: {drive}")
download_logs(drive, repo)
open_latest_log_in_advantage_scope()
previous_drives = current_drives
time.sleep(1)
if __name__ == "__main__":
LOGS_DIR.mkdir(exist_ok=True)
os.chdir(LOGS_DIR)
logger.add("logfile.log",
rotation="100 MB",
retention="1 week",
compression="zip",
format="{time} | {level} | {message}",
backtrace=True,
diagnose=True)
monitor_drives()