Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions .flake8
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
[flake8]
# @see https://flake8.pycqa.org/en/latest/user/configuration.html?highlight=.flake8

exclude =
ckan

# Extended output format.
format = pylint

# Show the source of errors.
show_source = True
statistics = True

max-complexity = 10
max-line-length = 127

# List ignore rules one per line.
ignore =
C901
W503
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ A CKAN extension to hold various security improvements for CKAN, including:
disclose whether or not that email address exists in the DB
* Two Factor Authentication is enforced for all users
* Preventing upload/linking of certain file types for resources
* Stricter access rules to view user profiles

**Please note**:
* This extension has been used and tested against CKAN version 2.7.x on git tag 2.5.0 and earlier
Expand Down Expand Up @@ -89,6 +90,16 @@ You can also achieve this by adding the detected mime type to your blacklist dir

Links are only checked based on the extension in the url, we do not request the file at the linked url to infer the mime type.

### Stricter access rules to view user profiles

If `ckan.auth.public_user_details` is set to False, then access to user profiles will be restricted beyond the CKAN default of 'any authenticated user'.

* 'user_list' permission (affecting the 'user_list' and 'user_autocomplete' APIs) will be restricted to admins (sysadmins, org admins, and group admins). This is necessary in order for admins to add users to their group/org.
* 'user_show' permission will be restricted to admins, plus the owner of the requested user profile.
* 'group_show' permission (affecting the 'group_show' and 'organization_show' APIs) will be restricted to admins of the specific requested group, if the call requests user details with the 'include_users' parameter. If user details are not requested, calls are unrestricted.

If `ckan.auth.public_user_details` is absent or True, no restrictions are imposed.

## Requirements
* The server-side session storage requires the session middleware in CKAN core to be moved near the end of the middleware stack. An example changeset (relevant to CKAN 2.9.3) for this is provided in [ckanext-security.patch](ckanext-security.patch). The installed CKAN core codebase will need to have this patch applied (or similar changes made if not using 2.9.3).
* A running Redis instance to store brute force protection tokens configured with a maxmemory and maxmemory-policy=lru so it overwrites the least recently used item rather than running out of space. This instance should be a different instance from the one used for Harvest items to avoid data loss. [Redis LRU-Cache documentation](https://redis.io/topics/lru-cache).
Expand Down
114 changes: 114 additions & 0 deletions ckanext/security/logic/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,131 @@
Action functions, all sysadmin-only
'''

from ckan import authz, model
from ckan.logic import auth as logic_auth
from ckan.plugins.toolkit import _, asbool, config, \
auth_allow_anonymous_access, chained_auth_function


def security_throttle_user_reset(context, data_dict):
return {'success': False}


def security_throttle_address_reset(context, data_dict):
return {'success': False}


def security_throttle_user_show(context, data_dict):
return {'success': False}


def security_throttle_address_show(context, data_dict):
return {'success': False}


def security_reset_totp(context, data_dict):
return {'success': False}


# User privacy enhancements


@chained_auth_function
def user_list(next_auth, context, data_dict=None):
"""Check whether access to the user list is authorised.
Restricted to admins if 'ckan.auth.public_user_details' is False.
"""
if asbool(config.get('ckan.auth.public_user_details', True)) \
or _requester_is_admin(context):
return next_auth(context, data_dict)
else:
return {'success': False}


@chained_auth_function
@auth_allow_anonymous_access
def user_show(next_auth, context, data_dict):
"""Check whether access to individual user details is authorised.
Restricted to admins or self if 'ckan.auth.public_user_details' is False.
"""
if asbool(config.get('ckan.auth.public_user_details', True)) \
or _requester_is_admin(context):
authorized = True
else:
requester = context.get('user')
id = data_dict.get('id', None)
if id:
user_obj = model.User.get(id)
else:
user_obj = data_dict.get('user_obj', None)
authorized = user_obj and requester in [user_obj.name, user_obj.id]
if authorized:
return next_auth(context, data_dict)

return {'success': False}


@chained_auth_function
@auth_allow_anonymous_access
def group_show(next_auth, context, data_dict):
"""Check whether access to a group is authorised.
If it's just the group metadata, this requires no privileges,
but if user details have been requested and
'ckan.auth.public_user_details' is False, it requires a group admin.
"""
if asbool(config.get('ckan.auth.public_user_details', True)):
authorized = True
else:
requester = context.get('user')
group = logic_auth.get_group_object(context, data_dict)
authorized = (
group.state == 'active'
and not asbool(data_dict.get('include_users', False))
and data_dict.get('object_type', None) != 'user'
) or authz.has_user_permission_for_group_or_org(group.id, requester, 'update')
if authorized:
return next_auth(context, data_dict)
else:
return {'success': False,
'msg': _('User %s not authorized to read group %s') % (requester, group.id)}


def _requester_is_admin(context):
"""Check whether the current user has admin privileges in some group
or organisation.
This is based on the 'update' privilege; see eg
ckan.logic.auth.update.group_edit_permissions.
"""
requester = context.get('user')
return _has_user_permission_for_some_group(requester, 'admin')


def _has_user_permission_for_some_group(user_name, permission):
"""Check if the user has the given permission for any group.
"""
user_id = authz.get_user_id_for_username(user_name, allow_none=True)
if not user_id:
return False
roles = authz.get_roles_with_permission(permission)

if not roles:
return False
# get any groups the user has with the needed role
q = model.Session.query(model.Member) \
.filter(model.Member.table_name == 'user') \
.filter(model.Member.state == 'active') \
.filter(model.Member.capacity.in_(roles)) \
.filter(model.Member.table_id == user_id)
group_ids = []
for row in q.all():
group_ids.append(row.group_id)
# if not in any groups has no permissions
if not group_ids:
return False

# see if any of the groups are active
q = model.Session.query(model.Group) \
.filter(model.Group.state == 'active') \
.filter(model.Group.id.in_(group_ids))

return bool(q.count())
5 changes: 5 additions & 0 deletions ckanext/security/plugin/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# encoding: utf-8

import logging
import ckan.plugins as p

Expand Down Expand Up @@ -97,6 +99,9 @@ def get_auth_functions(self):
auth.security_throttle_address_show,
'security_reset_totp':
auth.security_reset_totp,
'user_list': auth.user_list,
'user_show': auth.user_show,
'group_show': auth.group_show,
}
# END Hooks for IAuthFunctions

Expand Down
3 changes: 1 addition & 2 deletions ckanext/security/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@

import logging

from ckan.views import user
from ckanext.security import utils
from ckan.lib import helpers
from flask import Blueprint, make_response, request
from flask import Blueprint, make_response
from functools import wraps
from ckan.plugins import toolkit as tk

Expand Down