Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions _test/test_mpi/job_dispatcher_common_tests.m
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,47 @@ function my_delete(varargin)
assertTrue(is_file(file3a));
end

function test_fail_condense(obj, varargin)
if obj.ignore_test
skipTest('test_fail_condesnse is disabled');
end

common_param = struct('filepath', obj.working_dir, ...
'filename_template', ...
['test_JD_', obj.cluster_name, {'A','B','C'}, 'L%d_nf%d.txt'], ...
'fail_for_labsN', 2);

file1 = fullfile(obj.working_dir, ['test_JD_', obj.cluster_name,'A', 'L1_nf1.txt']);
file2 = fullfile(obj.working_dir, ['test_JD_', obj.cluster_name,'B', 'L2_nf1.txt']);
file3 = fullfile(obj.working_dir, ['test_JD_', obj.cluster_name,'C', 'L3_nf1.txt']);

jd = JobDispatcher(['test_job_', obj.cluster_name, '_fail_condense']);
log_filename = sprintf("par_fail_%s.log", jd.job_id);

files = {file1, file2, file3, log_filename};
co = onCleanup(@()(obj.my_delete(files{:})));


[outputs, n_failed, ~, jd] = jd.start_job('JETester', common_param, 36, true, 3, true, 1);

clob = set_temporary_config_options(parallel_config, 'debug', true);
out_full = evalc("jd.display_fail_job_results(outputs, n_failed,3)");

clear clob;

clob = set_temporary_config_options(parallel_config, 'debug', false);
out_silenced = evalc("jd.display_fail_job_results(outputs, n_failed,3)");

assertTrue(contains(out_full, 'failed'));
assertTrue(isempty(out_silenced));

log = fileread(log_filename);

assertEqual(log, out_full);


end

function test_job_with_logs_3workers(obj, varargin)
if obj.ignore_test
skipTest('test_job_with_logs_3workers is disabled');
Expand Down
Original file line number Diff line number Diff line change
@@ -1,107 +1,144 @@
function display_fail_jobs_(obj,outputs,n_failed,n_workers,Err_code)
% Auxiliary method to display job results if the job have
% failed
function display_fail_jobs_(obj, outputs, n_failed, n_workers, err_code)
% Auxiliary method to display job results if the job has failed
%
% Input:
% Outputs -- usually cellarray of the results, returned by a
% outputs -- usually cellarray of the results, returned by a
% parallel job
%
% n_failed -- number of tasks failed as the result of parallel
% job
%
% n_workers-- number of labs used by parallel job initially
%
% Err_code -- the text string in the form
% err_code -- the text string in the form
% ERROR_CLASS:error_reason to form identifier of
% the exception to throw
% If this paraemter is empty, it does not throw anything.
% If this parameter is empty, it does not throw anything.
% Throws:
% First exception returned from the cluster if such exceptions
% are present or exception with Err_code as MExeption.identifier
% are present or exception with err_code as MException.identifier
% if no errors returned
%

mEXceptions_outputs = false(size(outputs));
debug_flag = get(parallel_config, 'debug');

if ~debug_flag
log = sprintf("par_fail_%s.log", obj.job_id);
fh = fopen(log, 'w');
clob = onCleanup(@() fclose(fh));
end

if ~exist('err_code', 'var') || isempty(err_code)
warn_code = 'HORACE:display_fail_jobs:parallel_failure';
else
warn_code = err_code;
end

MExceptions_outputs = false(size(outputs));

if iscell(outputs)
fprintf('Job %s have failed. Outputs: \n',obj.job_id);
debug_print('Job %s failed. Outputs: \n', obj.job_id);

for i=1:numel(outputs)
if isa(outputs{i},'MException')
mEXceptions_outputs(i) = true;
fprintf('Task N%d failed. Error %s; Message %s\n',...
i,outputs{i}.identifier,outputs{i}.message);
elseif isfield(outputs{i},'error') && isa(outputs{i}.error,'MException')
mEXceptions_outputs(i) = true;
fprintf('Task N%d failed. Reason: %s\n',...
i,outputs{i}.fail_reason);

if isa(outputs{i}, 'MException')
MExceptions_outputs(i) = true;
debug_print('Task %d failed. Error %s; Message %s\n', ...
i, outputs{i}.identifier, outputs{i}.message);

Comment on lines +44 to +47
Copy link
Member

@abuts abuts May 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was playing with various failing jobs during the week and would argue that this should be reconsidered.
When job fails, it prints short list of messages stating worker failed (one row)
Then it prints detailed information.
The detailed information should go into log, that's correct.
But short info about job failure should remain. Then it should be one string
see file .... for details of the issue.

I do not want to investigate this issue too deeply, but have a feeling that modified changes are too brief. The suggested solution would provide much better user experience.

elseif isfield(outputs{i}, 'error') && isa(outputs{i}.error, 'MException')
MExceptions_outputs(i) = true;
debug_print('Task %d failed. Reason: %s\n', ...
i, outputs{i}.fail_reason);

else
mEXceptions_outputs(i) = false;
fprintf('Task N%d failed. Outputs: \n',i);
MExceptions_outputs(i) = false;
debug_print('Task %d failed. Outputs: \n', i);

if isempty(outputs{i})
fprintf('[]\n');
debug_print('[]\n');
else
disp(outputs{i});
debug_print(disp2str(outputs{i}));
end
end
end
elseif isempty(mEXceptions_outputs)

elseif isempty(outputs)
ext_type = class(outputs);
fprintf('Job %s have failed sending unhandled exception: %s\n',obj.job_id,ext_type);
if ~isempty(Err_code)
error(Err_code,'Parallel job have failed throwing unhandled exception: %s',ext_type);
end
debug_print('Job %s failed with unhandled exception: %s\n', obj.job_id, ext_type);

else
mEXceptions_outputs(1) = isa(outputs,'MException');
fprintf('Job %s have failed. Output: \n',obj.job_id);
disp(outputs);

MExceptions_outputs(1) = isa(outputs, 'MException');
debug_print('Job %s failed. Output: \n', obj.job_id);
debug_print(disp2str(outputs));

if numel(outputs) == 1
disp_exception(outputs);
end

end
if any(mEXceptions_outputs)
if isempty(Err_code)
warn_code = 'DISPLAY_FAIL_JOBS:parallel_failure';
else
warn_code = Err_code;
end
warning(warn_code ,...
' Number: %d parallel tasks out of total: %d tasks have failed',...
n_failed,n_workers)
errOutputs = outputs(mEXceptions_outputs);
if iscell(errOutputs)
for i=1:numel(errOutputs)
disp(['***** Error output N ',num2str(i)]);
disp_exception(errOutputs{i});

message = sprintf('%d of %d tasks in job %s have failed', ...
n_failed, n_workers, obj.job_id);

if isempty(outputs)
message = [message, sprintf(' with unhandled exception: %s', ext_type)];

elseif any(MExceptions_outputs)

err_outputs = outputs(MExceptions_outputs);

if iscell(err_outputs)
for i=1:numel(err_outputs)
debug_print(['***** Error output N ', num2str(i)]);
disp_exception(err_outputs{i});
end
else
disp_exception(errOutputs);
end
if ~isempty(Err_code)
error(Err_code,'Parallel job have failed, producing errors above.');
disp_exception(err_outputs);
end

else
if ~isempty(Err_code)
error(Err_code,...
' Number: %d parallel tasks out of total: %d tasks have failed without returning the reason',...
n_failed,n_workers)
end
message = [message, ' without returning the reason'];
end

function disp_exception(errOutput)
%
if isa(errOutput,'MException')
disp(getReport(errOutput))
elseif iscell(errOutput)
disp('***************************************************************');
disp(errOutput);
for i=1:numel(errOutput)
sprintf(' Cell %d, contains: %s\n',i,evalc('disp(errOutput{i}'));
disp_exception(errOutput{i});
if debug_flag
error(err_code, '%s.', message);
else
error(err_code, '%s, errors recorded in %s.', message, log)
end


function disp_exception(err_output)

if isa(err_output, 'MException')
debug_print(disp2str(getReport(err_output)))

elseif iscell(err_output)

for i=1:numel(err_output)
debug_print(' Cell %d, contains: \n', i);
disp_exception(err_output{i});
end
disp('***************************************************************');
elseif isfield(errOutput,'error') && isa(errOutput.error,'MException')
for i=1:numel(errOutput.error)
disp(getReport(errOutput.error(i)));

elseif isfield(err_output, 'error') && isa(err_output.error, 'MException')

for i=1:numel(err_output.error)
debug_print('%s', disp2str(getReport(err_output.error(i))));
end

else
disp('unknown type of error returned')
debug_print('unknown type of error: \n %s', disp2str(err_output));

end

end

function debug_print(varargin)
if debug_flag
fprintf(varargin{:});
else
fprintf(fh, varargin{:});
end
end

end