Handles access to stored policies. It is not a store in the same sense as the rest, does not derive from Store.
Source code in mlte/store/user/policy/policy_store_service.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77 | class PolicyStoreService:
"""Handles access to stored policies. It is not a store in the same sense as the rest, does not derive from Store."""
def __init__(
self, group_mapper: GroupMapper, permission_mapper: PermissionMapper
):
"""Sets up the group and permissions mapper to be used."""
self.group_mapper = group_mapper
self.permission_mapper = permission_mapper
def is_stored(self, policy: Policy) -> bool:
"""
Checks if this policy is stored in the given mappers.
:param policy: The policy to be checked.
"""
try:
# Try to read all groups and permissions.
for group in policy.groups:
_ = self.group_mapper.read(group.name)
for permission in group.permissions:
_ = self.permission_mapper.read(permission.to_str())
except errors.ErrorNotFound:
# At least one permission or group is missing.
return False
# If we found everything, policy is complete.
return True
def save_to_store(self, policy: Policy) -> None:
"""
Store this policy's groups and permissions in the given store.
Ignore errors if it already existed.
:param policy: The policy to be stored.
"""
# Create groups and permissions in store.
for group in policy.groups:
for permission in group.permissions:
try:
self.permission_mapper.create(permission)
except errors.ErrorAlreadyExists:
# If it already existed, we just leave it there.
pass
try:
self.group_mapper.create(group)
except errors.ErrorAlreadyExists:
# If it already existed, we just leave it there.
pass
def remove_from_store(self, policy: Policy) -> None:
"""Delete groups and permissions for a resource."""
# TODO: This is not atomic. Error deleting one part may leave the rest dangling.
permissions: dict[str, bool] = {}
for group in policy.groups:
for permission in group.permissions:
# Store permissions in dict for later removal, to avoid trying to re-remove already deleted ones.
permissions[permission.to_str()] = True
self.group_mapper.delete(group.name)
# Now remove all permissions.
# TODO: note that this may leave other groups using these permissions dangling. Not trivial to check if
# a permission is no longer used. Even worse, we may want to leave some of them, even with no groups.
for permission_str in permissions:
self.permission_mapper.delete(permission_str)
|
__init__(group_mapper, permission_mapper)
Sets up the group and permissions mapper to be used.
Source code in mlte/store/user/policy/policy_store_service.py
| def __init__(
self, group_mapper: GroupMapper, permission_mapper: PermissionMapper
):
"""Sets up the group and permissions mapper to be used."""
self.group_mapper = group_mapper
self.permission_mapper = permission_mapper
|
is_stored(policy)
Checks if this policy is stored in the given mappers.
Parameters:
| Name |
Type |
Description |
Default |
policy
|
Policy
|
The policy to be checked.
|
required
|
Source code in mlte/store/user/policy/policy_store_service.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38 | def is_stored(self, policy: Policy) -> bool:
"""
Checks if this policy is stored in the given mappers.
:param policy: The policy to be checked.
"""
try:
# Try to read all groups and permissions.
for group in policy.groups:
_ = self.group_mapper.read(group.name)
for permission in group.permissions:
_ = self.permission_mapper.read(permission.to_str())
except errors.ErrorNotFound:
# At least one permission or group is missing.
return False
# If we found everything, policy is complete.
return True
|
remove_from_store(policy)
Delete groups and permissions for a resource.
Source code in mlte/store/user/policy/policy_store_service.py
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77 | def remove_from_store(self, policy: Policy) -> None:
"""Delete groups and permissions for a resource."""
# TODO: This is not atomic. Error deleting one part may leave the rest dangling.
permissions: dict[str, bool] = {}
for group in policy.groups:
for permission in group.permissions:
# Store permissions in dict for later removal, to avoid trying to re-remove already deleted ones.
permissions[permission.to_str()] = True
self.group_mapper.delete(group.name)
# Now remove all permissions.
# TODO: note that this may leave other groups using these permissions dangling. Not trivial to check if
# a permission is no longer used. Even worse, we may want to leave some of them, even with no groups.
for permission_str in permissions:
self.permission_mapper.delete(permission_str)
|
save_to_store(policy)
Store this policy's groups and permissions in the given store.
Ignore errors if it already existed.
Parameters:
| Name |
Type |
Description |
Default |
policy
|
Policy
|
|
required
|
Source code in mlte/store/user/policy/policy_store_service.py
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60 | def save_to_store(self, policy: Policy) -> None:
"""
Store this policy's groups and permissions in the given store.
Ignore errors if it already existed.
:param policy: The policy to be stored.
"""
# Create groups and permissions in store.
for group in policy.groups:
for permission in group.permissions:
try:
self.permission_mapper.create(permission)
except errors.ErrorAlreadyExists:
# If it already existed, we just leave it there.
pass
try:
self.group_mapper.create(group)
except errors.ErrorAlreadyExists:
# If it already existed, we just leave it there.
pass
|