|
26 | 26 | import yaml
|
27 | 27 |
|
28 | 28 | import testflinger_device_connectors
|
| 29 | +from testflinger_device_connectors.fw_devices.firmware_update import ( |
| 30 | + detect_device, |
| 31 | +) |
29 | 32 |
|
30 | 33 |
|
31 | 34 | class ProvisioningError(Exception):
|
@@ -115,6 +118,59 @@ def stop(self):
|
115 | 118 |
|
116 | 119 |
|
117 | 120 | class DefaultDevice:
|
| 121 | + def firmware_update(self, args): |
| 122 | + """Default method for processing firmware update commands""" |
| 123 | + with open(args.config) as configfile: |
| 124 | + config = yaml.safe_load(configfile) |
| 125 | + testflinger_device_connectors.configure_logging(config) |
| 126 | + testflinger_device_connectors.logmsg( |
| 127 | + logging.INFO, "BEGIN firmware_update" |
| 128 | + ) |
| 129 | + |
| 130 | + test_opportunity = testflinger_device_connectors.get_test_opportunity( |
| 131 | + args.job_data |
| 132 | + ) |
| 133 | + fw_config = test_opportunity.get("firmware_update_data") |
| 134 | + ignore_failure = fw_config.get("ignore_failure", False) |
| 135 | + version = fw_config.get("version") |
| 136 | + device_ip = config["device_ip"] |
| 137 | + target_device_username = "ubuntu" |
| 138 | + exitcode = 0 |
| 139 | + supported_version = ["latest"] |
| 140 | + |
| 141 | + if version not in supported_version: |
| 142 | + testflinger_device_connectors.logmsg( |
| 143 | + logging.INFO, |
| 144 | + "Fail to provide version in firmware_update_data. " |
| 145 | + + "Current supported version: latest", |
| 146 | + ) |
| 147 | + exitcode = 1 |
| 148 | + else: |
| 149 | + try: |
| 150 | + target_device = detect_device( |
| 151 | + device_ip, target_device_username |
| 152 | + ) |
| 153 | + target_device.get_fw_info() |
| 154 | + if version == "latest": |
| 155 | + reboot_required = target_device.upgrade() |
| 156 | + if reboot_required: |
| 157 | + target_device.reboot() |
| 158 | + update_succeeded = target_device.check_results() |
| 159 | + if not update_succeeded: |
| 160 | + exitcode = 1 |
| 161 | + except Exception as e: |
| 162 | + testflinger_device_connectors.logmsg( |
| 163 | + logging.ERROR, f"Firmware Update failed: {str(e)}" |
| 164 | + ) |
| 165 | + exitcode = 1 |
| 166 | + finally: |
| 167 | + testflinger_device_connectors.logmsg( |
| 168 | + logging.INFO, "END firmware_update" |
| 169 | + ) |
| 170 | + if ignore_failure: |
| 171 | + exitcode = 0 |
| 172 | + return exitcode |
| 173 | + |
118 | 174 | def runtest(self, args):
|
119 | 175 | """Default method for processing test commands"""
|
120 | 176 | with open(args.config) as configfile:
|
|
0 commit comments