Skip to content

memory

mlte/store/artifact/underlying/memory.py

Implementation of in-memory artifact store.

InMemoryStore

Bases: ArtifactStore

An in-memory implementation of the MLTE artifact store.

Source code in mlte/store/artifact/underlying/memory.py
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
class InMemoryStore(ArtifactStore):
    """An in-memory implementation of the MLTE artifact store."""

    def __init__(self, uri: StoreURI) -> None:
        super().__init__(uri=uri)

        self.storage = MemoryStorage()
        """The underlying storage for the store."""

    def session(self) -> InMemoryStoreSession:
        """
        Return a session handle for the store instance.
        :return: The session handle
        """
        return InMemoryStoreSession(storage=self.storage)

storage = MemoryStorage() instance-attribute

The underlying storage for the store.

session()

Return a session handle for the store instance.

Returns:

Type Description
InMemoryStoreSession

The session handle

Source code in mlte/store/artifact/underlying/memory.py
278
279
280
281
282
283
def session(self) -> InMemoryStoreSession:
    """
    Return a session handle for the store instance.
    :return: The session handle
    """
    return InMemoryStoreSession(storage=self.storage)

InMemoryStoreSession

Bases: ArtifactStoreSession

An in-memory implementation of the MLTE artifact store.

Source code in mlte/store/artifact/underlying/memory.py
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
class InMemoryStoreSession(ArtifactStoreSession):
    """An in-memory implementation of the MLTE artifact store."""

    def __init__(self, *, storage: MemoryStorage) -> None:
        self.storage = storage
        """A reference to underlying storage."""

    def close(self) -> None:
        """Close the session."""
        # NOTE(Kyle): Closing an in-memory session is a no-op.
        pass

    # -------------------------------------------------------------------------
    # Structural Elements
    # -------------------------------------------------------------------------

    def create_model(self, model: Model) -> Model:
        if model.identifier in self.storage.models:
            raise errors.ErrorAlreadyExists(f"Model {model.identifier}")
        self.storage.models[model.identifier] = ModelWithVersions(
            identifier=model.identifier
        )
        return Model(identifier=model.identifier, versions=[])

    def read_model(self, model_id: str) -> Model:
        if model_id not in self.storage.models:
            raise errors.ErrorNotFound(f"Model {model_id}")
        return self._read_model(model_id)

    def list_models(self) -> List[str]:
        return [model_id for model_id in self.storage.models.keys()]

    def delete_model(self, model_id: str) -> Model:
        if model_id not in self.storage.models:
            raise errors.ErrorNotFound(f"Model {model_id}")

        popped = self._read_model(model_id)
        del self.storage.models[model_id]
        return popped

    def create_version(self, model_id: str, version: Version) -> Version:
        if model_id not in self.storage.models:
            raise errors.ErrorNotFound(f"Model {model_id}")

        model = self.storage.models[model_id]
        if version.identifier in model.versions:
            raise errors.ErrorAlreadyExists(f"Version {version.identifier}")

        model.versions[version.identifier] = VersionWithArtifacts(
            identifier=version.identifier
        )
        return Version(identifier=version.identifier)

    def read_version(self, model_id: str, version_id: str) -> Version:
        if model_id not in self.storage.models:
            raise errors.ErrorNotFound(f"Model {model_id}")

        model = self.storage.models[model_id]
        if version_id not in model.versions:
            raise errors.ErrorNotFound(f"Version {version_id}")

        return self._read_version(model_id, version_id)

    def list_versions(self, model_id: str) -> List[str]:
        if model_id not in self.storage.models:
            raise errors.ErrorNotFound(f"Model {model_id}")

        model = self.storage.models[model_id]
        return [version_id for version_id in model.versions.keys()]

    def delete_version(self, model_id: str, version_id: str) -> Version:
        if model_id not in self.storage.models:
            raise errors.ErrorNotFound(f"Model {model_id}")

        model = self.storage.models[model_id]
        if version_id not in model.versions:
            raise errors.ErrorNotFound(f"Version {version_id}")

        popped = self._read_version(model_id, version_id)
        del model.versions[version_id]
        return popped

    def _read_model(self, model_id: str) -> Model:
        """
        Lazily construct a Model object on read.
        :param model_id: The model identifier
        :return: The model object
        """
        assert model_id in self.storage.models, "Broken precondition."
        return Model(
            identifier=model_id,
            versions=[
                self._read_version(model_id, id)
                for id in self.storage.models[model_id].versions.keys()
            ],
        )

    def _read_version(self, model_id: str, version_id: str) -> Version:
        """
        Lazily construct a Version object on read.
        :param model_id: The model identifier
        :param version_id: The version identifier
        :return: The version object
        """
        assert model_id in self.storage.models, "Broken precondition."
        assert (
            version_id in self.storage.models[model_id].versions
        ), "Broken precondition."
        return Version(
            identifier=self.storage.models[model_id]
            .versions[version_id]
            .identifier
        )

    # -------------------------------------------------------------------------
    # Artifact
    # -------------------------------------------------------------------------

    def write_artifact(
        self,
        model_id: str,
        version_id: str,
        artifact: ArtifactModel,
        *,
        force: bool = False,
        parents: bool = False,
    ) -> ArtifactModel:
        if parents:
            storeutil.create_parents(self, model_id, version_id)

        version = self._get_version_with_artifacts(model_id, version_id)

        if artifact.header.identifier in version.artifacts and not force:
            raise errors.ErrorAlreadyExists(
                f"Artifact '{artifact.header.identifier}'"
            )
        version.artifacts[artifact.header.identifier] = artifact
        return artifact

    def read_artifact(
        self,
        model_id: str,
        version_id: str,
        artifact_id: str,
    ) -> ArtifactModel:
        version = self._get_version_with_artifacts(model_id, version_id)

        if artifact_id not in version.artifacts:
            raise errors.ErrorNotFound(f"Artifact '{artifact_id}'")
        return version.artifacts[artifact_id]

    def read_artifacts(
        self,
        model_id: str,
        version_id: str,
        limit: int = 100,
        offset: int = 0,
    ) -> List[ArtifactModel]:
        version = self._get_version_with_artifacts(model_id, version_id)
        return [artifact for artifact in version.artifacts.values()][
            offset : offset + limit
        ]

    def search_artifacts(
        self,
        model_id: str,
        version_id: str,
        query: Query = Query(),
    ) -> List[ArtifactModel]:
        version = self._get_version_with_artifacts(model_id, version_id)
        return [
            artifact
            for artifact in version.artifacts.values()
            if query.filter.match(artifact)
        ]

    def delete_artifact(
        self,
        model_id: str,
        version_id: str,
        artifact_id: str,
    ) -> ArtifactModel:
        version = self._get_version_with_artifacts(model_id, version_id)

        if artifact_id not in version.artifacts:
            raise errors.ErrorNotFound(f"Artifact '{artifact_id}'")
        artifact = version.artifacts[artifact_id]
        del version.artifacts[artifact_id]
        return artifact

    def _get_version_with_artifacts(
        self, model_id: str, version_id: str
    ) -> VersionWithArtifacts:
        """
        Get a version with artifacts from storage.
        :param model_id: The identifier for the model
        :param version_id: The identifier for the version
        :raises ErrorNotFound: If the required structural elements are not present
        :return: The version with associated artifacts
        """
        if model_id not in self.storage.models:
            raise errors.ErrorNotFound(f"Model {model_id}")

        model = self.storage.models[model_id]
        if version_id not in model.versions:
            raise errors.ErrorNotFound(f"Version {version_id}")

        return model.versions[version_id]

storage = storage instance-attribute

A reference to underlying storage.

close()

Close the session.

Source code in mlte/store/artifact/underlying/memory.py
66
67
68
69
def close(self) -> None:
    """Close the session."""
    # NOTE(Kyle): Closing an in-memory session is a no-op.
    pass

MemoryStorage

A simple storage wrapper for the in-memory store.

Source code in mlte/store/artifact/underlying/memory.py
47
48
49
50
51
class MemoryStorage:
    """A simple storage wrapper for the in-memory store."""

    def __init__(self) -> None:
        self.models: Dict[str, ModelWithVersions] = {}

ModelWithVersions

A structure that combines a model with the versions it contains.

Source code in mlte/store/artifact/underlying/memory.py
36
37
38
39
40
41
42
43
44
class ModelWithVersions:
    """A structure that combines a model with the versions it contains."""

    def __init__(self, *, identifier: str) -> None:
        self.identifier = identifier
        """The model identifier."""

        self.versions: Dict[str, VersionWithArtifacts] = {}
        """The collection of versions in the model."""

identifier = identifier instance-attribute

The model identifier.

versions = {} instance-attribute

The collection of versions in the model.

VersionWithArtifacts

A structure that combines a version with the artifacts it contains.

Source code in mlte/store/artifact/underlying/memory.py
25
26
27
28
29
30
31
32
33
class VersionWithArtifacts:
    """A structure that combines a version with the artifacts it contains."""

    def __init__(self, *, identifier: str) -> None:
        self.identifier = identifier
        """The version identifier."""

        self.artifacts: OrderedDict[str, ArtifactModel] = OrderedDict()
        """The artifacts associated with the version."""

artifacts = OrderedDict() instance-attribute

The artifacts associated with the version.

identifier = identifier instance-attribute

The version identifier.