Skip to content

catalog_group

mlte/store/catalog/catalog_group.py

MLTE catalog store group interface implementation.

CatalogStoreGroup

A group of catalog stores.

Source code in mlte/store/catalog/catalog_group.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
class CatalogStoreGroup:
    """
    A group of catalog stores.
    """

    def __init__(self):
        """Initialization."""
        self.catalogs: Dict[str, CatalogStore] = {}
        """Dictionary with all catalogs in this group."""

    def add_catalog_from_uri(self, id: str, uri: str, overwrite: bool = False):
        """
        Adds a catalog by indicating its uri.

        :param id: A string to identify the catalog.
        :param uri: The store URI that describes the store to be used for the catalog.
        :param overwrite: Add catalog to list even if id is already stored, pointing dictionary id to the new URI.
        """
        catalog = create_catalog_store(uri, id)
        self.add_catalog(id, catalog, overwrite)

    def add_catalog(
        self, id: str, catalog_store: CatalogStore, overwrite: bool = False
    ):
        """
        Adds a catalog.

        :param id: A string to identify the catalog.
        :param catalog_store: The catalog store to add.
        :param overwrite: Add catalog to list even if id is already stored, pointing dictionary id to the new URI.
        """
        if id in self.catalogs and not overwrite:
            raise ErrorAlreadyExists(
                f"Catalog with id {id} already exists in group."
            )
        self.catalogs[id] = catalog_store

    def remove_catalog(self, id: str):
        """Removes the given catalog."""
        if id not in self.catalogs:
            raise ErrorNotFound(f"Catalog with id {id} was not found in group.")

        del self.catalogs[id]

    def session(self) -> CatalogStoreGroupSession:
        """
        Return a session handle for the store instance.
        :return: The session handle
        """
        return CatalogStoreGroupSession(self.catalogs)

catalogs = {} instance-attribute

Dictionary with all catalogs in this group.

__init__()

Initialization.

Source code in mlte/store/catalog/catalog_group.py
25
26
27
28
def __init__(self):
    """Initialization."""
    self.catalogs: Dict[str, CatalogStore] = {}
    """Dictionary with all catalogs in this group."""

add_catalog(id, catalog_store, overwrite=False)

Adds a catalog.

Parameters:

Name Type Description Default
id str

A string to identify the catalog.

required
catalog_store CatalogStore

The catalog store to add.

required
overwrite bool

Add catalog to list even if id is already stored, pointing dictionary id to the new URI.

False
Source code in mlte/store/catalog/catalog_group.py
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
def add_catalog(
    self, id: str, catalog_store: CatalogStore, overwrite: bool = False
):
    """
    Adds a catalog.

    :param id: A string to identify the catalog.
    :param catalog_store: The catalog store to add.
    :param overwrite: Add catalog to list even if id is already stored, pointing dictionary id to the new URI.
    """
    if id in self.catalogs and not overwrite:
        raise ErrorAlreadyExists(
            f"Catalog with id {id} already exists in group."
        )
    self.catalogs[id] = catalog_store

add_catalog_from_uri(id, uri, overwrite=False)

Adds a catalog by indicating its uri.

Parameters:

Name Type Description Default
id str

A string to identify the catalog.

required
uri str

The store URI that describes the store to be used for the catalog.

required
overwrite bool

Add catalog to list even if id is already stored, pointing dictionary id to the new URI.

False
Source code in mlte/store/catalog/catalog_group.py
30
31
32
33
34
35
36
37
38
39
def add_catalog_from_uri(self, id: str, uri: str, overwrite: bool = False):
    """
    Adds a catalog by indicating its uri.

    :param id: A string to identify the catalog.
    :param uri: The store URI that describes the store to be used for the catalog.
    :param overwrite: Add catalog to list even if id is already stored, pointing dictionary id to the new URI.
    """
    catalog = create_catalog_store(uri, id)
    self.add_catalog(id, catalog, overwrite)

remove_catalog(id)

Removes the given catalog.

Source code in mlte/store/catalog/catalog_group.py
57
58
59
60
61
62
def remove_catalog(self, id: str):
    """Removes the given catalog."""
    if id not in self.catalogs:
        raise ErrorNotFound(f"Catalog with id {id} was not found in group.")

    del self.catalogs[id]

session()

Return a session handle for the store instance.

Returns:

Type Description
CatalogStoreGroupSession

The session handle

Source code in mlte/store/catalog/catalog_group.py
64
65
66
67
68
69
def session(self) -> CatalogStoreGroupSession:
    """
    Return a session handle for the store instance.
    :return: The session handle
    """
    return CatalogStoreGroupSession(self.catalogs)

CatalogStoreGroupSession

Bases: StoreSession

Sessions for all catalogs in a group.

Source code in mlte/store/catalog/catalog_group.py
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
class CatalogStoreGroupSession(StoreSession):
    """Sessions for all catalogs in a group."""

    def __init__(self, catalogs: Dict[str, CatalogStore]):
        """Initialize a session instance for each catalog."""
        self.sessions: Dict[str, CatalogStoreSession] = {}
        """Sessions for all catalogs in the group."""

        for id, catalog in catalogs.items():
            self.sessions[id] = catalog.session()

    def close(self) -> None:
        """Close all sessions."""
        for _, session in self.sessions.items():
            session.close()

    def get_session(self, catalog_id: str) -> CatalogStoreSession:
        """Gets a session for the given catalog store id."""
        if catalog_id not in self.sessions:
            raise ErrorNotFound(
                f"Catalog id {catalog_id} was not found in registered list of catalogs -"
            )
        return self.sessions[catalog_id]

    def list_details(
        self,
        catalog_id: Optional[str] = None,
        limit: int = 100,
        offset: int = 0,
    ) -> List[CatalogEntry]:
        """
        Read entries within limit and offset.
        :param catalog_id: The identifier of the catalog to read from; if not given, read from all catalogs.
        :param limit: The limit on entries to read
        :param offset: The offset on entries to read
        :return: The read entries
        """
        if catalog_id is not None:
            if catalog_id not in self.sessions:
                raise ErrorNotFound(
                    f"Catalog id {catalog_id} was not found in list of catalogs."
                )

            catalog_session = self.sessions[catalog_id]
            return catalog_session.entry_mapper.list_details(
                limit=limit, offset=offset
            )
        else:
            # Go over all catalogs, reading from each one, and grouping results.
            results: List[CatalogEntry] = []
            for catalog_id, session in self.sessions.items():
                partial_results = session.entry_mapper.list_details(
                    limit=limit, offset=offset
                )
                results.extend(partial_results)
            return results[offset : offset + limit]

    def search(
        self,
        catalog_id: Optional[str] = None,
        query: Query = Query(),
    ) -> List[CatalogEntry]:
        """
        Read a collection of entries, optionally filtered.
        :param catalog_id: The identifier of the catalog to read from; if not given, search on all catalogs.
        :param query: The entry query to apply
        :return: A collection of entries that satisfy the filter
        """
        if catalog_id is not None:
            if catalog_id not in self.sessions:
                raise ErrorNotFound(
                    f"Catalog id {catalog_id} was not found in list of catalogs."
                )

            catalog_session = self.sessions[catalog_id]
            entries = typing.cast(
                List[CatalogEntry], catalog_session.entry_mapper.search(query)
            )

            # Ensure they are marked as coming from this catalog.
            for entry in entries:
                entry.header.catalog_id = catalog_id
            return entries
        else:
            # Go over all catalogs, reading from each one, and grouping results.
            results: List[CatalogEntry] = []
            for catalog_id, session in self.sessions.items():
                # Get results for this catalog.
                partial_results = typing.cast(
                    List[CatalogEntry], session.entry_mapper.search(query)
                )

                # Ensure they are marked as coming from this catalog.
                for entry in partial_results:
                    entry.header.catalog_id = catalog_id

                # Add them to the overall lilst.
                results.extend(partial_results)
            return results

sessions = {} instance-attribute

Sessions for all catalogs in the group.

__init__(catalogs)

Initialize a session instance for each catalog.

Source code in mlte/store/catalog/catalog_group.py
75
76
77
78
79
80
81
def __init__(self, catalogs: Dict[str, CatalogStore]):
    """Initialize a session instance for each catalog."""
    self.sessions: Dict[str, CatalogStoreSession] = {}
    """Sessions for all catalogs in the group."""

    for id, catalog in catalogs.items():
        self.sessions[id] = catalog.session()

close()

Close all sessions.

Source code in mlte/store/catalog/catalog_group.py
83
84
85
86
def close(self) -> None:
    """Close all sessions."""
    for _, session in self.sessions.items():
        session.close()

get_session(catalog_id)

Gets a session for the given catalog store id.

Source code in mlte/store/catalog/catalog_group.py
88
89
90
91
92
93
94
def get_session(self, catalog_id: str) -> CatalogStoreSession:
    """Gets a session for the given catalog store id."""
    if catalog_id not in self.sessions:
        raise ErrorNotFound(
            f"Catalog id {catalog_id} was not found in registered list of catalogs -"
        )
    return self.sessions[catalog_id]

list_details(catalog_id=None, limit=100, offset=0)

Read entries within limit and offset.

Parameters:

Name Type Description Default
catalog_id Optional[str]

The identifier of the catalog to read from; if not given, read from all catalogs.

None
limit int

The limit on entries to read

100
offset int

The offset on entries to read

0

Returns:

Type Description
List[CatalogEntry]

The read entries

Source code in mlte/store/catalog/catalog_group.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
def list_details(
    self,
    catalog_id: Optional[str] = None,
    limit: int = 100,
    offset: int = 0,
) -> List[CatalogEntry]:
    """
    Read entries within limit and offset.
    :param catalog_id: The identifier of the catalog to read from; if not given, read from all catalogs.
    :param limit: The limit on entries to read
    :param offset: The offset on entries to read
    :return: The read entries
    """
    if catalog_id is not None:
        if catalog_id not in self.sessions:
            raise ErrorNotFound(
                f"Catalog id {catalog_id} was not found in list of catalogs."
            )

        catalog_session = self.sessions[catalog_id]
        return catalog_session.entry_mapper.list_details(
            limit=limit, offset=offset
        )
    else:
        # Go over all catalogs, reading from each one, and grouping results.
        results: List[CatalogEntry] = []
        for catalog_id, session in self.sessions.items():
            partial_results = session.entry_mapper.list_details(
                limit=limit, offset=offset
            )
            results.extend(partial_results)
        return results[offset : offset + limit]

search(catalog_id=None, query=Query())

Read a collection of entries, optionally filtered.

Parameters:

Name Type Description Default
catalog_id Optional[str]

The identifier of the catalog to read from; if not given, search on all catalogs.

None
query Query

The entry query to apply

Query()

Returns:

Type Description
List[CatalogEntry]

A collection of entries that satisfy the filter

Source code in mlte/store/catalog/catalog_group.py
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
def search(
    self,
    catalog_id: Optional[str] = None,
    query: Query = Query(),
) -> List[CatalogEntry]:
    """
    Read a collection of entries, optionally filtered.
    :param catalog_id: The identifier of the catalog to read from; if not given, search on all catalogs.
    :param query: The entry query to apply
    :return: A collection of entries that satisfy the filter
    """
    if catalog_id is not None:
        if catalog_id not in self.sessions:
            raise ErrorNotFound(
                f"Catalog id {catalog_id} was not found in list of catalogs."
            )

        catalog_session = self.sessions[catalog_id]
        entries = typing.cast(
            List[CatalogEntry], catalog_session.entry_mapper.search(query)
        )

        # Ensure they are marked as coming from this catalog.
        for entry in entries:
            entry.header.catalog_id = catalog_id
        return entries
    else:
        # Go over all catalogs, reading from each one, and grouping results.
        results: List[CatalogEntry] = []
        for catalog_id, session in self.sessions.items():
            # Get results for this catalog.
            partial_results = typing.cast(
                List[CatalogEntry], session.entry_mapper.search(query)
            )

            # Ensure they are marked as coming from this catalog.
            for entry in partial_results:
                entry.header.catalog_id = catalog_id

            # Add them to the overall lilst.
            results.extend(partial_results)
        return results

ManagedCatalogGroupSession

Bases: ManagedSession

A simple context manager for store sessions.

Source code in mlte/store/catalog/catalog_group.py
173
174
175
176
177
class ManagedCatalogGroupSession(ManagedSession):
    """A simple context manager for store sessions."""

    def __enter__(self) -> CatalogStoreGroupSession:
        return cast(CatalogStoreGroupSession, self.session)