Skip to content

http_clients

HTTP clients that can be used by underlying stores to access remote HTTP stores.

HttpResponse = Union[requests.Response, httpx.Response] module-attribute

Standard HTTP response, both have same implicit interface.

HttpClient

Bases: ABC

Interface for an HTTP client.

Source code in mlte/store/common/http_clients.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
class HttpClient(ABC):
    """Interface for an HTTP client."""

    def __init__(self) -> None:
        self.headers: dict[str, str] = {}

    @abstractmethod
    def get(self, url: str, **kwargs) -> HttpResponse:
        pass

    @abstractmethod
    def post(
        self, url: str, data: Any = None, json: Any = None, **kwargs
    ) -> HttpResponse:
        pass

    @abstractmethod
    def put(
        self, url: str, data: Any = None, json: Any = None, **kwargs
    ) -> HttpResponse:
        pass

    @abstractmethod
    def delete(self, url: str, **kwargs) -> HttpResponse:
        pass

    @staticmethod
    def raise_for_response(response: HttpResponse) -> None:
        """
        Raise an error from from a response.
        :param response: The response object
        """
        if response.status_code == codes.OK:
            return
        if response.status_code == codes.NOT_FOUND:
            raise errors.ErrorNotFound(f"{response.json()}")
        if response.status_code == codes.ALREADY_EXISTS:
            raise errors.ErrorAlreadyExists(f"{response.json()}")
        if response.status_code == codes.UNAUTHORIZED:
            raise errors.UnauthenticatedError(f"{response.json()}")
        if response.status_code == codes.FORBIDDEN:
            raise errors.ForbiddenError(f"{response.json()}")
        else:
            raise errors.InternalError(f"{response.json()}")

raise_for_response(response) staticmethod

Raise an error from from a response.

Parameters:

Name Type Description Default
response HttpResponse

The response object

required
Source code in mlte/store/common/http_clients.py
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
@staticmethod
def raise_for_response(response: HttpResponse) -> None:
    """
    Raise an error from from a response.
    :param response: The response object
    """
    if response.status_code == codes.OK:
        return
    if response.status_code == codes.NOT_FOUND:
        raise errors.ErrorNotFound(f"{response.json()}")
    if response.status_code == codes.ALREADY_EXISTS:
        raise errors.ErrorAlreadyExists(f"{response.json()}")
    if response.status_code == codes.UNAUTHORIZED:
        raise errors.UnauthenticatedError(f"{response.json()}")
    if response.status_code == codes.FORBIDDEN:
        raise errors.ForbiddenError(f"{response.json()}")
    else:
        raise errors.InternalError(f"{response.json()}")

OAuthHttpClient

Bases: HttpClient

A base HTTP client type for an server needing OAuth authentication. Works as an abstract base class for actual client implementations.

Source code in mlte/store/common/http_clients.py
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
class OAuthHttpClient(HttpClient):
    """
    A base HTTP client type for an server needing OAuth authentication. Works as an abstract base class for actual client
    implementations.
    """

    TOKEN_REQ_PASS_PAYLOAD = {"grant_type": "password"}
    TOKEN_REQ_HEADERS = {
        "Content-Type": "application/x-www-form-urlencoded",
        "Accept": "application/json",
    }
    TOKEN_ENDPOINT = "/token"

    def __init__(
        self,
        username: Optional[str] = None,
        password: Optional[str] = None,
    ) -> None:
        super().__init__()

        self.access_token: Optional[str] = None
        """The access token."""

        self.username = username
        """The username to use when authenticating."""

        self.password = password
        """The password to use when authenticating."""

    def _format_oauth_password_payload(
        self, username: str, password: str
    ) -> dict[str, str]:
        """Returns a properly structured payload with credentials to be sent to get a token."""
        payload = dict(self.TOKEN_REQ_PASS_PAYLOAD)
        payload.update({"username": username, "password": password})
        return payload

    def _store_token(self, access_token: str):
        """Stores the token and sets proper headers."""
        if access_token is not None:
            self.access_token = access_token
            self.headers = {"Authorization": f"Bearer {self.access_token}"}

    def authenticate(
        self,
        api_url: str,
        username: Optional[str] = None,
        password: Optional[str] = None,
    ):
        """Sends an authentication request and retrieves and stores the token."""
        # Validate we have a user and password.
        if username is None:
            username = self.username
            if username is None:
                raise Exception(
                    "Can't authenticate without user, no internal or argument username received."
                )
        if password is None:
            password = self.password
            if password is None:
                raise Exception(
                    "Can't authenticate without password, no internal or argument password received."
                )

        # Send authentication request to get token.
        self.headers = self.TOKEN_REQ_HEADERS
        url = f"{api_url}{self.TOKEN_ENDPOINT}"
        response = self.post(
            url,
            data=self._format_oauth_password_payload(username, password),
        )
        self.headers = {}
        if response.status_code != codes.OK:
            reply = response.content.decode("utf-8")
            raise Exception(
                f"Token request was unsuccessful - code: {response.status_code}, reply: {reply}"
            )

        # Process reply and store token.
        response_data = response.json()
        if response_data is None:
            raise Exception(
                "Did not receive any valid response for token request."
            )
        if "access_token" not in response_data:
            raise Exception("Access token was not contained in response.")
        self._store_token(response_data["access_token"])

    def process_credentials(self, uri: str) -> str:
        """Obtains user and password from uri for client auth, and returns cleaned up uri."""
        # Parse URI for user and password.
        uri, username, password = url_utils.remove_url_username_password(uri)
        if username is not None and password is not None:
            # If URI had user and password, get them for client auth.
            self.set_credentials(username, password)

        return uri

    def set_credentials(self, username: str, password: str) -> None:
        """Sets the client's credentials, used to authenticate."""
        self.username = username
        self.password = password

access_token = None instance-attribute

The access token.

password = password instance-attribute

The password to use when authenticating.

username = username instance-attribute

The username to use when authenticating.

authenticate(api_url, username=None, password=None)

Sends an authentication request and retrieves and stores the token.

Source code in mlte/store/common/http_clients.py
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
def authenticate(
    self,
    api_url: str,
    username: Optional[str] = None,
    password: Optional[str] = None,
):
    """Sends an authentication request and retrieves and stores the token."""
    # Validate we have a user and password.
    if username is None:
        username = self.username
        if username is None:
            raise Exception(
                "Can't authenticate without user, no internal or argument username received."
            )
    if password is None:
        password = self.password
        if password is None:
            raise Exception(
                "Can't authenticate without password, no internal or argument password received."
            )

    # Send authentication request to get token.
    self.headers = self.TOKEN_REQ_HEADERS
    url = f"{api_url}{self.TOKEN_ENDPOINT}"
    response = self.post(
        url,
        data=self._format_oauth_password_payload(username, password),
    )
    self.headers = {}
    if response.status_code != codes.OK:
        reply = response.content.decode("utf-8")
        raise Exception(
            f"Token request was unsuccessful - code: {response.status_code}, reply: {reply}"
        )

    # Process reply and store token.
    response_data = response.json()
    if response_data is None:
        raise Exception(
            "Did not receive any valid response for token request."
        )
    if "access_token" not in response_data:
        raise Exception("Access token was not contained in response.")
    self._store_token(response_data["access_token"])

process_credentials(uri)

Obtains user and password from uri for client auth, and returns cleaned up uri.

Source code in mlte/store/common/http_clients.py
153
154
155
156
157
158
159
160
161
def process_credentials(self, uri: str) -> str:
    """Obtains user and password from uri for client auth, and returns cleaned up uri."""
    # Parse URI for user and password.
    uri, username, password = url_utils.remove_url_username_password(uri)
    if username is not None and password is not None:
        # If URI had user and password, get them for client auth.
        self.set_credentials(username, password)

    return uri

set_credentials(username, password)

Sets the client's credentials, used to authenticate.

Source code in mlte/store/common/http_clients.py
163
164
165
166
def set_credentials(self, username: str, password: str) -> None:
    """Sets the client's credentials, used to authenticate."""
    self.username = username
    self.password = password

RequestsClient

Bases: OAuthHttpClient

Client implementation using requests library.

Source code in mlte/store/common/http_clients.py
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
class RequestsClient(OAuthHttpClient):
    """Client implementation using requests library."""

    def __init__(
        self, username: Optional[str] = None, password: Optional[str] = None
    ) -> None:
        super().__init__(username, password)

    def get(self, url: str, **kwargs) -> requests.Response:
        return requests.get(url, headers=self.headers, **kwargs)

    def post(
        self, url: str, data: Any = None, json: Any = None, **kwargs
    ) -> requests.Response:
        return requests.post(
            url,
            headers=self.headers,
            data=data,
            json=json,
            **kwargs,
        )

    def put(
        self, url: str, data: Any = None, json: Any = None, **kwargs
    ) -> requests.Response:
        return requests.put(
            url,
            headers=self.headers,
            data=data,
            json=json,
            **kwargs,
        )

    def delete(self, url: str, **kwargs) -> requests.Response:
        return requests.delete(url, headers=self.headers, **kwargs)