diff --git a/.github/workflows/CompatHelper.yml b/.github/workflows/CompatHelper.yml new file mode 100644 index 0000000..2876f03 --- /dev/null +++ b/.github/workflows/CompatHelper.yml @@ -0,0 +1,16 @@ +name: CompatHelper +on: + schedule: + - cron: '00 00 * * *' + workflow_dispatch: +jobs: + CompatHelper: + runs-on: ubuntu-latest + steps: + - name: Pkg.add("CompatHelper") + run: julia -e 'using Pkg; Pkg.add("CompatHelper")' + - name: CompatHelper.main() + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + COMPATHELPER_PRIV: ${{ secrets.COMPATHELPER_PRIV }} # optional + run: julia -e 'using CompatHelper; CompatHelper.main()' \ No newline at end of file diff --git a/.github/workflows/Documentation.yml b/.github/workflows/Documentation.yml new file mode 100644 index 0000000..717f01e --- /dev/null +++ b/.github/workflows/Documentation.yml @@ -0,0 +1,24 @@ +name: Documentation + +on: + push: + branches: + - master + tags: '*' + pull_request: + +jobs: + build: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + - uses: julia-actions/setup-julia@latest + with: + version: '1.5' + - name: Install dependencies + run: julia --project=docs/ -e 'using Pkg; Pkg.develop(PackageSpec(path=pwd())); Pkg.instantiate()' + - name: Build and deploy + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} # For authentication with GitHub Actions token + DOCUMENTER_KEY: ${{ secrets.DOCUMENTER_KEY }} # For authentication with SSH deploy key + run: julia --project=docs/ docs/make.jl \ No newline at end of file diff --git a/.github/workflows/TagBot.yml b/.github/workflows/TagBot.yml new file mode 100644 index 0000000..e3837ce --- /dev/null +++ b/.github/workflows/TagBot.yml @@ -0,0 +1,12 @@ +name: TagBot +on: + schedule: + - cron: 0 * * * * +jobs: + TagBot: + runs-on: ubuntu-latest + steps: + - uses: JuliaRegistries/TagBot@v1 + with: + token: ${{ secrets.GITHUB_TOKEN }} + ssh: ${{ secrets.DOCUMENTER_KEY }} \ No newline at end of file diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..1209b4f --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +Manifest.toml +docs/build diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md new file mode 100644 index 0000000..ebb5710 --- /dev/null +++ b/CONTRIBUTING.md @@ -0,0 +1,29 @@ +# AzStorage.jl contributor guidelines + +## Issue reporting +If you have found a bug in AzStorage.jl or have any suggestions for changes to +AzStorage.jl functionality, please consider filing an issue using the GitHub +isssue tracker. Please do not forget to search for an existing issue +which may already cover your suggestion. + +## Contributing +We try to follow GitHub flow (https://guides.github.com/introduction/flow/) for +making changes to AzStorage.jl. Contributors retain copyright on their +contributions, and the MIT license (https://opensource.org/licenses/MIT) +applies to the contribution. + +The basic steps to making a contribution are as follows, and assume some knowledge of +git: + 1. fork the AzStorage.jl repository + 2. create an appropriately titled branch for your contribution + 3. if applicable, add a unit-test to ensure the functionality of your contribution + (see the `test` subfolder). + 4. run `]test AzStorage` in the `test` folder + 5. make a pull-request + 6. have fun + +## Coding conventions +We try to follow the same coding conventions as https://github.com/JuliaLang/julia. +This primarily means using 4 spaces to indent (no tabs). In addition, we make a +best attempt to follow the guidelines in the style guide chapter of the julia +manual: https://docs.julialang.org/en/v1/manual/style-guide/ diff --git a/LICENSE.md b/LICENSE.md new file mode 100644 index 0000000..e6bcf54 --- /dev/null +++ b/LICENSE.md @@ -0,0 +1,22 @@ +The AzStorage.jl package is licensed under the MIT "Expat" License: + +> Copyright (c) 2020: Chevron U.S.A. Inc. +> +> Permission is hereby granted, free of charge, to any person obtaining +> a copy of this software and associated documentation files (the +> "Software"), to deal in the Software without restriction, including +> without limitation the rights to use, copy, modify, merge, publish, +> distribute, sublicense, and/or sell copies of the Software, and to +> permit persons to whom the Software is furnished to do so, subject to +> the following conditions: +> +> The above copyright notice and this permission notice shall be +> included in all copies or substantial portions of the Software. +> +> THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +> EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +> MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. +> IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +> CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, +> TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE +> SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. \ No newline at end of file diff --git a/Project.toml b/Project.toml new file mode 100644 index 0000000..1eb0944 --- /dev/null +++ b/Project.toml @@ -0,0 +1,29 @@ +name = "AzStorage" +uuid = "c6697862-1611-5eae-9ef8-48803c85c8d6" +version = "0.17.0" + +[deps] +AbstractStorage = "14dbef02-f468-5f15-853e-5ec8dee7b899" +AzSessions = "f239b30d-ae6b-58be-a2d5-7e9f30e280a9" +AzStorage_jll = "00c928b4-b5f3-54d8-b38d-afd4635c4ad2" +Base64 = "2a0f44e3-6c83-55bd-87e4-b1978d98bd5f" +HTTP = "cd3eb016-35fb-5094-929b-558a96fad6f3" +LightXML = "9c8b4983-aa76-5018-a973-4c85ecc9e179" +Serialization = "9e88b42a-f829-5b0c-bbe9-9e923198166b" +Sockets = "6462fe0b-24de-5631-8697-dd941f90decc" + +[compat] +julia = "1" +AbstractStorage = "1" +AzSessions = "1" +HTTP = "0.8" +LightXML = "0.9" + +[extras] +Dates = "ade2ca70-3891-5945-98fb-dc099432e06a" +JSON = "682c06a0-de6a-54ab-a142-c8b1cf79cde6" +Random = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c" +Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40" + +[targets] +test = ["Dates", "JSON", "Random", "Test"] diff --git a/README.md b/README.md new file mode 100644 index 0000000..03fe12c --- /dev/null +++ b/README.md @@ -0,0 +1,18 @@ +# AzStorage +Interface to Azure blob storage. + +```julia +using AzSessions, AzStorage + +mysession = AzCCSession(client_id="myclientid", client_secret="myclientsecret", resource="https://storage.azure.com") +container = AzContainer("foo", storageaccount="myaccount", session=mysession) + +mkpath(container) +write(container, "blob", rand(10)) + +readdir(container) + +x = read!(container, "blob", Vector{Float64}(undef, 10)) + +... +``` diff --git a/docs/make.jl b/docs/make.jl new file mode 100644 index 0000000..b6a0000 --- /dev/null +++ b/docs/make.jl @@ -0,0 +1,3 @@ +using Documenter, AzStorage + +makedocs(sitename="AzStorage", modules=[AzStorage]) \ No newline at end of file diff --git a/docs/src/example.md b/docs/src/example.md new file mode 100644 index 0000000..bdf7092 --- /dev/null +++ b/docs/src/example.md @@ -0,0 +1,53 @@ +# Example + +Here we show basic usage where we 1) create a container, 2) write a blob to the container, 3) list the contents of the container, 4) read the blob that was previously created, and 5) delete the container and its contents. + +```julia +using Pkg +Pkg.add("AzSessions") +Pkg.add("AzStorage") + +using AzSessions, AzStorage + +# here we use client credentials, but auth-code-flow and device-code flow (etc.) are also available. +# see the AzSessions.jl package for more details on authentication in Azure. +session = AzSession(;protocal=AzClientCredentials, client_id="myclientid", client_secret="verysecret", resource="https://storage.azure.com/") + +# create a handle to an Azure container in an existing storage account +container = AzContainer("foo"; storageaccount="mystorageaccount", session=session) + +# create the container +mkpath(container) + +# write a blob to the container +write(container, "myblob.bin", rand(10)) + +# list the blobs in the container +readdir(container) + +# read the contents of the blob +x = read!(container, "myblob.bin", Vector{Float64}(undef, 10)) + +# remove the container, and its contents +rm(x) +``` + +In addition, we can represent blob's, providing an API that is similar to handling POSIX files. + +```julia +# create a handle to a blob in a container +io = open(AzContainer("foo"; storageaccount="mystorageaccount"), "myblob.bin") +io = joinpath(AzContainer("foo"; storageaccount="mystorageaccount"), "myblob.bin") # this is equivalent to the previous line. + +# write to the blob +write(io, rand(10)) + +# read the blob +x = read!(io, zeros(10)) + +# check that the blob exists +isfile(x) + +# remove the blob +rm(x) +``` diff --git a/docs/src/index.md b/docs/src/index.md new file mode 100644 index 0000000..b337142 --- /dev/null +++ b/docs/src/index.md @@ -0,0 +1,4 @@ +# AzStorage +AzStorage is a Julia API for Azure storage. AzStorage provides methods for interacting +with Azure containers and blobs. In order to obtain reasonable through-put, I/O is threaded via +OpenMP. In the case of writing, Azure block-blobs are used to help organize the threaded I/O. diff --git a/docs/src/reference.md b/docs/src/reference.md new file mode 100644 index 0000000..48587a4 --- /dev/null +++ b/docs/src/reference.md @@ -0,0 +1,30 @@ +# Reference + +## Construct an Azure Container +```@docs +AzContainer +``` + +## Container methods +```@docs +containers +cp +dirname +isdir +mkpath +readdir +rm(::AzContainer) +``` + +## Blob methods +```@docs +filesize +isfile +joinpath +open +read +read! +rm(::AzContainer, ::AbstractString) +rm(::AzStorage.AzObject) +write +``` \ No newline at end of file diff --git a/src/AzStorage.c b/src/AzStorage.c new file mode 100644 index 0000000..98cd749 --- /dev/null +++ b/src/AzStorage.c @@ -0,0 +1,415 @@ +#include +#include +#include +#include +#include +#include +#include + +#define BUFFER_SIZE 2048 +#define MAXIMUM_BACKOFF 256.0 +#define CURLE_TIMEOUT 600L /* 5 hours */ + +#define MAX(x, y) (((x) > (y)) ? (x) : (y)) +#define MIN(x, y) (((x) < (y)) ? (x) : (y)) + +int +exponential_backoff( + int i) +{ + double sleeptime = MIN(pow(2.0, (double)i), MAXIMUM_BACKOFF) + 1.0*rand()/RAND_MAX; + double sleeptime_seconds = floor(sleeptime); + double sleeptime_nanoseconds = (long)((sleeptime - sleeptime_seconds) * 1000000000.0); + + struct timespec ts_sleeptime, ts_remainingtime; + + ts_sleeptime.tv_sec = (long)sleeptime_seconds; + ts_sleeptime.tv_nsec = (long)sleeptime_nanoseconds; + + return nanosleep(&ts_sleeptime, &ts_remainingtime); +} + +struct ResponseCodes { + long http; + long curl; +}; + +/* +https://docs.microsoft.com/en-us/rest/api/storageservices/common-rest-api-error-codes +https://curl.haxx.se/libcurl/c/libcurl-errors.html +*/ +int +isrestretrycode( + struct ResponseCodes responsecodes) +{ + if (responsecodes.curl == 7 || responsecodes.curl == 55 || responsecodes.curl == 56) { + return 1; + } + if (responsecodes.http == 500 || responsecodes.http == 503) { + return 1; + } + return 0; +} + +void +curl_init() +{ + curl_global_init(CURL_GLOBAL_ALL); +} + +size_t +write_callback_null( + char *ptr, + size_t size, + size_t nmemb, + void *userdata) +{ + return nmemb; +} + +void +curl_authorization( + char *token, + char *authorization) +{ + snprintf(authorization, BUFFER_SIZE, "Authorization: Bearer %s", token); +} + +void +curl_byterange( + char *byterange, + size_t dataoffset, + size_t datasize) +{ + snprintf(byterange, BUFFER_SIZE, "Range: bytes=%ld-%ld", dataoffset, dataoffset+datasize-1); +} + +void +curl_contentlength( + size_t datasize, + char *contentlength) +{ + snprintf(contentlength, BUFFER_SIZE, "Content-Length: %lu", (unsigned long)datasize); +} + +struct DataStruct { + char *data; + size_t datasize; + size_t currentsize; +}; + +size_t +write_callback_readdata( + char *ptr, + size_t size, + size_t nmemb, + void *datavoid) +{ + struct DataStruct *datastruct = (struct DataStruct*)datavoid; + size_t n = size*nmemb; + size_t newsize = datastruct->currentsize + n; + if (newsize > datastruct->datasize) { + printf("error: read too many bytes, %d in %s\n", __LINE__, __FILE__); + return 0; + } + memcpy(datastruct->data+datastruct->currentsize, ptr, n); + datastruct->currentsize = newsize; + return n; +} + +struct ResponseCodes +curl_writebytes_block( + char *token, + char *storageaccount, + char *containername, + char *blobname, + char *blockid, + char *data, + size_t datasize, + int verbose) +{ + char authorization[BUFFER_SIZE]; + curl_authorization(token, authorization); + char contentlength[BUFFER_SIZE]; + curl_contentlength(datasize, contentlength); + + struct curl_slist *headers = NULL; + headers = curl_slist_append(headers, "x-ms-version: 2017-11-09"); + headers = curl_slist_append(headers, "Content-Type: application/octet-stream"); + headers = curl_slist_append(headers, contentlength); + headers = curl_slist_append(headers, authorization); + + CURL *curlhandle = curl_easy_init(); + + char url[BUFFER_SIZE]; + snprintf( + url, + BUFFER_SIZE, + "https://%s.blob.core.windows.net/%s/%s?comp=block&blockid=%s", + storageaccount, + containername, + blobname, + blockid); + + curl_easy_setopt(curlhandle, CURLOPT_URL, url); + curl_easy_setopt(curlhandle, CURLOPT_HTTPHEADER, headers); + curl_easy_setopt(curlhandle, CURLOPT_CUSTOMREQUEST, "PUT"); + curl_easy_setopt(curlhandle, CURLOPT_POSTFIELDSIZE, datasize); + curl_easy_setopt(curlhandle, CURLOPT_POSTFIELDS, data); + curl_easy_setopt(curlhandle, CURLOPT_SSL_VERIFYPEER, 0); /* TODO */ + curl_easy_setopt(curlhandle, CURLOPT_VERBOSE, verbose); + curl_easy_setopt(curlhandle, CURLOPT_TIMEOUT, CURLE_TIMEOUT); + curl_easy_setopt(curlhandle, CURLOPT_WRITEFUNCTION, write_callback_null); + + char errbuf[CURL_ERROR_SIZE]; + curl_easy_setopt(curlhandle, CURLOPT_ERRORBUFFER, errbuf); + + long responsecode_http = 200; + CURLcode responsecode_curl = curl_easy_perform(curlhandle); + curl_easy_getinfo(curlhandle, CURLINFO_RESPONSE_CODE, &responsecode_http); + + if ( (responsecode_curl != CURLE_OK || responsecode_http >= 300) && verbose > 0) { + printf("Warning, curl response=%s, http response code=%ld\n", errbuf, responsecode_http); + } + + curl_easy_cleanup(curlhandle); + curl_slist_free_all(headers); + + struct ResponseCodes responsecodes; + responsecodes.http = responsecode_http; + responsecodes.curl = (long)responsecode_curl; + + return responsecodes; +} + +struct ResponseCodes +curl_writebytes_block_retry( + char *token, + char *storageaccount, + char *containername, + char *blobname, + char *blockid, + char *data, + size_t datasize, + int nretry, + int verbose) +{ + int iretry; + struct ResponseCodes responsecodes; + for (iretry = 0; iretry < nretry; iretry++) { + responsecodes = curl_writebytes_block(token, storageaccount, containername, blobname, blockid, data, datasize, verbose); + if (isrestretrycode(responsecodes) == 0) { + break; + } + if (verbose > 0) { + printf("Warning, bad write, retrying, %d/%d, http_responsecode=%ld, curl_responsecode=%ld.\n", iretry+1, nretry, responsecodes.http, responsecodes.curl); + } + if (exponential_backoff(iretry) != 0) { + printf("Warning, unable to sleep in exponential backoff due to failed nanosleep call.\n"); + break; + } + } + return responsecodes; +} + +struct ResponseCodes +curl_writebytes_block_retry_threaded( + char *token, + char *storageaccount, + char *containername, + char *blobname, + char **blockids, + char *data, + size_t datasize, + int nthreads, + int nblocks, + int nretry, + int verbose) +{ + size_t block_datasize = datasize/nblocks; + size_t block_dataremainder = datasize%nblocks; + + int threadid; + long thread_responsecode_http[nthreads]; + long thread_responsecode_curl[nthreads]; + for (threadid = 0; threadid < nthreads; threadid++) { + thread_responsecode_http[threadid] = 200; + thread_responsecode_curl[threadid] = (long)CURLE_OK; + } + +#pragma omp parallel num_threads(nthreads) default(shared) +{ + int threadid = omp_get_thread_num(); + int iblock; +#pragma omp for + for (iblock = 0; iblock < nblocks; iblock++) { + size_t block_firstbyte = iblock*block_datasize; + size_t _block_datasize = block_datasize; + if (iblock < block_dataremainder) { + block_firstbyte += iblock; + _block_datasize += 1; + } else { + block_firstbyte += block_dataremainder; + } + + struct ResponseCodes responsecodes = curl_writebytes_block_retry(token, storageaccount, containername, blobname, blockids[iblock], data+block_firstbyte, _block_datasize, nretry, verbose); + thread_responsecode_http[threadid] = MAX(responsecodes.http, thread_responsecode_http[threadid]); + thread_responsecode_curl[threadid] = MAX(responsecodes.curl, thread_responsecode_curl[threadid]); + } +} // end #pragma omp + + struct ResponseCodes responsecodes; + responsecodes.http = (long)200; + responsecodes.curl = (long)CURLE_OK; + for (threadid = 0; threadid < nthreads; threadid++) { + responsecodes.http = MAX(responsecodes.http, thread_responsecode_http[threadid]); + responsecodes.curl = MAX(responsecodes.curl, thread_responsecode_curl[threadid]); + } + return responsecodes; +} + +struct ResponseCodes +curl_readbytes( + char *token, + char *storageaccount, + char *containername, + char *blobname, + char *data, + size_t dataoffset, + size_t datasize, + int verbose) +{ + char authorization[BUFFER_SIZE]; + curl_authorization(token, authorization); + + char byterange[BUFFER_SIZE]; + curl_byterange(byterange, dataoffset, datasize); + + struct curl_slist *headers = NULL; + headers = curl_slist_append(headers, authorization); + headers = curl_slist_append(headers, "x-ms-version: 2017-11-09"); + headers = curl_slist_append(headers, byterange); + + struct DataStruct datastruct; + datastruct.data = data; + datastruct.datasize = datasize; + datastruct.currentsize = 0; + + CURL *curlhandle = curl_easy_init(); + + char url[BUFFER_SIZE]; + snprintf( + url, + BUFFER_SIZE, + "https://%s.blob.core.windows.net/%s/%s", + storageaccount, + containername, + blobname); + + curl_easy_setopt(curlhandle, CURLOPT_URL, url); + curl_easy_setopt(curlhandle, CURLOPT_HTTPHEADER, headers); + curl_easy_setopt(curlhandle, CURLOPT_SSL_VERIFYPEER, 0); /* TODO */ + curl_easy_setopt(curlhandle, CURLOPT_TIMEOUT, CURLE_TIMEOUT); + curl_easy_setopt(curlhandle, CURLOPT_VERBOSE, verbose); + curl_easy_setopt(curlhandle, CURLOPT_WRITEFUNCTION, write_callback_readdata); + curl_easy_setopt(curlhandle, CURLOPT_WRITEDATA, (void*)&datastruct); + + char errbuf[CURL_ERROR_SIZE]; + curl_easy_setopt(curlhandle, CURLOPT_ERRORBUFFER, errbuf); + + long responsecode_http = 200; + CURLcode responsecode_curl = curl_easy_perform(curlhandle); + curl_easy_getinfo(curlhandle, CURLINFO_RESPONSE_CODE, &responsecode_http); + + if ( (responsecode_curl != CURLE_OK || responsecode_http >= 300) && verbose > 0) { + printf("Error, bad read, http response code=%ld, curl response=%s\n", responsecode_http, errbuf); + } + + curl_easy_cleanup(curlhandle); + curl_slist_free_all(headers); + + struct ResponseCodes responsecodes; + responsecodes.http = responsecode_http; + responsecodes.curl = (long)responsecode_curl; + + return responsecodes; +} + +struct ResponseCodes +curl_readbytes_retry( + char *token, + char *storageaccount, + char *containername, + char *blobname, + char *data, + size_t dataoffset, + size_t datasize, + int nretry, + int verbose) +{ + struct ResponseCodes responsecodes; + int iretry; + for (iretry = 0; iretry < nretry; iretry++) { + responsecodes = curl_readbytes(token, storageaccount, containername, blobname, data, dataoffset, datasize, verbose); + if (isrestretrycode(responsecodes) == 0) { + break; + } + if (verbose > 0) { + printf("Warning, bad read, retrying, %d/%d, http responsecode=%ld, curl responsecode=%ld.\n", iretry+1, nretry, responsecodes.http, responsecodes.curl); + } + if (exponential_backoff(iretry) != 0) { + printf("Warning, exponential backoff failed\n"); + break; + } + } + return responsecodes; +} + +struct ResponseCodes +curl_readbytes_retry_threaded( + char *token, + char *storageaccount, + char *containername, + char *blobname, + char *data, + size_t dataoffset, + size_t datasize, + int nthreads, + int nretry, + int verbose) +{ + size_t thread_datasize = datasize/nthreads; + size_t thread_dataremainder = datasize%nthreads; + + long thread_responsecode_http[nthreads]; + long thread_responsecode_curl[nthreads]; + +#pragma omp parallel num_threads(nthreads) +{ + int threadid = omp_get_thread_num(); + size_t thread_firstbyte = threadid*thread_datasize; + size_t _thread_datasize = thread_datasize; + if (threadid < thread_dataremainder) { + thread_firstbyte += threadid; + _thread_datasize += 1; + } else { + thread_firstbyte += thread_dataremainder; + } + + struct ResponseCodes responsecodes = curl_readbytes_retry(token, storageaccount, containername, blobname, data+thread_firstbyte, dataoffset+thread_firstbyte, _thread_datasize, nretry, verbose); + thread_responsecode_http[threadid] = responsecodes.http; + thread_responsecode_curl[threadid] = responsecodes.curl; +} /* end pragma omp */ + long responsecode_http = 200; + long responsecode_curl = (long)CURLE_OK; + int threadid; + for (threadid = 0; threadid < nthreads; threadid++) { + responsecode_http = MAX(responsecode_http, thread_responsecode_http[threadid]); + responsecode_curl = MAX(responsecode_curl, thread_responsecode_curl[threadid]); + } + struct ResponseCodes responsecodes; + responsecodes.http = responsecode_http; + responsecodes.curl = responsecode_curl; + + return responsecodes; +} diff --git a/src/AzStorage.jl b/src/AzStorage.jl new file mode 100644 index 0000000..0b9c4e4 --- /dev/null +++ b/src/AzStorage.jl @@ -0,0 +1,601 @@ +module AzStorage + +using AbstractStorage, AzSessions, AzStorage_jll, Base64, HTTP, LightXML, Serialization, Sockets + +function __init__() + ccall((:curl_init, libAzStorage), Cvoid, ()) +end + +mutable struct AzContainer{A<:AzSessionAbstract} <: Container + storageaccount::String + containername::String + prefix::String + session::A + nthreads::Int + nretry::Int + verbose::Int +end + +function Base.copy(container::AzContainer) + AzContainer( + container.storageaccount, + container.containername, + container.prefix, + copy(container.session), + container.nthreads, + container.nretry, + container.verbose) +end + +struct AzObject + container::AzContainer + name::String +end + +""" + open(container, blobname) -> AzObject + +Create a handle to an Azure blob with the name `blobname::String` in the +Azure storage container: `container::AzContainer`. + +# Example: +```julia +io = open(AzContainer("mycontainer"; storageaccount="myaccount"), "foo.bin") +write(io, rand(10)) +``` +""" +function Base.open(container::AzContainer, name) + mkpath(container) + AzObject(container, string(name)) +end + +""" + joinpath(container, blobname) -> AzObject + +Create a handle to an Azure blob with the name `blobname::String` in the +Azure storage container: `container::AzContainer`. + +# Example: +```julia +io = joinpath(AzContainer("mycontainer"; storageaccount="myaccount"), "foo.bin") +write(io, rand(10)) +``` +""" +Base.joinpath(container::AzContainer, name...) = open(container, join(name, '/')) + +Base.close(object::AzObject) = nothing + +const __OAUTH_SCOPE = "offline_access+openid+https://storage.azure.com/user_impersonation" + +""" + container = AzContainer("containername"; storageaccount="myacccount", kwargs...) + +`container` is a handle to a new or existing Azure container in the `myaccount` sorage account. +The storage account must already exist. + +# Additional keyword arguments +* `session=AzSession(;lazy=true,scope=$__OAUTH_SCOPE)` user credentials (see AzSessions.jl package). +* `nthreads=Sys.CPU_THREADS` number of system threads that OpenMP will use to thread I/O. +* `nretry=10` number of retries to the Azure service (when Azure throws a retryable error) before throwing an error. +* `verbose=0` verbosity flag passed to libcurl. + +# Notes +The container name can container "/"'s. If this is the case, then the string preceding the first "/" will +be the container name, and the string that remains will be pre-pended to the blob names. This allows Azure +to present blobs in a pseudo-directory structure. +""" +function AzContainer(containername::AbstractString; storageaccount, session=AzSession(;lazy=true, scope=__OAUTH_SCOPE), nthreads=Sys.CPU_THREADS, nretry=10, verbose=0, prefix="") + name = split(containername, '/') + _containername = name[1] + prefix *= lstrip('/'*join(name[2:end], '/'), '/') + AzContainer(String(storageaccount), String(_containername), String(prefix), session, nthreads, nretry, verbose) +end + +AbstractStorage.Container(::Type{<:AzContainer}, d::Dict, session=AzSession(;lazy=true, scope=__OAUTH_SCOPE)) = + AzContainer(d["storageaccount"], d["containername"], d["prefix"], session, d["nthreads"], d["nretry"], d["verbose"]) + +struct ResponseCodes + http::Int64 + curl::Int64 +end + +# https://docs.microsoft.com/en-us/rest/api/storageservices/common-rest-api-error-codes +const RETRYABLE_HTTP_ERRORS = ( + 500, # Internal server error + 503) # Service unavailable + +function isretryable(e::HTTP.StatusError) + e.status ∈ RETRYABLE_HTTP_ERRORS && (return true) + false +end +isretryable(e::Base.IOError) = true +isretryable(e::HTTP.IOExtras.IOError) = isretryable(e.e) +isretryable(e::Base.EOFError) = true +isretryable(e::Sockets.DNSError) = Base.uverrorname(e.code) == "EAI_NONAME" ? false : true +isretryable(e) = false + +function retrywarn(i, s, e) + @debug "retry $i, sleeping for $s seconds, e=$e" +end + +macro retry(retries, ex::Expr) + quote + local r + for i = 1:$(esc(retries)) + try + r = $(esc(ex)) + break + catch e + (i <= $(esc(retries)) && isretryable(e)) || rethrow(e) + maximum_backoff = 256 + s = min(2.0^(i-1), maximum_backoff) + rand() + retrywarn(i, s, e) + sleep(s) + end + end + r + end +end + +""" + mkpath(container) + +create an Azure container from the handle `container::AzContainer`. If the container +already exists, then this is a no-op. +""" +function Base.mkpath(c::AzContainer) + if !iscontainer(c) + @retry c.nretry HTTP.request( + "PUT", + "https://$(c.storageaccount).blob.core.windows.net/$(c.containername)?restype=container", + Dict( + "Authorization"=>"Bearer $(token(c.session))", + "x-ms-version"=>"2017-11-09"), + retry = false) + end + nothing +end + +const _MINBYTES_PER_BLOCK = 32_000_000 +const _MAXBYTES_PER_BLOCK = 100_000_000 +const _MAXBLOCKS_PER_BLOB = 50_000 + +nblocks_error1() = error("data is too large for a block-blob: too many blocks") +nblocks_error2() = error("data is too large for a block-block: too many bytes per block") +function nblocks(nthreads::Integer, nbytes::Integer) + nblocks = ceil(Int, nbytes/_MAXBYTES_PER_BLOCK + eps(Float64)) + if nblocks < nthreads + bytes_per_block = max(div(nblocks, nthreads), _MINBYTES_PER_BLOCK) + nblocks = max(1, ceil(Int, nbytes/bytes_per_block)) + end + nblocks > _MAXBLOCKS_PER_BLOB && nblocks_error1() + bytes_per_block = div(nbytes, nblocks) + (rem(nbytes, nblocks) == 0 ? 0 : 1) + bytes_per_block > _MAXBYTES_PER_BLOCK && nblocks_error2() + nblocks +end + +addprefix(c::AzContainer, o) = c.prefix == "" ? o : normpath("$(c.prefix)/$o") + +function writebytes(c::AzContainer, o::AbstractString, data::DenseArray{UInt8}; contenttype="application/octet-stream") + function writebytes_blob(c, o, data, contenttype) + @retry c.nretry HTTP.request( + "PUT", + "https://$(c.storageaccount).blob.core.windows.net/$(c.containername)/$(addprefix(c,o))", + Dict( + "Authorization" => "Bearer $(token(c.session))", + "x-ms-version" => "2017-11-09", + "Content-Length" => "$(length(data))", + "Content-Type" => contenttype, + "x-ms-blob-type" => "BlockBlob"), + data, + retry = false, + verbose = c.verbose) + nothing + end + + function putblocklist(c, o, blockids) + xdoc = XMLDocument() + xroot = create_root(xdoc, "BlockList") + for blockid in blockids + add_text(new_child(xroot, "Uncommitted"), blockid) + end + blocklist = string(xdoc) + + @retry c.nretry HTTP.request( + "PUT", + "https://$(c.storageaccount).blob.core.windows.net/$(c.containername)/$(addprefix(c,o))?comp=blocklist", + Dict( + "x-ms-version" => "2017-11-09", + "Authorization" => "Bearer $(token(c.session))", + "Content-Type" => "application/octet-stream", + "Content-Length" => "$(length(blocklist))"), + blocklist, + retry = false) + nothing + end + + function writebytes_block(c, o, data, _nblocks) + t = token(c.session) + l = ceil(Int, log10(_nblocks)) + blockids = [base64encode(lpad(blockid-1, l, '0')) for blockid in 1:_nblocks] + _blockids = [HTTP.escapeuri(blockid) for blockid in blockids] + r = ccall((:curl_writebytes_block_retry_threaded, libAzStorage), ResponseCodes, + (Cstring, Cstring, Cstring, Cstring, Ptr{Cstring}, Ptr{UInt8}, Csize_t, Cint, Cint, Cint, Cint), + t, c.storageaccount, c.containername, addprefix(c,o), _blockids, data, length(data), c.nthreads, _nblocks, c.nretry, c.verbose) + r.http >= 300 && error("writebytes_block: error code $(r.http)") + r.curl > 0 && error("curl error, code=$(r.curl)") + + putblocklist(c, o, blockids) + end + + _nblocks = nblocks(c.nthreads, length(data)) + if _nblocks > 1 + writebytes_block(c, o, data, _nblocks) + else + writebytes_blob(c, o, data, contenttype) + end + nothing +end + +""" + write(container, "blobname", data::AbstractString; contenttype="text/plain") + +Write the string `data` to a blob with name `blobname` in `container::AzContainer`. +Optionally, one can specify the content-type of the blob using the `contenttype` keyword argument. +For example: `content-type="text/plain", `content-type="applicaton/json", etc.. +""" +Base.write(c::AzContainer, o::AbstractString, data::AbstractString; contenttype="text/plain") = + writebytes(c, o, transcode(UInt8, data); contenttype=contenttype) + +""" + write(container, "blobname", data::DenseArray) + +Write the array `data` to a blob with the name `blobname` in `container::AzContainer`. +""" +Base.write(c::AzContainer, o::AbstractString, data::DenseArray{T}) where {T<:Number} = + writebytes(c, o, unsafe_wrap(Vector{UInt8}, convert(Ptr{UInt8}, pointer(data)), length(data)*sizeof(T), own=false); contenttype="application/octet-stream") + +function Base.write(c::AzContainer, o::AbstractString, data::AbstractArray) + io = IOBuffer() + serialize(io, data) + writebytes(c, o, take!(io); contenttype="application/octet-stream") +end + +""" + write(io::AzObject, data) + +write data to `io::AzObject`. + +# Example +``` +io = open(AzContainer("mycontainer";storageaccount="mystorageaccount"), "foo.bin") +write(io, rand(10)) +x = read!(io, zeros(10)) +``` +""" +Base.write(o::AzObject, data) = write(o.container, o.name, data) + +nthreads_effective(nthreads::Integer, nbytes::Integer) = clamp(div(nbytes, _MINBYTES_PER_BLOCK), 1, nthreads) + +function readbytes!(c::AzContainer, o::AbstractString, data::DenseArray{UInt8}; offset=0) + function readbytes_serial!(c, o, data, offset) + HTTP.open( + "GET", + "https://$(c.storageaccount).blob.core.windows.net/$(c.containername)/$(addprefix(c,o))", + Dict( + "Authorization" => "Bearer $(token(c.session))", + "x-ms-version" => "2017-11-09", + "Range" => "bytes=$offset-$(offset+length(data)-1)"), + retry = false, + verbose = c.verbose) do io + read!(io, data) + end + nothing + end + + function readbytes_threaded!(c, o, data, offset, _nthreads) + t = token(c.session) + r = ccall((:curl_readbytes_retry_threaded, libAzStorage), ResponseCodes, + (Cstring, Cstring, Cstring, Cstring, Ptr{UInt8}, Csize_t, Csize_t, Cint, Cint, Cint), + t, c.storageaccount, c.containername, addprefix(c,o), data, offset, length(data), _nthreads, c.nretry, c.verbose) + r.http >= 300 && error("readbytes_threaded!: error code $(r.http)") + r.curl > 0 && error("curl error, code=$(r.curl))") + nothing + end + + _nthreads = nthreads_effective(c.nthreads, length(data)) + if _nthreads > 1 + readbytes_threaded!(c, o, data, offset, _nthreads) + else + readbytes_serial!(c, o, data, offset) + end + data +end + +""" + read(container, "blobname", String) + +returns the contents of the blob "blobname" in `container::AzContainer` as a string. +""" +Base.read(c::AzContainer, o::AbstractString, T::Type{String}) = String(readbytes!(c, o, Vector{UInt8}(undef, filesize(c,o)))) + +""" + read!(container, "blobname", data; offset=0) + +read from the blob "blobname" in `container::AzContainer` into `data::DenseArray`, and +where `offset` specifies a number of bytes in the blob to skip before reading. This +method returns `data`. For example, +``` +data = read!(AzContainer("foo";storageaccount="bar"), "baz.bin", Vector{Float32}(undef,10)) +``` +""" +function Base.read!(c::AzContainer, o::AbstractString, data::DenseArray{T}; offset=0) where {T<:Number} + _data = unsafe_wrap(Array, convert(Ptr{UInt8}, pointer(data)), length(data)*sizeof(T), own=false) + readbytes!(c, o, _data; offset=offset*sizeof(T)) + data +end + +""" + data = read(container, "blobname") + +read from a blob "blobname" in `container::AzContainer` into new memory. +""" +function Base.read(c::AzContainer, o::String) + io = IOBuffer(readbytes!(c, o, Array{UInt8}(undef, filesize(c, o)))) + deserialize(io) +end + +""" + read(object, String) + +read a string from `object::AzObject`. + +# Example +``` +io = open(AzContainer("mycontainer";storageaccount="mystorageaccount"), "foo.txt") +read(io, String) +``` +""" +Base.read(o::AzObject, T::Type{String}) = read(o.container, o.name, String) + +""" + read!(object, x; offset=0) -> x + +read data from `object::AzObject` into `x::DenseArray`, +and return `x`. `offset` is an integer that can be +used to specify the first byte in the object to read. + +# Example +``` +io = open(AzContainer("mycontainer";storageaccount="mystorageaccount"), "foo.txt") +read!(io, Vector{Float64}(undef, 10)) +``` +""" +Base.read!(o::AzObject, data; offset=0) = read!(o.container, o.name, data; offset=offset) + +""" + readdir(container) + +list of objects in a container. +""" +function Base.readdir(c::AzContainer; filterlist=true) + marker = "" + names = String[] + while true + r = @retry c.nretry HTTP.request( + "GET", + "https://$(c.storageaccount).blob.core.windows.net/$(c.containername)?restype=container&comp=list&marker=$marker", + Dict( + "Authorization" => "Bearer $(token(c.session))", + "x-ms-version" => "2017-11-09"), + retry = false) + xroot = root(parse_string(String(r.body))) + blobs = xroot["Blobs"][1]["Blob"] + _names = [content(blob["Name"][1]) for blob in blobs] + if filterlist && c.prefix != "" + _names = replace.(filter(_name->startswith(_name, c.prefix), _names), normpath(c.prefix*"/")=>"") + end + names = [names; _names] + marker = content(xroot["NextMarker"][1]) + marker == "" && break + end + names +end + +""" + dirname(container) + +Returns the name of the Azure container that `container::AzContainer` is a handler to. +""" +function Base.dirname(c::AzContainer) + local nm + if c.prefix == "" + nm = c.containername + else + nm = normpath(c.containername * "/" * c.prefix) + end + nm +end + +AbstractStorage.session(c::AzContainer) = c.session + +function AbstractStorage.scrubsession!(c::AzContainer) + scrub!(c.session) + c +end + +function AbstractStorage.scrubsession(c::AzContainer) + _c = copy(c) + scrub!(_c.session) + _c +end + +""" + isfile(container, "blobname") + +Returns true if the blob "blobname" exists in `container::AzContainer`. +""" +Base.isfile(c::AzContainer, object::AbstractString) = object ∈ readdir(c) + +""" + isfile(object::AzObject) + +Returns true if the blob corresponding to `object` exists. +""" +Base.isfile(o::AzObject) = isfile(o.container, o.name) + +iscontainer(c::AzContainer) = c.containername ∈ containers(storageaccount=c.storageaccount, session=c.session, nretry=c.nretry) + +""" + isdir(container) + +Returns true if `container::AzContainer` exists. +""" +function Base.isdir(c::AzContainer) + if !iscontainer(c) + return false + end + if c.prefix == "" + return true + end + !isempty(readdir(c::AzContainer)) +end + +""" + containers(;storageaccount="mystorageaccount", session=AzSession(;lazy=true, scope=__OAUTH_SCOPE)) + +list all containers in a given storage account. +""" +function containers(;storageaccount, session=AzSession(;lazy=true, scope=__OAUTH_SCOPE), nretry=5) + marker = "" + names = String[] + while true + r = @retry nretry HTTP.request( + "GET", + "https://$storageaccount.blob.core.windows.net/?comp=list&marker=$marker", + Dict( + "Authorization" => "Bearer $(token(session))", + "x-ms-version" => "2017-11-09"), + retry = false) + xroot = root(parse_string(String(r.body))) + containers = xroot["Containers"][1]["Container"] + names = [names; [content(container["Name"][1]) for container in containers]] + marker = content(xroot["NextMarker"][1]) + marker == "" && break + end + names +end + +""" + filesize(container, "blobname") + +Returns the size of the blob "blobname" that is in `container::AzContainer` +""" +function Base.filesize(c::AzContainer, o::AbstractString) + r = @retry c.nretry HTTP.request( + "HEAD", + "https://$(c.storageaccount).blob.core.windows.net/$(c.containername)/$(addprefix(c,o))", + Dict( + "Authorization" => "Bearer $(token(c.session))", + "x-ms-version" => "2017-11-09"), + retry = false) + n = 0 + for header in r.headers + if header.first == "Content-Length" + n = parse(Int, header.second) + end + end + n +end + +""" + filesize(object::AzObject) + +Returns the size of the blob corresponding to `object::AzObject` +""" +Base.filesize(o::AzObject) = filesize(o.container, o.name) + +""" + rm(container, "blobname") + +remove the blob "blobname" from `container::AzContainer`. +""" +function Base.rm(c::AzContainer, o::AbstractString) + try + @retry c.nretry HTTP.request( + "DELETE", + "https://$(c.storageaccount).blob.core.windows.net/$(c.containername)/$(addprefix(c,o))", + Dict( + "Authorization" => "Bearer $(token(c.session))", + "x-ms-version" => "2017-11-09"), + retry = false) + catch + @warn "error removing $(c.containername)/$(addprefix(c,o))" + end + nothing +end + +""" + rm(object::AzObject) + +remove the blob corresponding to `object::AzObject` +""" +Base.rm(o::AzObject) = rm(o.container, o.name) + +""" + rm(container) + +remove `container::AzContainer` and all of its blobs. +""" +function Base.rm(c::AzContainer) + function _rm(c::AzContainer) + @retry c.nretry HTTP.request( + "DELETE", + "https://$(c.storageaccount).blob.core.windows.net/$(c.containername)?restype=container", + Dict( + "Authorization" => "Bearer $(token(c.session))", + "x-ms-version" => "2017-11-09"), + retry = false) + end + + try + if c.prefix == "" + _rm(c) + else + for o in readdir(c) + rm(c, o) + end + isempty(readdir(c; filterlist=false)) && _rm(c) + end + catch + @warn "error removing $(c.containername)" + end + + nothing +end + +""" + cp(container_src, container_dst) + +copy `container_src::AzContainer` and its blobs to `container_dst::AzContainer`. +""" +function Base.cp(src::AzContainer, dst::AzContainer) + mkpath(dst) + + blobs = readdir(src) + for blob in blobs + @retry dst.nretry HTTP.request( + "PUT", + "https://$(dst.storageaccount).blob.core.windows.net/$(dst.containername)/$(addprefix(dst,blob))", + Dict( + "Authorization" => "Bearer $(token(dst.session))", + "x-ms-version" => "2017-11-09", + "x-ms-copy-source" => "https://$(src.storageaccount).blob.core.windows.net/$(src.containername)/$(addprefix(src,blob))"), + retry = false) + end + nothing +end + +export AzContainer, containers + +end diff --git a/src/makefile b/src/makefile new file mode 100644 index 0000000..c919038 --- /dev/null +++ b/src/makefile @@ -0,0 +1,6 @@ +all: + gcc `curl-config --cflags` -O3 -fopenmp -fPIC -c AzStorage.c + gcc -shared -fopenmp -o libAzStorage.so AzStorage.o `curl-config --libs` ${LDFLAGS} + +clean: + rm -rf *.so *.o diff --git a/test/runtests.jl b/test/runtests.jl new file mode 100644 index 0000000..6d71648 --- /dev/null +++ b/test/runtests.jl @@ -0,0 +1,288 @@ +using AbstractStorage, AzSessions, AzStorage, Dates, JSON, Random, Test + +session = AzSession(read(joinpath(homedir(),"session.json"), String)) +storageaccount = ENV["STORAGE_ACCOUNT"] +@info "storageaccount=$storageaccount" + +for container in containers(;storageaccount=storageaccount,session=session) + rm(AzContainer(container;storageaccount=storageaccount,session=session)) +end +@info "sleeping for 60 seconds to ensure Azure clean-up from any previous run" +sleep(60) + +@testset "Containers, list" begin + sleep(1) + r = lowercase(randstring(MersenneTwister(millisecond(now())+0))) + c = AzContainer("foo-$r-a", storageaccount=storageaccount, session=session) + mkpath(c) + + write(c, "bar", "one") + write(c, "baz", "two") + + l = readdir(c) + + @test "bar" ∈ l + @test "baz" ∈ l + + containers(storageaccount=c.storageaccount, session=c.session) + @test isdir(c) + rm(c) + @test !isdir(c) +end + +@testset "Containers, prefix, list" begin + sleep(1) + r = lowercase(randstring(MersenneTwister(millisecond(now())+1))) + c = AzContainer("foo-$r-b", prefix="prefix", storageaccount=storageaccount, session=session) + mkpath(c) + + write(c, "bar", "one") + write(c, "baz", "two") + + l = readdir(c) + + @test "bar" ∈ l + @test "baz" ∈ l + + l = readdir(c, filterlist=false) + @test "prefix/bar" ∈ l + @test "prefix/baz" ∈ l + + containers(storageaccount=c.storageaccount, session=c.session) + @test isdir(c) + rm(c) + @test !isdir(c) +end + +@testset "Containers, prefix, list, alt construction" begin + sleep(1) + r = lowercase(randstring(MersenneTwister(millisecond(now())+2))) + c = AzContainer("foo-$r-c/prefix", storageaccount=storageaccount, session=session) + mkpath(c) + + write(c, "bar", "one") + write(c, "baz", "two") + + l = readdir(c) + + @test "bar" ∈ l + @test "baz" ∈ l + + l = readdir(c, filterlist=false) + @test "prefix/bar" ∈ l + @test "prefix/baz" ∈ l + + containers(storageaccount=c.storageaccount, session=c.session) + @test isdir(c) + rm(c) + @test !isdir(c) +end + +@testset "Containers, dirname" begin + sleep(1) + r = lowercase(randstring(MersenneTwister(millisecond(now())+3))) + c = AzContainer("foo-$r-d", storageaccount=storageaccount, session=session) + @test dirname(c) == "foo-$r-d" +end + +@testset "Containers, prefix, dirname" begin + sleep(1) + r = lowercase(randstring(MersenneTwister(millisecond(now())+4))) + c = AzContainer("foo-$r-e", prefix="prefix", storageaccount=storageaccount, session=session) + @test dirname(c) == "foo-$r-e/prefix" +end + +@testset "Containers, size, prefix=$prefix" for prefix in ("", "prefix") + sleep(1) + suffix = prefix == "" ? "-foo" : "-bar" + r = lowercase(randstring(MersenneTwister(millisecond(now())+5))) + c = AzContainer("foo-$r-f$suffix", prefix=prefix, storageaccount=storageaccount, session=session) + mkpath(c) + + write(c, "bar", rand(UInt8,10)) + write(c, "baz", rand(UInt8,11)) + @test filesize(c, "bar") == 10 + @test filesize(c, "baz") == 11 + + rm(c) +end + +@testset "Containers, bytes, nthreads=$nthreads, prefix=$prefix" for nthreads in (1, 2), prefix in ("","prefix") + sleep(1) + suffix = prefix == "" ? "-foo" : "-bar" + r = lowercase(randstring(MersenneTwister(millisecond(now())+6))) + c = AzContainer("foo-$r-g$suffix", prefix=prefix, storageaccount=storageaccount, session=session, nthreads=nthreads) + mkpath(c) + + N = 10 + if nthreads == 2 + N = round(Int, AzStorage._MINBYTES_PER_BLOCK * nthreads * 5 * (1+rand()) / 8) + nblks = AzStorage.nblocks(nthreads, N*8) + bytes_per_block = div(8*N, nblks) + (rem(8*N, nblks) == 0 ? 0 : 1) + @info "bytes_per_block=$bytes_per_block, AzStorage._MAXBYTES_PER_BLOCK=$(AzStorage._MAXBYTES_PER_BLOCK)" + end + + x = rand(N) + write(c, "bar", x) + y = read!(c, "bar", Vector{Float64}(undef, N)) + @test x ≈ y + rm(c) +end + +@testset "Containers, bytes, nested folder, prefix=$prefix" for prefix in ("","prefix") + sleep(1) + r = lowercase(randstring(MersenneTwister(millisecond(now())+7))) + suffix = prefix == "" ? "-foo" : "-bar" + c = AzContainer("foo-$r-e$suffix", prefix=prefix, storageaccount=storageaccount, session=session) + mkpath(c) + + x = rand(10) + write(c, "bar/baz", x) + y = read!(c, "bar/baz", Vector{Float64}(undef, 10)) + @test x ≈ y + rm(c) +end + +@testset "Containers, string, prefix=$prefix" for prefix in ("", "prefix") + sleep(1) + r = lowercase(randstring(MersenneTwister(millisecond(now())+8))) + suffix = prefix == "" ? "-foo" : "-bar" + c = AzContainer("foo-$r-f$suffix", prefix=prefix, storageaccount=storageaccount, session=session) + mkpath(c) + + write(c, "bar", "hello world\n") + @test read(c, "bar", String) == "hello world\n" + rm(c) +end + +@testset "Containers, rm, prefix=$prefix" for prefix in ("prefix","") + sleep(1) + r = lowercase(randstring(MersenneTwister(millisecond(now())+9))) + suffix = prefix == "" ? "-foo" : "-bar" + c = AzContainer("foo-$r-g$suffix", prefix=prefix, storageaccount=storageaccount, session=session) + mkpath(c) + write(c, "bar", "bar\n") + write(c, "foo", "foo\n") + rm(c, "bar") + @test readdir(c) == ["foo"] + rm(c, "baz") + @test readdir(c) == ["foo"] + rm(c) +end + +@testset "Containers, isfile, prefix=$prefix" for prefix in ("","prefix") + sleep(1) + r = lowercase(randstring(MersenneTwister(millisecond(now())+10))) + suffix = prefix == "" ? "-foo" : "-bar" + c = AzContainer("foo-$r-h$suffix", prefix=prefix, storageaccount=storageaccount, session=session) + mkpath(c) + write(c, "bar", "bar\n") + @test isfile(c, "bar") + write(c, "bar/baz", "bar\n") + @test isfile(c, "bar/baz") + @test !isfile(c, "notanobject") + rm(c) +end + +@testset "Containers, cp, prefix=$prefix" for prefix in ("", "prefix") + sleep(1) + r = lowercase(randstring(MersenneTwister(millisecond(now())+11))) + suffix = prefix == "" ? "-foo" : "-bar" + src = AzContainer("foo-$r-i$suffix", storageaccount=storageaccount, session=session) + r = lowercase(randstring(MersenneTwister(millisecond(now())+12))) + dst = AzContainer("foo-$r-j$suffix", storageaccount=storageaccount, session=session) + + mkpath(src) + write(src, "bar", "one") + write(src, "baz", "two") + + cp(src, dst) + + @test read(dst, "bar", String) == "one" + @test read(dst, "baz", String) == "two" + + rm(src) + rm(dst) +end + +@testset "Containers, json" begin + sleep(1) + r = lowercase(randstring(MersenneTwister(millisecond(now())+13))) + c = AzContainer("foo-$r-k", storageaccount=storageaccount, session=session, nthreads=2, nretry=10) + mkpath(c) + _c = Container(AzContainer, JSON.parse(json(c)), c.session) + @test _c.storageaccount == storageaccount + @test _c.containername == "foo-$r-k" + @test _c.nretry == 10 + @test _c.nthreads == 2 + rm(c) +end + +@testset "Object, bytes" begin + sleep(1) + r = lowercase(randstring(MersenneTwister(millisecond(now())+13))) + c = AzContainer("foo-$r-k", storageaccount=storageaccount, session=session, nthreads=2, nretry=10) + io = open(c, "bar") + x = rand(10) + write(io, x) + _x = read!(io, zeros(10)) + rm(c) + @test x ≈ _x +end + +@testset "Object, bytes" begin + sleep(1) + r = lowercase(randstring(MersenneTwister(millisecond(now())+13))) + c = AzContainer("foo-$r-k", storageaccount=storageaccount, session=session, nthreads=2, nretry=10) + io = open(c, "bar") + write(io, "hello") + x = read(io, String) + rm(c) + @test x == "hello" +end + +@testset "Object, isfile" begin + sleep(1) + r = lowercase(randstring(MersenneTwister(millisecond(now())+13))) + c = AzContainer("foo-$r-k", storageaccount=storageaccount, session=session, nthreads=2, nretry=10) + io = open(c, "bar") + write(io, "hello") + @test isfile(io) + rm(c) +end + +@testset "Object, rm" begin + sleep(1) + r = lowercase(randstring(MersenneTwister(millisecond(now())+13))) + c = AzContainer("foo-$r-k", storageaccount=storageaccount, session=session, nthreads=2, nretry=10) + io = open(c, "bar") + write(io, "hello") + @test isfile(io) + rm(io) + @test !isfile(io) + rm(c) +end + +@testset "Object, joinpath" begin + sleep(1) + r = lowercase(randstring(MersenneTwister(millisecond(now())+13))) + c = AzContainer("foo-$r-k", storageaccount=storageaccount, session=session, nthreads=2, nretry=10) + io = joinpath(c, "bar", "baz") + write(io, "hello") + @test read(io, String) == "hello" + rm(c) +end + +# this failed because of a bug in the block-list when using exactly 10 blocks +# Anusha found the failing example. +@testset "Anusha's example" begin + sleep(1) + r = lowercase(randstring(MersenneTwister(millisecond(now())+14))) + c = AzContainer("foo-$r-l", storageaccount=storageaccount, session=session, nthreads=2, nretry=10) + mkpath(c) + x = rand(2801,13821) + write(c, "bar", x) + _x = read!(c, "bar", Array{Float64,2}(undef,2801,13821)) + @test x ≈ _x + rm(c) +end