Add virtual env creation in Celery workers #24192
Replies: 20 comments
-
Thanks for opening your first issue here! Be sure to follow the issue template! |
Beta Was this translation helpful? Give feedback.
-
I think this is a bit bigger task to tackle. but an interesting candidate to discuss. I remember discussion at the devlist where we discussed different options how to do it (preparing wheel packages for tasks was one of those for example). However this idea is much simpler and can be implemented much easier and I think it might serve the purpose you described pretty well. It does not include a lot of changes. It would be rather similar to what cgroup_task_runner does already Conceptualy it could be a "virtualenv_task_runner" but it would launch a process in another virtualenv (and create the venv if it does not exist before) - very similar to what cgroup_task_runner does when creating a new cgroup process. I think there is possibly non-trivial work, though with dependency management of airlfow itself when crossed with the other requirements you want to add, because what you are really looking at is not only to create a virtualenv itself and add your packaages there, but you also have to make sure that airflow is installed in this virtualenv including some (possibly all ? ) packages that come with airflow. This is the only way to make sure that all operators will work within such environment. But maybe if we could assume that we copy "all" standard airflow dependencies and add only the "extra" ones might be different between different venvs, that might work rather well and can be implemented easily. I think python venv will install symlinks to the packages by default when they are already installed in the "main" environment so this might even be pretty effficient in terms of space used. I wonder what others think about it @kaxil @ashb @mik-laj @turbaszek - I know you might now some potential problems with that approach, but first look it looks rather reasonable and rather easy to implement ? |
Beta Was this translation helpful? Give feedback.
-
Yeah, as long as we use |
Beta Was this translation helpful? Give feedback.
-
The problem is that without 'airflow' in the venv NONE of the operators will work. You need to have at least airflow package + a number of dependent packages, otherwise airflow's in-task functionalities like jinja templating, variables, xcom and a number of others will not work. Also if you would like to use any of the "provider" operators, they won't work either without importing the providers (and dependent) packages. That's what I meant by the non-trivial work if you want to limit that and only install 'airflow and few more packages' - determining which packages to install in such venv is far from trivial and error-prone. So I think the only viable solution is to clone the current "airflow" installation and then you can have different venvs for different extra packages. Otherwise (if you do not want to install airflow + dependend packages) you should simply use PythonVirtualenv operator as you mentioned above. |
Beta Was this translation helpful? Give feedback.
-
My first "off the cuff" thought is that this would be something that lives outside Airflow, and it's just the context you run the celery workers in. The venv task runner idea does sound like a good way of tackling this of we do want to include it - but it would be slow. On reading the issue subject my first thought was:
It depends how many venvs you wanted I guess |
Beta Was this translation helpful? Give feedback.
-
You're right, that's why I'm suggesting to allow the user chose between two methods:
I don't want that this feature will make Airflow launcheable only by using venv or global env.
Well, yes and no.
There is a major issue with that approach because it means it will be harder to upgrade versions, it will require the user to build some kind of deployment script that will shutdown the worker gracefully and launch a new one every time they want to upgrade any package. |
Beta Was this translation helpful? Give feedback.
-
What we are talking about is caching some resources between task runs (in this case .venv). The problems with that are that you introduce indeterministic approach. Initially execution time depends on whether the .venv is there or not. But it's more than that. What happens if new dependencies are released in the meantime? Do we care about having the exact same version of the depedencies between diffferent .venvs ? What happens if you have two versions of the same .venv in different workers and your task executes either on the older or newer version (or is executed on one, fails and then gets retried on the other)? As the result - should we always require pinning the versions? Should we invalidate the cached .venv if the "pinned" version changed. How do we determine when we should change and rebuild the .venv. I am not saying it is impossible to solve, I am just saying that it needs to be rather carefully thought about. More than that - maybe we do not have to think that much and can apply some best practices out there. This is actually very similar to what https://github.com/actions/cache (and any other CI) do. And maybe this is something we should do as well - introduce a generic caching approach that we will be able to apply to .venv or any other "cacheable" resource. We've been discussing other optimisations and concept of "task affiinity". We've already discussed it several times that there should be a way of sharing some "cached" state between different tasks of the same kind and reusing that state. For example machine learning models for workers that have GPUs - where the same model loaded by one task could be reused by many other tasks (and save time for model loading which is often a lengthy operation because CPU and GPU memory is not shared - it's only sharedin the new M1 from Apple). I am not talking we should do it all together, but I see that caching/affinity share a lot of common problems (and solution) and maybe we should think broader and propose a general caching/affinity solution that could be applied to other cases. |
Beta Was this translation helpful? Give feedback.
-
To be honest, I think the best approach here is to not solve it and let the user know about this behavior, and I don't think this is such a bad behavior, as if a task has started already and the there is now a new version - you wouldn't stop the task to rerun, it make sense it will run and the next task will use the new version.
I would love to see some caching mechanism in Airflow for other things as well (shared Java JARs for example), but a generic cache solution IMHO will only complex this feature, as right now it's out of the box (create venv if the folder doesn't exists, run pip install, and that's it, something like: |
Beta Was this translation helpful? Give feedback.
-
@zacharya19 Can you please describe in detail the algorithm you want to propose for upgrading/managing the venv? The one proposed does not describe the details. When it should happen ? Are you going to try to upgrade to newer dependencies always when new task is run? Which component of the process is doing it? How it plays out with other tasks running at the same time and possibly using the same venv at the time of upgrade? I think you heavily underestimate the complexity involved in the upgrade/creation of the venvs. Remember that this is not only problem of different venvs running on different workers. LocalExcecutor and CeleryExecutor will run multiple tasks in parallel on the same machine. Should they share the same venv? What happens in different tasks have different "pinned" versions specified for example? Will they override each other venv? Or when one task has a pinned version and the other not - what happens then? I think this requires at least a design doc describing the whole algorithm (and likely it already require creating and discussing An Airflow Improvement Proposal (https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals) and collective discussion on all such cases in the devlist.
This will not work in a number of cases described above. I think when you look at all the scenarios and edge-cases involved, you will find out that you have the same complexity as any other caching solution and we will end up having to solve them, so it's likely better to solve them as 'general' case. Remember the quote?
|
Beta Was this translation helpful? Give feedback.
-
As a first step, I suggest a simple solution: This solution is fairly simple to implement and covers the base issues I described in the issue.
I'm all down to start thinking on a generic solution, but I don't think that this feature has to be bulletproof, so if you still think my solution isn't good enough I can try think on something else or maybe propose a solution for a generic caching manager. |
Beta Was this translation helpful? Give feedback.
-
I suggest you write it to the devlist and propose it as new feature there, I'd love to hear what others think about it. |
Beta Was this translation helpful? Give feedback.
-
Just to expolain - I have my opinion (voiced above) and I thin the indeterministic behaviour will lead to many questions from the users and hard debugging which will be hard to answer. Personally I think we need a more bullet-proof solution. This is a new feature to add and it should be discussed in the devlist (like anything else of this caliber). We are really community-driven and no single person can make a decision. For me this sounds like an interesting feature, the level of "buletproofness" of it should be discussed in the devlist and I would love to hear what others say, Whatever community decide, I am happy with.. |
Beta Was this translation helpful? Give feedback.
-
In my opinion if you start to have multiple teams working on the same Airflow deployment then virtualenv solves the problem only on Python level which in my opinion is not sufficient. I personally would be in favour of worker/queue per env/team (as @ashb suggested). This seems to be more reliable approach (managing deps on deployment level) than managing dependencies via application (Airflow) which main task is scheduling and executing tasks. But as @potiuk said - we should move this discussion to devlist to gather wider feedback. |
Beta Was this translation helpful? Give feedback.
-
As I said, the issue here is that the user have to create some kind of complex deployment scripts, because running airflow inside venv requires restart when updating the packages. |
Beta Was this translation helpful? Give feedback.
-
Still haven't got the access so I'm just going to write down another suggestion I had an mind - That way, we gain a much more bullet-proof solution while still giving the user the option to control it inside Airflow without the need of creating worker per venv. |
Beta Was this translation helpful? Give feedback.
-
I believe somoene ask you for your Id to give you access (in response to your mail) - did you send it? |
Beta Was this translation helpful? Give feedback.
-
@potiuk Didn't get the response email, weird, but anyway sent my Wiki ID now. |
Beta Was this translation helpful? Give feedback.
-
@zacharya19 - maybe you are not subscribed to the devlist? By default when you even 'reply to all' at the devlist, the default reply to is the devlist only. So if you are not subscribed, you might not get the answer: You can get the 'how to subscribe' info here: https://airflow.apache.org/community/ |
Beta Was this translation helpful? Give feedback.
-
Thanks, subscribed and created the AIP - https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-37+Virtualenv+management+inside+Airflow. Also, sent an email to start a discussion on my solution. |
Beta Was this translation helpful? Give feedback.
-
AIP created. Has not been picked up and led by anyone. But maybe someone will. converting it into discussion. Makes no sense to keep it as an issue. |
Beta Was this translation helpful? Give feedback.
-
Description
Add the option for the celery worker to create a new virtual env, install some packages, and run airflow run command inside it (based on
executor_config
params).Really nice to have - have reusable virtual env that can be shared between tasks with the same param (based on user configuration).
Use case / motivation
Once getting to a point when you want to create cluster for different types of python tasks and you've multiple teams working on the same cluster, you need to start splitting into different python packages the business login code to allow better versioning control and avoid the need of restarting the workers when deploying new util code.
I think it would be amazing if we can allow creating new virtual envs as part of Airflow and control the package versions.
I know that
PythonVirtualenvOperator
exists, but:ShortCircuitOperator
orBranchPythonOperator
or any kind of new python based operator, I've to create a new operator that will inherit fromPythonVirtualenvOperator
and duplicate the desired functionality.Are you willing to submit a PR?
Yes, would love to.
Related Issues
Not that I can find.
Beta Was this translation helpful? Give feedback.
All reactions