Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions conf/ali_sync.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ CHECK_EXIST: true
# 设置当文件已存在时,是否强制覆盖文件,如果否,则当文件大小或文件类型不一致时才会覆盖
FORCE_OVERRIDE: false

# 设置同步的并发线程数
THREADS_NUM_FOR_SYNC: 3
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

没看到在哪使用了这个配置


# 限制每个线程同步数据的速率为1Mb/s=1048576B/s,单位:B/s
SYNC_SPEED: 1048576

# 设置多少个线程同时添加离线任务
THREADS_NUM_FOR_ADD_OFFLINE_TASK: 20

Expand Down
57 changes: 53 additions & 4 deletions src/ali_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
from botocore.client import Config

from pykit import jobq
from pykit import http
from pykit import awssign

report_state_lock = threading.RLock()

Expand Down Expand Up @@ -341,6 +343,52 @@ def compare_file(result, th_status):
return True


def upload_file(resp_object, result):
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

在哪里调用了upload_file 这个函数

uri = '/%s/%s' % (cnf['BAISHAN_BUCKET_NAME'], result['s3_key'])
verb = 'PUT'

endpoint = cnf['BAISHAN_ENDPOINT']
if 'http://' in endpoint:
Host = endpoint[len('http://'):]
else:
Host = endpoint[len('https://'):]

headers = {
'Content-Length': resp_object.content_length,
'Host': Host,
}

request = {
'verb': verb,
'uri': uri,
'headers': headers,
}

sign = awssign.Signer(cnf['BAISHAN_ACCESS_KEEY'], cnf['BAISHAN_SECRET_KEY'])
sign.add_auth(request, query_auth=False, expires=120)

cli = http.Client(Host, port=80)
cli.send_request(request['uri'], verb, request['headers'])

send_size = 0
while True:
start_time = time.time()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

start_time设置为读取整个body的开始时间

buf = resp_object.read(1024 * 1024)
end_time = time.time()
expect_time = send_size / cnf['SYNC_SPEED']
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SYNC_SPEED是一个可选的配置项, 需要判断下是否设置

act_time = end_time - start_time
time_diff = expect_time - act_time

if time_diff > 0:
time.sleep(time_diff)

cli.send_body(buf)
send_size += 1024 * 1024

if buf == '':
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

放在while循环最开始

break


def pipe_file(result, th_status):
result['piped'] = True
th_status['piped_n'] = th_status.get('piped_n', 0) + 1
Expand All @@ -349,13 +397,14 @@ def update_pipe_progress(done_bytes, total_bytes):
th_status['pipe_progress'] = (done_bytes, total_bytes)

file_object = result['file_object']
upload_file(file_object, result)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

要替换的是418行的代码


try:
resp_object = oss2_bucket.get_object(
file_object.key, progress_callback=update_pipe_progress)

ali_file_info = validate_and_extract_ali_file_info(resp_object, result)
if ali_file_info == None:
if ali_file_info is None:

result['pipe_failed'] = True
th_status['pipe_failed_n'] = th_status.get('pipe_failed_n', 0) + 1
Expand Down Expand Up @@ -498,7 +547,7 @@ def update_sync_stat(result):
elif 'default_not_override' in result:
ali_sync_state['default_not_override'] += 1

if not 'piped' in result:
if 'piped' not in result:
return

ali_sync_state['piped'] += 1
Expand Down Expand Up @@ -528,7 +577,7 @@ def update_sync_stat(result):
ali_sync_state['pipe_succeed'] += 1
ali_sync_state['pipe_succeed_bytes'] += file_object.size

if not 'compared' in result:
if 'compared' not in result:
return

if 'compare_failed' in result:
Expand Down Expand Up @@ -697,7 +746,7 @@ def sync():
try:
report_sess = {'stop': False}
report_th = _thread(report, (report_sess,))
jobq.run(iter_files(), [(sync_one_file, 3),
jobq.run(iter_files(), [(sync_one_file, cnf['THREADS_NUM_FOR_SYNC']),
(update_sync_stat, 1),
])

Expand Down