Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Problem with CRC check in Cassandra-driver #583 #588

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions medusa/backup_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def orchestrate(config, backup_name_arg, seed_target, stagger, enable_md5_checks
logging.error(err_msg)

delete_snapshot_command = ' '.join(backup.cassandra.delete_snapshot_command(backup.snapshot_tag))
pssh_run_success_cleanup = backup.orchestration_uploads\
pssh_run_success_cleanup = backup.orchestration_uploads \
.pssh_run(backup.hosts,
delete_snapshot_command,
hosts_variables={})
Expand Down Expand Up @@ -152,7 +152,7 @@ def execute(self, cql_session_provider=None):
def _create_snapshots(self):
# Run snapshot in parallel on all nodes,
create_snapshot_command = ' '.join(self.cassandra.create_snapshot_command(self.backup_name))
pssh_run_success = self.orchestration_snapshots.\
pssh_run_success = self.orchestration_snapshots. \
pssh_run(self.hosts,
create_snapshot_command,
hosts_variables={})
Expand Down Expand Up @@ -184,14 +184,16 @@ def _build_backup_cmd(self):

# Use %s placeholders in the below command to have them replaced by pssh using per host command substitution
command = 'mkdir -p {work}; cd {work} && medusa-wrapper {sudo} medusa {config} -vvv backup-node ' \
'--backup-name {backup_name} {stagger} {enable_md5_checks} --mode {mode}' \
'--backup-name {backup_name} {stagger} {enable_md5_checks} --mode {mode} ' \
'--contact-points {contact_points}' \
.format(work=self.work_dir,
sudo='sudo' if medusa.utils.evaluate_boolean(self.config.cassandra.use_sudo) else '',
config=f'--config-file {self.config.file_path}' if self.config.file_path else '',
backup_name=self.backup_name,
stagger=stagger_option,
enable_md5_checks=enable_md5_checks_option,
mode=self.mode)
mode=self.mode,
contact_points=','.join(self.hosts))

logging.debug('Running backup on all nodes with the following command {}'.format(command))

Expand Down
7 changes: 5 additions & 2 deletions medusa/backup_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,15 +168,18 @@ def stagger(fqdn, storage, tokenmap):
# Called by async thread for backup, called in main thread as synchronous backup.
# Kicks off the node backup unit of work and registers for backup queries.
# No return value for async mode, throws back exception for failed kickoff.
def handle_backup(config, backup_name_arg, stagger_time, enable_md5_checks_flag, mode):
def handle_backup(config, backup_name_arg, stagger_time, enable_md5_checks_flag, mode, contact_points):
start = datetime.datetime.now()
backup_name = backup_name_arg or start.strftime('%Y%m%d%H%M')
monitoring = Monitoring(config=config.monitoring)

try:
logging.debug("Starting backup preparations with Mode: {}".format(mode))
storage = Storage(config=config.storage)
cassandra = Cassandra(config)
cassandra = Cassandra(
config,
contact_points=contact_points.split(',') if contact_points is not None else None
)

storage.storage_driver.prepare_upload()

Expand Down
7 changes: 4 additions & 3 deletions medusa/cassandra_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ class Cassandra(object):
SNAPSHOT_PATTERN = '*/*/snapshots/{}'
SNAPSHOT_PREFIX = 'medusa-'

def __init__(self, config, contact_point=None, release_version=None):
def __init__(self, config, hostname=None, release_version=None, contact_points=None):
self._release_version = release_version
cassandra_config = config.cassandra
self._start_cmd = shlex.split(cassandra_config.start_cmd)
Expand All @@ -341,11 +341,12 @@ def __init__(self, config, contact_point=None, release_version=None):
self._root = config_reader.root
self._commitlog_path = config_reader.commitlog_directory
self._saved_caches_path = config_reader.saved_caches_directory
self._hostname = contact_point if contact_point is not None else config_reader.listen_address
self._hostname = hostname if hostname is not None else config_reader.listen_address
self._storage_port = config_reader.storage_port
self._native_port = config_reader.native_port
self._contact_points = contact_points if contact_points is not None else [self._hostname]
self._cql_session_provider = CqlSessionProvider(
[self._hostname],
self._contact_points,
config)
self._rpc_port = config_reader.rpc_port
self.seeds = config_reader.seeds
Expand Down
5 changes: 3 additions & 2 deletions medusa/medusacli.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,14 +123,15 @@ def cli(ctx, verbosity, without_log_timestamp, config_file, **kwargs):
'(in addition to size, which is used by default)',
is_flag=True, default=False)
@click.option('--mode', default="differential", type=click.Choice(['full', 'differential']))
@click.option('--contact-points', help='Contact points for the cassandra driver', default=None)
@pass_MedusaConfig
def backup(medusaconfig, backup_name, stagger, enable_md5_checks, mode):
def backup(medusaconfig, backup_name, stagger, enable_md5_checks, mode, contact_points):
"""
Backup single Cassandra node
"""
stagger_time = datetime.timedelta(seconds=stagger) if stagger else None
BackupMan.register_backup(backup_name, is_async=False)
return backup_node.handle_backup(medusaconfig, backup_name, stagger_time, enable_md5_checks, mode)
return backup_node.handle_backup(medusaconfig, backup_name, stagger_time, enable_md5_checks, mode, contact_points)


@cli.command(name='backup-cluster')
Expand Down
5 changes: 3 additions & 2 deletions medusa/service/grpc/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ def AsyncBackup(self, request, context):
BackupMan.register_backup(request.name, is_async=True)
backup_future = executor.submit(backup_node.handle_backup, config=self.config,
backup_name_arg=request.name, stagger_time=None,
enable_md5_checks_flag=False, mode=mode)
enable_md5_checks_flag=False, mode=mode,
contact_points=None)

backup_future.add_done_callback(record_backup_info)
BackupMan.set_backup_future(request.name, backup_future)
Expand Down Expand Up @@ -147,7 +148,7 @@ def Backup(self, request, context):
response.backupName = request.name
BackupMan.register_backup(request.name, is_async=False)
backup_node.handle_backup(config=self.config, backup_name_arg=request.name, stagger_time=None,
enable_md5_checks_flag=False, mode=mode)
enable_md5_checks_flag=False, mode=mode, contact_points=None)
record_status_in_response(response, request.name)
return response
except Exception as e:
Expand Down
3 changes: 2 additions & 1 deletion tests/backup_node_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ def test_handle_backup_async(self, mock_start_backup, mock_storage, mock_cassand
BackupMan.register_backup(test_backup_name, is_async=True)
backup_future = executor.submit(backup_node.handle_backup, config=self.medusa_config,
backup_name_arg=test_backup_name, stagger_time=None,
enable_md5_checks_flag=False, mode="differential")
enable_md5_checks_flag=False, mode="differential",
contact_points=None)
mock_future_instance = MagicMock()
mock_callback = MagicMock()
mock_future_instance.result.return_value = {"foo": "bar"}
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/features/steps/integration_steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -551,7 +551,7 @@ def _i_perform_a_backup_of_the_node_named_backupname(context, backup_mode, backu
BackupMan.register_backup(backup_name, is_async=False)
(actual_backup_duration, actual_start, end, node_backup, node_backup_cache, num_files, start, backup_name) = \
backup_node.handle_backup(context.medusa_config, backup_name, None, str(md5_enabled_str).lower() == "enabled",
backup_mode)
backup_mode, None)
context.latest_backup_cache = node_backup_cache


Expand Down