Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added src/V0.2.3/ULP/.DS_Store
Binary file not shown.
Binary file added src/V0.2.3/ULP/RPI_PICO-20240222-v1.22.2.uf2
Binary file not shown.
Binary file added src/V0.2.3/ULP/flash_nuke.uf2
Binary file not shown.
63 changes: 59 additions & 4 deletions src/V0.2.3/battery.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
"""Battery Management System for TI's BQ27441-G1A fuel gauge IC."""

#7 21

# pylint: disable=import-error,invalid-name
import struct
import time
import machine

verbose = False # [Set true when debugging]


class BQ27441:
"""BQ27441 Battery Management System class for TI's BQ27441-G1A fuel gauge IC."""
Expand Down Expand Up @@ -54,18 +58,33 @@ def _extended_block_write(self, subclass_id, offset, payload):
self._wr(0x61, b"\x00") # BlockDataControl = 0
self._wr(0x3E, bytes([subclass_id])) # DataClass
self._wr(0x3F, bytes([offset // 32])) # Block offset 0-7
time.sleep_ms(10) #This delay is needed for the BQ27441 to process the command

# 2 read current 32-byte buffer
buf = bytearray(self._rd(0x40, 32))
# 3 modify
buf[offset % 32 : offset % 32 + len(payload)] = payload
start = offset % 32
buf[offset % 32: offset % 32 + len(payload)] = payload
# 4 write back whole buffer (only touched bytes actually needed)
self._wr(0x40, buf)

# 5 checksum
csum = (0xFF - (sum(buf) & 0xFF)) & 0xFF
self._wr(0x60, bytes([csum]))

# # 6: **Read back the bytes that were written to verify success**
# readback = bytearray(self._rd(0x40 + start, len(payload)))
# if readback != bytearray(payload):
# if verbose:
# print("[EXT-WRITE][FAIL] subclass=0x{0:02X} offset=0x{1:02X} "
# "wrote={2} read={3} 可能原因: 字节序错误 / 未进入CFGUPDATE / 校验和错误 / I2C失败"
# .format(subclass_id, offset, payload.hex(), readback.hex()))
# else:
# if verbose:
# print("[EXT-WRITE][OK] subclass=0x{0:02X} offset=0x{1:02X} data={2}"
# .format(subclass_id, offset, readback.hex()))
# return bytes(readback)

# ---------- public one-shot initialiser ----------
def initialise(
self,
Expand All @@ -87,10 +106,10 @@ def initialise(
if CALIBRATION:
# 1 Design Capacity & Terminate Voltage (State 0x52, block 0)
self._extended_block_write(
0x52, 0x0A, struct.pack("<H", design_capacity_mAh)
0x52, 0x0A, struct.pack(">H", design_capacity_mAh)
)
self._extended_block_write(
0x52, 0x10, struct.pack("<H", terminate_voltage_mV)
0x52, 0x10, struct.pack(">H", terminate_voltage_mV)
)

# 2 clear OpConfig BIE (Registers 0x40, byte 0x40)
Expand All @@ -111,7 +130,7 @@ def initialise(

def remain_capacity(self):
"""Get the remaining capacity in mAh."""
return self._rd_word(0x1C)
return self._rd_word(0x0C)

def voltage_V(self):
"""Get the battery voltage in volts."""
Expand All @@ -126,6 +145,42 @@ def avg_current_mA(self):
raw = self._rd_word(0x10)
return raw - 0x10000 if raw & 0x8000 else raw

def i2c_get_Control(self):
"""Get the control register value."""
return self._rd_word(0x00)

def i2c_get_flags(self):
"""Get the flags register value."""
return self._rd_word(0x06)

def i2c_get_stateofcharge(self):
"""Get the state of charge register value in %."""
return self._rd_word(0x1C)

def i2c_get_NominalAvailableCapacity(self):
"""Get the nominal available capacity register value in mAh."""
return self._rd_word(0x08)

def i2c_get_FullAvailableCapacity(self):
"""Get the full available capacity register value in mAh."""
return self._rd_word(0x0A)

def i2c_get_FullChargeCapacity(self):
"""Get the full charge capacity register value in mAh."""
return self._rd_word(0x0E)

def i2c_get_StandbyCurrent(self):
"""Get the standby current register value in mA."""
return self._rd_word(0x12)

def i2c_get_StateOfHealth(self):
"""Get the state of health register value."""
return self._rd_word(0x20)

def i2c_get_DesignCapacity(self):
"""Get the design capacity register value in mAh."""
return self._rd_word(0x3c)

# Add additional methods here for other registers you need to interface with


Expand Down
142 changes: 142 additions & 0 deletions src/V0.2.3/upload_win.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
简化版 RP2040 烧录脚本 for Window
Rongbo Hu 7 22
1.运行本脚本(注意:记得在本文件夹中放一个ULP文件夹)
2.按住BOOT键插入RP2040
3.等待脚本自动烧录
"""

import os
import time
import shutil
import subprocess

UPLOAD_FILES = [
"bin/loading1.bin",
"bin/loading2.bin",
"eink_driver_sam.py",
"pamir_uart_protocols.py",
"neopixel_controller.py",
"power_manager.py",
"battery.py",
"debug_handler.py",
"uart_handler.py",
"threaded_task_manager.py",
"main.py"
]

def find_rp2040_drive():
"""查找包含 INFO_UF2.txt 的盘符"""
if os.name == 'nt':
drives = [f"{chr(i)}:\\" for i in range(65, 91) if os.path.exists(f"{chr(i)}:\\")]
else:
drives = [d.path for d in os.scandir('/mnt') if d.is_dir()]
for drive in drives:
if os.path.exists(os.path.join(drive, "INFO_UF2.txt")):
return drive
return None

def wait_for_rp2040(message, timeout=30):
"""等待RP2040插入"""
print(message)
start_time = time.time()
while time.time() - start_time < timeout:
drive = find_rp2040_drive()
if drive:
print(f"[OK] RP2040已连接:{drive}")
return drive
time.sleep(1)
print("[ERROR] 超时未检测到RP2040")
return None

def copy_uf2(drive, uf2_path):
"""复制UF2文件到RP2040"""
try:
shutil.copy(uf2_path, os.path.join(drive, os.path.basename(uf2_path)))
print(f"[OK] 成功复制 {os.path.basename(uf2_path)} 到 {drive}")
return True
except Exception as e:
print(f"[ERROR] UF2复制失败: {e}")
return False

def wait_for_mpremote(port="COM162", timeout=60):
"""验证mpremote是否能连接到指定端口"""
print(f"尝试通过 mpremote 连接到 {port} ...")
start_time = time.time()
while time.time() - start_time < timeout:
try:
result = subprocess.run(["mpremote", "connect", port, "exec", "print('connected')"],
capture_output=True, text=True, timeout=5)
if "connected" in result.stdout:
print(f"[OK] 成功连接到 {port}")
return True
except Exception as e:
print(f"[WARNING] 连接失败: {e}")
time.sleep(1)
print(f"[ERROR] 无法连接到 {port}")
return False


PORT = "COM162" # 你从 Thonny 得到的端口号

def upload_files(port="COM162"):
"""通过mpremote上传文件到指定端口"""
for f in UPLOAD_FILES:
if not os.path.exists(f):
print(f"[ERROR] 文件不存在: {f}")
return False
cmd = ["mpremote", "connect", port, "fs", "cp", f, f":/{os.path.basename(f)}"]
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=15)
if result.returncode == 0:
print(f"[OK] 上传成功: {os.path.basename(f)}")
else:
print(f"[ERROR] 上传失败: {result.stderr.strip()}")
return False
except Exception as e:
print(f"[ERROR] 上传出错: {e}")
return False
return True


def main():
print("========== RP2040 烧录工具 ==========")

# 1. 等待第一次插入(BOOT模式)
drive = wait_for_rp2040("请按住BOOT插入RP2040...", timeout=30)
if not drive:
return

# 2. 烧录flash_nuke.uf2
if not copy_uf2(drive, "ULP/flash_nuke.uf2"):
return

print("等待RP2040重新挂载...")
time.sleep(5)

# 3. 等待再次出现盘符
drive = wait_for_rp2040("请等待RP2040重新连接...", timeout=30)
if not drive:
return

# 4. 烧录RPI_PICO-20240222-v1.22.2.uf2
if not copy_uf2(drive, "ULP/RPI_PICO-20240222-v1.22.2.uf2"):
return

print("等待RP2040进入Micropython模式...")
time.sleep(8)

# 5. 等待mpremote设备连接
if not wait_for_mpremote(PORT):
return

# 6. 上传文件
if not upload_files(PORT):
return

print("========== 烧录完成 ==========")

if __name__ == "__main__":
main()