Skip to content

policy_store_service

Class to store group and permission policies.

PolicyStoreService

Handles access to stored policies. It is not a store in the same sense as the rest, does not derive from Store.

Source code in mlte/store/user/policy/policy_store_service.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
class PolicyStoreService:
    """Handles access to stored policies. It is not a store in the same sense as the rest, does not derive from Store."""

    def __init__(
        self, group_mapper: GroupMapper, permission_mapper: PermissionMapper
    ):
        """Sets up the group and permissions mapper to be used."""
        self.group_mapper = group_mapper
        self.permission_mapper = permission_mapper

    def is_stored(self, policy: Policy) -> bool:
        """
        Checks if this policy is stored in the given mappers.

        :param policy: The policy to be checked.
        """
        try:
            # Try to read all groups and permissions.
            for group in policy.groups:
                _ = self.group_mapper.read(group.name)

                for permission in group.permissions:
                    _ = self.permission_mapper.read(permission.to_str())
        except errors.ErrorNotFound:
            # At least one permission or group is missing.
            return False

        # If we found everything, policy is complete.
        return True

    def save_to_store(self, policy: Policy) -> None:
        """
        Store this policy's groups and permissions in the given store.
        Ignore errors if it already existed.

        :param policy: The policy to be stored.
        """
        # Create groups and permissions in store.
        for group in policy.groups:
            for permission in group.permissions:
                try:
                    self.permission_mapper.create(permission)
                except errors.ErrorAlreadyExists:
                    # If it already existed, we just leave it there.
                    pass

            try:
                self.group_mapper.create(group)
            except errors.ErrorAlreadyExists:
                # If it already existed, we just leave it there.
                pass

    def remove_from_store(self, policy: Policy) -> None:
        """Delete groups and permissions for a resource."""
        # TODO: This is not atomic. Error deleting one part may leave the rest dangling.
        permissions: dict[str, bool] = {}
        for group in policy.groups:
            for permission in group.permissions:
                # Store permissions in dict for later removal, to avoid trying to re-remove already deleted ones.
                permissions[permission.to_str()] = True

            self.group_mapper.delete(group.name)

        # Now remove all permissions.
        # TODO: note that this may leave other groups using these permissions dangling. Not trivial to check if
        # a permission is no longer used. Even worse, we may want to leave some of them, even with no groups.
        for permission_str in permissions:
            self.permission_mapper.delete(permission_str)

__init__(group_mapper, permission_mapper)

Sets up the group and permissions mapper to be used.

Source code in mlte/store/user/policy/policy_store_service.py
13
14
15
16
17
18
def __init__(
    self, group_mapper: GroupMapper, permission_mapper: PermissionMapper
):
    """Sets up the group and permissions mapper to be used."""
    self.group_mapper = group_mapper
    self.permission_mapper = permission_mapper

is_stored(policy)

Checks if this policy is stored in the given mappers.

Parameters:

Name Type Description Default
policy Policy

The policy to be checked.

required
Source code in mlte/store/user/policy/policy_store_service.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
def is_stored(self, policy: Policy) -> bool:
    """
    Checks if this policy is stored in the given mappers.

    :param policy: The policy to be checked.
    """
    try:
        # Try to read all groups and permissions.
        for group in policy.groups:
            _ = self.group_mapper.read(group.name)

            for permission in group.permissions:
                _ = self.permission_mapper.read(permission.to_str())
    except errors.ErrorNotFound:
        # At least one permission or group is missing.
        return False

    # If we found everything, policy is complete.
    return True

remove_from_store(policy)

Delete groups and permissions for a resource.

Source code in mlte/store/user/policy/policy_store_service.py
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
def remove_from_store(self, policy: Policy) -> None:
    """Delete groups and permissions for a resource."""
    # TODO: This is not atomic. Error deleting one part may leave the rest dangling.
    permissions: dict[str, bool] = {}
    for group in policy.groups:
        for permission in group.permissions:
            # Store permissions in dict for later removal, to avoid trying to re-remove already deleted ones.
            permissions[permission.to_str()] = True

        self.group_mapper.delete(group.name)

    # Now remove all permissions.
    # TODO: note that this may leave other groups using these permissions dangling. Not trivial to check if
    # a permission is no longer used. Even worse, we may want to leave some of them, even with no groups.
    for permission_str in permissions:
        self.permission_mapper.delete(permission_str)

save_to_store(policy)

Store this policy's groups and permissions in the given store. Ignore errors if it already existed.

Parameters:

Name Type Description Default
policy Policy

The policy to be stored.

required
Source code in mlte/store/user/policy/policy_store_service.py
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
def save_to_store(self, policy: Policy) -> None:
    """
    Store this policy's groups and permissions in the given store.
    Ignore errors if it already existed.

    :param policy: The policy to be stored.
    """
    # Create groups and permissions in store.
    for group in policy.groups:
        for permission in group.permissions:
            try:
                self.permission_mapper.create(permission)
            except errors.ErrorAlreadyExists:
                # If it already existed, we just leave it there.
                pass

        try:
            self.group_mapper.create(group)
        except errors.ErrorAlreadyExists:
            # If it already existed, we just leave it there.
            pass