Source code for couchbase.management.users

from datetime import datetime

from couchbase.options import OptionBlock, OptionBlockTimeOut, timedelta, forward_args
from couchbase.management.generic import GenericManager
from couchbase.auth import AuthDomain
from typing import *
from couchbase.exceptions import (HTTPException, ErrorMapper, InvalidArgumentException,
                                  FeatureNotFoundException, RateLimitedException)
from copy import deepcopy


class GroupNotFoundException(HTTPException):
    """ The RBAC Group was not found"""


class UserNotFoundException(HTTPException):
    """ The RBAC User was not found"""


class UserErrorHandler(ErrorMapper):
    @staticmethod
    def mapping():
        # type (...)->Mapping[str, CBErrorType]
        return {HTTPException: {'Unknown group': GroupNotFoundException,
                                'Unknown user': UserNotFoundException,
                                'Not found': FeatureNotFoundException,
                                'Method Not Allowed': FeatureNotFoundException,
                                '.*Limit\(s\) exceeded\s+\[.*\].*': RateLimitedException}
                }


[docs]@UserErrorHandler.wrap class UserManager(GenericManager): def __init__(self, # type: UserManager admin): """User Manager Programmatic access to the user management REST API: https://docs.couchbase.com/server/current/rest-api/rbac.html Unless otherwise indicated, all objects SHOULD be immutable. :param parent_cluster: """ super(UserManager, self).__init__(admin)
[docs] def get_user(self, username, # type: str *options, # type: GetUserOptions **kwargs # type: Any ): # type: (...) -> UserAndMetadata """ Gets a user. :param str username: ID of the user. :param options: GetUserOptions :param Any kwargs: override corresponding values in the options. :returns: An instance of UserAndMetadata. :raises: UserNotFoundException :raises: InvalidArgumentsException Any exceptions raised by the underlying platform """ # Implementation Notes # When parsing the "get" and "getAll" responses, # take care to distinguish between roles assigned directly to the user (role origin with type="user") and # roles inherited from groups (role origin with type="group" and name=<group name>). # If the server response does not include an "origins" field for a role, # then it was generated by a server version prior to 6.5 and the SDK MUST treat the role as if it had a # single origin of type="user". final_args = forward_args(kwargs, *options) domain = final_args.pop("domain_name", "local") timeout = final_args.pop("timeout", None) return UserAndMetadata.load_from_server( self._admin_bucket.user_get(username, domain, timeout).value)
[docs] def get_all_users(self, *options, # type: GetAllUsersOptions **kwargs # type: Any ): # type: (...) -> Iterable[UserAndMetadata] """ Gets all users. :param options: GetAllUsersOptions :param Any kwargs: override corresponding values in the options. :return: An iterable collection of UserAndMetadata. """ final_args = forward_args(kwargs, *options) domain = final_args.get("domain_name", "local") timeout = final_args.get("timeout", None) return list(map(lambda u: UserAndMetadata.load_from_server(u), self._admin_bucket.users_get(domain, timeout).value))
[docs] def upsert_user(self, user, # type: User *options, # type: UpsertUserOptions **kwargs # type: Any ): """ Creates or updates a user. :param User user: the new version of the user. :param options: UpsertUserOptions :param Any kwargs: override corresponding values in the options. :raises: InvalidArgumentsException """ # Implementation Notes # # When building the PUT request to send to the REST endpoint, implementations MUST omit the "password" property # if it is not present in the given User domain object (so that the password is only changed if the calling code # provided a new password). final_args = forward_args(kwargs, *options) domain = final_args.pop("domain_name", "local") final_args.update({k: v for k, v in user.as_dict.items() if k in { 'password', 'roles', 'name', 'groups'}}) self._admin_bucket.user_upsert(user.username, domain, **final_args)
[docs] def drop_user(self, username, # type: str *options, # type: DropUserOptions **kwargs # type: Any ): """ Removes a user. :param str username: ID of the user. :param options: DropUserOptions :param Any kwargs: override corresponding values in the options. :raises: UserNotFoundException :raises: InvalidArgumentsException Any exceptions raised by the underlying platform """ final_args = forward_args(kwargs, *options) domain = final_args.pop("domain_name", "local") timeout = final_args.get("timeout", None) self._admin_bucket.user_remove(username, domain, timeout)
[docs] def get_roles(self, *options, # type: GetRolesOptions **kwargs # type: Any ): # type: (...) -> Iterable[RoleAndDescription] """ Returns the roles supported by the server. :param options: GetRolesOptions :param Any kwargs: override corresponding values in the options. :return: An iterable collection of RoleAndDescription. """ final_args = forward_args(kwargs, *options) timeout = final_args.get("timeout", None) return list(map(lambda r: RoleAndDescription.load_from_server(r), self._admin_bucket.get_roles(timeout).value))
[docs] def get_group(self, group_name, # type: str *options, # type: GetGroupOptions **kwargs # type: Any ): # type: (...) -> Group """ Gets a group. :param str group_name: name of the group to get. :param options: GetRolesOptions :param Any kwargs: override corresponding values in the options. :return: An instance of Group. :raises: GroupNotFoundException :raises: InvalidArgumentsException Any exceptions raised by the underlying platform """ final_args = forward_args(kwargs, *options) timeout = final_args.get("timeout", None) return Group.load_from_server( self._admin_bucket.group_get(group_name, timeout).value)
[docs] def get_all_groups(self, *options, # type: GetAllGroupsOptions **kwargs # type: Any ): # type: (...) -> Iterable[Group] """ Get all groups. :param timedelta timeout: the time allowed for the operation to be terminated. This is controlled by the client. :returns: An iterable collection of Group. """ final_args = forward_args(kwargs, *options) timeout = final_args.get("timeout", None) return list(map(lambda g: Group.load_from_server(g), self._admin_bucket.groups_get(timeout).value))
[docs] def upsert_group(self, group, # type: Group *options, # type: UpsertGroupOptions **kwargs # type: Any ): """ Creates or updates a group. :param Group group: the new version of the group. :param options: UpsertGroupOptions :param Any kwargs: override corresponding values in the options. :raises: InvalidArgumentsException Any exceptions raised by the underlying platform """ # This endpoint accepts application/x-www-form-urlencoded and requires the data be sent as form data. # The name/id should not be included in the form data. # Roles should be a comma separated list of strings. # If, only if, the role contains a bucket name then the rolename should be suffixed # with[<bucket_name>] e.g. bucket_full_access[default],security_admin. final_args = forward_args(kwargs, *options) final_args.update({k: v for k, v in group.as_dict.items() if k in { 'roles', 'description', 'ldap_group_reference'}}) self._admin_bucket.group_upsert(group.name, **final_args)
[docs] def drop_group(self, group_name, # type: str *options, # type: DropGroupOptions **kwargs # type: Any ): """ Removes a group. :param str group_name: name of the group. :param options: DropGroupOptions :param Any kwargs: override corresponding values in the options. :raises: GroupNotFoundException :raises: InvalidArgumentsException """ self._admin_bucket.http_request("/settings/rbac/groups/{}".format(group_name), method='DELETE', **forward_args(kwargs, *options))
class SetHelper(object): @classmethod def to_set(cls, value, valid_type, display_name): if not value: return value elif isinstance(value, set): cls.validate_all_set_types(value, valid_type, display_name) return value elif isinstance(value, list): cls.validate_all_set_types(value, valid_type, display_name) return set(value) elif isinstance(value, tuple): cls.validate_all_set_types(value, valid_type, display_name) return set(value) elif isinstance(value, valid_type): return set([value]) else: raise InvalidArgumentException( '{} must be of type {}.'.format(display_name, valid_type.__name__)) @classmethod def validate_all_set_types(cls, value, valid_type, display_name): if all(map(lambda r: isinstance(r, type), value)): raise InvalidArgumentException( '{} must contain only objects of type {}.'.format(display_name, valid_type.__name__)) class Role(object): def __init__(self, name=None, # type: str bucket=None, # type: str scope=None, # type: str collection=None, # type: str ): if not name: raise InvalidArgumentException('A role must have a name') self._name = name self._bucket = bucket self._scope = scope self._collection = collection @property def name(self): # type: (...) -> str return self._name @property def bucket(self): # type: (...) -> str return self._bucket @property def scope(self): # type: (...) -> str return self._scope @property def collection(self): # type: (...) -> str return self._collection def to_server_str(self): if self.bucket and self.scope and self.collection: return '{0}[{1}:{2}:{3}]'.format(self.name, self.bucket, self.scope, self.collection) elif self.bucket and self.scope: return '{0}[{1}:{2}]'.format(self.name, self.bucket, self.scope) elif self.bucket: return '{0}[{1}]'.format(self.name, self.bucket) else: return self.name def as_dict(self): return { 'name': self._name, 'bucket': self._bucket, 'scope': self._scope, 'collection': self._collection } def __eq__(self, other): return (isinstance(other, type(self)) and (self._name, self._bucket, self._scope, self._collection) == (other._name, other._bucket, other._scope, other._collection)) def __hash__(self): return hash((self._name, self._bucket, self._scope, self._collection)) @classmethod def load_from_server(cls, json_data): return cls( name=json_data.get('role', None), bucket=json_data.get('bucket_name', None), scope=json_data.get('scope_name', None), collection=json_data.get('collection_name', None) ) class RoleAndDescription(object): def __init__(self, role=None, # type: Role display_name=None, # type: str description=None, # type: str ce=None, # type: bool ): self._role = role self._display_name = display_name self._description = description self._ce = ce @property def role(self): # type: (...) -> Role return self._role @property def display_name(self): # type: (...) -> str return self._display_name @property def description(self): # type: (...) -> str return self._description @property def ce(self): # type: (...) -> bool return self._ce @classmethod def load_from_server(cls, json_data): return cls( role=Role.load_from_server(json_data), display_name=json_data.get('name', None), description=json_data.get('desc', None), ce=json_data.get('ce', None) ) class RoleAndOrigins(object): def __init__(self, role=None, # type: Role origins=[] # type: List[Origin] ): self._role = role self._origins = origins @property def role(self): # type: (...) -> Role return self._role @property def origins(self): # type: (...) -> list[Origin] return self._origins @classmethod def load_from_server(cls, json_data): # RBAC prior to v6.5 does not have origins origin_data = json_data.get('origins', None) return cls( role=Role.load_from_server(json_data), origins=list(map(lambda o: Origin(**o), origin_data)) if origin_data else [] ) class Origin(object): """Indicates why the user has a specific role. If the type is "user" it means the role is assigned directly to the user. If the type is "group" it means the role is inherited from the group identified by the "name" field.""" def __init__(self, type=None, # type: str name=None # type: str ): self._type = type self._name = name @property def type(self): # type: (...) -> str return self._type @property def name(self): # type: (...) -> str return self._name class User(object): def __init__(self, username=None, # type: str display_name=None, # type: str groups=None, # type: Set[str] roles=None, # type: Set[Role] password=None # type: str ): if not username: raise InvalidArgumentException('A user must have a username') self._username = username self._display_name = display_name self._groups = SetHelper.to_set(groups, str, 'Groups') self._roles = SetHelper.to_set(roles, Role, 'Roles') self._password = password @property def username(self): # type: (...) -> str return self._username @property def display_name(self): # type: (...) -> str return self._display_name @display_name.setter def display_name(self, value # type: str ): self._display_name = value @property def groups(self): # type: (...) -> Set[str] """names of the groups""" return self._groups @groups.setter def groups(self, value # type: Set[str] ): self._groups = SetHelper.to_set(value, str, 'Groups') @property def roles(self): # type: (...) -> Set[Role] """only roles assigned directly to the user (not inherited from groups)""" return self._roles @roles.setter def roles(self, value # type: Set[Role] ): self._roles = SetHelper.to_set(value, Role, 'Roles') def password(self, value): self._password = value password = property(None, password) @property def as_dict(self): output = { 'username': self.username, 'name': self.display_name, 'password': self._password } if self.roles: output['roles'] = list(self.roles) if self.groups: output['groups'] = list(self.groups) return output def __eq__(self, other): return (isinstance(other, type(self)) and (self._username, self._display_name, self._groups, self._roles) == (other._username, other._display_name, other._groups, other._roles)) def __hash__(self): return hash((self._username, self._display_name, self._groups, self._roles)) @classmethod def load_from_server(cls, json_data, roles=None): user_roles = roles if not user_roles: set(map(lambda r: Role.load_from_server(r), json_data.get('roles'))) # RBAC prior to v6.5 does not have groups group_data = json_data.get('groups', None) return cls( username=json_data.get('id'), display_name=json_data.get('name'), roles=user_roles, groups=set(group_data) if group_data else None ) class UserAndMetadata(object): """Models the "get user" / "get all users" response. Associates the mutable properties of a user with derived properties such as the effective roles inherited from groups.""" def __init__(self, domain=None, # type: AuthDomain user=None, # type: User effective_roles=[], # type: List[RoleAndOrigins] password_changed=None, # type: datetime external_groups=None, # type: Set[str] **kwargs # type: Any ): self._domain = domain self._user = user self._effective_roles = effective_roles self._password_changed = password_changed self._external_groups = external_groups self._raw_data = kwargs.get('raw_data', None) @property def domain(self): # type: (...) -> AuthDomain """ AuthDomain is an enumeration with values "local" and "external". It MAY alternatively be represented as String.""" return self._domain @property def user(self): # type: (...) -> User """- returns a new mutable User object each time this method is called. Modifying the fields of the returned User MUST have no effect on the UserAndMetadata object it came from.""" return deepcopy(self._user) @property def effective_roles(self): # type: (...) -> List[RoleAndOrigins] """all roles, regardless of origin.""" return self._effective_roles @property def password_changed(self): # type: (...) -> Optional[datetime] return self._password_changed @property def external_groups(self): # type: (...) -> Set[str] return self._external_groups @property def raw_data(self): # type: (...) -> Dict return self._raw_data @classmethod def load_from_server(cls, json_data): effective_roles = list(map(lambda r: RoleAndOrigins.load_from_server(r), json_data.get('roles'))) user_roles = set(r.role for r in effective_roles if any(map(lambda o: o.type == 'user', r.origins)) or len(r.origins) == 0) # RBAC prior to v6.5 does not have groups ext_group_data = json_data.get('external_groups', None) # password_change_date is optional pw_data = json_data.get('password_change_date', None) return cls( domain=AuthDomain.from_str(json_data.get('domain')), effective_roles=effective_roles, user=User.load_from_server(json_data, roles=user_roles), password_changed=datetime.strptime(pw_data, '%Y-%m-%dT%H:%M:%S.%fZ') if pw_data else None, external_groups=set(ext_group_data) if ext_group_data else None, raw_data=json_data ) class Group(object): def __init__(self, name=None, # type: str description=None, # type: str roles=None, # type: Set[Role] ldap_group_reference=None, # type: str **kwargs # type: Any ): if not name: raise InvalidArgumentException('A group must have a name') self._name = name self._description = description self._roles = SetHelper.to_set(roles, Role, 'Roles') self._ldap_group_reference = ldap_group_reference self._raw_data = kwargs.get('raw_data', None) @property def name(self): # type: (...) -> str return self._name @property def description(self): # type: (...) -> str return self._description @description.setter def description(self, value # type: str ): self._description = value @property def roles(self): # type: (...) -> Set[Role] return self._roles @roles.setter def roles(self, value # type: Set[Role] ): self._roles = SetHelper.to_set(value, Role, 'Roles') @property def ldap_group_reference(self): # type: (...) -> str return self._ldap_group_reference @ldap_group_reference.setter def ldap_group_reference(self, value # type: str ): self._ldap_group_reference = value @property def raw_data(self): # type: (...) -> Dict return self._raw_data @property def as_dict(self): return { 'name': self.name, 'description': self.description, 'roles': self.roles, 'ldap_group_reference': self.ldap_group_reference } def __eq__(self, other): return (isinstance(other, type(self)) and (self._name, self._description, self._roles, self._ldap_group_reference) == (other._name, other._description, other._roles, other._ldap_group_reference)) @classmethod def load_from_server(cls, json_data): return cls( json_data.get('id'), description=json_data.get('description'), roles=set(map(lambda r: Role.load_from_server( r), json_data.get('roles'))), ldap_group_referenc=json_data.get('ldap_group_ref') ) class UserOptions(OptionBlock): def __init__(self, domain_name="local", # type: str timeout=None, # type: timedelta **kwargs # type: Any ): """ An OptionBlock with an auth domain and timeout :param str domain_name: name of the user domain (local | external). Defaults to local. :param timedelta timeout: the time allowed for the operation to be terminated. This is controlled by the client. :param kwargs: parameters to pass in to the OptionBlock """ super(UserOptions, self).__init__(domain_name=domain_name, timeout=timeout, **kwargs) class GetUserOptions(UserOptions): pass class GetAllUsersOptions(UserOptions): pass class UpsertUserOptions(UserOptions): pass class DropUserOptions(UserOptions): pass class GetRolesOptions(OptionBlockTimeOut): pass class DropGroupOptions(OptionBlockTimeOut): pass class GetGroupOptions(OptionBlockTimeOut): pass class GetAllGroupsOptions(OptionBlockTimeOut): pass class UpsertGroupOptions(OptionBlockTimeOut): pass