Skip to content

Commit

Permalink
[AIRFLOW-2267] Airflow DAG level access
Browse files Browse the repository at this point in the history
Make sure you have checked _all_ steps below.

### JIRA
- [x] My PR addresses the following [Airflow JIRA]
(https://issues.apache.org/jira/browse/AIRFLOW/)
issues and references them in the PR title. For
example, "\[AIRFLOW-XXX\] My Airflow PR"
    -
https://issues.apache.org/jira/browse/AIRFLOW-2267
    - In case you are fixing a typo in the
documentation you can prepend your commit with
\[AIRFLOW-XXX\], code changes always need a JIRA
issue.

### Description
- [x] Here are some details about my PR, including
screenshots of any UI changes:
 Provide DAG level access for airflow.  The detail
design could be found at https://docs.google.com/d
ocument/d/1qs26lE9kAuCY0Qa0ga-80EQ7d7m4s-590lhjtMB
jmxw/edit#

### Tests
- [x] My PR adds the following unit tests __OR__
does not need testing for this extremely good
reason:
Unit tests are added.

### Commits
- [x] My commits all reference JIRA issues in
their subject lines, and I have squashed multiple
commits if they address the same issue. In
addition, my commits follow the guidelines from
"[How to write a good git commit
message](http://chris.beams.io/posts/git-
commit/)":
    1. Subject is separated from body by a blank line
    2. Subject is limited to 50 characters
    3. Subject does not end with a period
    4. Subject uses the imperative mood ("add", not
"adding")
    5. Body wraps at 72 characters
    6. Body explains "what" and "why", not "how"

- [x] Passes `git diff upstream/master -u --
"*.py" | flake8 --diff`

Closes apache#3197 from feng-tao/airflow-2267
  • Loading branch information
Tao feng authored and mistercrunch committed Jul 16, 2018
1 parent 17ddbcf commit f3f2eb3
Show file tree
Hide file tree
Showing 16 changed files with 1,285 additions and 231 deletions.
8 changes: 8 additions & 0 deletions UPDATING.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,14 @@ assists users migrating to a new version.

## Airflow Master

### DAG level Access Control for new RBAC UI

Extend and enhance new Airflow RBAC UI to support DAG level ACL. Each dag now has two permissions(one for write, one for read) associated('can_dag_edit', 'can_dag_read').
The admin will create new role, associate the dag permission with the target dag and assign that role to users. That user can only access / view the certain dags on the UI
that he has permissions on. If a new role wants to access all the dags, the admin could associate dag permissions on an artificial view(``all_dags``) with that role.

We also provide a new cli command(``sync_perm``) to allow admin to auto sync permissions.

### Setting UTF-8 as default mime_charset in email utils

### Add a configuration variable(default_dag_run_display_number) to control numbers of dag run for display
Expand Down
18 changes: 18 additions & 0 deletions airflow/bin/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -1282,6 +1282,9 @@ def create_user(args):
if password != password_confirmation:
raise SystemExit('Passwords did not match!')

if appbuilder.sm.find_user(args.username):
print('{} already exist in the db'.format(args.username))
return
user = appbuilder.sm.add_user(args.username, args.firstname, args.lastname,
args.email, role, password)
if user:
Expand Down Expand Up @@ -1342,6 +1345,16 @@ def list_dag_runs(args, dag=None):
print(record)


@cli_utils.action_logging
def sync_perm(args): # noqa
if settings.RBAC:
appbuilder = cached_appbuilder()
print('Update permission, view-menu for all existing roles')
appbuilder.sm.sync_roles()
else:
print('The sync_perm command only works for rbac UI.')


Arg = namedtuple(
'Arg', ['flags', 'help', 'action', 'default', 'nargs', 'type', 'choices', 'metavar'])
Arg.__new__.__defaults__ = (None, None, None, None, None, None, None)
Expand Down Expand Up @@ -1924,6 +1937,11 @@ class CLIFactory(object):
'args': ('role', 'username', 'email', 'firstname', 'lastname',
'password', 'use_random_password'),
},
{
'func': sync_perm,
'help': "Update existing role's permissions.",
'args': tuple(),
}
)
subparsers_dict = {sp['func'].__name__: sp for sp in subparsers}
dag_subparsers = (
Expand Down
5 changes: 4 additions & 1 deletion airflow/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,10 @@ def configure_orm(disable_connection_pool=False):
setup_event_handlers(engine, reconnect_timeout)

Session = scoped_session(
sessionmaker(autocommit=False, autoflush=False, bind=engine))
sessionmaker(autocommit=False,
autoflush=False,
bind=engine,
expire_on_commit=False))


def dispose_orm():
Expand Down
2 changes: 1 addition & 1 deletion airflow/www/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -2517,7 +2517,7 @@ def hidden_field_formatter(view, context, model, name):
)
column_list = ('key', 'val', 'is_encrypted',)
column_filters = ('key', 'val')
column_searchable_list = ('key', 'val')
column_searchable_list = ('key', 'val', 'is_encrypted',)
column_default_sort = ('key', False)
form_widget_args = {
'is_encrypted': {'disabled': True},
Expand Down
25 changes: 17 additions & 8 deletions airflow/www_rbac/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
csrf = CSRFProtect()


def create_app(config=None, testing=False, app_name="Airflow"):
def create_app(config=None, session=None, testing=False, app_name="Airflow"):
global app, appbuilder
app = Flask(__name__)
app.wsgi_app = ProxyFix(app.wsgi_app)
Expand Down Expand Up @@ -66,10 +66,20 @@ def create_app(config=None, testing=False, app_name="Airflow"):
configure_logging()

with app.app_context():

from airflow.www_rbac.security import AirflowSecurityManager
security_manager_class = app.config.get('SECURITY_MANAGER_CLASS') or \
AirflowSecurityManager

if not issubclass(security_manager_class, AirflowSecurityManager):
raise Exception(
"""Your CUSTOM_SECURITY_MANAGER must now extend AirflowSecurityManager,
not FAB's security manager.""")

appbuilder = AppBuilder(
app,
db.session,
security_manager_class=app.config.get('SECURITY_MANAGER_CLASS'),
db.session if not session else session,
security_manager_class=security_manager_class,
base_template='appbuilder/baselayout.html')

def init_views(appbuilder):
Expand Down Expand Up @@ -126,12 +136,11 @@ def init_views(appbuilder):
# Otherwise, when the name of a view or menu is changed, the framework
# will add the new Views and Menus names to the backend, but will not
# delete the old ones.
appbuilder.security_cleanup()

init_views(appbuilder)

from airflow.www_rbac.security import init_roles
init_roles(appbuilder)
security_manager = appbuilder.sm
security_manager.sync_roles()

from airflow.www_rbac.api.experimental import endpoints as e
# required for testing purposes otherwise the module retains
Expand Down Expand Up @@ -164,14 +173,14 @@ def root_app(env, resp):
return [b'Apache Airflow is not at this location']


def cached_app(config=None, testing=False):
def cached_app(config=None, session=None, testing=False):
global app, appbuilder
if not app or not appbuilder:
base_url = urlparse(conf.get('webserver', 'base_url'))[2]
if not base_url or base_url == '/':
base_url = ""

app, _ = create_app(config, testing)
app, _ = create_app(config, session, testing)
app = DispatcherMiddleware(root_app, {base_url: app})
return app

Expand Down
34 changes: 33 additions & 1 deletion airflow/www_rbac/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import functools
import pendulum
from io import BytesIO as IO
from flask import after_this_request, request, g
from flask import after_this_request, redirect, request, url_for, g
from airflow import models, settings


Expand Down Expand Up @@ -91,3 +91,35 @@ def zipper(response):
return f(*args, **kwargs)

return view_func


def has_dag_access(**dag_kwargs):
"""
Decorator to check whether the user has read / write permission on the dag.
"""
def decorator(f):
@functools.wraps(f)
def wrapper(self, *args, **kwargs):
has_access = self.appbuilder.sm.has_access
dag_id = request.args.get('dag_id')
# if it is false, we need to check whether user has write access on the dag
can_dag_edit = dag_kwargs.get('can_dag_edit', False)

# 1. check whether the user has can_dag_edit permissions on all_dags
# 2. if 1 false, check whether the user
# has can_dag_edit permissions on the dag
# 3. if 2 false, check whether it is can_dag_read view,
# and whether user has the permissions
if (
has_access('can_dag_edit', 'all_dags') or
has_access('can_dag_edit', dag_id) or (not can_dag_edit and
(has_access('can_dag_read',
'all_dags') or
has_access('can_dag_read',
dag_id)))):
return f(self, *args, **kwargs)
else:
return redirect(url_for(self.appbuilder.sm.auth_view.
__class__.__name__ + ".login"))
return wrapper
return decorator
Loading

0 comments on commit f3f2eb3

Please sign in to comment.