|
14 | 14 |
|
15 | 15 | import openfga_sdk |
16 | 16 | from openfga_sdk.credentials import CredentialConfiguration, Credentials |
| 17 | +from openfga_sdk.exceptions import ApiValueError |
17 | 18 |
|
18 | 19 |
|
19 | 20 | class TestCredentials(IsolatedAsyncioTestCase): |
@@ -172,3 +173,58 @@ def test_configuration_client_credentials_missing_api_audience(self): |
172 | 173 | ) |
173 | 174 | with self.assertRaises(openfga_sdk.ApiValueError): |
174 | 175 | credential.validate_credentials_config() |
| 176 | + |
| 177 | + |
| 178 | +class TestCredentialsIssuer(IsolatedAsyncioTestCase): |
| 179 | + def setUp(self): |
| 180 | + # Setup a basic configuration that can be modified per test case |
| 181 | + self.configuration = CredentialConfiguration(api_issuer="https://example.com") |
| 182 | + self.credentials = Credentials( |
| 183 | + method="client_credentials", configuration=self.configuration |
| 184 | + ) |
| 185 | + |
| 186 | + def test_valid_issuer_https(self): |
| 187 | + # Test a valid HTTPS URL |
| 188 | + self.configuration.api_issuer = "issuer.fga.example " |
| 189 | + result = self.credentials._parse_issuer(self.configuration.api_issuer) |
| 190 | + self.assertEqual(result, "https://issuer.fga.example/oauth/token") |
| 191 | + |
| 192 | + def test_valid_issuer_with_oauth_endpoint_https(self): |
| 193 | + # Test a valid HTTPS URL |
| 194 | + self.configuration.api_issuer = "https://example.com/oauth/token" |
| 195 | + result = self.credentials._parse_issuer(self.configuration.api_issuer) |
| 196 | + self.assertEqual(result, "https://example.com/oauth/token") |
| 197 | + |
| 198 | + def test_valid_issuer_with_some_endpoint_https(self): |
| 199 | + # Test a valid HTTPS URL |
| 200 | + self.configuration.api_issuer = "https://example.com/oauth/some/endpoint" |
| 201 | + result = self.credentials._parse_issuer(self.configuration.api_issuer) |
| 202 | + self.assertEqual(result, "https://example.com/oauth/some/endpoint") |
| 203 | + |
| 204 | + def test_valid_issuer_http(self): |
| 205 | + # Test a valid HTTP URL |
| 206 | + self.configuration.api_issuer = "fga.example/some_endpoint" |
| 207 | + result = self.credentials._parse_issuer(self.configuration.api_issuer) |
| 208 | + self.assertEqual(result, "https://fga.example/some_endpoint") |
| 209 | + |
| 210 | + def test_invalid_issuer_no_scheme(self): |
| 211 | + # Test an issuer URL without a scheme |
| 212 | + self.configuration.api_issuer = "https://issuer.fga.example:8080/some_endpoint " |
| 213 | + result = self.credentials._parse_issuer(self.configuration.api_issuer) |
| 214 | + self.assertEqual(result, "https://issuer.fga.example:8080/some_endpoint") |
| 215 | + |
| 216 | + def test_invalid_issuer_bad_scheme(self): |
| 217 | + # Test an issuer with an unsupported scheme |
| 218 | + self.configuration.api_issuer = "ftp://example.com" |
| 219 | + with self.assertRaises(ApiValueError): |
| 220 | + self.credentials._parse_issuer(self.configuration.api_issuer) |
| 221 | + |
| 222 | + def test_invalid_issuer_with_port(self): |
| 223 | + # Test an issuer with an unsupported scheme |
| 224 | + self.configuration.api_issuer = "https://issuer.fga.example:8080 " |
| 225 | + result = self.credentials._parse_issuer(self.configuration.api_issuer) |
| 226 | + self.assertEqual(result, "https://issuer.fga.example:8080/oauth/token") |
| 227 | + |
| 228 | + |
| 229 | +if __name__ == "__main__": |
| 230 | + unittest.main() |
0 commit comments