Skip to content

Commit

Permalink
storage: Implement the provider repair operation and supporting apis.…
Browse files Browse the repository at this point in the history
… Cleanup and refactoring.
  • Loading branch information
patniemeyer committed Jan 24, 2024
1 parent 7b999a7 commit 6dc21d0
Show file tree
Hide file tree
Showing 29 changed files with 1,426 additions and 285 deletions.
2 changes: 1 addition & 1 deletion str-twincoding/clean.sh
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
rm -rf *.encoded recovered.dat file_1KB.dat file_1MB.dat repository encoding/repository server/repository
rm -rf *.encoded recovered.dat file_1KB.dat file_1MB.dat repository encoding/repository server/repository examples/data


11 changes: 11 additions & 0 deletions str-twincoding/docs/usage.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,18 @@ providers.sh list
monitor.sh --update 1

# Push the file by name
# (Observe the availability of the file in the monitor)
storage.sh push foo_file.dat

# Delete a shard from one of the providers
# (Observe the availability is reduced as a unique shard is lost)
storage.sh delete_shard --provider 5001 foo_file.dat --node_type 0 --node_index 0

# Request that the provider rebuild the lost node from specified other nodes in the cluster.
storage.sh request_repair --to_provider 5001 foo_file.dat --node_type 0 --node_index 0 --from_providers 5002 5003 5004
...


# Shut down the servers
examples/test-cluster.sh stop

4 changes: 2 additions & 2 deletions str-twincoding/encoding/file_decoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import numpy as np
from tqdm import tqdm

from storage.storage_model import NodeType, EncodedFile, assert_rs
from storage.storage_model import NodeType, EncodedFile
from storage.repository import Repository

from encoding.chunks import ChunksReader, open_output_file
Expand All @@ -34,7 +34,7 @@ def __init__(self,
org_file_length: int = None # original file length without encoder padding
):

assert_rs(node_type)
node_type.assert_reed_solomon()
self.k = node_type.k
self.transpose = node_type.transpose
self.node_type = node_type
Expand Down
6 changes: 3 additions & 3 deletions str-twincoding/encoding/file_encoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from encoding.chunks import ChunkReader
from encoding.twin_coding import rs_generator_matrix, Code, twin_code
from storage.storage_model import EncodedFile, NodeType0, NodeType1, assert_rs
from storage.storage_model import EncodedFile, NodeType0, NodeType1
from storage.repository import Repository
from tqdm import tqdm
import time
Expand All @@ -31,8 +31,8 @@ def __init__(self,
output_path: str = None,
overwrite: bool = False):

assert_rs(node_type0)
assert_rs(node_type1)
node_type0.assert_reed_solomon()
node_type1.assert_reed_solomon()
assert node_type0.k == node_type1.k, "The two node types must have the same k."
assert node_type0.n > node_type0.k and node_type1.n > node_type1.k, "The node type must have n > k."

Expand Down
8 changes: 5 additions & 3 deletions str-twincoding/encoding/node_recovery_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@

from encoding.chunks import ChunksReader, open_output_file
from encoding.twin_coding import rs_generator_matrix
from storage.storage_model import NodeType, NodeType1, assert_rs
from storage.storage_model import NodeType, NodeType1
from storage.repository import Repository


# Consume recovery files from k nodes of the opposite type to recover a lost node's data.
# Consume recovery files from k nodes to recover a lost shard's data.
# The recovered shard type will be the opposite of the recovery source node type.
# See `node_recovery_source.py` for generating the recovery files.
#
# Note: We will parallelize this in a future update.
Expand All @@ -26,7 +27,7 @@ def __init__(self,
output_path: str = None,
overwrite: bool = False):

assert_rs(recovery_source_node_type)
recovery_source_node_type.assert_reed_solomon()
self.recovery_source_node_type = recovery_source_node_type
self.k = recovery_source_node_type.k

Expand Down Expand Up @@ -59,6 +60,7 @@ def map_files(files_dir: str,
return OrderedDict(sorted(files.items(), key=lambda x: x[1])[:k])

def recover_node(self):
print(f"Recovering node to: {self.output_path}")
GF = galois.GF(2 ** 8)
G = rs_generator_matrix(GF, self.recovery_source_node_type.k, self.recovery_source_node_type.n)
with open_output_file(output_path=self.output_path, overwrite=self.overwrite) as out:
Expand Down
142 changes: 101 additions & 41 deletions str-twincoding/encoding/node_recovery_source.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import time
import galois
from icecream import ic
from tqdm import tqdm

from storage.storage_model import NodeType, NodeType1
from storage.renderable import Renderable
from storage.storage_model import NodeType
from storage.repository import Repository
from encoding.chunks import ChunkReader, open_output_file
from encoding.twin_coding import rs_generator_matrix
Expand All @@ -14,28 +16,77 @@
#
# Note: We will parallelize this in a future update.
#
# TODO: We don't currently have a way to assert that the source and client nodes are of opposite types.
#
class NodeRecoverySource(ChunkReader):
def __init__(self,
# Node information for the recover node (client node).
recover_node_type: NodeType,
recover_node_index: int,
# Node information for this node (source node)
data_path: str,
# Output
output_path: str = None,
overwrite: bool = False):
class NodeRecoverySource(ChunkReader, Renderable):

# This file level init.
# @see `NodeRecoverySource.for_nodes()` which works with at the repository level.
def __init__(
self,
# Node information for the recover node (client node).
recover_node_type: NodeType,
recover_node_index: int,

# Node information for the source "helper" node.
data_path: str,

# Output
output_path: str = None,
overwrite: bool = False
):
super().__init__(path=data_path, chunk_size=recover_node_type.k)
assert recover_node_type.encoding == 'reed_solomon', "Only reed solomon encoding is currently supported."
recover_node_type.assert_reed_solomon()
self.recover_node_type = recover_node_type
assert recover_node_index < recover_node_type.k, "Recover node index must be less than k."
assert recover_node_index < recover_node_type.n, "Recover node index must be less than n."
self.recover_node_index = recover_node_index
self.output_path = output_path or f"recover_{recover_node_index}.dat"
self.overwrite = overwrite

# Implement hash
def __hash__(self):
return hash(self.output_path)

@classmethod
# Init a NodeSourceRecovery instance using the specified repository file conventions.
def for_repo(
cls,
repo: Repository,
filename: str,

# Node information for the recovering node (client node).
recover_node_type: NodeType,
recover_node_index: int,

# Node information for the source "helper" node.
source_node_type: NodeType,
source_node_index: int,

overwrite: bool = False
):
# The source and recover nodes must be of opposing types (0, 1).
assert recover_node_type.type != source_node_type.type, "Node types must be different."

# The input path of the helper node's shard
helper_shard_path = repo.shard_path(
filename, node_type=source_node_type.type, node_index=source_node_index, expected=True)

# The output path of the recovery file
recovery_file_path = repo.recovery_file_path(
filename,
recover_node_type=recover_node_type.type,
recover_node_index=recover_node_index,
helper_node_index=source_node_index,
expected=False)

return cls(
recover_node_type=recover_node_type,
recover_node_index=recover_node_index,
data_path=helper_shard_path,
output_path=recovery_file_path,
overwrite=overwrite
)

# Generate the node recovery file for the client node
def generate(self):
def render(self):
GF = galois.GF(2 ** 8)
# The encoding vector of the failed node is the i'th column of the generator matrix of its type.
G = rs_generator_matrix(GF, self.recover_node_type.k, self.recover_node_type.n)
Expand All @@ -52,32 +103,41 @@ def generate(self):


if __name__ == '__main__':
file = 'file_1KB.dat'
repo = Repository.default()

# The recovering node
recover_node_encoding = NodeType1(k=3, n=5, encoding='reed_solomon')
recover_node_index = 0
# Test requesting a single recovery file from a provider
# ./storage.sh request_recovery_file --provider http://localhost:8080 --overwrite --recover_node_type 0
# --recover_node_index 0 --source_node_index 0 file_1KB.dat

# Use three helper nodes of type 0 to generate recovery files for a node of type 1.
helper_node_type = 0
for helper_node_index in range(3):
# The input path of the helper node's shard
helper_shard_path = repo.shard_path(
file, node_type=helper_node_type, node_index=helper_node_index)
# The output path of the recovery file
recovery_file_path = repo.recovery_file_path(
file,
recover_node_type=recover_node_encoding.type,
recover_node_index=recover_node_index,
helper_node_index=helper_node_index,
expected=False)
def main():
filename = 'file_1KB.dat'
repo = Repository.default()

NodeRecoverySource(
recover_node_type=recover_node_encoding,
recover_node_index=recover_node_index,
data_path=helper_shard_path,
output_path=recovery_file_path,
overwrite=True
).generate()
# The node and shard to recover
recover_node_type = 1
recover_node_encoding = NodeType(type=recover_node_type, k=3, n=5, encoding='reed_solomon')
recover_node_index = 0

# Use k (3) helper nodes of the opposite type (0) to generate k (3) recovery files for
# the recovering nodes't type (1).
helper_node_type = 0 if recover_node_type == 1 else 1
for helper_node_index in range(3):
helper_node_encoding = NodeType(
type=helper_node_type,
encoding=recover_node_encoding.encoding,
k=recover_node_encoding.k,
n=recover_node_encoding.n
)

NodeRecoverySource.for_repo(
repo=repo,
filename=filename,
recover_node_type=recover_node_encoding,
recover_node_index=recover_node_index,
source_node_type=helper_node_encoding,
source_node_index=helper_node_index,
overwrite=True
).render()


main()
...
4 changes: 3 additions & 1 deletion str-twincoding/examples/test-cluster.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ if [ -z "$STRHOME" ]; then
echo "STRHOME is not set."
exit 1
fi
app="$STRHOME/server/server_cli.py"
apppy="server/server_cli.py"
app="$STRHOME/$apppy"
data="$STRHOME/examples/data"
mkdir -p "$data"

Expand Down Expand Up @@ -54,6 +55,7 @@ list_all() {
printf "%-10s %-10s %-10s\n" "PID" "PORT" "TIME"

# Find all PIDs for the given process name and extract relevant information
echo "$app"
ps auxw | grep "$app" | grep -v grep | awk '{
pid = $2;
time = $10;
Expand Down
Loading

0 comments on commit 6dc21d0

Please sign in to comment.