Porter 是一个数据清洗辅助工具,能够将 MySQL、MongoDB 和文本文件(CSV/TSV/JSON)等数据源数据全量扫描推入到 Redis 队列,支持断点续传、自定义等待延迟和 Batch Size。
$ git clone https://github.com/zhiweio/porter.git && cd porter
$ python3 setup.py install --user
读取方式
reader: [mysql|mongo|json|file|csv]
redis 配置
redis:
host: 主机地址
port: 端口
db: 库
password:
key: 任务名称
queue_key_prefix: 数据队列名前称缀,留空则默认为`porter.queue.`
cache_key_prefix: 缓存名前称缀,留空则默认 `porter.cache.`
MySQL 配置
mysql:
host: 主机地址
port: 端口
db: 数据库
table: 表名
user: 用户名
password: 密码
pk: 表的主键,留空默认 `id`
column: 需要上传的字段名,留空默认选取全部字段
append_db_info: 可选,是否同时上传库名表名信息,true OR false,默认 false
appendices: 可选,指定附加字段数据上传,详细配置见模板文件
MongoDB 配置
mongo:
host: 主机地址
port: 端口
db: 库
collection: 集合
user: 用户名
password: 密码
column: 需要上传的字段名,留空默认选取全部字段
appendices: 同上
文本文件配置
file:
path: 文件路径
delimiter: 文件分隔符
header: true: 使用 header 拼接为 json 格式数据; false: 不处理上传整条数据
appendices: 同上
JSON 文件配置
json:
path: 文件路径
appendices: 同上
MySQL
---
reader: mysql
redis:
host: "127.0.0.1"
port: 6379
db: 56
password: 123456
key: task_read_from_mysql
queue_key_prefix: porter.queue.
cache_key_prefix: porter.cache.
mysql:
host: "127.0.0.1"
port: 3306
db: db_economic
table: t_macroindex
user: test
password: 123456
pk: id
column:
# 指定附加字段一同上传 redis
# appendices 内容格式为 ^\w+:\w+$,会被解析为 key-value,可以为多个
# append_db_info=true 表示同时上传数据库名和表名
MongoDB
---
reader: mongo
...
mongo:
host: "127.0.0.1"
port: 7055
db: iEnterprise
collection: TLDetailData
user: root
password: "123456"
column:
JSON
---
reader: json
...
json:
path: /pathto/data/test.json
# appendices:
# - field_name:"hello world"
CSV
---
reader: file
...
file:
path: /pathto/data/test.csv
delimiter: ","
header: True
# appendices:
# - field_name:"hello world"
普通文件
上传原始数据,不做 json 拼接处理
---
reader: file
...
file:
path: /pathto/data/test.csv
delimiter:
header: false
# appendices:
# - "hello world"
Usage: porter [OPTIONS] [sync|monitor|clear|new]
A command line tool for extracting data and load into Redis.
Options:
-V, --version Show the version and exit.
-f, --config-file PATH Task config file
-l, --limit INTEGER Limit of each reading from data source
[default: 1000]
--limit-scale INTEGER The maximum lengeth of redis queue is (limit
* scale) [default: 3]
--blocking / -B, --no-blocking Enable blocking mode [default: True]
-t, --time-sleep INTEGER Time to wait when up to the maximum limit of
queue [default: 10]
-C, --clean-type [status|queue|all]
Type of redis cache
-T, --task-type [mysql|mongo|json|file|csv]
Type of task template
-o, --output-task-file PATH Save task template into file
-v, --verbose Print debug information
--debug-file PATH File to be used as a stream for DEBUG
logging
-h, --help Show this message and exit.
- sync 同步数据
- monitor 监控任务状态
- clear 清空任务状态
- new 创建新的任务模板
每次读取 100 条数据并上传,超过最大限制数量则停止读取
$ porter sync -f task_template.yaml -l 100
读取全部数据并上传
$ porter sync -f task_template.yaml --no-blocking
打印详细日志
$ porter sync -f task_template.yaml -l 100 -v --debug-file /tmp/porter.log
查看任务进度
$ porter monitor -f task_template.yaml
{
"db": "db_economic",
"table": "t_macroindex",
"count": 61,
"page": 6,
"record": {xxx}
}
清空缓存数据
- all 所有
- status 任务状态
- queue 队列数据
$ porter clear -f task_template.yaml --clean-type all
创建一个任务配置模板
[mysql|mongo|json|file|csv]
$ porter new -T mysql
---
reader: mysql
redis:
host: "127.0.0.1"
port: 6379
db:
password:
key:
queue_key_prefix: porter.queue.
cache_key_prefix: porter.cache.
mysql:
host: "127.0.0.1"
port: 3306
db:
table:
user:
password:
pk:
column:
# 指定附加字段一同上传 redis
# appendices 内容格式为 ^\w+:\w+$,会被解析为 key-value,可以为多个
# append_db_info=true 表示同时上传数据库名和表名
创建任务模板并写入到指定文件
$ porter new -T mysql -o /tmp/mysql.yaml
每个数据同步任务都必须是唯一指定的 cache_key_prefix
+ key
,但是可以多个任务往同一个队列里推数据,即多个任务可以用相同的 queue_key_prefix
+ key