Skip to content

Commit

Permalink
Merge pull request #18 from pkit/gzip_support
Browse files Browse the repository at this point in the history
added support for gzipped image devices
  • Loading branch information
Constantine Peresypkin committed Mar 20, 2014
2 parents a6c2d30 + 3d72f14 commit 1e38b90
Show file tree
Hide file tree
Showing 4 changed files with 200 additions and 11 deletions.
53 changes: 53 additions & 0 deletions test/unit/test_objectquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -1332,6 +1332,59 @@ def test_QUERY_use_image_file(self):
#self.assertEqual(self.app.logger.log_dict['info'][0][0][0],
# 'Zerovm CDR: 0 0 0 0 1 46 2 56 0 0 0 0')

def test_QUERY_use_gzipped_image(self):
self.setup_zerovm_query()
req = self.zerovm_object_request()
nexefile = StringIO(self._nexescript)
conf = ZvmNode(1, 'sort', 'file://usr/bin/sort')
conf.add_new_channel('stdin', ACCESS_READABLE, parse_location('swift://a/c/o'))
conf.add_new_channel('stdout', ACCESS_WRITABLE)
conf.add_new_channel('image', ACCESS_CDR)
conf = json.dumps(conf, cls=NodeEncoder)
sysmap = StringIO(conf)
with self.create_tar({'usr/bin/sort': nexefile}) as image_tar:
import gzip
image_tar_gz = image_tar + '.gz'
try:
t = open(image_tar, 'rb')
gz = gzip.open(image_tar_gz, 'wb')
gz.writelines(t)
gz.close()
t.close()
with self.create_tar({'image.gz': open(image_tar_gz, 'rb'),
'sysmap': sysmap}) as tar:
length = os.path.getsize(tar)
req.body_file = Input(open(tar, 'rb'), length)
req.content_length = length
resp = self.app.zerovm_query(req)
fd, name = mkstemp()
self.assertEqual(resp.status_int, 200)
for chunk in resp.app_iter:
os.write(fd, chunk)
os.close(fd)
self.assertEqual(os.path.getsize(name), resp.content_length)
tar = tarfile.open(name)
names = tar.getnames()
members = tar.getmembers()
self.assertIn('stdout', names)
self.assertEqual(names[-1], 'stdout')
self.assertEqual(members[-1].size, len(self._sortednumbers))
file = tar.extractfile(members[-1])
self.assertEqual(file.read(), self._sortednumbers)
self.assertEqual(resp.headers['x-nexe-retcode'], '0')
self.assertEqual(resp.headers['x-nexe-status'], 'ok.')
self.assertEqual(resp.headers['x-nexe-validation'], '0')
self.assertEqual(resp.headers['x-nexe-system'], 'sort')
timestamp = normalize_timestamp(time())
self.assertEqual(math.floor(float(resp.headers['X-Timestamp'])),
math.floor(float(timestamp)))
self.assertEquals(resp.headers['content-type'], 'application/x-gtar')
finally:
try:
os.unlink(image_tar_gz)
except OSError:
pass

def test_QUERY_bypass_image_file(self):
self.setup_zerovm_query()
req = self.zerovm_object_request()
Expand Down
81 changes: 81 additions & 0 deletions test/unit/test_proxyquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,22 @@ def create_tar(self, filelist):
except OSError:
pass

@contextmanager
def create_gzip(self, fname):
gzfd, gzname = mkstemp()
os.close(gzfd)
gz = GzipFile(gzname, mode='wb')
fd = open(fname, 'rb')
gz.write(fd.read())
gz.close()
try:
yield gzname
finally:
try:
os.unlink(gzname)
except OSError:
pass

@contextmanager
def add_sysimage_device(self, sysimage_path):
prosrv = _test_servers[0]
Expand Down Expand Up @@ -557,6 +573,35 @@ def test_QUERY_sort_store_stdout(self):
'o2': self.get_sorted_numbers()
})

def test_gzipped_tar(self):
self.setup_QUERY()
conf = [
{
'name': 'sort',
'exec': {'path': 'swift://a/c/exe'},
'file_list': [
{'device': 'stdin', 'path': 'swift://a/c/o'},
{'device': 'stdout', 'path': 'swift://a/c/o2'}
]
}
]
conf = json.dumps(conf)
prosrv = _test_servers[0]
req = self.zerovm_tar_request()
req.headers['content-type'] = 'application/x-gzip'
sysmap = StringIO(conf)
with self.create_tar({CLUSTER_CONFIG_FILENAME: sysmap}) as tar:
with self.create_gzip(tar) as gzname:
req.body_file = open(gzname, 'rb')
req.content_length = os.path.getsize(tar)
res = req.get_response(prosrv)
self.executed_successfully(res)
self.check_container_integrity(prosrv,
'/v1/a/c',
{
'o2': self.get_sorted_numbers()
})

def test_QUERY_sort_store_stdout_stderr(self):
self.setup_QUERY()
conf = [
Expand Down Expand Up @@ -928,6 +973,42 @@ def test_QUERY_use_image(self):
str(['This is image file',
pickle.loads(self.get_sorted_numbers())]))

def test_QUERY_use_gzipped_image(self):
self.setup_QUERY()
prolis = _test_sockets[0]
prosrv = _test_servers[0]
nexe =\
r'''
return [open(mnfst.image['path']).read(), sorted(id)]
'''[1:-1]
self.create_object(prolis, '/v1/a/c/exe2', nexe)
image = 'This is image file'
image_gz = StringIO('')
gz = GzipFile(mode='wb', fileobj=image_gz)
gz.write(image)
gz.close()
self.create_object(prolis, '/v1/a/c/img.gz', image_gz.getvalue(),
content_type='application/x-gzip')
conf = [
{
'name': 'sort',
'exec': {'path': 'swift://a/c/exe2'},
'file_list': [
{'device': 'stdin', 'path': 'swift://a/c/o'},
{'device': 'stdout'},
{'device': 'image', 'path': 'swift://a/c/img.gz'}
]
}
]
conf = json.dumps(conf)
req = self.zerovm_request()
req.body = conf
res = req.get_response(prosrv)
self.assertEqual(res.status_int, 200)
self.assertEqual(res.body,
str(['This is image file',
pickle.loads(self.get_sorted_numbers())]))

def test_QUERY_use_large_image(self):
self.setup_QUERY()
prolis = _test_sockets[0]
Expand Down
25 changes: 20 additions & 5 deletions zerocloud/objectquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import signal

from swift import gettext_ as _
import zlib
from swift.common.swob import Request, Response, HTTPNotFound, \
HTTPPreconditionFailed, HTTPRequestTimeout, HTTPRequestEntityTooLarge, \
HTTPBadRequest, HTTPUnprocessableEntity, HTTPServiceUnavailable, \
Expand All @@ -34,6 +35,7 @@
is_image_path, ACCESS_NETWORK, ACCESS_RANDOM, REPORT_VALIDATOR, REPORT_RETCODE, REPORT_ETAG, \
REPORT_CDR, REPORT_STATUS, SwiftPath, REPORT_LENGTH, REPORT_DAEMON, NodeEncoder
from zerocloud.configparser import ClusterConfigParser
from zerocloud.proxyquery import gunzip_iter

from zerocloud.tarstream import UntarStream, TarStream, REGTYPE, BLOCKSIZE, NUL
import zerocloud.thread_pool as zpool
Expand Down Expand Up @@ -582,13 +584,26 @@ def zerovm_query(self, req):
info = untar_stream.get_next_tarinfo()
while info:
if info.offset_data:
channels[info.name] = os.path.join(zerovm_tmp, info.name)
fp = open(channels[info.name], 'ab')
fname = info.name
file_iter = untar_stream.untar_file_iter()
if fname == 'image.gz':
fname = 'image'
file_iter = gunzip_iter(
untar_stream.untar_file_iter(),
self.app.network_chunk_size)
channels[fname] = os.path.join(zerovm_tmp, fname)
fp = open(channels[fname], 'ab')
untar_stream.to_write = info.size
untar_stream.offset_data = info.offset_data
for data in untar_stream.untar_file_iter():
fp.write(data)
perf = "%s %s:%.3f" % (perf, info.name, time.time() - start)
try:
for data in file_iter:
fp.write(data)
perf = "%s %s:%.3f" % (perf, info.name, time.time() - start)
except zlib.error:
return HTTPUnprocessableEntity(
request=req,
body='Failed to inflate gzipped image',
headers=nexe_headers)
fp.close()
info = untar_stream.get_next_tarinfo()
if 'content-length' in req.headers\
Expand Down
52 changes: 46 additions & 6 deletions zerocloud/proxyquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from eventlet import GreenPile, GreenPool, Queue
from eventlet.green import socket
from eventlet.timeout import Timeout
import zlib

from swiftclient.client import quote

Expand All @@ -21,7 +22,8 @@
from swift.proxy.controllers.base import update_headers, delay_denial, \
cors_validation
from swift.common.utils import split_path, get_logger, TRUE_VALUES, \
get_remote_client, ContextPool, cache_from_env, normalize_timestamp, GreenthreadSafeIterator
get_remote_client, ContextPool, cache_from_env, normalize_timestamp, \
GreenthreadSafeIterator
from swift.proxy.server import ObjectController, ContainerController, \
AccountController
from swift.common.bufferedhttp import http_connect
Expand All @@ -35,7 +37,8 @@
CLUSTER_CONFIG_FILENAME, NODE_CONFIG_FILENAME, TAR_MIMES, \
POST_TEXT_OBJECT_SYSTEM_MAP, POST_TEXT_ACCOUNT_SYSTEM_MAP, \
merge_headers, update_metadata, DEFAULT_EXE_SYSTEM_MAP, STREAM_CACHE_SIZE, \
ZvmChannel, parse_location, is_swift_path, is_image_path, can_run_as_daemon, SwiftPath, NodeEncoder
ZvmChannel, parse_location, is_swift_path, is_image_path, can_run_as_daemon, \
SwiftPath, NodeEncoder
from zerocloud.configparser import ClusterConfigParser, ClusterConfigParsingError
from zerocloud.tarstream import StringBuffer, UntarStream, \
TarStream, REGTYPE, BLOCKSIZE, NUL, ExtractedFile, Path, ReadError
Expand Down Expand Up @@ -604,18 +607,30 @@ def POST(self, req, exe_resp=None, cluster_config=''):
req.bytes_transferred = 0
path_list = [StringBuffer(CLUSTER_CONFIG_FILENAME),
StringBuffer(NODE_CONFIG_FILENAME)]
read_iter = iter(lambda: req.environ['wsgi.input'].read(self.app.network_chunk_size), '')
rdata = req.environ['wsgi.input']
read_iter = iter(lambda: rdata.read(self.app.network_chunk_size), '')
orig_content_type = req.content_type
if req.content_type in ['application/x-gzip']:
orig_iter = read_iter
read_iter = gunzip_iter(orig_iter, self.app.network_chunk_size)
req.content_type = TAR_MIMES[0]
if req.content_type in TAR_MIMES:
# we must have Content-Length set for tar-based requests
# as it will be impossible to stream them otherwise
if not 'content-length' in req.headers:
return HTTPBadRequest(request=req,
body='Must specify Content-Length')
# buffer first blocks of tar file and search for the system map
cached_body = CachedBody(read_iter)
try:
cached_body = CachedBody(read_iter)
except zlib.error:
return HTTPUnprocessableEntity(request=req,
body='Error reading %s stream'
% orig_content_type)
user_image = True
image_resp = Response(app_iter=iter(cached_body),
headers={'Content-Length': req.headers['content-length']})
headers={'Content-Length': req.headers['content-length'],
'Content-Type': req.headers['content-type']})
image_resp.nodes = []
untar_stream = UntarStream(cached_body.cache, path_list)
try:
Expand Down Expand Up @@ -1221,8 +1236,11 @@ def _queue_put(conn, data, chunked):

def _send_tar_headers(chunked, data_src):
for conn in data_src.conns:
name = conn['dev']
if name == 'image' and data_src.content_type == 'application/x-gzip':
name = 'image.gz'
info = conn['conn'].tar_stream.create_tarinfo(ftype=REGTYPE,
name=conn['dev'],
name=name,
size=data_src.content_length)
for chunk in conn['conn'].tar_stream.serve_chunk(info):
if not conn['conn'].failed:
Expand Down Expand Up @@ -1272,6 +1290,28 @@ def _get_local_address(node):
return result


def gunzip_iter(data_iter, chunk_size):
dec = zlib.decompressobj(16 + zlib.MAX_WBITS)
unc_data = ''
for chunk in data_iter:
while dec.unconsumed_tail:
while len(unc_data) < chunk_size and dec.unconsumed_tail:
unc_data += dec.decompress(dec.unconsumed_tail,
chunk_size - len(unc_data))
if len(unc_data) == chunk_size:
yield unc_data
unc_data = ''
if unc_data and dec.unconsumed_tail:
chunk += dec.unconsumed_tail
break
unc_data += dec.decompress(chunk, chunk_size - len(unc_data))
if len(unc_data) == chunk_size:
yield unc_data
unc_data = ''
if unc_data:
yield unc_data


def filter_factory(global_conf, **local_conf):
"""
paste.deploy app factory for creating WSGI proxy apps.
Expand Down

0 comments on commit 1e38b90

Please sign in to comment.