Skip to content

authorization

mlte/backend/api/auth/authorization.py

Setup of OAuth based authorization checks.

AuthorizedUser = Annotated[BasicUser, Depends(get_authorized_user)] module-attribute

Type alias to simplify use of get user.

oauth2_scheme = OAuth2PasswordBearer(tokenUrl=f'{settings.API_PREFIX}{TOKEN_ENDPOINT_URL}') module-attribute

Securty scheme to be used.

get_authorized_user(token, resource) async

Given a token, gets the authenticated user and checks if it has access to resources.

Parameters:

Name Type Description Default
token Annotated[str, Depends(oauth2_scheme)]

A JWT bearer access token with user information.

required
resource Annotated[Permission, Depends(get_current_resource)]

A ResourceAction object indicating the resource and actions we are checking for.

required

Returns:

Type Description
BasicUser

A User data structure, with a User that has access to the resources.

Source code in mlte/backend/api/auth/authorization.py
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
async def get_authorized_user(
    token: Annotated[str, Depends(oauth2_scheme)],
    resource: Annotated[Permission, Depends(get_current_resource)],
) -> BasicUser:
    """
    Given a token, gets the authenticated user and checks if it has access to resources.

    :param token: A JWT bearer access token with user information.
    :param resource: A ResourceAction object indicating the resource and actions we are checking for.
    :return: A User data structure, with a User that has access to the resources.
    """
    # Validate token and get username.
    # print(f"Get authorized user token {token}, resource {resource}")
    try:
        username = get_username_from_token(token, state.token_key)
    except Exception as ex:
        raise HTTPAuthException(
            error="invalid_token",
            error_decription=f"Could not decode token: {ex}",
        )

    # Check if user in token exists.
    user = None
    with state_stores.user_store_session() as user_store:
        user = user_store.user_mapper.read(username)
    if user is None:
        raise HTTPAuthException(
            error="invalid_token",
            error_decription="Username in token was not found.",
        )

    # Check if user is enabled to be used.
    if user.disabled:
        raise HTTPException(
            status_code=codes.FORBIDDEN, detail="User is inactive"
        )

    # Check proper authorizations.
    if not is_authorized(user, resource):
        raise HTTPException(
            status_code=codes.FORBIDDEN,
            detail="User is not authorized to access this resource.",
        )

    # Convert to simple user version to avoid including hashed password.
    basic_user = BasicUser(**user.to_json())
    return basic_user

get_current_resource(request) async

Gets a resource permission description for the current resource, method and model.

Source code in mlte/backend/api/auth/authorization.py
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
async def get_current_resource(request: Request) -> Permission:
    """Gets a resource permission description for the current resource, method and model."""
    # Parse URL for resource type.
    url = request.url.path
    resource_url = url.replace(settings.API_PREFIX, "")
    resource_type = ResourceType.get_type_from_url(resource_url)
    if resource_type is None:
        raise RuntimeError(
            f"Could not parse resource type from URL: {resource_url}"
        )

    # Parse URL and body for resource id, if any.
    resource_id = None
    if resource_type is not None:
        url_parts = resource_url.split("/")
        if len(url_parts) > 2:
            resource_id = url_parts[2]
        else:
            try:
                # If the URL didn't have an id, try to get it from a JSON body.
                data = await request.json()
                id_key = UserStore.ID_MAP[resource_type]
                if id_key in data:
                    resource_id = data[id_key]
            except JSONDecodeError:
                # This request had no JSON body, thus no id.
                pass

    # Get method.
    method = MethodType[request.method]

    # NOTE: if this is a search, treat it as if it was a GET method.
    # This is used so that read permissions will be applied to searches, while allowing
    # it to send complex queries through its body.
    if url.endswith("/search"):
        method = MethodType.GET

    # Build and return the resource permission description
    resource = Permission(
        resource_type=resource_type, resource_id=resource_id, method=method
    )
    # print(f"Resource: {resource}")
    return resource

get_username_from_token(token, key)

Obtains a user from an encoded token, if the token is valid.

Source code in mlte/backend/api/auth/authorization.py
81
82
83
84
85
def get_username_from_token(token: str, key: str) -> str:
    """Obtains a user from an encoded token, if the token is valid."""
    # Decode token, checking for format and expiration.
    decoded_token = jwt.decode_user_token(token, key)
    return decoded_token.username

is_authorized(current_user, resource)

Checks if the current user is authorized to access the current resource.

Source code in mlte/backend/api/auth/authorization.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
def is_authorized(current_user: BasicUser, resource: Permission) -> bool:
    """Checks if the current user is authorized to access the current resource."""
    # print(
    #    f"Checking authorization for user {current_user.username} to resource {resource}"
    # )

    if current_user.role == RoleType.ADMIN:
        # If having admin role, always get access.
        # print("User is admin")
        return True
    else:
        # Handle special resource cases.
        if (
            resource.resource_type == ResourceType.USER
            and resource.resource_id == USER_ME_ID
        ):
            resource.resource_id = current_user.username

        # Check to find if the current user has permissions through any of its groups.
        # print(current_user.groups)
        logging.info(f"Checking authorization for resource: {resource}")
        for group in current_user.groups:
            # print(group)
            for permission in group.permissions:
                if permission.grants_access(resource):
                    # print("Permission for this model found")
                    return True

    # If none of the above are true, deny access.
    # print("Permission not granted.")
    return False