Skip to content

store_session

MLTE artifact store interface session implementation.

ArtifactMapper

Bases: ResourceMapper

An interface for mapping CRUD actions to artifacts.

Source code in mlte/store/artifact/store_session.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
class ArtifactMapper(ResourceMapper):
    """An interface for mapping CRUD actions to artifacts."""

    def create(
        self, new_artifact: ArtifactModel, model_and_version: tuple[str, str]
    ) -> ArtifactModel:
        try:
            # If artifact to create exists, complain.
            _ = self.read(new_artifact.header.identifier, model_and_version)
            raise errors.ErrorAlreadyExists(
                f"Artifact '{new_artifact.header.identifier}' already exists."
            )
        except errors.ErrorNotFound:
            # We expect it not to be found when creating.
            return self._store_artifact(new_artifact, model_and_version)

    def read(
        self, artifact_id: str, model_and_version: tuple[str, str]
    ) -> ArtifactModel:
        raise NotImplementedError(ResourceMapper.NOT_IMPLEMENTED_ERROR_MSG)

    def list(self, model_and_version: tuple[str, str]) -> list[str]:
        raise NotImplementedError(ResourceMapper.NOT_IMPLEMENTED_ERROR_MSG)

    def edit(
        self, artifact: ArtifactModel, model_and_version: tuple[str, str]
    ) -> ArtifactModel:
        # Check to see if the artifact exists before editing it, which will throw an error if it doesn't.
        _ = self.read(artifact.header.identifier, model_and_version)
        return self._store_artifact(artifact, model_and_version)

    def delete(
        self, artifact_id: str, model_and_version: tuple[str, str]
    ) -> ArtifactModel:
        raise NotImplementedError(ResourceMapper.NOT_IMPLEMENTED_ERROR_MSG)

    def _add_header_data(
        self, artifact: ArtifactModel, user: Optional[str]
    ) -> ArtifactModel:
        """Adds time and creator data to model."""
        artifact.header.timestamp = int(time.time())
        artifact.header.creator = user
        return artifact

    def write_artifact_with_header(
        self,
        model_id: str,
        version_id: str,
        artifact: ArtifactModel,
        *,
        force: bool = False,
        user: Optional[str] = None,
    ) -> ArtifactModel:
        """
        Write an artifact, generating the timestamp and adding creator. Internally calls the actual write_artifact implementation.
        :param model_id: The identifier for the model
        :param version_id: The identifier for the model version
        :param artifact: The artifact
        :param force: Overwrite an artifact if it already exists
        for artifact should be implictly created (default: False)
        :param user: The username of the user executing this action.
        """
        artifact = self._add_header_data(artifact, user)
        return self.write_artifact(
            model_id,
            version_id,
            artifact,
            force=force,
        )

    def write_artifact(
        self,
        model_id: str,
        version_id: str,
        artifact: ArtifactModel,
        *,
        force: bool = False,
    ) -> ArtifactModel:
        """
        Write an artifact.
        :param model_id: The identifier for the model
        :param version_id: The identifier for the model version
        :param artifact: The artifact
        :param force: Overwrite an artifact if it already exists
        for artifact should be implictly created (default: False)
        """
        artifact_id = artifact.header.identifier
        self.validators.validate_all(artifact)
        try:
            _ = self.read(artifact_id, (model_id, version_id))
            if not force:
                # If it exists and we are not "forcing" (overriting/editing), complain it already exists.
                raise errors.ErrorAlreadyExists(
                    f"Artifact '{artifact_id}' (force param prevents overwriting it)"
                )
            return self.edit(artifact, (model_id, version_id))
        except errors.ErrorNotFound as ex:
            if "Artifact" in str(ex):
                # This means artifact did not exist; we want to create it.
                return self.create(artifact, (model_id, version_id))
            else:
                # If model or version were not found, we can't move fowrad.
                raise ex

    def _store_artifact(
        self, artifact: ArtifactModel, model_and_version: tuple[str, str]
    ) -> ArtifactModel:
        """Writes an artifact to the store."""
        raise NotImplementedError(ResourceMapper.NOT_IMPLEMENTED_ERROR_MSG)

write_artifact(model_id, version_id, artifact, *, force=False)

Write an artifact.

Parameters:

Name Type Description Default
model_id str

The identifier for the model

required
version_id str

The identifier for the model version

required
artifact ArtifactModel

The artifact

required
force bool

Overwrite an artifact if it already exists for artifact should be implictly created (default: False)

False
Source code in mlte/store/artifact/store_session.py
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
def write_artifact(
    self,
    model_id: str,
    version_id: str,
    artifact: ArtifactModel,
    *,
    force: bool = False,
) -> ArtifactModel:
    """
    Write an artifact.
    :param model_id: The identifier for the model
    :param version_id: The identifier for the model version
    :param artifact: The artifact
    :param force: Overwrite an artifact if it already exists
    for artifact should be implictly created (default: False)
    """
    artifact_id = artifact.header.identifier
    self.validators.validate_all(artifact)
    try:
        _ = self.read(artifact_id, (model_id, version_id))
        if not force:
            # If it exists and we are not "forcing" (overriting/editing), complain it already exists.
            raise errors.ErrorAlreadyExists(
                f"Artifact '{artifact_id}' (force param prevents overwriting it)"
            )
        return self.edit(artifact, (model_id, version_id))
    except errors.ErrorNotFound as ex:
        if "Artifact" in str(ex):
            # This means artifact did not exist; we want to create it.
            return self.create(artifact, (model_id, version_id))
        else:
            # If model or version were not found, we can't move fowrad.
            raise ex

write_artifact_with_header(model_id, version_id, artifact, *, force=False, user=None)

Write an artifact, generating the timestamp and adding creator. Internally calls the actual write_artifact implementation.

Parameters:

Name Type Description Default
model_id str

The identifier for the model

required
version_id str

The identifier for the model version

required
artifact ArtifactModel

The artifact

required
force bool

Overwrite an artifact if it already exists for artifact should be implictly created (default: False)

False
user Optional[str]

The username of the user executing this action.

None
Source code in mlte/store/artifact/store_session.py
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
def write_artifact_with_header(
    self,
    model_id: str,
    version_id: str,
    artifact: ArtifactModel,
    *,
    force: bool = False,
    user: Optional[str] = None,
) -> ArtifactModel:
    """
    Write an artifact, generating the timestamp and adding creator. Internally calls the actual write_artifact implementation.
    :param model_id: The identifier for the model
    :param version_id: The identifier for the model version
    :param artifact: The artifact
    :param force: Overwrite an artifact if it already exists
    for artifact should be implictly created (default: False)
    :param user: The username of the user executing this action.
    """
    artifact = self._add_header_data(artifact, user)
    return self.write_artifact(
        model_id,
        version_id,
        artifact,
        force=force,
    )

ArtifactStoreSession

Bases: StoreSession

The base class for all implementations of the MLTE artifact store session.

Source code in mlte/store/artifact/store_session.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
class ArtifactStoreSession(StoreSession):
    """The base class for all implementations of the MLTE artifact store session."""

    model_mapper: ModelMapper
    """Mapper for the model resource."""

    version_mapper: VersionMapper
    """Mapper for the version resource."""

    artifact_mapper: ArtifactMapper
    """Mapper for the artifact resource."""

    def create_parents(self, model_id: str, version_id: str) -> None:
        """
        Create organizational elements within this store. If they exist, this operation is a noop.
        :param model_id: The model identifier
        :param version_id: The version identifier
        """
        try:
            self.model_mapper.create(Model(identifier=model_id))
        except errors.ErrorAlreadyExists:
            pass

        try:
            self.version_mapper.create(Version(identifier=version_id), model_id)
        except errors.ErrorAlreadyExists:
            pass

artifact_mapper instance-attribute

Mapper for the artifact resource.

model_mapper instance-attribute

Mapper for the model resource.

version_mapper instance-attribute

Mapper for the version resource.

create_parents(model_id, version_id)

Create organizational elements within this store. If they exist, this operation is a noop.

Parameters:

Name Type Description Default
model_id str

The model identifier

required
version_id str

The version identifier

required
Source code in mlte/store/artifact/store_session.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
def create_parents(self, model_id: str, version_id: str) -> None:
    """
    Create organizational elements within this store. If they exist, this operation is a noop.
    :param model_id: The model identifier
    :param version_id: The version identifier
    """
    try:
        self.model_mapper.create(Model(identifier=model_id))
    except errors.ErrorAlreadyExists:
        pass

    try:
        self.version_mapper.create(Version(identifier=version_id), model_id)
    except errors.ErrorAlreadyExists:
        pass

ManagedArtifactSession

Bases: ManagedSession

A simple context manager for store sessions.

Source code in mlte/store/artifact/store_session.py
43
44
45
46
47
class ManagedArtifactSession(ManagedSession):
    """A simple context manager for store sessions."""

    def __enter__(self) -> ArtifactStoreSession:
        return cast(ArtifactStoreSession, self.session)

ModelMapper

Bases: ResourceMapper

An interface for mapping CRUD actions to models.

Source code in mlte/store/artifact/store_session.py
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
class ModelMapper(ResourceMapper):
    """An interface for mapping CRUD actions to models."""

    def create(self, new_model: Model, context: Any = None) -> Model:
        raise NotImplementedError(ResourceMapper.NOT_IMPLEMENTED_ERROR_MSG)

    def read(self, model_id: str, context: Any = None) -> Model:
        raise NotImplementedError(ResourceMapper.NOT_IMPLEMENTED_ERROR_MSG)

    def list(self, context: Any = None) -> list[str]:
        raise NotImplementedError(ResourceMapper.NOT_IMPLEMENTED_ERROR_MSG)

    def edit(self, model: Model, context: Any = None) -> Model:
        raise NotImplementedError(ResourceMapper.NOT_IMPLEMENTED_ERROR_MSG)

    def delete(self, model_id: str, context: Any = None) -> Model:
        raise NotImplementedError(ResourceMapper.NOT_IMPLEMENTED_ERROR_MSG)

VersionMapper

Bases: ResourceMapper

An interface for mapping CRUD actions to versions.

Source code in mlte/store/artifact/store_session.py
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
class VersionMapper(ResourceMapper):
    """An interface for mapping CRUD actions to versions."""

    def create(self, new_version: Version, model_id: str) -> Version:
        raise NotImplementedError(ResourceMapper.NOT_IMPLEMENTED_ERROR_MSG)

    def read(self, version_id: str, model_id: str) -> Version:
        raise NotImplementedError(ResourceMapper.NOT_IMPLEMENTED_ERROR_MSG)

    def list(self, model_id: str) -> list[str]:
        raise NotImplementedError(ResourceMapper.NOT_IMPLEMENTED_ERROR_MSG)

    def edit(self, version: Version, model_id: str) -> Version:
        raise NotImplementedError(ResourceMapper.NOT_IMPLEMENTED_ERROR_MSG)

    def delete(self, version_id: str, model_id: str) -> Version:
        raise NotImplementedError(ResourceMapper.NOT_IMPLEMENTED_ERROR_MSG)