diff --git a/pyfarm/jobtypes/core/jobtype.py b/pyfarm/jobtypes/core/jobtype.py index 6c665626..f87be37a 100644 --- a/pyfarm/jobtypes/core/jobtype.py +++ b/pyfarm/jobtypes/core/jobtype.py @@ -77,6 +77,11 @@ FROZEN_ENVIRONMENT = ImmutableDict(os.environ.copy()) +try: + WindowError +except NameError: # pragma: no cover + WindowError = OSError + class TaskNotFound(Exception): pass @@ -489,7 +494,7 @@ def tempdir(self, new=False, remove_on_finish=True): os.makedirs(parent_dir) except (OSError, IOError, WindowError) as error: if error.errno != EEXIST: - logger.error("Failed to create %s: %s", parent_dir, e) + logger.error("Failed to create %s: %s", parent_dir, error) raise self._tempdirs.add(parent_dir) diff --git a/tests/test_jobtypes/test_core_jobtype.py b/tests/test_jobtypes/test_core_jobtype.py index 51a278ab..c9cf0787 100644 --- a/tests/test_jobtypes/test_core_jobtype.py +++ b/tests/test_jobtypes/test_core_jobtype.py @@ -16,26 +16,44 @@ import os import re +import tempfile from contextlib import nested +from datetime import datetime, timedelta +from os.path import join, isdir, dirname, basename from uuid import UUID, uuid4 +try: + import grp +except ImportError: + pass + +try: + import pwd +except ImportError: + pass + + from mock import Mock, patch + +from twisted.internet.defer import Deferred from twisted.internet.error import ProcessTerminated, ProcessDone from twisted.python.failure import Failure from voluptuous import Schema, MultipleInvalid from pyfarm.core.utility import ImmutableDict -from pyfarm.core.enums import INTEGER_TYPES, STRING_TYPES, WINDOWS +from pyfarm.core.enums import INTEGER_TYPES, STRING_TYPES, WINDOWS, WorkState from pyfarm.agent.config import config from pyfarm.agent.testutil import TestCase, skipIf -from pyfarm.agent.sysinfo.user import is_administrator +from pyfarm.agent.sysinfo import system, memory, user +from pyfarm.agent.utility import remove_directory +from pyfarm.jobtypes.core.log import STDOUT, STDERR, logpool from pyfarm.jobtypes.core.internals import USER_GROUP_TYPES from pyfarm.jobtypes.core.jobtype import ( - JobType, CommandData, process_stdout, process_stderr, logger) + FROZEN_ENVIRONMENT, JobType, CommandData, logger, + process_stdout, process_stderr) from pyfarm.jobtypes.core.log import STDOUT, STDERR, logpool -IS_ADMIN = is_administrator() - +IS_ADMIN = user.is_administrator() def fake_assignment(): assignment_id = uuid4() @@ -49,8 +67,9 @@ def fake_assignment(): "jobtype": { "name": "Foo", "version": 1}, - "tasks": [{"id": 1, "frame": 1, "attempt": 1}, - {"id": 1, "frame": 1, "attempt": 1}]} + "tasks": [ + {"id": 1, "frame": 1, "attempt": 1}, + {"id": 2, "frame": 2, "attempt": 1}]} config["current_assignments"][assignment_id] = assignment return assignment @@ -154,24 +173,24 @@ def test_validate_group_type(self): CommandData("", group=1.0).validate() @skipIf(WINDOWS, "Non-Windows only") - @skipIf(IS_ADMIN, "Is Administrator") + @skipIf(user.is_administrator(), "Is Administrator") def test_validate_change_user_non_admin_failure(self): with self.assertRaises(EnvironmentError): CommandData("", user=0).validate() @skipIf(WINDOWS, "Non-Windows only") - @skipIf(IS_ADMIN, "Is Administrator") + @skipIf(user.is_administrator(), "Is Administrator") def test_validate_change_group_non_admin_failure(self): with self.assertRaises(EnvironmentError): CommandData("", group=0).validate() @skipIf(WINDOWS, "Non-Windows only") - @skipIf(not IS_ADMIN, "Not Administrator") + @skipIf(not user.is_administrator(), "Not Administrator") def test_validate_change_user_admin(self): CommandData("", user=0).validate() @skipIf(WINDOWS, "Non-Windows only") - @skipIf(not IS_ADMIN, "Not Administrator") + @skipIf(not user.is_administrator(), "Not Administrator") def test_validate_change_group_admin(self): CommandData("", group=0).validate() @@ -214,10 +233,436 @@ def test_set_default_environment(self): class TestJobTypeLoad(TestCase): + # TODO: test to ensure load() does something useful, the below + # are just unittests + def test_schema(self): with self.assertRaises(MultipleInvalid): JobType.load({}) + def test_download_called(self): + assignment = fake_assignment() + deferred = Deferred() + + class JobTypePatch(JobType): + @classmethod + def _download_jobtype(cls, name, version): + self.assertEqual(assignment["jobtype"]["name"], name) + self.assertEqual(assignment["jobtype"]["version"], version) + return deferred + + with nested( + patch.object( + JobType, "_download_jobtype", JobTypePatch._download_jobtype), + patch.object(deferred, "addCallback") + ) as (_, patched_addCallback): + JobType.load(assignment) + + patched_addCallback.assert_called_with( + JobType._jobtype_download_complete, + JobType._cache_key(assignment) + ) + + +class TestJobTypeEmptyMethodSignatures(TestCase): + # Some methods do not have any code in them so while we can't + # write tests against any code we can ensure their APIs, which + # are considered public, don't change. + + def test_prepare_for_job(self): + JobType.prepare_for_job(None) + + def test_cleanup_after_job(self): + JobType.cleanup_after_job(None) + + def spawn_persistent_process(self): + JobType.spawn_persistent_process(None, None) + + def test_process_stdout_line(self): + jobtype = JobType(fake_assignment()) + jobtype.process_stdout_line(None, None) + + def test_process_stderr_line(self): + jobtype = JobType(fake_assignment()) + jobtype.process_stderr_line(None, None) + + with patch.object(jobtype, "process_stdout_line") as patched: + jobtype.process_stderr_line("a", "b") + + patched.assert_called_with("a", "b") + + def test_preprocess_stdout_line(self): + jobtype = JobType(fake_assignment()) + jobtype.preprocess_stdout_line(None, None) + + def test_preprocess_stderr_line(self): + jobtype = JobType(fake_assignment()) + jobtype.preprocess_stderr_line(None, None) + + def test_format_stdout_line(self): + jobtype = JobType(fake_assignment()) + jobtype.format_stdout_line(None, None) + + def test_format_stderr_line(self): + jobtype = JobType(fake_assignment()) + jobtype.format_stderr_line(None, None) + + def test_spawn_persistent_process(self): + JobType.spawn_persistent_process(None, None) + + def test_get_command_data(self): + with self.assertRaises(NotImplementedError): + jobtype = JobType(fake_assignment()) + jobtype.get_command_data() + + def test_before_start(self): + jobtype = JobType(fake_assignment()) + jobtype.before_start() + + def test_before_spawn_process(self): + path = self.create_file() + jobtype = JobType(fake_assignment()) + logpool.open_log(jobtype.uuid, path, ignore_existing=True) + command = CommandData("foobar", ("a", "b", "c")) + + with patch.object(logger, "info") as mocked: + jobtype.before_spawn_process(command, None) + + mocked.assert_called_once_with( + "Starting command: %s", "foobar ('a', 'b', 'c')") + + +class TestJobTypeCloseLogs(TestCase): + def test_close_logs(self): + jobtype = JobType(fake_assignment()) + + with patch.object(logpool, "close_log") as patched: + jobtype._close_logs() + + patched.assert_called_with(jobtype.uuid) + + +class TestJobTypeNode(TestCase): + def test_reraises_notimplemented(self): + def side_effect(): + raise NotImplementedError + + with nested( + patch.object( + system, "machine_architecture", side_effect=side_effect), + self.assertRaises(NotImplementedError) + ): + jobtype = JobType(fake_assignment()) + jobtype.node() + + def test_output(self): + jobtype = JobType(fake_assignment()) + + with nested( + patch.object(memory, "total_ram", return_value=1.1), + patch.object(memory, "free_ram", return_value=2.1), + patch.object(memory, "total_consumption", return_value=3.1) + ): + self.assertEqual( + jobtype.node(), + { + "master_api": config.get("master-api"), + "hostname": config["agent_hostname"], + "agent_id": config["agent_id"], + "id": config["agent_id"], + "cpus": int(config["agent_cpus"]), + "ram": int(config["agent_ram"]), + "total_ram": 1, + "free_ram": 2, + "consumed_ram": 3, + "admin": user.is_administrator(), + "user": user.username(), + "case_sensitive_files": + system.filesystem_is_case_sensitive(), + "case_sensitive_env": + system.environment_is_case_sensitive(), + "machine_architecture": + system.machine_architecture(), + "operating_system": system.operating_system() + } + ) + + +class TestJobTypeAssignments(TestCase): + def test_assignments(self): + assignment = fake_assignment() + jobtype = JobType(assignment) + self.assertEqual(jobtype.assignments(), assignment["tasks"]) + + +class TestJobTypeTempDir(TestCase): + def test_not_new_and_tempdir_already_set(self): + jobtype = JobType(fake_assignment()) + jobtype._tempdir = "foobar" + self.assertEqual(jobtype.tempdir(), "foobar") + + def test_creates_directory(self): + root_directory = tempfile.mkdtemp() + self.addCleanup(remove_directory, root_directory) + config["jobtype_tempdir_root"] = join(root_directory, "$JOBTYPE_UUID") + jobtype = JobType(fake_assignment()) + self.assertTrue(isdir(jobtype.tempdir())) + + def test_path_contains_jobtype_uuid(self): + root_directory = tempfile.mkdtemp() + self.addCleanup(remove_directory, root_directory) + config["jobtype_tempdir_root"] = join(root_directory, "$JOBTYPE_UUID") + jobtype = JobType(fake_assignment()) + tempdir = jobtype.tempdir() + self.assertEqual(basename(dirname(tempdir)), str(jobtype.uuid)) + + def test_ignores_eexist(self): + root_directory = tempfile.mkdtemp() + self.addCleanup(remove_directory, root_directory) + config["jobtype_tempdir_root"] = join(root_directory, "$JOBTYPE_UUID") + jobtype = JobType(fake_assignment()) + os.makedirs(config["jobtype_tempdir_root"].replace( + "$JOBTYPE_UUID", str(jobtype.uuid))) + jobtype.tempdir() + + def test_makedirs_raises_other_errors(self): + root_directory = tempfile.mkdtemp() + self.addCleanup(remove_directory, root_directory) + config["jobtype_tempdir_root"] = join(root_directory, "$JOBTYPE_UUID") + jobtype = JobType(fake_assignment()) + + def side_effect(*args): + raise OSError("Foo", 4242) + + with nested( + patch.object(os, "makedirs", side_effect=side_effect), + self.assertRaises(OSError) + ): + jobtype.tempdir() + + def test_remove_on_finish_does_not_add_parent(self): + # It's important that we do not add the parent directory + # as the path to cleanup. Otherwise we could end up + # cleaning up more than needed. + root_directory = tempfile.mkdtemp() + self.addCleanup(remove_directory, root_directory) + config["jobtype_tempdir_root"] = join(root_directory, "$JOBTYPE_UUID") + jobtype = JobType(fake_assignment()) + jobtype.tempdir() + + for value in jobtype._tempdirs: + self.assertNotEqual(value, dirname(config["jobtype_tempdir_root"])) + + def test_remove_on_finish_adds_child_directory(self): + root_directory = tempfile.mkdtemp() + self.addCleanup(remove_directory, root_directory) + config["jobtype_tempdir_root"] = join(root_directory, "$JOBTYPE_UUID") + jobtype = JobType(fake_assignment()) + tempdir = jobtype.tempdir() + self.assertIn(tempdir, jobtype._tempdirs) + + def test_sets_tempdir(self): + jobtype = JobType(fake_assignment()) + tempdir = jobtype.tempdir() + self.assertEqual(tempdir, jobtype._tempdir) + + def test_new_tempdir_does_not_reset_default(self): + jobtype = JobType(fake_assignment()) + tempdir1 = jobtype.tempdir() + tempdir2 = jobtype.tempdir(new=True) + self.assertNotEqual(tempdir1, tempdir2) + self.assertEqual(jobtype._tempdir, tempdir1) + + +class TestJobTypeGetUidGid(TestCase): + @skipIf(WINDOWS, "Non-Windows only") + def test_get_user_id(self): + jobtype = JobType(fake_assignment()) + + for pwd_struct in pwd.getpwall(): + uid, gid = jobtype.get_uid_gid(pwd_struct.pw_name, None) + self.assertEqual(pwd.getpwuid(uid).pw_uid, uid) + self.assertIsNone(gid) + + @skipIf(WINDOWS, "Non-Windows only") + def test_get_group_id(self): + jobtype = JobType(fake_assignment()) + + for grp_struct in grp.getgrall(): + uid, gid = jobtype.get_uid_gid(None, grp_struct.gr_name) + self.assertIsNone(uid) + self.assertEqual(grp.getgrgid(gid).gr_gid, gid) + + +class TestJobTypeGetCommandList(TestCase): + def test_result_is_tuple(self): + jobtype = JobType(fake_assignment()) + self.assertIsInstance(jobtype.get_command_list([]), tuple) + + def test_does_not_modify_contents(self): + jobtype = JobType(fake_assignment()) + self.assertEqual( + jobtype.get_command_list(["a", "b", "c"]), ("a", "b", "c")) + + def test_expands_variables(self): + config["jobtype_include_os_environ"] = True + jobtype = JobType(fake_assignment()) + + all_keys = ["$%s" % key for key in os.environ.keys()] + self.assertEqual( + jobtype.get_command_list(all_keys), + tuple(map(jobtype.expandvars, all_keys)) + ) + + +class TestJobTypeGetCSVLogPath(TestCase): + POP_CONFIG_KEYS = ["agent_time_offset"] + + def setUp(self): + super(TestJobTypeGetCSVLogPath, self).setUp() + self.current_value = config.get("jobtype_task_log_filename") + config["jobtype_task_log_filename"] = \ + "$YEAR-$MONTH-$DAY_$HOUR-$MINUTE-$SECOND_$JOB_$PROCESS" + config["agent_time_offset"] = 0 + + def tearDown(self): + super(TestJobTypeGetCSVLogPath, self).tearDown() + config["jobtype_task_log_filename"] = self.current_value + + def test_uses_utcnow_for_create_time(self): + jobtype = JobType(fake_assignment()) + now = datetime.utcnow() + filename = basename(jobtype.get_csvlog_path(uuid4())) + year, month, day_hour, minute, second_job_process = filename.split("-") + day, hour = day_hour.split("_") + second, job, uuid_str = second_job_process.split("_") + log_time = datetime( + year=int(year), month=int(month), day=int(day), + hour=int(hour), minute=int(minute), second=int(second) + ) + self.assertDateAlmostEqual( + now, log_time, microsecond_deviation=1000000) + + def test_includes_agent_time_offset(self): + config["agent_time_offset"] = 120 + jobtype = JobType(fake_assignment()) + now = datetime.utcnow() + timedelta(seconds=120) + filename = basename(jobtype.get_csvlog_path(uuid4())) + year, month, day_hour, minute, second_job_process = filename.split("-") + day, hour = day_hour.split("_") + second, job, uuid_str = second_job_process.split("_") + log_time = datetime( + year=int(year), month=int(month), day=int(day), + hour=int(hour), minute=int(minute), second=int(second) + ) + self.assertDateAlmostEqual(now, log_time, microsecond_deviation=1000000) + + def test_uses_provided_create_time(self): + assignment = fake_assignment() + jobtype = JobType(assignment) + now = datetime.utcnow() + uid = uuid4() + filename = basename(jobtype.get_csvlog_path(uid, create_time=now)) + self.assertEqual( + "%s-%02d-%02d_%02d-%02d-%02d_%s_%s" % ( + now.year, now.month, now.day, + now.hour, now.minute, now.second, + assignment["job"]["id"], str(uid).replace("-", "") + ), + filename + ) + + +class TestJobTypeMapPath(TestCase): + def test_path_path_expands_vars(self): + jobtype = JobType(fake_assignment()) + self.assertEqual( + jobtype.map_path("$PATH/foo"), + os.path.expandvars("$PATH/foo")) + + +class TestJobTypeExpandVars(TestCase): + def prepare_config(self): + super(TestJobTypeExpandVars, self).prepare_config() + config["jobtype_expandvars"] = FROZEN_ENVIRONMENT + config["jobtype_default_environment"] = { + "PATH": os.environ["PATH"] + } + + def tearDown(self): + super(TestJobTypeExpandVars, self).tearDown() + config["jobtype_expandvars"] = True + + def test_respects_expand_from_config(self): + config["jobtype_expandvars"] = False + jobtype = JobType(fake_assignment()) + self.assertEqual(jobtype.expandvars("$PATH"), "$PATH") + + def test_respects_expand_keyword_expandvars(self): + config["jobtype_expandvars"] = False + jobtype = JobType(fake_assignment()) + self.assertEqual( + jobtype.expandvars("$PATH", expand=True), + os.path.expandvars("$PATH")) + + config["jobtype_expandvars"] = True + jobtype = JobType(fake_assignment()) + self.assertEqual(jobtype.expandvars("$PATH", expand=False), "$PATH") + + def test_passed_custom_environment(self): + jobtype = JobType(fake_assignment()) + self.assertEqual( + jobtype.expandvars("$FOOBAR", environment={"FOOBAR": "foo"}), + "foo") + + +class TestJobTypeGetLocalTaskState(TestCase): + def test_done(self): + jobtype = JobType(fake_assignment()) + jobtype.finished_tasks.add(1) + self.assertEqual(jobtype.get_local_task_state(1), WorkState.DONE) + + def test_failed(self): + jobtype = JobType(fake_assignment()) + jobtype.failed_tasks.add(2) + self.assertEqual(jobtype.get_local_task_state(2), WorkState.FAILED) + + def test_unknown(self): + jobtype = JobType(fake_assignment()) + self.assertIsNone(jobtype.get_local_task_state(3)) + + +class TestJobTypeIsSuccessful(TestCase): + def test_reason_is_zero(self): + jobtype = JobType(fake_assignment()) + self.assertTrue(jobtype.is_successful(None, 0)) + + def test_reason_non_zero_and_is_integer(self): + jobtype = JobType(fake_assignment()) + self.assertFalse(jobtype.is_successful(None, 1)) + + def test_reason_has_type_and_exit_code(self): + jobtype = JobType(fake_assignment()) + reason = Mock(type=ProcessDone, value=Mock(exitCode=0)) + self.assertTrue(jobtype.is_successful(None, reason)) + + def test_reason_is_not_an_integer(self): + jobtype = JobType(fake_assignment()) + + with self.assertRaises(NotImplementedError): + self.assertFalse(jobtype.is_successful(None, None)) + + +class TestJobTypeProcessStarted(TestCase): + def test_calls_set_task_state(self): + jobtype = JobType(fake_assignment()) + + with patch.object(jobtype, "set_task_state") as mocked: + jobtype.process_started(None) + + for task in jobtype.assignment["tasks"]: + mocked.assert_any_call(task, WorkState.RUNNING) + class TestJobTypeFormatError(TestCase): def test_process_terminated(self):