diff --git a/_test/test_mpi/job_dispatcher_common_tests.m b/_test/test_mpi/job_dispatcher_common_tests.m index f4ba64767a..37cf73f98d 100644 --- a/_test/test_mpi/job_dispatcher_common_tests.m +++ b/_test/test_mpi/job_dispatcher_common_tests.m @@ -215,6 +215,47 @@ function my_delete(varargin) assertTrue(is_file(file3a)); end + function test_fail_condense(obj, varargin) + if obj.ignore_test + skipTest('test_fail_condesnse is disabled'); + end + + common_param = struct('filepath', obj.working_dir, ... + 'filename_template', ... + ['test_JD_', obj.cluster_name, {'A','B','C'}, 'L%d_nf%d.txt'], ... + 'fail_for_labsN', 2); + + file1 = fullfile(obj.working_dir, ['test_JD_', obj.cluster_name,'A', 'L1_nf1.txt']); + file2 = fullfile(obj.working_dir, ['test_JD_', obj.cluster_name,'B', 'L2_nf1.txt']); + file3 = fullfile(obj.working_dir, ['test_JD_', obj.cluster_name,'C', 'L3_nf1.txt']); + + jd = JobDispatcher(['test_job_', obj.cluster_name, '_fail_condense']); + log_filename = sprintf("par_fail_%s.log", jd.job_id); + + files = {file1, file2, file3, log_filename}; + co = onCleanup(@()(obj.my_delete(files{:}))); + + + [outputs, n_failed, ~, jd] = jd.start_job('JETester', common_param, 36, true, 3, true, 1); + + clob = set_temporary_config_options(parallel_config, 'debug', true); + out_full = evalc("jd.display_fail_job_results(outputs, n_failed,3)"); + + clear clob; + + clob = set_temporary_config_options(parallel_config, 'debug', false); + out_silenced = evalc("jd.display_fail_job_results(outputs, n_failed,3)"); + + assertTrue(contains(out_full, 'failed')); + assertTrue(isempty(out_silenced)); + + log = fileread(log_filename); + + assertEqual(log, out_full); + + + end + function test_job_with_logs_3workers(obj, varargin) if obj.ignore_test skipTest('test_job_with_logs_3workers is disabled'); diff --git a/herbert_core/classes/MPIFramework/@JobDispatcher/private/display_fail_jobs_.m b/herbert_core/classes/MPIFramework/@JobDispatcher/private/display_fail_jobs_.m index 5233267d0f..b907a2cfac 100644 --- a/herbert_core/classes/MPIFramework/@JobDispatcher/private/display_fail_jobs_.m +++ b/herbert_core/classes/MPIFramework/@JobDispatcher/private/display_fail_jobs_.m @@ -1,107 +1,144 @@ -function display_fail_jobs_(obj,outputs,n_failed,n_workers,Err_code) -% Auxiliary method to display job results if the job have -% failed +function display_fail_jobs_(obj, outputs, n_failed, n_workers, err_code) +% Auxiliary method to display job results if the job has failed +% % Input: -% Outputs -- usually cellarray of the results, returned by a +% outputs -- usually cellarray of the results, returned by a % parallel job +% % n_failed -- number of tasks failed as the result of parallel % job +% % n_workers-- number of labs used by parallel job initially % -% Err_code -- the text string in the form +% err_code -- the text string in the form % ERROR_CLASS:error_reason to form identifier of % the exception to throw -% If this paraemter is empty, it does not throw anything. +% If this parameter is empty, it does not throw anything. % Throws: % First exception returned from the cluster if such exceptions -% are present or exception with Err_code as MExeption.identifier +% are present or exception with err_code as MException.identifier % if no errors returned % -mEXceptions_outputs = false(size(outputs)); +debug_flag = get(parallel_config, 'debug'); + +if ~debug_flag + log = sprintf("par_fail_%s.log", obj.job_id); + fh = fopen(log, 'w'); + clob = onCleanup(@() fclose(fh)); +end + +if ~exist('err_code', 'var') || isempty(err_code) + warn_code = 'HORACE:display_fail_jobs:parallel_failure'; +else + warn_code = err_code; +end + +MExceptions_outputs = false(size(outputs)); + if iscell(outputs) - fprintf('Job %s have failed. Outputs: \n',obj.job_id); + debug_print('Job %s failed. Outputs: \n', obj.job_id); + for i=1:numel(outputs) - if isa(outputs{i},'MException') - mEXceptions_outputs(i) = true; - fprintf('Task N%d failed. Error %s; Message %s\n',... - i,outputs{i}.identifier,outputs{i}.message); - elseif isfield(outputs{i},'error') && isa(outputs{i}.error,'MException') - mEXceptions_outputs(i) = true; - fprintf('Task N%d failed. Reason: %s\n',... - i,outputs{i}.fail_reason); - + if isa(outputs{i}, 'MException') + MExceptions_outputs(i) = true; + debug_print('Task %d failed. Error %s; Message %s\n', ... + i, outputs{i}.identifier, outputs{i}.message); + + elseif isfield(outputs{i}, 'error') && isa(outputs{i}.error, 'MException') + MExceptions_outputs(i) = true; + debug_print('Task %d failed. Reason: %s\n', ... + i, outputs{i}.fail_reason); + else - mEXceptions_outputs(i) = false; - fprintf('Task N%d failed. Outputs: \n',i); + MExceptions_outputs(i) = false; + debug_print('Task %d failed. Outputs: \n', i); + if isempty(outputs{i}) - fprintf('[]\n'); + debug_print('[]\n'); else - disp(outputs{i}); + debug_print(disp2str(outputs{i})); end end end -elseif isempty(mEXceptions_outputs) + +elseif isempty(outputs) ext_type = class(outputs); - fprintf('Job %s have failed sending unhandled exception: %s\n',obj.job_id,ext_type); - if ~isempty(Err_code) - error(Err_code,'Parallel job have failed throwing unhandled exception: %s',ext_type); - end + debug_print('Job %s failed with unhandled exception: %s\n', obj.job_id, ext_type); + else - mEXceptions_outputs(1) = isa(outputs,'MException'); - fprintf('Job %s have failed. Output: \n',obj.job_id); - disp(outputs); + + MExceptions_outputs(1) = isa(outputs, 'MException'); + debug_print('Job %s failed. Output: \n', obj.job_id); + debug_print(disp2str(outputs)); + if numel(outputs) == 1 disp_exception(outputs); end + end -if any(mEXceptions_outputs) - if isempty(Err_code) - warn_code = 'DISPLAY_FAIL_JOBS:parallel_failure'; - else - warn_code = Err_code; - end - warning(warn_code ,... - ' Number: %d parallel tasks out of total: %d tasks have failed',... - n_failed,n_workers) - errOutputs = outputs(mEXceptions_outputs); - if iscell(errOutputs) - for i=1:numel(errOutputs) - disp(['***** Error output N ',num2str(i)]); - disp_exception(errOutputs{i}); + +message = sprintf('%d of %d tasks in job %s have failed', ... + n_failed, n_workers, obj.job_id); + +if isempty(outputs) + message = [message, sprintf(' with unhandled exception: %s', ext_type)]; + +elseif any(MExceptions_outputs) + + err_outputs = outputs(MExceptions_outputs); + + if iscell(err_outputs) + for i=1:numel(err_outputs) + debug_print(['***** Error output N ', num2str(i)]); + disp_exception(err_outputs{i}); end else - disp_exception(errOutputs); - end - if ~isempty(Err_code) - error(Err_code,'Parallel job have failed, producing errors above.'); + disp_exception(err_outputs); end + else - if ~isempty(Err_code) - error(Err_code,... - ' Number: %d parallel tasks out of total: %d tasks have failed without returning the reason',... - n_failed,n_workers) - end + message = [message, ' without returning the reason']; end -function disp_exception(errOutput) -% -if isa(errOutput,'MException') - disp(getReport(errOutput)) -elseif iscell(errOutput) - disp('***************************************************************'); - disp(errOutput); - for i=1:numel(errOutput) - sprintf(' Cell %d, contains: %s\n',i,evalc('disp(errOutput{i}')); - disp_exception(errOutput{i}); +if debug_flag + error(err_code, '%s.', message); +else + error(err_code, '%s, errors recorded in %s.', message, log) +end + + +function disp_exception(err_output) + +if isa(err_output, 'MException') + debug_print(disp2str(getReport(err_output))) + +elseif iscell(err_output) + + for i=1:numel(err_output) + debug_print(' Cell %d, contains: \n', i); + disp_exception(err_output{i}); end - disp('***************************************************************'); -elseif isfield(errOutput,'error') && isa(errOutput.error,'MException') - for i=1:numel(errOutput.error) - disp(getReport(errOutput.error(i))); + +elseif isfield(err_output, 'error') && isa(err_output.error, 'MException') + + for i=1:numel(err_output.error) + debug_print('%s', disp2str(getReport(err_output.error(i)))); end + else - disp('unknown type of error returned') + debug_print('unknown type of error: \n %s', disp2str(err_output)); + end +end + +function debug_print(varargin) + if debug_flag + fprintf(varargin{:}); + else + fprintf(fh, varargin{:}); + end +end +end