Skip to content

reader

DB utils for getting catalog related data from the DB.

DBReader

Class encapsulating functions to read catalog related data from the DB.

Source code in mlte/store/catalog/underlying/rdbs/reader.py
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
class DBReader:
    """Class encapsulating functions to read catalog related data from the DB."""

    @staticmethod
    def get_entry(
        entry_id: str, session: Session
    ) -> Tuple[CatalogEntry, DBCatalogEntry]:
        """Reads the entry with the given id using the provided session, and returns a CatalogEntry and DBCatalogEntry object."""
        entry_orm = session.scalar(
            select(DBCatalogEntry)
            .where(DBCatalogEntry.entry_header_id == DBCatalogEntryHeader.id)
            .where(DBCatalogEntryHeader.identifier == entry_id)
        )

        if entry_orm is None:
            raise errors.ErrorNotFound(
                f"Entry with identifier {entry_id} was not found in the catalog store."
            )
        else:
            return (DBReader._build_entry(entry_orm), entry_orm)

    @staticmethod
    def _build_entry(
        entry_orm: DBCatalogEntry,
    ) -> CatalogEntry:
        """Builds a CatalogEntry object out of its DB model."""
        entry_header = CatalogEntryHeader(
            identifier=entry_orm.entry_header.identifier,
            creator=entry_orm.entry_header.creator,
            created=entry_orm.entry_header.created,
            updated=entry_orm.entry_header.updated,
            catalog_id=entry_orm.entry_header.catalog_identifier,
        )

        return CatalogEntry(
            tags=json.loads(entry_orm.tags),
            quality_attribute=entry_orm.quality_attribute,
            code=entry_orm.code,
            description=entry_orm.description,
            inputs=entry_orm.inputs,
            output=entry_orm.outputs,
            header=entry_header,
        )

    @staticmethod
    def _build_entry_orm(
        entry: CatalogEntry,
        session: Session,
        entry_orm: Optional[DBCatalogEntry] = None,
    ) -> DBCatalogEntry:
        """Creates or updates a DB catalog entry object from a model."""
        if entry_orm is None:
            entry_orm = DBCatalogEntry()
            entry_header_orm = DBCatalogEntryHeader()
        else:
            entry_header_orm = entry_orm.entry_header

        entry_header_orm.identifier = entry.header.identifier
        entry_header_orm.created = typing.cast(int, entry.header.created)
        entry_header_orm.updated = typing.cast(int, entry.header.updated)
        entry_header_orm.creator = entry.header.creator
        entry_header_orm.catalog_identifier = entry.header.catalog_id

        entry_orm.tags = json.dumps(entry.tags)
        entry_orm.quality_attribute = entry.quality_attribute
        entry_orm.code = entry.code
        entry_orm.description = entry.description
        entry_orm.inputs = entry.inputs
        entry_orm.outputs = entry.output
        entry_orm.entry_header = entry_header_orm

        return entry_orm

    @staticmethod
    def get_entries(
        session: Session,
    ) -> Tuple[List[CatalogEntry], List[DBCatalogEntry]]:
        """Reads all catalog entries in the DB, and returns a list of CatalogEntry and DBCatalogEntry objects."""
        entries_orm = list(
            session.execute(select(DBCatalogEntry)).scalars().all()
        )
        entries: List[CatalogEntry] = []
        for entry_orm in entries_orm:
            entry = DBReader._build_entry(entry_orm)
            entries.append(entry)

        return entries, entries_orm

get_entries(session) staticmethod

Reads all catalog entries in the DB, and returns a list of CatalogEntry and DBCatalogEntry objects.

Source code in mlte/store/catalog/underlying/rdbs/reader.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
@staticmethod
def get_entries(
    session: Session,
) -> Tuple[List[CatalogEntry], List[DBCatalogEntry]]:
    """Reads all catalog entries in the DB, and returns a list of CatalogEntry and DBCatalogEntry objects."""
    entries_orm = list(
        session.execute(select(DBCatalogEntry)).scalars().all()
    )
    entries: List[CatalogEntry] = []
    for entry_orm in entries_orm:
        entry = DBReader._build_entry(entry_orm)
        entries.append(entry)

    return entries, entries_orm

get_entry(entry_id, session) staticmethod

Reads the entry with the given id using the provided session, and returns a CatalogEntry and DBCatalogEntry object.

Source code in mlte/store/catalog/underlying/rdbs/reader.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
@staticmethod
def get_entry(
    entry_id: str, session: Session
) -> Tuple[CatalogEntry, DBCatalogEntry]:
    """Reads the entry with the given id using the provided session, and returns a CatalogEntry and DBCatalogEntry object."""
    entry_orm = session.scalar(
        select(DBCatalogEntry)
        .where(DBCatalogEntry.entry_header_id == DBCatalogEntryHeader.id)
        .where(DBCatalogEntryHeader.identifier == entry_id)
    )

    if entry_orm is None:
        raise errors.ErrorNotFound(
            f"Entry with identifier {entry_id} was not found in the catalog store."
        )
    else:
        return (DBReader._build_entry(entry_orm), entry_orm)