Source code for couchbase.management.users

from abc import abstractmethod

from couchbase.management.admin import Admin
from couchbase.options import timedelta, forward_args
from couchbase.management.generic import GenericManager
from couchbase_core import mk_formstr, JSONMapping, Mapped
from couchbase.auth import AuthDomain
from couchbase_core._pyport import ulp, with_metaclass, Protocol
from couchbase_core import ABCMeta
from typing import *
from couchbase.options import OptionBlockTimeOut
from couchbase.exceptions import HTTPException, ErrorMapper, NotSupportedWrapper


class GroupNotFoundException(HTTPException):
    """ The RBAC Group was not found"""


class UserNotFoundException(HTTPException):
    """ The RBAC User was not found"""


class UserErrorHandler(ErrorMapper):
    @staticmethod
    def mapping():
        # type (...)->Mapping[str, CBErrorType]
        return {HTTPException: {'Unknown group': GroupNotFoundException,
                            'Unknown user': UserNotFoundException}}


[docs]@UserErrorHandler.wrap class UserManager(GenericManager): def __init__(self, # type: UserManager admin): """User Manager Programmatic access to the user management REST API: https://docs.couchbase.com/server/current/rest-api/rbac.html Unless otherwise indicated, all objects SHOULD be immutable. :param parent_cluster: """ super(UserManager, self).__init__(admin) def get_user(self, # type: UserManager username, # type: str domain_name=AuthDomain.Local, # type: str timeout=None # timedelta ): pass #@mgmt_exc_wrap
[docs] def get_user(self, # type: UserManager username, # type: str domain_name=AuthDomain.Local, # type: AuthDomain *options, # type: GetUserOptions **kwargs ): # type: (...) -> UserAndMetadata """ Gets a user. :param str username: ID of the user. :param AuthDomain domain_name: name of the user domain. Defaults to local. :param timedelta timeout: the time allowed for the operation to be terminated. This is controlled by the client. :returns: An instance of UserAndMetadata. :raises: UserNotFoundException :raises: InvalidArgumentsException Any exceptions raised by the underlying platform """ # Implementation Notes # When parsing the "get" and "getAll" responses, # take care to distinguish between roles assigned directly to the user (role origin with type="user") and # roles inherited from groups (role origin with type="group" and name=<group name>). # If the server response does not include an "origins" field for a role, # then it was generated by a server version prior to 6.5 and the SDK MUST treat the role as if it had a # single origin of type="user". return RawUserAndMetadata(self._admin_bucket.user_get(domain_name, username, **forward_args(kwargs, *options)))
@overload def get_all_users(self, # type: UserManager domain_name, # type: str timeout=None # type: timedelta ): pass
[docs] def get_all_users(self, # type: UserManager domain_name, # type: str *options, # type: GetAllUsersOptions **kwargs ): # type: (...) -> Iterable[UserAndMetadata] """ :param domain_name: name of the user domain. Defaults to local. :param options: :param timedelta timeout: the time allowed for the operation to be terminated. This is controlled by the client. :return: An iterable collection of UserAndMetadata. """ return list(map(RawUserAndMetadata, self._admin_bucket.users_get(domain=domain_name, **forward_args(kwargs, *options)).value))
@overload def upsert_user(self, # type: UserManager user, # type: User domain=AuthDomain.Local, # type: AuthDomain timeout=None # type: timedelta ): pass
[docs] def upsert_user(self, # type: UserManager user, # type: User domain=AuthDomain.Local, # type: AuthDomain *options, # type: UpsertUserOptions **kwargs ): """ Creates or updates a user. :param User user: the new version of the user. :param AuthDomain domain: name of the user domain (local | external). Defaults to local. :param timedelta timeout: the time allowed for the operation to be terminated. This is controlled by the client. :raises: InvalidArgumentsException """ # Implementation Notes # # When building the PUT request to send to the REST endpoint, implementations MUST omit the "password" property # if it is not present in the given User domain object (so that the password is only changed if the calling code # provided a new password). # # For backwards compatibility with Couchbase Server 6.0 and earlier, # the "groups" parameter MUST be omitted if the group list is empty. Couchbase Server 6.5 treats the absent parameter the same as an explicit parameter with no value (removes any existing group associations, which is what we want in this case). final_opts = {k: v for k, v in user.as_dict.items() if k in {'password', 'roles', 'name', 'timeout'}} self._admin_bucket.user_upsert(domain, user.username, **final_opts)
@overload def drop_user(self, # type: UserManager user_name, # type: str domain=AuthDomain.Local, # type: AuthDomain timeout=None # type: timedelta ): pass
[docs] def drop_user(self, # type: UserManager user_name, # type: str domain=AuthDomain.Local, # type: AuthDomain *options, # type: DropUserOptions **kwargs ): """ Removes a user. :param str user_name: ID of the user. :param AuthDomain domain: name of the user domain. Defaults to local. :param timedelta timeout: the time allowed for the operation to be terminated. This is controlled by the client. :return: :raises: UserNotFoundException :raises: InvalidArgumentsException """ final_args = forward_args(kwargs, *options) self._admin_bucket.user_remove(domain, user_name, **final_args)
@overload def get_roles(self, # type: UserManager timeout=None, # type: timedelta *options): pass
[docs] def get_roles(self, # type: UserManager *options, # type: GetRolesOptions **kwargs ): # type: (...) -> Iterable[RoleAndDescription] """ Returns the roles supported by the server. :param options: misc options :param timedelta timeout: the time allowed for the operation to be terminated. This is controlled by the client. :return: An iterable collection of RoleAndDescription. """ return list(map(lambda x: RoleAndDescription.of(**x), self._admin_bucket.http_request("/settings/rbac/roles/", **forward_args(kwargs, *options)).value))
@overload def get_group(self, # type: UserManager group_name, # type: str timeout=None # type: timedelta ): pass
[docs] def get_group(self, # type: UserManager group_name, # type: str *options, # type: GetGroupOptions **kwargs ): # type: (...) -> Group """ Get info about the named group. :param str group_name: name of the group to get. :param timedelta timeout: the time allowed for the operation to be terminated. This is controlled by the client. :return: An instance of Group. :raises: GroupNotFoundException :raises: InvalidArgumentsException """ return Group.from_json(self._admin_bucket.http_request("/settings/rbac/groups/{}".format(group_name), **forward_args(kwargs, *options)).value)
@overload def get_all_groups(self, # type: UserManager timeout=None, # type: timedelta *options # type: GetAllGroupsOptions ): pass @NotSupportedWrapper.a_404_means_not_supported def get_all_groups(self, # type: UserManager *options, # type: GetAllGroupsOptions **kwargs ): # type: (...) -> Iterable[Group] """ Get all groups. :param timedelta timeout: the time allowed for the operation to be terminated. This is controlled by the client. :returns: An iterable collection of Group. """ groups = self._admin_bucket.http_request("/settings/rbac/groups/", **forward_args(kwargs, *options)) return list( map(Group.from_json, groups.value)) @overload def upsert_group(self, # type: UserManager group, # type: Group timeout=None # type: timedelta ): pass
[docs] def upsert_group(self, # type: UserManager group, # type: Group *options, # type: UpsertGroupOptions **kwargs ): """ Add or replace a group. :warning: Does not appear to work correctly yet - tracked here: https://issues.couchbase.com/browse/PYCBC-667 :param Group group: the new version of the group. :param timedelta timeout: the time allowed for the operation to be terminated. This is controlled by the client. :raises: InvalidArgumentsException """ # This endpoint accepts application/x-www-form-urlencoded and requires the data be sent as form data. # The name/id should not be included in the form data. # Roles should be a comma separated list of strings. # If, only if, the role contains a bucket name then the rolename should be suffixed # with[<bucket_name>] e.g. bucket_full_access[default],security_admin. group_dict = group.as_dict() form_data = mk_formstr(group_dict) self._admin_bucket.http_request(path="/settings/rbac/groups/{}".format(group.name), method='PUT', content=form_data, content_type='application/x-www-form-urlencoded', **forward_args(kwargs, *options))
@overload def drop_group(self, # type: UserManager group_name, # type: str timeout=None # type: timedelta ): pass
[docs] def drop_group(self, # type: UserManager group_name, # type: str *options, # type: DropGroupOptions **kwargs ): """ Removes a group. :param str group_name: name of the group. :param timedelta timeout: the time allowed for the operation to be terminated. This is controlled by the client. :raises: GroupNotFoundException :raises: InvalidArgumentsException """ self._admin_bucket.http_request("/settings/rbac/groups/{}".format(group_name), method='DELETE', **forward_args(kwargs, *options))
RawRole = NamedTuple('RawRole', [('name', str), ('bucket', str)]) class Role(with_metaclass(ABCMeta, Mapped)): """A role identifies a specific permission. CAVEAT, # type: The properties of a role are likely to change with the introduction of collection-level permissions. Until then here's what the accessor methods look like: """ factory = RawRole @staticmethod def defaults(): return {'bucket': None} @staticmethod def mappings(): return {'role': 'name'} @property @abstractmethod def name(self): pass @property @abstractmethod def bucket(self): pass def __iter__(self): return iter([self.name, self.bucket]) def __str__(self): return Admin._role_to_str(self) @classmethod def decode(cls, param): if isinstance(param, str): param = Admin._str_to_role(param) if isinstance(param, dict): args = list(map(param.get, ('role', 'bucket'))) else: args = param return cls.factory(*args) RawRoleAndDescription = NamedTuple('RawRoleAndDescription', [('role', Role), ('display_name', str), ('description', str), ('ce', Any), ('bucket_name', str), ('scope_name', str), ('collection_name', str)]) class RoleAndDescription(with_metaclass(ABCMeta, Mapped)): """ Associates a role with its name and description. This is additional information only present in the "list available roles" response.""" factory = RawRoleAndDescription @staticmethod def defaults(): return {'ce': True, 'bucket_name': None, 'scope_name': None, 'collection_name': None} @staticmethod def mappings(): return {'desc':'description', 'name': 'display_name'} @property @abstractmethod def role(self): # type: (...) -> Role return None @property @abstractmethod def ce(self): # type: (...) -> Role return None @property @abstractmethod def display_name(self): # type: (...) -> str return None @property @abstractmethod def description(self): # type: (...) -> str pass @classmethod def of(cls, *args, **kwargs): return Mapped._of(cls, *args, **kwargs) class Origin(object): def __init__(self): """Indicates why the user has a specific role. If the type is "user" it means the role is assigned directly to the user. If the type is "group" it means the role is inherited from the group identified by the "name" field.""" @property def type(self): # type: (...) -> str pass @property def name(self): # type: (...) -> str return None class RoleAndOrigins(object): def __init__(self): """Associates a role with its origins. This is how roles are returned by the "get user" and "get all users" responses.""" @property def role(self): # type: (...) -> Role pass @property def origins(self): # type: (...) -> List[Origin] pass class User(object): @overload def __init__(self, username=None, display_name=None, password=None, groups=None, roles=None): pass def __init__(self, **raw_data): self._raw_data = raw_data @property def username(self): # type: (...) -> str return self._raw_data.get('username') @property def display_name(self): # type: (...) -> str return self._raw_data.get('username') @property def groups(self): # type: (...) -> Set[str] """names of the groups""" return set(self._raw_data.get('groups', [])) @property def roles(self): # type: (...) -> Set[Role] """only roles assigned directly to the user (not inherited from groups)""" return self._raw_data.get('roles') @property def password(self): # type: (...) -> None """ From the user's perspective the password property is "write-only". The accessor SHOULD be hidden from the user and be visible only to the manager implementation.""" return self._raw_data.get('password') @password.setter def password(self, value): self._raw_data['password'] = value @property def as_dict(self): return self._raw_data class UserAndMetadata(object): """Models the "get user" / "get all users" response. Associates the mutable properties of a user with derived properties such as the effective roles inherited from groups.""" @property def domain(self): # type: (...) -> AuthDomain """ AuthDomain is an enumeration with values "local" and "external". It MAY alternatively be represented as String.""" @property def user(self): # type: (...) -> User """- returns a new mutable User object each time this method is called. Modifying the fields of the returned User MUST have no effect on the UserAndMetadata object it came from.""" @property def effective_roles(self): # type: (...) -> Set[Role] """all roles, regardless of origin.""" @property def effective_roles_and_origins(self): # type: (...) -> List[RoleAndOrigins] """same as effectiveRoles, but with origin information included.""" pass @property def password_changed(self): # type: (...) -> Optional[float] pass @property def external_groups(self): # type: (...) -> Set[str] pass class RawUserAndMetadata(UserAndMetadata): def __init__(self, raw_data): self._raw_data = raw_data @property def domain(self): # type: (...) -> AuthDomain """ AuthDomain is an enumeration with values "local" and "external". It MAY alternatively be represented as String.""" return self._raw_data.get('domain') @property def user(self): # type: (...) -> User """- returns a new mutable User object each time this method is called. Modifying the fields of the returned User MUST have no effect on the UserAndMetadata object it came from.""" return User(**self._raw_data.get('user')) @property def effective_roles(self): # type: (...) -> Set[Role] """all roles, regardless of origin.""" return set(map(Role, self._raw_data.get('effective_roles'))) @property def effective_roles_and_origins(self): # type: (...) -> List[RoleAndOrigins] """same as effectiveRoles, but with origin information included.""" return list(map(RoleAndOrigins, self._raw_data.get('effective_roles_and_origins'))) @property def password_changed(self): # type: (...) -> Optional[float] return self._raw_data.get('password_changed') @property def external_groups(self): # type: (...) -> Set[str] return set(self._raw_data.get('external_groups')) class Group(JSONMapping): @staticmethod def defaults(): return {'description': '', 'ldap_group_ref': ''} @overload def __init__(self, name, description=None, roles=None, ldap_group_reference=None): pass def __init__(self, name, **kwargs): self._name = name super(Group,self).__init__(kwargs) @property def name(self): return self._name ldap_group_reference=JSONMapping._genprop('ldap_group_ref') @property def roles(self): return set(map(Role.decode, self._raw_json['roles'])) @roles.setter def roles(self, value): self._raw_json['roles']=list(map(Admin._role_to_str, value)) @staticmethod def from_json(kwargs): name = kwargs.pop('id') return Group(name, **kwargs) def as_dict(self): result = {k: v for k, v in self._raw_json.items() if v} if 'roles' in result: result['roles'] = ','.join(map(ulp.quote, result['roles'])) else: result['roles'] = [] return result def __eq__(self, other): return self.name == other.name and self._raw_json == other._raw_json def __repr__(self): return '{}:{}'.format(self.name, repr(self._raw_json)) class GetUserOptions(OptionBlockTimeOut): pass class UpsertUserOptions(OptionBlockTimeOut): pass class GetRolesOptions(OptionBlockTimeOut): pass class GetGroupOptions(OptionBlockTimeOut): pass class GetAllGroupsOptions(OptionBlockTimeOut): pass class DropGroupOptions(OptionBlockTimeOut): pass class DropUserOptions(OptionBlockTimeOut): pass class GetAllUsersOptions(OptionBlockTimeOut): pass class UpsertGroupOptions(OptionBlockTimeOut): pass