Skip to content

Commit

Permalink
when needed, refresh tokens in the retry-loop
Browse files Browse the repository at this point in the history
  • Loading branch information
samtkaplan committed Feb 7, 2023
1 parent 7c89c2b commit 61c0c31
Show file tree
Hide file tree
Showing 5 changed files with 407 additions and 164 deletions.
245 changes: 168 additions & 77 deletions src/AzStorage.c
Original file line number Diff line number Diff line change
Expand Up @@ -476,9 +476,12 @@ curl_refresh_tokens(
{
unsigned long current_time = (unsigned long) time(NULL);
struct ResponseCodes responsecodes;

responsecodes.http = 200;
responsecodes.curl = (long)CURLE_OK;
responsecodes.retry_after = 0;

if (current_time < (*expiry - 600)) { /* 10 minute grace period */
responsecodes.http = 200;
responsecodes.curl = (long)CURLE_OK;
return responsecodes;
}

Expand All @@ -487,10 +490,7 @@ curl_refresh_tokens(
} else if (refresh_token != NULL) {
responsecodes = curl_refresh_tokens_from_refresh_token(bearer_token, refresh_token, expiry, scope, resource, clientid, tenant, verbose, connect_timeout, read_timeout);
} else {
printf("Unable to refresh tokens without either a refresh token or a client secret");
responsecodes.curl = 1000;
responsecodes.http = 1000;
responsecodes.retry_after = 0;
printf("Warning: unable to refresh token.");
}

return responsecodes;
Expand Down Expand Up @@ -550,17 +550,42 @@ write_callback_readdata(

struct ResponseCodes
curl_writebytes_block(
char *token,
char *storageaccount,
char *containername,
char *blobname,
char *blockid,
char *data,
size_t datasize,
int verbose,
long connect_timeout,
long read_timeout)
omp_lock_t *token_lock,
char *token,
char *refresh_token,
unsigned long *expiry,
char *scope,
char *resource,
char *tenant,
char *clientid,
char *client_secret,
char *storageaccount,
char *containername,
char *blobname,
char *blockid,
char *data,
size_t datasize,
int nretry,
int verbose,
long connect_timeout,
long read_timeout)
{
omp_set_lock(token_lock);
curl_refresh_tokens_retry(
token,
refresh_token,
expiry,
scope,
resource,
clientid,
client_secret,
tenant,
nretry,
verbose,
connect_timeout,
read_timeout);
omp_unset_lock(token_lock);

char authorization[BUFFER_SIZE];
curl_authorization(token, authorization);
char contentlength[BUFFER_SIZE];
Expand Down Expand Up @@ -633,22 +658,30 @@ curl_writebytes_block(

struct ResponseCodes
curl_writebytes_block_retry(
char *token,
char *storageaccount,
char *containername,
char *blobname,
char *blockid,
char *data,
size_t datasize,
int nretry,
int verbose,
long connect_timeout,
long read_timeout)
omp_lock_t *token_lock,
char *token,
char *refresh_token,
unsigned long *expiry,
char *scope,
char *resource,
char *tenant,
char *clientid,
char *client_secret,
char *storageaccount,
char *containername,
char *blobname,
char *blockid,
char *data,
size_t datasize,
int nretry,
int verbose,
long connect_timeout,
long read_timeout)
{
int iretry;
struct ResponseCodes responsecodes;
for (iretry = 0; iretry < nretry; iretry++) {
responsecodes = curl_writebytes_block(token, storageaccount, containername, blobname, blockid, data, datasize, verbose, connect_timeout, read_timeout);
responsecodes = curl_writebytes_block(token_lock, token, refresh_token, expiry, scope, resource, tenant, clientid, client_secret, storageaccount, containername, blobname, blockid, data, datasize, nretry, verbose, connect_timeout, read_timeout);
if (isrestretrycode(responsecodes) == 0) {
break;
}
Expand All @@ -665,19 +698,26 @@ curl_writebytes_block_retry(

struct ResponseCodes
curl_writebytes_block_retry_threaded(
char *token,
char *storageaccount,
char *containername,
char *blobname,
char **blockids,
char *data,
size_t datasize,
int nthreads,
int nblocks,
int nretry,
int verbose,
long connect_timeout,
long read_timeout)
char *token,
char *refresh_token,
unsigned long *expiry,
char *scope,
char *resource,
char *tenant,
char *clientid,
char *client_secret,
char *storageaccount,
char *containername,
char *blobname,
char **blockids,
char *data,
size_t datasize,
int nthreads,
int nblocks,
int nretry,
int verbose,
long connect_timeout,
long read_timeout)
{
size_t block_datasize = datasize/nblocks;
size_t block_dataremainder = datasize%nblocks;
Expand All @@ -690,6 +730,9 @@ curl_writebytes_block_retry_threaded(
thread_responsecode_curl[threadid] = (long)CURLE_OK;
}

omp_lock_t token_lock;
omp_init_lock(&token_lock);

#pragma omp parallel num_threads(nthreads) default(shared)
{
int threadid = omp_get_thread_num();
Expand All @@ -705,12 +748,14 @@ curl_writebytes_block_retry_threaded(
block_firstbyte += block_dataremainder;
}

struct ResponseCodes responsecodes = curl_writebytes_block_retry(token, storageaccount, containername, blobname, blockids[iblock], data+block_firstbyte, _block_datasize, nretry, verbose, connect_timeout, read_timeout);
struct ResponseCodes responsecodes = curl_writebytes_block_retry(&token_lock, token, refresh_token, expiry, scope, resource, tenant, clientid, client_secret, storageaccount, containername, blobname, blockids[iblock], data+block_firstbyte, _block_datasize, nretry, verbose, connect_timeout, read_timeout);
thread_responsecode_http[threadid] = MAX(responsecodes.http, thread_responsecode_http[threadid]);
thread_responsecode_curl[threadid] = MAX(responsecodes.curl, thread_responsecode_curl[threadid]);
}
} // end #pragma omp

omp_destroy_lock(&token_lock);

struct ResponseCodes responsecodes;
responsecodes.http = (long)200;
responsecodes.curl = (long)CURLE_OK;
Expand All @@ -723,17 +768,42 @@ curl_writebytes_block_retry_threaded(

struct ResponseCodes
curl_readbytes(
char *token,
char *storageaccount,
char *containername,
char *blobname,
char *data,
size_t dataoffset,
size_t datasize,
int verbose,
long connect_timeout,
long read_timeout)
omp_lock_t *token_lock,
char *token,
char *refresh_token,
unsigned long *expiry,
char *scope,
char *resource,
char *tenant,
char *clientid,
char *client_secret,
char *storageaccount,
char *containername,
char *blobname,
char *data,
size_t dataoffset,
size_t datasize,
int nretry,
int verbose,
long connect_timeout,
long read_timeout)
{
omp_set_lock(token_lock);
curl_refresh_tokens_retry(
token,
refresh_token,
expiry,
scope,
resource,
clientid,
client_secret,
tenant,
nretry,
verbose,
connect_timeout,
read_timeout);
omp_unset_lock(token_lock);

char authorization[BUFFER_SIZE];
curl_authorization(token, authorization);

Expand Down Expand Up @@ -808,22 +878,30 @@ curl_readbytes(

struct ResponseCodes
curl_readbytes_retry(
char *token,
char *storageaccount,
char *containername,
char *blobname,
char *data,
size_t dataoffset,
size_t datasize,
int nretry,
int verbose,
long connect_timeout,
long read_timeout)
omp_lock_t *token_lock,
char *token,
char *refresh_token,
unsigned long *expiry,
char *scope,
char *resource,
char *tenant,
char *clientid,
char *client_secret,
char *storageaccount,
char *containername,
char *blobname,
char *data,
size_t dataoffset,
size_t datasize,
int nretry,
int verbose,
long connect_timeout,
long read_timeout)
{
struct ResponseCodes responsecodes;
int iretry;
for (iretry = 0; iretry < nretry; iretry++) {
responsecodes = curl_readbytes(token, storageaccount, containername, blobname, data, dataoffset, datasize, verbose, connect_timeout, read_timeout);
responsecodes = curl_readbytes(token_lock, token, refresh_token, expiry, scope, resource, tenant, clientid, client_secret, storageaccount, containername, blobname, data, dataoffset, datasize, nretry, verbose, connect_timeout, read_timeout);
if (isrestretrycode(responsecodes) == 0) {
break;
}
Expand All @@ -840,25 +918,35 @@ curl_readbytes_retry(

struct ResponseCodes
curl_readbytes_retry_threaded(
char *token,
char *storageaccount,
char *containername,
char *blobname,
char *data,
size_t dataoffset,
size_t datasize,
int nthreads,
int nretry,
int verbose,
long connect_timeout,
long read_timeout)
char *token,
char *refresh_token,
unsigned long *expiry,
char *scope,
char *resource,
char *tenant,
char *clientid,
char *client_secret,
char *storageaccount,
char *containername,
char *blobname,
char *data,
size_t dataoffset,
size_t datasize,
int nthreads,
int nretry,
int verbose,
long connect_timeout,
long read_timeout)
{
size_t thread_datasize = datasize/nthreads;
size_t thread_dataremainder = datasize%nthreads;

long thread_responsecode_http[nthreads];
long thread_responsecode_curl[nthreads];

omp_lock_t token_lock;
omp_init_lock(&token_lock);

#pragma omp parallel num_threads(nthreads)
{
int threadid = omp_get_thread_num();
Expand All @@ -871,10 +959,13 @@ curl_readbytes_retry_threaded(
thread_firstbyte += thread_dataremainder;
}

struct ResponseCodes responsecodes = curl_readbytes_retry(token, storageaccount, containername, blobname, data+thread_firstbyte, dataoffset+thread_firstbyte, _thread_datasize, nretry, verbose, connect_timeout, read_timeout);
struct ResponseCodes responsecodes = curl_readbytes_retry(&token_lock, token, refresh_token, expiry, scope, resource, tenant, clientid, client_secret, storageaccount, containername, blobname, data+thread_firstbyte, dataoffset+thread_firstbyte, _thread_datasize, nretry, verbose, connect_timeout, read_timeout);
thread_responsecode_http[threadid] = responsecodes.http;
thread_responsecode_curl[threadid] = responsecodes.curl;
} /* end pragma omp */

omp_destroy_lock(&token_lock);

long responsecode_http = 200;
long responsecode_curl = (long)CURLE_OK;
int threadid;
Expand Down
Loading

0 comments on commit 61c0c31

Please sign in to comment.