Skip to content

store

Implementation of relational database system artifact store.

RDBSArtifactMapper

Bases: ArtifactMapper

In-memory mapper for the version resource.

Source code in mlte/store/artifact/underlying/rdbs/store.py
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
class RDBSArtifactMapper(ArtifactMapper):
    """In-memory mapper for the version resource."""

    def __init__(
        self, storage: RDBStorage, validators: CompositeValidator
    ) -> None:
        super().__init__(validators=validators)

        self.storage = storage
        """A reference to underlying storage."""

    def read(
        self, artifact_id: str, model_and_version: tuple[str, str]
    ) -> ArtifactModel:
        model_id, version_id = model_and_version
        with Session(self.storage.engine) as session:
            artifact, _ = DBReader.get_artifact(
                model_id, version_id, artifact_id, session
            )
            return artifact

    def delete(
        self, artifact_id: str, model_and_version: tuple[str, str]
    ) -> ArtifactModel:
        model_id, version_id = model_and_version
        with Session(self.storage.engine) as session:
            artifact, artifact_orm = DBReader.get_artifact(
                model_id, version_id, artifact_id, session
            )
            session.delete(artifact_orm)
            session.commit()
            return artifact

    def list(self, model_and_version: tuple[str, str]) -> list[str]:
        model_id, version_id = model_and_version
        with Session(self.storage.engine) as session:
            artifacts = DBReader.get_artifacts(model_id, version_id, session)
            return [artifact.header.identifier for artifact in artifacts]

    # -------------------------------------------------------------------------
    # Internal helpers.
    # -------------------------------------------------------------------------

    def _read_artifact(
        self, artifact_id: str, model_and_version: tuple[str, str]
    ) -> tuple[ArtifactModel, DBArtifact]:
        """
        Looks for an artifact in the DB store, returns it as a model and as an ORM object,
        or throws an ErrorNotFound if not present.
        """
        model_id, version_id = model_and_version
        with Session(self.storage.engine) as session:
            artifact, artifact_orm = DBReader.get_artifact(
                model_id, version_id, artifact_id, session
            )
            return artifact, artifact_orm

    def _store_artifact(
        self,
        artifact: ArtifactModel,
        model_and_version: tuple[str, str],
    ):
        """Writes an artifact to the store."""
        model_id, version_id = model_and_version
        original_orm: Optional[DBArtifact] = None
        try:
            _, original_orm = self._read_artifact(
                artifact.header.identifier, model_and_version
            )
        except errors.ErrorNotFound:
            # It is ok if we don't find it, we will just create it instead of editing.
            pass

        with Session(self.storage.engine) as session:
            # To simulate edition, we delete the current object and create a new one.
            # TODO: Change this so we are actually editing the object in the DB.
            if original_orm:
                session.delete(original_orm)
            updated_orm = main_factory.create_artifact_orm(
                artifact,
                model_id,
                version_id,
                artifact.header.level,
                session,
            )
            session.add(updated_orm)

            # Create or edit changes in DB.
            session.commit()

        return self.read(artifact.header.identifier, model_and_version)

storage = storage instance-attribute

A reference to underlying storage.

RDBSModelMapper

Bases: ModelMapper

In-memory mapper for the model resource.

Source code in mlte/store/artifact/underlying/rdbs/store.py
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
class RDBSModelMapper(ModelMapper):
    """In-memory mapper for the model resource."""

    def __init__(self, storage: RDBStorage) -> None:
        self.storage = storage
        """A reference to underlying storage."""

    def create(self, model: Model, context: Any = None) -> Model:
        with Session(self.storage.engine) as session:
            try:
                _, _ = DBReader.get_model(model.identifier, session)
                raise errors.ErrorAlreadyExists(
                    f"Model with identifier {model.identifier} already exists."
                )
            except errors.ErrorNotFound:
                # If it was not found, it means we can create it.
                model_orm = DBModel(
                    name=model.identifier,
                    versions=[],
                )
                session.add(model_orm)
                session.commit()
                return Model(identifier=model.identifier, versions=[])

    def read(self, model_id: str, context: Any = None) -> Model:
        with Session(self.storage.engine) as session:
            model, _ = DBReader.get_model(model_id, session)
            return model

    def list(self, context: Any = None) -> list[str]:
        models: list[str] = []
        with Session(self.storage.engine) as session:
            model_orms = session.scalars(select(DBModel))
            for model_orm in model_orms:
                models.append(model_orm.name)
        return models

    def delete(self, model_id: str, context: Any = None) -> Model:
        with Session(self.storage.engine) as session:
            model, model_orm = DBReader.get_model(model_id, session)
            session.delete(model_orm)
            session.commit()
            return model

storage = storage instance-attribute

A reference to underlying storage.

RDBSVersionMapper

Bases: VersionMapper

In-memory mapper for the version resource.

Source code in mlte/store/artifact/underlying/rdbs/store.py
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
class RDBSVersionMapper(VersionMapper):
    """In-memory mapper for the version resource."""

    def __init__(self, storage: RDBStorage) -> None:
        self.storage = storage
        """A reference to underlying storage."""

    def create(self, version: Version, model_id: str) -> Version:
        with Session(self.storage.engine) as session:
            try:
                _, _ = DBReader.get_version(
                    model_id, version.identifier, session
                )
                raise errors.ErrorAlreadyExists(
                    f"Version with identifier {version.identifier} for model {model_id} already exists."
                )
            except errors.ErrorNotFound:
                # Check if model exists.
                _, model_orm = DBReader.get_model(model_id, session)

                # Now create version.
                version_orm = DBVersion(
                    name=version.identifier, model_id=model_orm.id
                )
                session.add(version_orm)
                session.commit()
                return Version(identifier=version.identifier)

    def read(self, version_id: str, model_id: str) -> Version:
        with Session(self.storage.engine) as session:
            version, _ = DBReader.get_version(model_id, version_id, session)
            return version

    def list(self, model_id: str) -> list[str]:
        versions: list[str] = []
        with Session(self.storage.engine) as session:
            version_orms = session.scalars(
                (
                    select(DBVersion)
                    .where(DBVersion.model_id == DBModel.id)
                    .where(DBModel.name == model_id)
                )
            )
            for version_orm in version_orms:
                versions.append(version_orm.name)
        return versions

    def delete(self, version_id: str, model_id: str) -> Version:
        with Session(self.storage.engine) as session:
            version, version_orm = DBReader.get_version(
                model_id, version_id, session
            )
            session.delete(version_orm)
            session.commit()
            return version

storage = storage instance-attribute

A reference to underlying storage.

RelationalDBArtifactStore

Bases: ArtifactStore

A DB implementation of the MLTE artifact store.

Source code in mlte/store/artifact/underlying/rdbs/store.py
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
class RelationalDBArtifactStore(ArtifactStore):
    """A DB implementation of the MLTE artifact store."""

    def __init__(self, uri, **kwargs):
        super().__init__(uri=uri)

        self.storage = RDBStorage(
            uri,
            base_class=typing.cast(DeclarativeBase, DBBase),
            init_tables_func=init_artifact_store_tables,
            **kwargs,
        )
        """The relational DB storage."""

    def session(self) -> RelationalDBArtifactStoreSession:
        """
        Return a session handle for the store instance.
        :return: The session handle
        """
        return RelationalDBArtifactStoreSession(
            storage=self.storage, validators=self.validators
        )

storage = RDBStorage(uri, base_class=(typing.cast(DeclarativeBase, DBBase)), init_tables_func=init_artifact_store_tables, **kwargs) instance-attribute

The relational DB storage.

session()

Return a session handle for the store instance.

Returns:

Type Description
RelationalDBArtifactStoreSession

The session handle

Source code in mlte/store/artifact/underlying/rdbs/store.py
52
53
54
55
56
57
58
59
def session(self) -> RelationalDBArtifactStoreSession:
    """
    Return a session handle for the store instance.
    :return: The session handle
    """
    return RelationalDBArtifactStoreSession(
        storage=self.storage, validators=self.validators
    )

RelationalDBArtifactStoreSession

Bases: ArtifactStoreSession

A relational DB implementation of the MLTE artifact store session.

Source code in mlte/store/artifact/underlying/rdbs/store.py
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
class RelationalDBArtifactStoreSession(ArtifactStoreSession):
    """A relational DB implementation of the MLTE artifact store session."""

    def __init__(
        self, storage: RDBStorage, validators: CompositeValidator
    ) -> None:
        self.storage = storage
        """A reference to underlying storage."""

        self.version_mapper = RDBSVersionMapper(storage=storage)
        """The mapper to version CRUD."""

        self.model_mapper = RDBSModelMapper(storage=storage)
        """The mapper to model CRUD."""

        self.artifact_mapper = RDBSArtifactMapper(
            storage=storage, validators=validators
        )
        """The mapper to artifact CRUD."""

    def close(self) -> None:
        """Close the session."""
        self.storage.close()

artifact_mapper = RDBSArtifactMapper(storage=storage, validators=validators) instance-attribute

The mapper to artifact CRUD.

model_mapper = RDBSModelMapper(storage=storage) instance-attribute

The mapper to model CRUD.

storage = storage instance-attribute

A reference to underlying storage.

version_mapper = RDBSVersionMapper(storage=storage) instance-attribute

The mapper to version CRUD.

close()

Close the session.

Source code in mlte/store/artifact/underlying/rdbs/store.py
93
94
95
def close(self) -> None:
    """Close the session."""
    self.storage.close()

init_artifact_store_tables(engine)

Pre-populate tables.

Source code in mlte/store/artifact/underlying/rdbs/store.py
62
63
64
65
def init_artifact_store_tables(engine: Engine):
    """Pre-populate tables."""
    with Session(engine) as session:
        init_artifact_types(session)