diff --git a/.flake8 b/.flake8 new file mode 100644 index 0000000..ee7a7cc --- /dev/null +++ b/.flake8 @@ -0,0 +1,20 @@ +[flake8] +# @see https://flake8.pycqa.org/en/latest/user/configuration.html?highlight=.flake8 + +exclude = + ckan + +# Extended output format. +format = pylint + +# Show the source of errors. +show_source = True +statistics = True + +max-complexity = 10 +max-line-length = 127 + +# List ignore rules one per line. +ignore = + C901 + W503 diff --git a/README.md b/README.md index d9f79b4..74920ff 100644 --- a/README.md +++ b/README.md @@ -13,6 +13,7 @@ A CKAN extension to hold various security improvements for CKAN, including: disclose whether or not that email address exists in the DB * Two Factor Authentication is enforced for all users * Preventing upload/linking of certain file types for resources +* Stricter access rules to view user profiles **Please note**: * This extension has been used and tested against CKAN version 2.7.x on git tag 2.5.0 and earlier @@ -89,6 +90,16 @@ You can also achieve this by adding the detected mime type to your blacklist dir Links are only checked based on the extension in the url, we do not request the file at the linked url to infer the mime type. +### Stricter access rules to view user profiles + +If `ckan.auth.public_user_details` is set to False, then access to user profiles will be restricted beyond the CKAN default of 'any authenticated user'. + +* 'user_list' permission (affecting the 'user_list' and 'user_autocomplete' APIs) will be restricted to admins (sysadmins, org admins, and group admins). This is necessary in order for admins to add users to their group/org. +* 'user_show' permission will be restricted to admins, plus the owner of the requested user profile. +* 'group_show' permission (affecting the 'group_show' and 'organization_show' APIs) will be restricted to admins of the specific requested group, if the call requests user details with the 'include_users' parameter. If user details are not requested, calls are unrestricted. + +If `ckan.auth.public_user_details` is absent or True, no restrictions are imposed. + ## Requirements * The server-side session storage requires the session middleware in CKAN core to be moved near the end of the middleware stack. An example changeset (relevant to CKAN 2.9.3) for this is provided in [ckanext-security.patch](ckanext-security.patch). The installed CKAN core codebase will need to have this patch applied (or similar changes made if not using 2.9.3). * A running Redis instance to store brute force protection tokens configured with a maxmemory and maxmemory-policy=lru so it overwrites the least recently used item rather than running out of space. This instance should be a different instance from the one used for Harvest items to avoid data loss. [Redis LRU-Cache documentation](https://redis.io/topics/lru-cache). diff --git a/ckanext/security/logic/auth.py b/ckanext/security/logic/auth.py index a1ad4a1..9d8b8b9 100644 --- a/ckanext/security/logic/auth.py +++ b/ckanext/security/logic/auth.py @@ -2,17 +2,131 @@ Action functions, all sysadmin-only ''' +from ckan import authz, model +from ckan.logic import auth as logic_auth +from ckan.plugins.toolkit import _, asbool, config, \ + auth_allow_anonymous_access, chained_auth_function + + def security_throttle_user_reset(context, data_dict): return {'success': False} + def security_throttle_address_reset(context, data_dict): return {'success': False} + def security_throttle_user_show(context, data_dict): return {'success': False} + def security_throttle_address_show(context, data_dict): return {'success': False} + def security_reset_totp(context, data_dict): return {'success': False} + + +# User privacy enhancements + + +@chained_auth_function +def user_list(next_auth, context, data_dict=None): + """Check whether access to the user list is authorised. + Restricted to admins if 'ckan.auth.public_user_details' is False. + """ + if asbool(config.get('ckan.auth.public_user_details', True)) \ + or _requester_is_admin(context): + return next_auth(context, data_dict) + else: + return {'success': False} + + +@chained_auth_function +@auth_allow_anonymous_access +def user_show(next_auth, context, data_dict): + """Check whether access to individual user details is authorised. + Restricted to admins or self if 'ckan.auth.public_user_details' is False. + """ + if asbool(config.get('ckan.auth.public_user_details', True)) \ + or _requester_is_admin(context): + authorized = True + else: + requester = context.get('user') + id = data_dict.get('id', None) + if id: + user_obj = model.User.get(id) + else: + user_obj = data_dict.get('user_obj', None) + authorized = user_obj and requester in [user_obj.name, user_obj.id] + if authorized: + return next_auth(context, data_dict) + + return {'success': False} + + +@chained_auth_function +@auth_allow_anonymous_access +def group_show(next_auth, context, data_dict): + """Check whether access to a group is authorised. + If it's just the group metadata, this requires no privileges, + but if user details have been requested and + 'ckan.auth.public_user_details' is False, it requires a group admin. + """ + if asbool(config.get('ckan.auth.public_user_details', True)): + authorized = True + else: + requester = context.get('user') + group = logic_auth.get_group_object(context, data_dict) + authorized = ( + group.state == 'active' + and not asbool(data_dict.get('include_users', False)) + and data_dict.get('object_type', None) != 'user' + ) or authz.has_user_permission_for_group_or_org(group.id, requester, 'update') + if authorized: + return next_auth(context, data_dict) + else: + return {'success': False, + 'msg': _('User %s not authorized to read group %s') % (requester, group.id)} + + +def _requester_is_admin(context): + """Check whether the current user has admin privileges in some group + or organisation. + This is based on the 'update' privilege; see eg + ckan.logic.auth.update.group_edit_permissions. + """ + requester = context.get('user') + return _has_user_permission_for_some_group(requester, 'admin') + + +def _has_user_permission_for_some_group(user_name, permission): + """Check if the user has the given permission for any group. + """ + user_id = authz.get_user_id_for_username(user_name, allow_none=True) + if not user_id: + return False + roles = authz.get_roles_with_permission(permission) + + if not roles: + return False + # get any groups the user has with the needed role + q = model.Session.query(model.Member) \ + .filter(model.Member.table_name == 'user') \ + .filter(model.Member.state == 'active') \ + .filter(model.Member.capacity.in_(roles)) \ + .filter(model.Member.table_id == user_id) + group_ids = [] + for row in q.all(): + group_ids.append(row.group_id) + # if not in any groups has no permissions + if not group_ids: + return False + + # see if any of the groups are active + q = model.Session.query(model.Group) \ + .filter(model.Group.state == 'active') \ + .filter(model.Group.id.in_(group_ids)) + + return bool(q.count()) diff --git a/ckanext/security/plugin/__init__.py b/ckanext/security/plugin/__init__.py index f027db3..cd12c9e 100644 --- a/ckanext/security/plugin/__init__.py +++ b/ckanext/security/plugin/__init__.py @@ -1,3 +1,5 @@ +# encoding: utf-8 + import logging import ckan.plugins as p @@ -97,6 +99,9 @@ def get_auth_functions(self): auth.security_throttle_address_show, 'security_reset_totp': auth.security_reset_totp, + 'user_list': auth.user_list, + 'user_show': auth.user_show, + 'group_show': auth.group_show, } # END Hooks for IAuthFunctions diff --git a/ckanext/security/views.py b/ckanext/security/views.py index 7c4948a..08725c7 100644 --- a/ckanext/security/views.py +++ b/ckanext/security/views.py @@ -2,10 +2,9 @@ import logging -from ckan.views import user from ckanext.security import utils from ckan.lib import helpers -from flask import Blueprint, make_response, request +from flask import Blueprint, make_response from functools import wraps from ckan.plugins import toolkit as tk