From 61c0c31dd7f19f7bfc026b48030401e0f04dc10a Mon Sep 17 00:00:00 2001 From: samkaplan Date: Thu, 2 Feb 2023 15:35:41 -0600 Subject: [PATCH] when needed, refresh tokens in the retry-loop --- src/AzStorage.c | 245 +++++++++++++++++++++++++++++++--------------- src/AzStorage.h | 67 ++++++++----- src/AzStorage.jl | 182 +++++++++++++++++++++++----------- test/Project.toml | 3 + test/runtests.jl | 74 +++++++++++++- 5 files changed, 407 insertions(+), 164 deletions(-) diff --git a/src/AzStorage.c b/src/AzStorage.c index 624e72b..7012469 100644 --- a/src/AzStorage.c +++ b/src/AzStorage.c @@ -476,9 +476,12 @@ curl_refresh_tokens( { unsigned long current_time = (unsigned long) time(NULL); struct ResponseCodes responsecodes; + + responsecodes.http = 200; + responsecodes.curl = (long)CURLE_OK; + responsecodes.retry_after = 0; + if (current_time < (*expiry - 600)) { /* 10 minute grace period */ - responsecodes.http = 200; - responsecodes.curl = (long)CURLE_OK; return responsecodes; } @@ -487,10 +490,7 @@ curl_refresh_tokens( } else if (refresh_token != NULL) { responsecodes = curl_refresh_tokens_from_refresh_token(bearer_token, refresh_token, expiry, scope, resource, clientid, tenant, verbose, connect_timeout, read_timeout); } else { - printf("Unable to refresh tokens without either a refresh token or a client secret"); - responsecodes.curl = 1000; - responsecodes.http = 1000; - responsecodes.retry_after = 0; + printf("Warning: unable to refresh token."); } return responsecodes; @@ -550,17 +550,42 @@ write_callback_readdata( struct ResponseCodes curl_writebytes_block( - char *token, - char *storageaccount, - char *containername, - char *blobname, - char *blockid, - char *data, - size_t datasize, - int verbose, - long connect_timeout, - long read_timeout) + omp_lock_t *token_lock, + char *token, + char *refresh_token, + unsigned long *expiry, + char *scope, + char *resource, + char *tenant, + char *clientid, + char *client_secret, + char *storageaccount, + char *containername, + char *blobname, + char *blockid, + char *data, + size_t datasize, + int nretry, + int verbose, + long connect_timeout, + long read_timeout) { + omp_set_lock(token_lock); + curl_refresh_tokens_retry( + token, + refresh_token, + expiry, + scope, + resource, + clientid, + client_secret, + tenant, + nretry, + verbose, + connect_timeout, + read_timeout); + omp_unset_lock(token_lock); + char authorization[BUFFER_SIZE]; curl_authorization(token, authorization); char contentlength[BUFFER_SIZE]; @@ -633,22 +658,30 @@ curl_writebytes_block( struct ResponseCodes curl_writebytes_block_retry( - char *token, - char *storageaccount, - char *containername, - char *blobname, - char *blockid, - char *data, - size_t datasize, - int nretry, - int verbose, - long connect_timeout, - long read_timeout) + omp_lock_t *token_lock, + char *token, + char *refresh_token, + unsigned long *expiry, + char *scope, + char *resource, + char *tenant, + char *clientid, + char *client_secret, + char *storageaccount, + char *containername, + char *blobname, + char *blockid, + char *data, + size_t datasize, + int nretry, + int verbose, + long connect_timeout, + long read_timeout) { int iretry; struct ResponseCodes responsecodes; for (iretry = 0; iretry < nretry; iretry++) { - responsecodes = curl_writebytes_block(token, storageaccount, containername, blobname, blockid, data, datasize, verbose, connect_timeout, read_timeout); + responsecodes = curl_writebytes_block(token_lock, token, refresh_token, expiry, scope, resource, tenant, clientid, client_secret, storageaccount, containername, blobname, blockid, data, datasize, nretry, verbose, connect_timeout, read_timeout); if (isrestretrycode(responsecodes) == 0) { break; } @@ -665,19 +698,26 @@ curl_writebytes_block_retry( struct ResponseCodes curl_writebytes_block_retry_threaded( - char *token, - char *storageaccount, - char *containername, - char *blobname, - char **blockids, - char *data, - size_t datasize, - int nthreads, - int nblocks, - int nretry, - int verbose, - long connect_timeout, - long read_timeout) + char *token, + char *refresh_token, + unsigned long *expiry, + char *scope, + char *resource, + char *tenant, + char *clientid, + char *client_secret, + char *storageaccount, + char *containername, + char *blobname, + char **blockids, + char *data, + size_t datasize, + int nthreads, + int nblocks, + int nretry, + int verbose, + long connect_timeout, + long read_timeout) { size_t block_datasize = datasize/nblocks; size_t block_dataremainder = datasize%nblocks; @@ -690,6 +730,9 @@ curl_writebytes_block_retry_threaded( thread_responsecode_curl[threadid] = (long)CURLE_OK; } + omp_lock_t token_lock; + omp_init_lock(&token_lock); + #pragma omp parallel num_threads(nthreads) default(shared) { int threadid = omp_get_thread_num(); @@ -705,12 +748,14 @@ curl_writebytes_block_retry_threaded( block_firstbyte += block_dataremainder; } - struct ResponseCodes responsecodes = curl_writebytes_block_retry(token, storageaccount, containername, blobname, blockids[iblock], data+block_firstbyte, _block_datasize, nretry, verbose, connect_timeout, read_timeout); + struct ResponseCodes responsecodes = curl_writebytes_block_retry(&token_lock, token, refresh_token, expiry, scope, resource, tenant, clientid, client_secret, storageaccount, containername, blobname, blockids[iblock], data+block_firstbyte, _block_datasize, nretry, verbose, connect_timeout, read_timeout); thread_responsecode_http[threadid] = MAX(responsecodes.http, thread_responsecode_http[threadid]); thread_responsecode_curl[threadid] = MAX(responsecodes.curl, thread_responsecode_curl[threadid]); } } // end #pragma omp + omp_destroy_lock(&token_lock); + struct ResponseCodes responsecodes; responsecodes.http = (long)200; responsecodes.curl = (long)CURLE_OK; @@ -723,17 +768,42 @@ curl_writebytes_block_retry_threaded( struct ResponseCodes curl_readbytes( - char *token, - char *storageaccount, - char *containername, - char *blobname, - char *data, - size_t dataoffset, - size_t datasize, - int verbose, - long connect_timeout, - long read_timeout) + omp_lock_t *token_lock, + char *token, + char *refresh_token, + unsigned long *expiry, + char *scope, + char *resource, + char *tenant, + char *clientid, + char *client_secret, + char *storageaccount, + char *containername, + char *blobname, + char *data, + size_t dataoffset, + size_t datasize, + int nretry, + int verbose, + long connect_timeout, + long read_timeout) { + omp_set_lock(token_lock); + curl_refresh_tokens_retry( + token, + refresh_token, + expiry, + scope, + resource, + clientid, + client_secret, + tenant, + nretry, + verbose, + connect_timeout, + read_timeout); + omp_unset_lock(token_lock); + char authorization[BUFFER_SIZE]; curl_authorization(token, authorization); @@ -808,22 +878,30 @@ curl_readbytes( struct ResponseCodes curl_readbytes_retry( - char *token, - char *storageaccount, - char *containername, - char *blobname, - char *data, - size_t dataoffset, - size_t datasize, - int nretry, - int verbose, - long connect_timeout, - long read_timeout) + omp_lock_t *token_lock, + char *token, + char *refresh_token, + unsigned long *expiry, + char *scope, + char *resource, + char *tenant, + char *clientid, + char *client_secret, + char *storageaccount, + char *containername, + char *blobname, + char *data, + size_t dataoffset, + size_t datasize, + int nretry, + int verbose, + long connect_timeout, + long read_timeout) { struct ResponseCodes responsecodes; int iretry; for (iretry = 0; iretry < nretry; iretry++) { - responsecodes = curl_readbytes(token, storageaccount, containername, blobname, data, dataoffset, datasize, verbose, connect_timeout, read_timeout); + responsecodes = curl_readbytes(token_lock, token, refresh_token, expiry, scope, resource, tenant, clientid, client_secret, storageaccount, containername, blobname, data, dataoffset, datasize, nretry, verbose, connect_timeout, read_timeout); if (isrestretrycode(responsecodes) == 0) { break; } @@ -840,18 +918,25 @@ curl_readbytes_retry( struct ResponseCodes curl_readbytes_retry_threaded( - char *token, - char *storageaccount, - char *containername, - char *blobname, - char *data, - size_t dataoffset, - size_t datasize, - int nthreads, - int nretry, - int verbose, - long connect_timeout, - long read_timeout) + char *token, + char *refresh_token, + unsigned long *expiry, + char *scope, + char *resource, + char *tenant, + char *clientid, + char *client_secret, + char *storageaccount, + char *containername, + char *blobname, + char *data, + size_t dataoffset, + size_t datasize, + int nthreads, + int nretry, + int verbose, + long connect_timeout, + long read_timeout) { size_t thread_datasize = datasize/nthreads; size_t thread_dataremainder = datasize%nthreads; @@ -859,6 +944,9 @@ curl_readbytes_retry_threaded( long thread_responsecode_http[nthreads]; long thread_responsecode_curl[nthreads]; + omp_lock_t token_lock; + omp_init_lock(&token_lock); + #pragma omp parallel num_threads(nthreads) { int threadid = omp_get_thread_num(); @@ -871,10 +959,13 @@ curl_readbytes_retry_threaded( thread_firstbyte += thread_dataremainder; } - struct ResponseCodes responsecodes = curl_readbytes_retry(token, storageaccount, containername, blobname, data+thread_firstbyte, dataoffset+thread_firstbyte, _thread_datasize, nretry, verbose, connect_timeout, read_timeout); + struct ResponseCodes responsecodes = curl_readbytes_retry(&token_lock, token, refresh_token, expiry, scope, resource, tenant, clientid, client_secret, storageaccount, containername, blobname, data+thread_firstbyte, dataoffset+thread_firstbyte, _thread_datasize, nretry, verbose, connect_timeout, read_timeout); thread_responsecode_http[threadid] = responsecodes.http; thread_responsecode_curl[threadid] = responsecodes.curl; } /* end pragma omp */ + + omp_destroy_lock(&token_lock); + long responsecode_http = 200; long responsecode_curl = (long)CURLE_OK; int threadid; diff --git a/src/AzStorage.h b/src/AzStorage.h index b24dfe0..8f9f4c5 100644 --- a/src/AzStorage.h +++ b/src/AzStorage.h @@ -9,7 +9,8 @@ #include #include -#define BUFFER_SIZE 16000 // this needs to be large to accomodate large OAuth2 tokens +int BUFFER_SIZE = 16000; // this needs to be large to accomodate large OAuth2 tokens + #define API_HEADER_BUFFER_SIZE 512 #define MAXIMUM_BACKOFF 256.0 #define CURLE_TIMEOUT 18000L /* 5 hours */ @@ -70,34 +71,48 @@ curl_authorization( struct ResponseCodes curl_writebytes_block_retry_threaded( - char *token, - char *storageaccount, - char *containername, - char *blobname, - char **blockids, - char *data, - size_t datasize, - int nthreads, - int nblocks, - int nretry, - int verbose, - long connect_timeout, - long read_timeout); + char *token, + char *refresh_token, + unsigned long *expiry, + char *scope, + char *tenant, + char *resource, + char *clientid, + char *client_secret, + char *storageaccount, + char *containername, + char *blobname, + char **blockids, + char *data, + size_t datasize, + int nthreads, + int nblocks, + int nretry, + int verbose, + long connect_timeout, + long read_timeout); struct ResponseCodes curl_readbytes_retry_threaded( - char *token, - char *storageaccount, - char *containername, - char *blobname, - char *data, - size_t dataoffset, - size_t datasize, - int nthreads, - int nretry, - int verbose, - long connect_timeout, - long read_timeout); + char *token, + char *refresh_token, + unsigned long *expiry, + char *scope, + char *tenant, + char *resource, + char *clientid, + char *client_secret, + char *storageaccount, + char *containername, + char *blobname, + char *data, + size_t dataoffset, + size_t datasize, + int nthreads, + int nretry, + int verbose, + long connect_timeout, + long read_timeout); struct ResponseCodes curl_refresh_tokens_retry( diff --git a/src/AzStorage.jl b/src/AzStorage.jl index 9952cb8..bcf8873 100644 --- a/src/AzStorage.jl +++ b/src/AzStorage.jl @@ -22,6 +22,9 @@ const RETRYABLE_CURL_ERRORS = [ # https://docs.microsoft.com/en-us/rest/api/storageservices/versioning-for-the-azure-storage-services const API_VERSION = "2021-08-06" +# buffer size for holding OAuth2 tokens +const BUFFER_SIZE = unsafe_load(cglobal((:BUFFER_SIZE, libAzStorage), Int32)) + function __init__() @ccall libAzStorage.curl_init(length(RETRYABLE_HTTP_ERRORS)::Cint, length(RETRYABLE_CURL_ERRORS)::Cint, RETRYABLE_HTTP_ERRORS::Ptr{Clong}, RETRYABLE_CURL_ERRORS::Ptr{Clong}, API_VERSION::Cstring)::Cvoid end @@ -207,6 +210,65 @@ macro retry(retries, ex::Expr) end end +function new_pointer_array_from_string(input) + _input = transcode(UInt8, input) + output = Vector{UInt8}(undef, BUFFER_SIZE) + copyto!(output, 1, _input, 1, length(_input)) + output[length(_input)+1] = '\0' + output +end + +function authinfo(session::AzSessions.AzClientCredentialsSession) + _token = new_pointer_array_from_string(session.token) + refresh_token = C_NULL + expiry = [floor(UInt64, datetime2unix(session.expiry))] + scope = C_NULL + resource = session.resource + tenant = session.tenant + clientid = session.client_id + client_secret = session.client_secret + _token,refresh_token,expiry,scope,resource,tenant,clientid,client_secret +end + +function authinfo(session::Union{AzSessions.AzDeviceCodeFlowSession,AzSessions.AzAuthCodeFlowSession}) + _token = new_pointer_array_from_string(session.token) + _refresh_token = new_pointer_array_from_string(session.refresh_token) + expiry = [floor(UInt64, datetime2unix(session.expiry))] + scope = session.scope + resource = AzSessions.audience_from_scope(session.scope) + tenant = session.tenant + clientid = session.client_id + client_secret = C_NULL + _token,_refresh_token,expiry,scope,resource,tenant,clientid,client_secret +end + +function authinfo(session::AzSessions.AzVMSession) + refresh_token = C_NULL + expiry = [floor(UInt64, datetime2unix(session.expiry))] + scope = C_NULL + resource = session.resource + tenant = C_NULL + clientid = C_NULL + client_secret = C_NULL + refresh_token,expiry,scope,resource,tenant,clientid,client_secret +end + +function authinfo!(session::AzSessions.AzClientCredentialsSession, _token, refresh_token, expiry) + session.expiry = unix2datetime(expiry[1]) + session.token = unsafe_string(pointer(_token)) +end + +function authinfo!(session::Union{AzSessions.AzDeviceCodeFlowSession,AzSessions.AzAuthCodeFlowSession}, _token, refresh_token, expiry) + session.expiry = unix2datetime(expiry[1]) + session.token = unsafe_string(pointer(_token)) + session.refresh_token = unsafe_string(pointer(refresh_token)) +end + +function authinfo!(session::AzSessions.AzVMSession, _token, refresh_token, expiry) + session.expiry = unix2datetime(expiry[1]) + session.token = unsafe_string(point(_token)) +end + """ mkpath(container) @@ -249,64 +311,66 @@ _normpath(s) = Sys.iswindows() ? replace(normpath(s), "\\"=>"/") : normpath(s) addprefix(c::AzContainer, o) = c.prefix == "" ? o : _normpath("$(c.prefix)/$o") -function writebytes(c::AzContainer, o::AbstractString, data::DenseArray{UInt8}; contenttype="application/octet-stream") - function writebytes_blob(c, o, data, contenttype) - @retry c.nretry HTTP.request( - "PUT", - "https://$(c.storageaccount).blob.core.windows.net/$(c.containername)/$(addprefix(c,o))", - [ - "Authorization" => "Bearer $(token(c.session))", - "x-ms-version" => API_VERSION, - "Content-Length" => "$(length(data))", - "Content-Type" => contenttype, - "x-ms-blob-type" => "BlockBlob" - ], - data, - retry = false, - verbose = c.verbose, - connect_timeout = c.connect_timeout, - readtimeout = c.read_timeout) - nothing - end - - function putblocklist(c, o, blockids) - xdoc = XMLDocument() - xroot = create_root(xdoc, "BlockList") - for blockid in blockids - add_text(new_child(xroot, "Uncommitted"), blockid) - end - blocklist = string(xdoc) +function writebytes_blob(c, o, data, contenttype) + @retry c.nretry HTTP.request( + "PUT", + "https://$(c.storageaccount).blob.core.windows.net/$(c.containername)/$(addprefix(c,o))", + [ + "Authorization" => "Bearer $(token(c.session))", + "x-ms-version" => API_VERSION, + "Content-Length" => "$(length(data))", + "Content-Type" => contenttype, + "x-ms-blob-type" => "BlockBlob" + ], + data, + retry = false, + verbose = c.verbose, + connect_timeout = c.connect_timeout, + readtimeout = c.read_timeout) + nothing +end - @retry c.nretry HTTP.request( - "PUT", - "https://$(c.storageaccount).blob.core.windows.net/$(c.containername)/$(addprefix(c,o))?comp=blocklist", - [ - "x-ms-version" => API_VERSION, - "Authorization" => "Bearer $(token(c.session))", - "Content-Type" => "application/octet-stream", - "Content-Length" => "$(length(blocklist))" - ], - blocklist, - retry = false, - verbose = c.verbose, - connect_timeout = c.connect_timeout, - readtimeout = c.read_timeout) - nothing +function putblocklist(c, o, blockids) + xdoc = XMLDocument() + xroot = create_root(xdoc, "BlockList") + for blockid in blockids + add_text(new_child(xroot, "Uncommitted"), blockid) end + blocklist = string(xdoc) - function writebytes_block(c, o, data, _nblocks) - # heuristic to increase probability that token is valid during the retry logic in AzSessions.c - t = token(c.session; offset=Minute(30)) - l = ceil(Int, log10(_nblocks)) - blockids = [base64encode(lpad(blockid-1, l, '0')) for blockid in 1:_nblocks] - _blockids = [HTTP.escapeuri(blockid) for blockid in blockids] - r = @ccall libAzStorage.curl_writebytes_block_retry_threaded(t::Cstring, c.storageaccount::Cstring, c.containername::Cstring, addprefix(c,o)::Cstring, _blockids::Ptr{Cstring}, - data::Ptr{UInt8}, length(data)::Csize_t, c.nthreads::Cint, _nblocks::Cint, c.nretry::Cint, c.verbose::Cint, c.connect_timeout::Clong, c.read_timeout::Clong)::ResponseCodes - (r.http >= 300 || r.curl > 0) && error("writebytes_block error: http code $(r.http), curl code $(r.curl)") - - putblocklist(c, o, blockids) - end + @retry c.nretry HTTP.request( + "PUT", + "https://$(c.storageaccount).blob.core.windows.net/$(c.containername)/$(addprefix(c,o))?comp=blocklist", + [ + "x-ms-version" => API_VERSION, + "Authorization" => "Bearer $(token(c.session))", + "Content-Type" => "application/octet-stream", + "Content-Length" => "$(length(blocklist))" + ], + blocklist, + retry = false, + verbose = c.verbose, + connect_timeout = c.connect_timeout, + readtimeout = c.read_timeout) + nothing +end + +function writebytes_block(c, o, data, _nblocks) + # heuristic to increase probability that token is valid during the retry logic in AzSessions.c + l = ceil(Int, log10(_nblocks)) + blockids = [base64encode(lpad(blockid-1, l, '0')) for blockid in 1:_nblocks] + _blockids = [HTTP.escapeuri(blockid) for blockid in blockids] + t = token(c.session; offset=Minute(10)) + _token,refresh_token,expiry,scope,resource,tenant,clientid,client_secret = authinfo(c.session) + r = @ccall libAzStorage.curl_writebytes_block_retry_threaded(_token::Ptr{UInt8}, refresh_token::Ptr{UInt8}, expiry::Ptr{Culong}, scope::Cstring, resource::Cstring, tenant::Cstring, + clientid::Cstring, client_secret::Cstring,c.storageaccount::Cstring, c.containername::Cstring, addprefix(c,o)::Cstring, _blockids::Ptr{Cstring}, data::Ptr{UInt8}, + length(data)::Csize_t, c.nthreads::Cint, _nblocks::Cint, c.nretry::Cint, c.verbose::Cint, c.connect_timeout::Clong, c.read_timeout::Clong)::ResponseCodes + (r.http >= 300 || r.curl > 0) && error("writebytes_block error: http code $(r.http), curl code $(r.curl)") + authinfo!(c.session, _token, refresh_token, expiry) + putblocklist(c, o, blockids) +end +function writebytes(c::AzContainer, o::AbstractString, data::DenseArray{UInt8}; contenttype="application/octet-stream") _nblocks = nblocks(c.nthreads, length(data)) if Sys.iswindows() writebytes_blob(c, o, data, contenttype) @@ -487,11 +551,13 @@ function readbytes!(c::AzContainer, o::AbstractString, data::DenseArray{UInt8}; function readbytes_threaded!(c, o, data, offset, _nthreads) # heuristic to increase probability that token is valid during the retry logic in AzSessions.c - t = token(c.session; offset=Minute(30)) - r = @ccall libAzStorage.curl_readbytes_retry_threaded(t::Cstring, c.storageaccount::Cstring, c.containername::Cstring, - addprefix(c,o)::Cstring, data::Ptr{UInt8}, offset::Csize_t, length(data)::Csize_t, _nthreads::Cint, c.nretry::Cint, - c.verbose::Cint, c.connect_timeout::Clong, c.read_timeout::Clong)::ResponseCodes + t = token(c.session; offset=Minute(10)) + _token,refresh_token,expiry,scope,resource,tenant,clientid,client_secret = authinfo(c.session) + r = @ccall libAzStorage.curl_readbytes_retry_threaded(_token::Ptr{UInt8}, refresh_token::Ptr{UInt8}, expiry::Ptr{Culong}, scope::Cstring, resource::Cstring, tenant::Cstring, + clientid::Cstring, client_secret::Cstring, c.storageaccount::Cstring, c.containername::Cstring, addprefix(c,o)::Cstring, data::Ptr{UInt8}, offset::Csize_t, + length(data)::Csize_t, _nthreads::Cint, c.nretry::Cint, c.verbose::Cint, c.connect_timeout::Clong, c.read_timeout::Clong)::ResponseCodes (r.http >= 300 || r.curl > 0) && error("readbytes_threaded! error: http code $(r.http), curl code $(r.curl)") + authinfo!(c.session, _token, refresh_token, expiry) nothing end diff --git a/test/Project.toml b/test/Project.toml index d1bba43..1773b03 100644 --- a/test/Project.toml +++ b/test/Project.toml @@ -1,7 +1,10 @@ [deps] AbstractStorage = "14dbef02-f468-5f15-853e-5ec8dee7b899" AzSessions = "f239b30d-ae6b-58be-a2d5-7e9f30e280a9" +AzStorage_jll = "00c928b4-b5f3-54d8-b38d-afd4635c4ad2" +Base64 = "2a0f44e3-6c83-55bd-87e4-b1978d98bd5f" Dates = "ade2ca70-3891-5945-98fb-dc099432e06a" +HTTP = "cd3eb016-35fb-5094-929b-558a96fad6f3" JSON = "682c06a0-de6a-54ab-a142-c8b1cf79cde6" Serialization = "9e88b42a-f829-5b0c-bbe9-9e923198166b" Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40" diff --git a/test/runtests.jl b/test/runtests.jl index ee9ea9f..46c318c 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -1,4 +1,4 @@ -using AbstractStorage, AzSessions, AzStorage, Dates, JSON, Serialization, Test, UUIDs +using AbstractStorage, AzSessions, AzStorage, AzStorage_jll, Base64, Dates, HTTP, JSON, Serialization, Test, UUIDs function robust_mkpath(c) local _c @@ -546,7 +546,7 @@ end @testset "Windows, single thread check" begin r = uuid4() - container = AzContainer("foo-$r.o", storageaccount=storageaccount, session=session, nthreads=2, nretry=10) + container = AzContainer("foo-$r-o", storageaccount=storageaccount, session=session, nthreads=2, nretry=10) if Sys.iswindows() @test container.nthreads == 1 else @@ -556,7 +556,75 @@ end @testset "timeouts" begin r = uuid4() - c = AzContainer("foo-$r.o", storageaccount=storageaccount, session=session, nretry=0, connect_timeout=2, read_timeout=3) + c = AzContainer("foo-$r-o", storageaccount=storageaccount, session=session, nretry=0, connect_timeout=2, read_timeout=3) @test c.connect_timeout == 2 @test c.read_timeout == 3 end + +@testset "C token refresh, write" begin + r = uuid4() + c = AzContainer("foo-$r-o", storageaccount=storageaccount, session=session, nthreads=4, connect_timeout=2, read_timeout=3) + c = robust_mkpath(c) + o = "foo.bin" + + nthreads = c.nthreads + + N = round(Int, AzStorage._MINBYTES_PER_BLOCK * nthreads * 5) + data = rand(UInt8, N) + + _nblocks = AzStorage.nblocks(nthreads, length(data)) + + l = ceil(Int, log10(_nblocks)) + blockids = [base64encode(lpad(blockid-1, l, '0')) for blockid in 1:_nblocks] + _blockids = [HTTP.escapeuri(blockid) for blockid in blockids] + t = token(c.session; offset=Minute(10)) + + c.session.expiry = now() # force a token refresh in the C code + + _token,refresh_token,expiry,scope,resource,tenant,clientid,client_secret = AzStorage.authinfo(c.session) + + r = @ccall libAzStorage.curl_writebytes_block_retry_threaded(_token::Ptr{UInt8}, refresh_token::Ptr{UInt8}, expiry::Ptr{Culong}, scope::Cstring, resource::Cstring, tenant::Cstring, + clientid::Cstring, client_secret::Cstring,c.storageaccount::Cstring, c.containername::Cstring, AzStorage.addprefix(c,o)::Cstring, _blockids::Ptr{Cstring}, data::Ptr{UInt8}, + length(data)::Csize_t, c.nthreads::Cint, _nblocks::Cint, c.nretry::Cint, c.verbose::Cint, c.connect_timeout::Clong, c.read_timeout::Clong)::AzStorage.ResponseCodes + + AzStorage.authinfo!(c.session, _token, refresh_token, expiry) + @test t != c.session.token + + AzStorage.putblocklist(c, o, blockids) + + _data = read!(c, o, Vector{UInt8}(undef, N)) + @test data ≈ _data +end + +@testset "C token refresh, read" begin + r = uuid4() + c = AzContainer("foo-$r-o", storageaccount=storageaccount, session=session, nthreads=4, connect_timeout=2, read_timeout=3) + c = robust_mkpath(c) + o = "foo.bin" + + nthreads = c.nthreads + + N = round(Int, AzStorage._MINBYTES_PER_BLOCK * nthreads * 5) + data = rand(UInt8, N) + + write(c, o, data) + + t = token(c.session; offset=Minute(10)) + + _nthreads = AzStorage.nthreads_effective(c.nthreads, length(data)) + offset = 0 + + c.session.expiry = now() + + _token,refresh_token,expiry,scope,resource,tenant,clientid,client_secret = AzStorage.authinfo(c.session) + + _data = Vector{UInt8}(undef, N) + + r = @ccall libAzStorage.curl_readbytes_retry_threaded(_token::Ptr{UInt8}, refresh_token::Ptr{UInt8}, expiry::Ptr{Culong}, scope::Cstring, resource::Cstring, tenant::Cstring, + clientid::Cstring, client_secret::Cstring, c.storageaccount::Cstring, c.containername::Cstring, AzStorage.addprefix(c,o)::Cstring, _data::Ptr{UInt8}, offset::Csize_t, + length(data)::Csize_t, _nthreads::Cint, c.nretry::Cint, c.verbose::Cint, c.connect_timeout::Clong, c.read_timeout::Clong)::AzStorage.ResponseCodes + + AzStorage.authinfo!(c.session, _token, refresh_token, expiry) + + @test data ≈ _data +end