Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions conf/ali_sync.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ CHECK_EXIST: true
# 设置当文件已存在时,是否强制覆盖文件,如果否,则当文件大小或文件类型不一致时才会覆盖
FORCE_OVERRIDE: false

# 设置同步的并发线程数
THREADS_NUM_FOR_SYNC: 3
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

没看到在哪使用了这个配置


# 限制每个线程同步数据的速率为1Mb/s=1048576B/s,单位:B/s
SYNC_SPEED: 1048576

# 设置多少个线程同时添加离线任务
THREADS_NUM_FOR_ADD_OFFLINE_TASK: 20

Expand Down
77 changes: 66 additions & 11 deletions src/ali_sync.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#!/usr/bin/env python2
# coding:utf-8

import ssl
import base64
import copy
import errno
Expand All @@ -19,6 +20,8 @@
from botocore.client import Config

from pykit import jobq
from pykit import http
from pykit import awssign

report_state_lock = threading.RLock()

Expand Down Expand Up @@ -341,6 +344,64 @@ def compare_file(result, th_status):
return True


def upload_file(resp_object, result, ali_file_info):
uri = '/%s/%s' % (cnf['BAISHAN_BUCKET_NAME'], result['s3_key'])
verb = 'PUT'

endpoint = cnf['BAISHAN_ENDPOINT']
https_ctx = None
port = 80
if 'http://' in endpoint:
Host = endpoint[len('http://'):]
elif 'https://' in endpoint:
Host = endpoint[len('https://'):]
https_ctx = ssl._create_unverified_context()
port = 443
else:
Host = endpoint

headers = {
'Content-Length': resp_object.content_length,
'Host': Host,
'x-amz-acl': cnf['FILE_ACL'],
'Content-Type': ali_file_info['content_type'],
}

for k, v in ali_file_info['meta'].items():
headers['x-amz-meta-' + k] = v

request = {
'verb': verb,
'uri': uri,
'headers': headers,
}

sign = awssign.Signer(cnf['BAISHAN_ACCESS_KEY'], cnf['BAISHAN_SECRET_KEY'])
sign.add_auth(request, query_auth=False, expires=120)

cli = http.Client(Host, port=port, https_context=https_ctx)
cli.send_request(request['uri'], verb, request['headers'])

send_size = 0
start_time = time.time()
while True:
buf = resp_object.read(1024 * 1024)
if buf == '':
break

end_time = time.time()
if cnf['SYNC_SPEED'] is not None:
expect_time = send_size / cnf['SYNC_SPEED']
act_time = end_time - start_time
time_diff = expect_time - act_time

if time_diff > 0:
time.sleep(time_diff)

cli.send_body(buf)
send_size += 1024 * 1024


def pipe_file(result, th_status):
result['piped'] = True
th_status['piped_n'] = th_status.get('piped_n', 0) + 1
Expand All @@ -355,19 +416,13 @@ def update_pipe_progress(done_bytes, total_bytes):
file_object.key, progress_callback=update_pipe_progress)

ali_file_info = validate_and_extract_ali_file_info(resp_object, result)
if ali_file_info == None:
if ali_file_info is None:

result['pipe_failed'] = True
th_status['pipe_failed_n'] = th_status.get('pipe_failed_n', 0) + 1
return False

extra_args = {
'ACL': cnf['FILE_ACL'],
'ContentType': ali_file_info['content_type'],
'Metadata': ali_file_info['meta'],
}
s3_client.upload_fileobj(resp_object, cnf['BAISHAN_BUCKET_NAME'],
result['s3_key'], ExtraArgs=extra_args)
upload_file(resp_object, result, ali_file_info)

result['pipe_succeed'] = True
th_status['pipe_succeed_n'] = th_status.get('pipe_succeed_n', 0) + 1
Expand Down Expand Up @@ -498,7 +553,7 @@ def update_sync_stat(result):
elif 'default_not_override' in result:
ali_sync_state['default_not_override'] += 1

if not 'piped' in result:
if 'piped' not in result:
return

ali_sync_state['piped'] += 1
Expand Down Expand Up @@ -528,7 +583,7 @@ def update_sync_stat(result):
ali_sync_state['pipe_succeed'] += 1
ali_sync_state['pipe_succeed_bytes'] += file_object.size

if not 'compared' in result:
if 'compared' not in result:
return

if 'compare_failed' in result:
Expand Down Expand Up @@ -697,7 +752,7 @@ def sync():
try:
report_sess = {'stop': False}
report_th = _thread(report, (report_sess,))
jobq.run(iter_files(), [(sync_one_file, 3),
jobq.run(iter_files(), [(sync_one_file, cnf['THREADS_NUM_FOR_SYNC']),
(update_sync_stat, 1),
])

Expand Down