|
| 1 | +// Copyright 2025 The Cockroach Authors. |
| 2 | +// |
| 3 | +// Use of this software is governed by the CockroachDB Software License |
| 4 | +// included in the /LICENSE file. |
| 5 | + |
| 6 | +package bulkutil |
| 7 | + |
| 8 | +import ( |
| 9 | + "context" |
| 10 | + "net/url" |
| 11 | + |
| 12 | + "github.com/cockroachdb/cockroach/pkg/ccl/storageccl" |
| 13 | + "github.com/cockroachdb/cockroach/pkg/cloud" |
| 14 | + "github.com/cockroachdb/cockroach/pkg/security/username" |
| 15 | + "github.com/cockroachdb/errors" |
| 16 | +) |
| 17 | + |
| 18 | +// ExternalStorageMux is a utility for managing multiple cloud storage instances. |
| 19 | +// The main motivator for this is each node has its own nodelocal://<node-id> |
| 20 | +// instance. |
| 21 | +type ExternalStorageMux struct { |
| 22 | + factory cloud.ExternalStorageFromURIFactory |
| 23 | + storeInstances map[string]cloud.ExternalStorage |
| 24 | + user username.SQLUsername |
| 25 | +} |
| 26 | + |
| 27 | +// NewExternalStorageMux creates a new ExternalStorageMux that caches external storage |
| 28 | +// instances. This is particularly useful for nodelocal storage where each node |
| 29 | +// has its own storage instance (nodelocal://1/, nodelocal://2/, etc.). |
| 30 | +func NewExternalStorageMux( |
| 31 | + factory cloud.ExternalStorageFromURIFactory, user username.SQLUsername, |
| 32 | +) *ExternalStorageMux { |
| 33 | + return &ExternalStorageMux{ |
| 34 | + factory: factory, |
| 35 | + storeInstances: make(map[string]cloud.ExternalStorage), |
| 36 | + user: user, |
| 37 | + } |
| 38 | +} |
| 39 | + |
| 40 | +// Close closes all cached storage instances. |
| 41 | +func (c *ExternalStorageMux) Close() error { |
| 42 | + var err error |
| 43 | + for _, store := range c.storeInstances { |
| 44 | + err = errors.CombineErrors(err, store.Close()) |
| 45 | + } |
| 46 | + return err |
| 47 | +} |
| 48 | + |
| 49 | +// StoreFile splits a URI into its storage prefix and file path, caching the |
| 50 | +// storage instance for reuse. For example, "nodelocal://1/import/123/file.sst" |
| 51 | +// is split into storage "nodelocal://1" and path "/import/123/file.sst". |
| 52 | +func (c *ExternalStorageMux) StoreFile( |
| 53 | + ctx context.Context, uri string, |
| 54 | +) (storageccl.StoreFile, error) { |
| 55 | + prefix, filepath, err := c.splitURI(uri) |
| 56 | + if err != nil { |
| 57 | + return storageccl.StoreFile{}, err |
| 58 | + } |
| 59 | + prefixKey := prefix.String() |
| 60 | + store, ok := c.storeInstances[prefixKey] |
| 61 | + if !ok { |
| 62 | + storage, err := c.factory(ctx, prefix.String(), c.user) |
| 63 | + if err != nil { |
| 64 | + return storageccl.StoreFile{}, err |
| 65 | + } |
| 66 | + c.storeInstances[prefixKey] = storage |
| 67 | + store = storage |
| 68 | + } |
| 69 | + return storageccl.StoreFile{ |
| 70 | + Store: store, |
| 71 | + FilePath: filepath, |
| 72 | + }, nil |
| 73 | +} |
| 74 | + |
| 75 | +// splitURI splits a URI into its prefix (scheme + host) and path components. |
| 76 | +// For example, "nodelocal://1/import/123/file.sst" becomes: |
| 77 | +// - prefix: url.URL{Scheme: "nodelocal", Host: "1"} |
| 78 | +// - path: "/import/123/file.sst" |
| 79 | +func (c *ExternalStorageMux) splitURI(uri string) (url.URL, string, error) { |
| 80 | + parsed, err := url.Parse(uri) |
| 81 | + if err != nil { |
| 82 | + return url.URL{}, "", errors.Wrap(err, "failed to parse external storage uri") |
| 83 | + } |
| 84 | + |
| 85 | + path := parsed.Path |
| 86 | + parsed.Path = "" |
| 87 | + |
| 88 | + return *parsed, path, nil |
| 89 | +} |
0 commit comments