Skip to content

http_clients

mlte/store/underlying/http_clients.py

HTTP clients that can be used by underlying stores to access remote HTTP stores.

HttpResponse = Union[requests.Response, httpx.Response] module-attribute

Standard HTTP response, both have same implicit interface.

HttpClient

Interface for an HTTP client.

Source code in mlte/store/common/http_clients.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
class HttpClient:
    """Interface for an HTTP client."""

    def __init__(self, type: HttpClientType) -> None:
        self.type = type
        self.headers: dict[str, str] = {}

    def get(self, url: str, **kwargs) -> HttpResponse:
        raise NotImplementedError("get()")

    def post(
        self, url: str, data: Any = None, json: Any = None, **kwargs
    ) -> HttpResponse:
        raise NotImplementedError("post()")

    def put(
        self, url: str, data: Any = None, json: Any = None, **kwargs
    ) -> HttpResponse:
        raise NotImplementedError("put()")

    def delete(self, url: str, **kwargs) -> HttpResponse:
        raise NotImplementedError("delete()")

    @staticmethod
    def raise_for_response(response: HttpResponse) -> None:
        """
        Raise an error from from a response.
        :param response: The response object
        """
        if response.status_code == codes.OK:
            return
        if response.status_code == codes.NOT_FOUND:
            raise errors.ErrorNotFound(f"{response.json()}")
        if response.status_code == codes.ALREADY_EXISTS:
            raise errors.ErrorAlreadyExists(f"{response.json()}")
        if response.status_code == codes.UNAUTHORIZED:
            raise errors.UnauthenticatedError(f"{response.json()}")
        if response.status_code == codes.FORBIDDEN:
            raise errors.ForbiddenError(f"{response.json()}")
        else:
            raise errors.InternalError(f"{response.json()}")

raise_for_response(response) staticmethod

Raise an error from from a response.

Parameters:

Name Type Description Default
response HttpResponse

The response object

required
Source code in mlte/store/common/http_clients.py
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
@staticmethod
def raise_for_response(response: HttpResponse) -> None:
    """
    Raise an error from from a response.
    :param response: The response object
    """
    if response.status_code == codes.OK:
        return
    if response.status_code == codes.NOT_FOUND:
        raise errors.ErrorNotFound(f"{response.json()}")
    if response.status_code == codes.ALREADY_EXISTS:
        raise errors.ErrorAlreadyExists(f"{response.json()}")
    if response.status_code == codes.UNAUTHORIZED:
        raise errors.UnauthenticatedError(f"{response.json()}")
    if response.status_code == codes.FORBIDDEN:
        raise errors.ForbiddenError(f"{response.json()}")
    else:
        raise errors.InternalError(f"{response.json()}")

HttpClientType

Bases: Enum

An enumeration over HTTP client types.

Source code in mlte/store/common/http_clients.py
20
21
22
23
24
25
26
27
class HttpClientType(Enum):
    """An enumeration over HTTP client types."""

    REQUESTS = "requests"
    """The requests-based HTTP client."""

    TESTCLIENT = "testclient"
    """The fastapi TestClient HTTP client."""

REQUESTS = 'requests' class-attribute instance-attribute

The requests-based HTTP client.

TESTCLIENT = 'testclient' class-attribute instance-attribute

The fastapi TestClient HTTP client.

OAuthHttpClient

Bases: HttpClient

A base HTTP client type for an server needing OAuth authentication.

Source code in mlte/store/common/http_clients.py
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
class OAuthHttpClient(HttpClient):
    """A base HTTP client type for an server needing OAuth authentication."""

    TOKEN_REQ_PASS_PAYLOAD = {"grant_type": "password"}
    TOKEN_REQ_HEADERS = {
        "Content-Type": "application/x-www-form-urlencoded",
        "Accept": "application/json",
    }
    TOKEN_ENDPOINT = "/token"

    def __init__(
        self,
        type: HttpClientType,
        username: Optional[str] = None,
        password: Optional[str] = None,
    ) -> None:
        super().__init__(type)

        self.access_token: Optional[str] = None
        """The access token."""

        self.username = username
        """The username to use when authenticating."""

        self.password = password
        """The password to use when authenticating."""

    def _format_oauth_password_payload(
        self, username: str, password: str
    ) -> dict[str, str]:
        """Returns a properly structured payload with credentials to be sent to get a token."""
        payload = dict(self.TOKEN_REQ_PASS_PAYLOAD)
        payload.update({"username": username, "password": password})
        return payload

    def _store_token(self, access_token: str):
        """Stores the token and sets proper headers."""
        if access_token is not None:
            self.access_token = access_token
            self.headers = {"Authorization": f"Bearer {self.access_token}"}

    def authenticate(
        self,
        api_url: str,
        username: Optional[str] = None,
        password: Optional[str] = None,
    ):
        """Sends an authentication request and retrieves and stores the token."""
        # Validate we have a user and password.
        if username is None:
            username = self.username
            if username is None:
                raise Exception(
                    "Can't authenticate without user, no internal or argument username received."
                )
        if password is None:
            password = self.password
            if password is None:
                raise Exception(
                    "Can't authenticate without password, no internal or argument password received."
                )

        # Send authentication request to get token.
        self.headers = self.TOKEN_REQ_HEADERS
        url = f"{api_url}{self.TOKEN_ENDPOINT}"
        response = self.post(
            url,
            data=self._format_oauth_password_payload(username, password),
        )
        self.headers = {}
        if response.status_code != codes.OK:
            reply = response.content.decode("utf-8")
            raise Exception(
                f"Token request was unsuccessful - code: {response.status_code}, reply: {reply}"
            )

        # Process reply and store token.
        response_data = response.json()
        if response_data is None:
            raise Exception(
                "Did not receive any valid response for token request."
            )
        if "access_token" not in response_data:
            raise Exception("Access token was not contained in response.")
        self._store_token(response_data["access_token"])

    def process_credentials(self, uri: str) -> str:
        """Obtains user and password from uri for client auth, and returns cleaned up uri."""
        # Parse URI for user and password.
        uri, username, password = url_utils.remove_url_username_password(uri)
        if username is not None and password is not None:
            # If URI had user and password, get them for client auth.
            self.username = username
            self.password = password

        return uri

access_token = None instance-attribute

The access token.

password = password instance-attribute

The password to use when authenticating.

username = username instance-attribute

The username to use when authenticating.

authenticate(api_url, username=None, password=None)

Sends an authentication request and retrieves and stores the token.

Source code in mlte/store/common/http_clients.py
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
def authenticate(
    self,
    api_url: str,
    username: Optional[str] = None,
    password: Optional[str] = None,
):
    """Sends an authentication request and retrieves and stores the token."""
    # Validate we have a user and password.
    if username is None:
        username = self.username
        if username is None:
            raise Exception(
                "Can't authenticate without user, no internal or argument username received."
            )
    if password is None:
        password = self.password
        if password is None:
            raise Exception(
                "Can't authenticate without password, no internal or argument password received."
            )

    # Send authentication request to get token.
    self.headers = self.TOKEN_REQ_HEADERS
    url = f"{api_url}{self.TOKEN_ENDPOINT}"
    response = self.post(
        url,
        data=self._format_oauth_password_payload(username, password),
    )
    self.headers = {}
    if response.status_code != codes.OK:
        reply = response.content.decode("utf-8")
        raise Exception(
            f"Token request was unsuccessful - code: {response.status_code}, reply: {reply}"
        )

    # Process reply and store token.
    response_data = response.json()
    if response_data is None:
        raise Exception(
            "Did not receive any valid response for token request."
        )
    if "access_token" not in response_data:
        raise Exception("Access token was not contained in response.")
    self._store_token(response_data["access_token"])

process_credentials(uri)

Obtains user and password from uri for client auth, and returns cleaned up uri.

Source code in mlte/store/common/http_clients.py
163
164
165
166
167
168
169
170
171
172
def process_credentials(self, uri: str) -> str:
    """Obtains user and password from uri for client auth, and returns cleaned up uri."""
    # Parse URI for user and password.
    uri, username, password = url_utils.remove_url_username_password(uri)
    if username is not None and password is not None:
        # If URI had user and password, get them for client auth.
        self.username = username
        self.password = password

    return uri

RequestsClient

Bases: OAuthHttpClient

Client implementation using requests library.

Source code in mlte/store/common/http_clients.py
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
class RequestsClient(OAuthHttpClient):
    """Client implementation using requests library."""

    def __init__(
        self, username: Optional[str] = None, password: Optional[str] = None
    ) -> None:
        super().__init__(HttpClientType.REQUESTS, username, password)

    def get(self, url: str, **kwargs) -> requests.Response:
        return requests.get(url, headers=self.headers, **kwargs)

    def post(
        self, url: str, data: Any = None, json: Any = None, **kwargs
    ) -> requests.Response:
        return requests.post(
            url,
            headers=self.headers,
            data=data,
            json=json,
            **kwargs,
        )

    def put(
        self, url: str, data: Any = None, json: Any = None, **kwargs
    ) -> requests.Response:
        return requests.put(
            url,
            headers=self.headers,
            data=data,
            json=json,
            **kwargs,
        )

    def delete(self, url: str, **kwargs) -> requests.Response:
        return requests.delete(url, headers=self.headers, **kwargs)