demonstrate verison header getting transformed to timestamp

This commit is contained in:
Mike Lueders
2024-04-05 13:36:11 -05:00
parent 23f333f1fa
commit 6b07cb0620
22 changed files with 251 additions and 174 deletions

View File

@@ -1,21 +1,23 @@
lockVersion: 2.0.0
id: f973dc21-d1bd-4e48-971a-a3918272099b
management:
docChecksum: 3851061474ad0b751f638e537d98fd25
docChecksum: 2e336baab28c334d1912043cb6deaa64
docVersion: "1"
speakeasyVersion: internal
generationVersion: 2.275.2
releaseVersion: 0.0.1
configChecksum: 5be955b039369e57b218570e7f64d06f
speakeasyVersion: 1.235.2
generationVersion: 2.298.2
releaseVersion: 0.1.0
configChecksum: 6124ffbb29f4c376978d74c5e711c4ef
features:
python:
constsAndDefaults: 0.1.2
core: 4.5.0
core: 4.6.2
flattening: 2.81.1
globalServerURLs: 2.82.1
getRequestBodies: 2.81.1
globalServerURLs: 2.82.2
responseFormat: 0.1.0
generatedFiles:
- src/shippo/sdkconfiguration.py
- src/shippo/sdk.py
- py.typed
- pylintrc
- setup.py
- src/shippo/__init__.py
@@ -25,11 +27,14 @@ generatedFiles:
- src/shippo/models/errors/sdkerror.py
- tests/helpers.py
- src/shippo/models/operations/example.py
- src/shippo/models/components/examplebody.py
- src/shippo/models/__init__.py
- src/shippo/models/errors/__init__.py
- src/shippo/models/operations/__init__.py
- src/shippo/models/components/__init__.py
- docs/models/operations/examplerequest.md
- docs/models/operations/exampleresponse.md
- docs/models/components/examplebody.md
- docs/sdks/shippo/README.md
- USAGE.md
- .gitattributes

View File

@@ -12,13 +12,13 @@ generation:
auth:
oAuth2ClientCredentialsEnabled: false
python:
version: 0.0.1
version: 0.1.0
additionalDependencies:
dependencies: {}
extraDependencies:
dev:
pytest: ==8.1.1
httpretty: ==1.1.4
pytest: ==8.1.1
author: Speakeasy
clientServerStatusCodesAsErrors: true
description: Python Client SDK Generated by Speakeasy
@@ -35,3 +35,5 @@ python:
maxMethodParams: 4
outputModelSuffix: output
packageName: shippo-api-client
projectUrls: {}
responseFormat: envelope

View File

@@ -31,15 +31,17 @@ pip install git+<UNSET>.git
```python
import shippo
from shippo.models import components
s = shippo.Shippo()
res = s.example(results_per_page=904965)
res = s.example(header_param='<value>', shippo_api_version='2018-02-08T00:00:00Z', example_body=components.ExampleBody())
if res.status_code == 200:
if res is not None:
# handle response
pass
```
<!-- End SDK Example Usage [usage] -->
@@ -58,27 +60,28 @@ Handling errors in this SDK should largely match your expectations. All operati
| Error Object | Status Code | Content Type |
| --------------- | --------------- | --------------- |
| errors.SDKError | 4x-5xx | */* |
| errors.SDKError | 4xx-5xx | */* |
### Example
```python
import shippo
from shippo.models import errors
from shippo.models import components, errors
s = shippo.Shippo()
res = None
try:
res = s.example(results_per_page=904965)
res = s.example(header_param='<value>', shippo_api_version='2018-02-08T00:00:00Z', example_body=components.ExampleBody())
except errors.SDKError as e:
# handle exception
raise(e)
if res.status_code == 200:
if res is not None:
# handle response
pass
```
<!-- End Error Handling [errors] -->
@@ -97,17 +100,19 @@ You can override the default server globally by passing a server index to the `s
```python
import shippo
from shippo.models import components
s = shippo.Shippo(
server_idx=0,
)
res = s.example(results_per_page=904965)
res = s.example(header_param='<value>', shippo_api_version='2018-02-08T00:00:00Z', example_body=components.ExampleBody())
if res.status_code == 200:
if res is not None:
# handle response
pass
```
@@ -116,17 +121,19 @@ if res.status_code == 200:
The default server can also be overridden globally by passing a URL to the `server_url: str` optional parameter when initializing the SDK client instance. For example:
```python
import shippo
from shippo.models import components
s = shippo.Shippo(
server_url="https://example.com",
)
res = s.example(results_per_page=904965)
res = s.example(header_param='<value>', shippo_api_version='2018-02-08T00:00:00Z', example_body=components.ExampleBody())
if res.status_code == 200:
if res is not None:
# handle response
pass
```
<!-- End Server Selection [server] -->
@@ -142,7 +149,7 @@ import requests
http_client = requests.Session()
http_client.headers.update({'x-custom-header': 'someValue'})
s = shippo.Shippo(client: http_client)
s = shippo.Shippo(client=http_client)
```
<!-- End Custom HTTP Client [http-client] -->

View File

@@ -1,14 +1,16 @@
<!-- Start SDK Example Usage [usage] -->
```python
import shippo
from shippo.models import components
s = shippo.Shippo()
res = s.example(results_per_page=904965)
res = s.example(header_param='<value>', shippo_api_version='2018-02-08T00:00:00Z', example_body=components.ExampleBody())
if res.status_code == 200:
if res is not None:
# handle response
pass
```
<!-- End SDK Example Usage [usage] -->

View File

@@ -0,0 +1,8 @@
# ExampleBody
## Fields
| Field | Type | Required | Description |
| ------------------ | ------------------ | ------------------ | ------------------ |
| `field` | *Optional[str]* | :heavy_minus_sign: | N/A |

View File

@@ -3,6 +3,8 @@
## Fields
| Field | Type | Required | Description |
| -------------------------------------------------- | -------------------------------------------------- | -------------------------------------------------- | -------------------------------------------------- |
| `results_per_page` | *Optional[int]* | :heavy_minus_sign: | The number of results to return per page (max 100) |
| Field | Type | Required | Description | Example |
| -------------------------------------------------------------------------- | -------------------------------------------------------------------------- | -------------------------------------------------------------------------- | -------------------------------------------------------------------------- | -------------------------------------------------------------------------- |
| `header_param` | *Optional[str]* | :heavy_minus_sign: | The number of results to return per page (max 100) | |
| `shippo_api_version` | *Optional[str]* | :heavy_minus_sign: | String used to pick a non-default API version to use | 2018-02-08 00:00:00 +0000 UTC |
| `example_body` | [Optional[components.ExampleBody]](../../models/components/examplebody.md) | :heavy_minus_sign: | N/A | |

View File

@@ -13,22 +13,26 @@
```python
import shippo
from shippo.models import components
s = shippo.Shippo()
res = s.example(results_per_page=904965)
res = s.example(header_param='<value>', shippo_api_version='2018-02-08T00:00:00Z', example_body=components.ExampleBody())
if res.status_code == 200:
if res is not None:
# handle response
pass
```
### Parameters
| Parameter | Type | Required | Description |
| -------------------------------------------------- | -------------------------------------------------- | -------------------------------------------------- | -------------------------------------------------- |
| `results_per_page` | *Optional[int]* | :heavy_minus_sign: | The number of results to return per page (max 100) |
| Parameter | Type | Required | Description | Example |
| -------------------------------------------------------------------------- | -------------------------------------------------------------------------- | -------------------------------------------------------------------------- | -------------------------------------------------------------------------- | -------------------------------------------------------------------------- |
| `header_param` | *Optional[str]* | :heavy_minus_sign: | The number of results to return per page (max 100) | |
| `shippo_api_version` | *Optional[str]* | :heavy_minus_sign: | String used to pick a non-default API version to use | 2018-02-08 00:00:00 +0000 UTC |
| `example_body` | [Optional[components.ExampleBody]](../../models/components/examplebody.md) | :heavy_minus_sign: | N/A | |
### Response
@@ -38,4 +42,4 @@ if res.status_code == 200:
| Error Object | Status Code | Content Type |
| --------------- | --------------- | --------------- |
| errors.SDKError | 4x-5xx | */* |
| errors.SDKError | 4xx-5xx | */* |

View File

@@ -12,6 +12,7 @@ paths:
operationId: Example
parameters:
- $ref: "#/components/parameters/HeaderParam"
- $ref: '#/components/parameters/ShippoApiVersionHeader'
requestBody:
content:
application/json:
@@ -28,6 +29,13 @@ components:
name: header_param
schema:
type: string
ShippoApiVersionHeader:
description: String used to pick a non-default API version to use
in: header
name: SHIPPO-API-VERSION
schema:
type: string
example: 2018-02-08
responses: {}
schemas:
ExampleBody:

1
py.typed Normal file
View File

@@ -0,0 +1 @@
# Marker file for PEP 561. The package enables type hints.

View File

@@ -441,7 +441,8 @@ disable=raw-checker-failed,
too-many-nested-blocks,
too-many-boolean-expressions,
no-else-raise,
bare-except
bare-except,
broad-exception-caught
# Enable the message, report, category or checker with the given id(s). You can
# either give multiple identifier separated by comma (,) or put this option

View File

@@ -3,19 +3,19 @@
import setuptools
try:
with open("README.md", "r") as fh:
with open('README.md', 'r') as fh:
long_description = fh.read()
except FileNotFoundError:
long_description = ""
long_description = ''
setuptools.setup(
name="shippo-api-client",
version="0.0.1",
author="Speakeasy",
description="Python Client SDK Generated by Speakeasy",
name='shippo-api-client',
version='0.1.0',
author='Speakeasy',
description='Python Client SDK Generated by Speakeasy',
long_description=long_description,
long_description_content_type="text/markdown",
packages=setuptools.find_packages(where="src"),
long_description_content_type='text/markdown',
packages=setuptools.find_packages(where='src'),
install_requires=[
"certifi>=2023.7.22",
"charset-normalizer>=3.2.0",
@@ -34,10 +34,12 @@ setuptools.setup(
],
extras_require={
"dev": [
"httpretty==1.1.4",
"pylint==2.16.2",
"pytest==8.1.1",
],
},
package_dir={'': 'src'},
python_requires='>=3.8',
package_data={"shippo-api-client": ["py.typed"]},
package_data={'shippo-api-client': ['py.typed']},
)

View File

@@ -2,3 +2,4 @@
from .sdkhooks import *
from .types import *
from .registration import *

View File

@@ -0,0 +1,13 @@
from .types import Hooks
# This file is only ever generated once on the first generation and then is free to be modified.
# Any hooks you wish to add should be registered in the init_hooks function. Feel free to define them
# in this file or in separate files in the hooks folder.
def init_hooks(hooks: Hooks):
# pylint: disable=unused-argument
"""Add hooks by calling hooks.register{sdk_init/before_request/after_success/after_error}Hook
with an instance of a hook that implements that specific Hook interface
Hooks are registered per SDK instance, and are valid for the lifetime of the SDK instance"""

View File

@@ -2,17 +2,17 @@
import requests
from .types import SDKInitHook, BeforeRequestContext, BeforeRequestHook, AfterSuccessContext, AfterSuccessHook, AfterErrorContext, AfterErrorHook, Hooks
from typing import List, Optional, Tuple, Union
from .registration import init_hooks
from typing import List, Optional, Tuple
class SDKHooks(Hooks):
sdk_init_hooks: List[SDKInitHook] = []
before_request_hooks: List[BeforeRequestHook] = []
after_success_hooks: List[AfterSuccessHook] = []
after_error_hooks: List[AfterErrorHook] = []
def __init__(self):
pass
self.sdk_init_hooks: List[SDKInitHook] = []
self.before_request_hooks: List[BeforeRequestHook] = []
self.after_success_hooks: List[AfterSuccessHook] = []
self.after_error_hooks: List[AfterErrorHook] = []
init_hooks(self)
def register_sdk_init_hook(self, hook: SDKInitHook) -> None:
self.sdk_init_hooks.append(hook)
@@ -31,19 +31,21 @@ class SDKHooks(Hooks):
base_url, client = hook.sdk_init(base_url, client)
return base_url, client
def before_request(self, hook_ctx: BeforeRequestContext, request: requests.PreparedRequest) -> Union[requests.PreparedRequest, Exception]:
def before_request(self, hook_ctx: BeforeRequestContext, request: requests.PreparedRequest) -> requests.PreparedRequest:
for hook in self.before_request_hooks:
request = hook.before_request(hook_ctx, request)
if isinstance(request, Exception):
raise request
out = hook.before_request(hook_ctx, request)
if isinstance(out, Exception):
raise out
request = out
return request
def after_success(self, hook_ctx: AfterSuccessContext, response: requests.Response) -> requests.Response:
for hook in self.after_success_hooks:
response = hook.after_success(hook_ctx, response)
if isinstance(response, Exception):
raise response
out = hook.after_success(hook_ctx, response)
if isinstance(out, Exception):
raise out
response = out
return response
def after_error(self, hook_ctx: AfterErrorContext, response: Optional[requests.Response], error: Optional[Exception]) -> Tuple[Optional[requests.Response], Optional[Exception]]:

View File

@@ -17,15 +17,19 @@ class HookContext:
class BeforeRequestContext(HookContext):
pass
def __init__(self, hook_ctx: HookContext):
super().__init__(hook_ctx.operation_id, hook_ctx.oauth2_scopes, hook_ctx.security_source)
class AfterSuccessContext(HookContext):
pass
def __init__(self, hook_ctx: HookContext):
super().__init__(hook_ctx.operation_id, hook_ctx.oauth2_scopes, hook_ctx.security_source)
class AfterErrorContext(HookContext):
pass
def __init__(self, hook_ctx: HookContext):
super().__init__(hook_ctx.operation_id, hook_ctx.oauth2_scopes, hook_ctx.security_source)
class SDKInitHook(ABC):
@@ -42,13 +46,13 @@ class BeforeRequestHook(ABC):
class AfterSuccessHook(ABC):
@abstractmethod
def after_success(self, hook_ctx: AfterSuccessContext, response: requests_http.Response) -> Union[requests_http.PreparedRequest, Exception]:
def after_success(self, hook_ctx: AfterSuccessContext, response: requests_http.Response) -> Union[requests_http.Response, Exception]:
pass
class AfterErrorHook(ABC):
@abstractmethod
def after_error(self, hook_ctx: AfterErrorContext, response: Optional[requests_http.Response], error: Optional[Exception]) -> Union[Tuple[Optional[requests_http.PreparedRequest], Optional[Exception]], Exception]:
def after_error(self, hook_ctx: AfterErrorContext, response: Optional[requests_http.Response], error: Optional[Exception]) -> Union[Tuple[Optional[requests_http.Response], Optional[Exception]], Exception]:
pass

View File

@@ -0,0 +1,5 @@
"""Code generated by Speakeasy (https://speakeasyapi.dev). DO NOT EDIT."""
from .examplebody import *
__all__ = ["ExampleBody"]

View File

@@ -0,0 +1,15 @@
"""Code generated by Speakeasy (https://speakeasyapi.dev). DO NOT EDIT."""
from __future__ import annotations
import dataclasses
from dataclasses_json import Undefined, dataclass_json
from shippo import utils
from typing import Optional
@dataclass_json(undefined=Undefined.EXCLUDE)
@dataclasses.dataclass
class ExampleBody:
field: Optional[str] = dataclasses.field(default=None, metadata={'dataclasses_json': { 'letter_case': utils.get_field_name('field'), 'exclude': lambda f: f is None }})

View File

@@ -3,13 +3,17 @@
from __future__ import annotations
import dataclasses
import requests as requests_http
from ...models.components import examplebody as components_examplebody
from typing import Optional
@dataclasses.dataclass
class ExampleRequest:
results_per_page: Optional[int] = dataclasses.field(default=25, metadata={'query_param': { 'field_name': 'results_per_page', 'style': 'form', 'explode': True }})
header_param: Optional[str] = dataclasses.field(default=None, metadata={'header': { 'field_name': 'header_param', 'style': 'simple', 'explode': False }})
r"""The number of results to return per page (max 100)"""
shippo_api_version: Optional[str] = dataclasses.field(default=None, metadata={'header': { 'field_name': 'SHIPPO-API-VERSION', 'style': 'simple', 'explode': False }})
r"""String used to pick a non-default API version to use"""
example_body: Optional[components_examplebody.ExampleBody] = dataclasses.field(default=None, metadata={'request': { 'media_type': 'application/json' }})

View File

@@ -2,9 +2,10 @@
import requests as requests_http
from .sdkconfiguration import SDKConfiguration
from .utils.retries import RetryConfig
from shippo import utils
from shippo._hooks import HookContext, SDKHooks
from shippo.models import errors, operations
from shippo._hooks import AfterErrorContext, AfterSuccessContext, BeforeRequestContext, HookContext, SDKHooks
from shippo.models import components, errors, operations
from typing import Dict, Optional
class Shippo:
@@ -12,11 +13,11 @@ class Shippo:
sdk_configuration: SDKConfiguration
def __init__(self,
server_idx: int = None,
server_url: str = None,
url_params: Dict[str, str] = None,
client: requests_http.Session = None,
retry_config: utils.RetryConfig = None
server_idx: Optional[int] = None,
server_url: Optional[str] = None,
url_params: Optional[Dict[str, str]] = None,
client: Optional[requests_http.Session] = None,
retry_config: Optional[RetryConfig] = None
) -> None:
"""Instantiates the SDK configuring it with the provided parameters.
@@ -29,7 +30,7 @@ class Shippo:
:param client: The requests.Session HTTP client to use for all operations
:type client: requests_http.Session
:param retry_config: The utils.RetryConfig to use globally
:type retry_config: utils.RetryConfig
:type retry_config: RetryConfig
"""
if client is None:
client = requests_http.Session()
@@ -38,7 +39,12 @@ class Shippo:
if url_params is not None:
server_url = utils.template_url(server_url, url_params)
self.sdk_configuration = SDKConfiguration(client, None, server_url, server_idx, retry_config=retry_config)
self.sdk_configuration = SDKConfiguration(
client,
server_url,
server_idx,
retry_config=retry_config
)
hooks = SDKHooks()
@@ -48,59 +54,59 @@ class Shippo:
self.sdk_configuration.server_url = server_url
# pylint: disable=protected-access
self.sdk_configuration._hooks=hooks
self.sdk_configuration._hooks = hooks
def example(self, results_per_page: Optional[int] = None) -> operations.ExampleResponse:
def example(self, header_param: Optional[str] = None, shippo_api_version: Optional[str] = None, example_body: Optional[components.ExampleBody] = None) -> operations.ExampleResponse:
hook_ctx = HookContext(operation_id='Example', oauth2_scopes=[], security_source=None)
request = operations.ExampleRequest(
results_per_page=results_per_page,
header_param=header_param,
shippo_api_version=shippo_api_version,
example_body=example_body,
)
base_url = utils.template_url(*self.sdk_configuration.get_server_details())
url = base_url + '/example'
headers = {}
query_params = utils.get_query_params(operations.ExampleRequest, request)
headers = { **utils.get_headers(request), **headers }
req_content_type, data, form = utils.serialize_request_body(request, operations.ExampleRequest, "example_body", False, True, 'json')
if req_content_type is not None and req_content_type not in ('multipart/form-data', 'multipart/mixed'):
headers['content-type'] = req_content_type
headers['Accept'] = '*/*'
headers['user-agent'] = self.sdk_configuration.user_agent
client = self.sdk_configuration.client
try:
req = self.sdk_configuration.get_hooks().before_request(
hook_ctx,
requests_http.Request('GET', url, params=query_params, headers=headers).prepare(),
)
req = client.prepare_request(requests_http.Request('GET', url, data=data, files=form, headers=headers))
req = self.sdk_configuration.get_hooks().before_request(BeforeRequestContext(hook_ctx), req)
http_res = client.send(req)
except Exception as e:
_, e = self.sdk_configuration.get_hooks().after_error(hook_ctx, None, e)
_, e = self.sdk_configuration.get_hooks().after_error(AfterErrorContext(hook_ctx), None, e)
if e is not None:
raise e
if utils.match_status_codes(['4XX','5XX'], http_res.status_code):
http_res, e = self.sdk_configuration.get_hooks().after_error(hook_ctx, http_res, None)
if e:
result, e = self.sdk_configuration.get_hooks().after_error(AfterErrorContext(hook_ctx), http_res, None)
if e is not None:
raise e
else:
result = self.sdk_configuration.get_hooks().after_success(hook_ctx, http_res)
if isinstance(result, Exception):
raise result
if result is not None:
http_res = result
else:
http_res = self.sdk_configuration.get_hooks().after_success(AfterSuccessContext(hook_ctx), http_res)
content_type = http_res.headers.get('Content-Type')
res = operations.ExampleResponse(status_code=http_res.status_code, content_type=content_type, raw_response=http_res)
res = operations.ExampleResponse(status_code=http_res.status_code, content_type=http_res.headers.get('Content-Type') or '', raw_response=http_res)
if http_res.status_code == 200:
pass
elif http_res.status_code >= 400 and http_res.status_code < 500 or http_res.status_code >= 500 and http_res.status_code < 600:
raise errors.SDKError('API error occurred', http_res.status_code, http_res.text, http_res)
else:
raise errors.SDKError('unknown status code received', http_res.status_code, http_res.text, http_res)
return res

View File

@@ -6,7 +6,7 @@ from ._hooks import SDKHooks
from .utils import utils
from .utils.retries import RetryConfig
from dataclasses import dataclass
from typing import Dict, Tuple
from typing import Dict, Optional, Tuple
SERVERS = [
@@ -17,18 +17,18 @@ SERVERS = [
@dataclass
class SDKConfiguration:
client: requests_http.Session
server_url: str = ''
server_idx: int = 0
server_url: Optional[str] = ''
server_idx: Optional[int] = 0
language: str = 'python'
openapi_doc_version: str = '1'
sdk_version: str = '0.0.1'
gen_version: str = '2.275.2'
user_agent: str = 'speakeasy-sdk/python 0.0.1 2.275.2 1 shippo-api-client'
retry_config: RetryConfig = None
_hooks: SDKHooks = None
sdk_version: str = '0.1.0'
gen_version: str = '2.298.2'
user_agent: str = 'speakeasy-sdk/python 0.1.0 2.298.2 1 shippo-api-client'
retry_config: Optional[RetryConfig] = None
_hooks: Optional[SDKHooks] = None
def get_server_details(self) -> Tuple[str, Dict[str, str]]:
if self.server_url:
if self.server_url is not None and self.server_url != '':
return utils.remove_suffix(self.server_url, '/'), {}
if self.server_idx is None:
self.server_idx = 0

View File

@@ -75,12 +75,12 @@ def retry(func, retries: Retries):
if res.status_code == parsed_code:
raise TemporaryError(res)
except requests.exceptions.ConnectionError as exception:
if retries.config.config.retry_connection_errors:
if retries.config.retry_connection_errors:
raise
raise PermanentError(exception) from exception
except requests.exceptions.Timeout as exception:
if retries.config.config.retry_connection_errors:
if retries.config.retry_connection_errors:
raise
raise PermanentError(exception) from exception

View File

@@ -4,7 +4,7 @@ import base64
import json
import re
import sys
from dataclasses import Field, dataclass, fields, is_dataclass, make_dataclass
from dataclasses import Field, fields, is_dataclass, make_dataclass
from datetime import date, datetime
from decimal import Decimal
from email.message import Message
@@ -14,30 +14,15 @@ from typing import (Any, Callable, Dict, List, Optional, Tuple, Union,
from xmlrpc.client import boolean
from typing_inspect import is_optional_type
import dateutil.parser
import requests
from dataclasses_json import DataClassJsonMixin
class SecurityClient:
client: requests.Session
query_params: Dict[str, str] = {}
def get_security(security: Any) -> Tuple[Dict[str, str], Dict[str, str]]:
headers: Dict[str, str] = {}
def __init__(self, client: requests.Session):
self.client = client
def send(self, request: requests.PreparedRequest, **kwargs):
request.prepare_url(url=request.url, params=self.query_params)
request.headers.update(self.headers)
return self.client.send(request, **kwargs)
def configure_security_client(client: requests.Session, security: dataclass):
client = SecurityClient(client)
query_params: Dict[str, str] = {}
if security is None:
return client
return headers, query_params
sec_fields: Tuple[Field, ...] = fields(security)
for sec_field in sec_fields:
@@ -49,35 +34,35 @@ def configure_security_client(client: requests.Session, security: dataclass):
if metadata is None:
continue
if metadata.get('option'):
_parse_security_option(client, value)
return client
_parse_security_option(headers, query_params, value)
return headers, query_params
if metadata.get('scheme'):
# Special case for basic auth which could be a flattened struct
if metadata.get("sub_type") == "basic" and not is_dataclass(value):
_parse_security_scheme(client, metadata, security)
_parse_security_scheme(headers, query_params, metadata, security)
else:
_parse_security_scheme(client, metadata, value)
_parse_security_scheme(headers, query_params, metadata, value)
return client
return headers, query_params
def _parse_security_option(client: SecurityClient, option: dataclass):
def _parse_security_option(headers: Dict[str, str], query_params: Dict[str, str], option: Any):
opt_fields: Tuple[Field, ...] = fields(option)
for opt_field in opt_fields:
metadata = opt_field.metadata.get('security')
if metadata is None or metadata.get('scheme') is None:
continue
_parse_security_scheme(
client, metadata, getattr(option, opt_field.name))
headers, query_params, metadata, getattr(option, opt_field.name))
def _parse_security_scheme(client: SecurityClient, scheme_metadata: Dict, scheme: any):
def _parse_security_scheme(headers: Dict[str, str], query_params: Dict[str, str], scheme_metadata: Dict, scheme: Any):
scheme_type = scheme_metadata.get('type')
sub_type = scheme_metadata.get('sub_type')
if is_dataclass(scheme):
if scheme_type == 'http' and sub_type == 'basic':
_parse_basic_auth_scheme(client, scheme)
_parse_basic_auth_scheme(headers, scheme)
return
scheme_fields: Tuple[Field, ...] = fields(scheme)
@@ -89,33 +74,33 @@ def _parse_security_scheme(client: SecurityClient, scheme_metadata: Dict, scheme
value = getattr(scheme, scheme_field.name)
_parse_security_scheme_value(
client, scheme_metadata, metadata, value)
headers, query_params, scheme_metadata, metadata, value)
else:
_parse_security_scheme_value(
client, scheme_metadata, scheme_metadata, scheme)
headers, query_params, scheme_metadata, scheme_metadata, scheme)
def _parse_security_scheme_value(client: SecurityClient, scheme_metadata: Dict, security_metadata: Dict, value: any):
def _parse_security_scheme_value(headers: Dict[str, str], query_params: Dict[str, str], scheme_metadata: Dict, security_metadata: Dict, value: Any):
scheme_type = scheme_metadata.get('type')
sub_type = scheme_metadata.get('sub_type')
header_name = security_metadata.get('field_name')
header_name = str(security_metadata.get('field_name'))
if scheme_type == "apiKey":
if sub_type == 'header':
client.headers[header_name] = value
headers[header_name] = value
elif sub_type == 'query':
client.query_params[header_name] = value
query_params[header_name] = value
else:
raise Exception('not supported')
elif scheme_type == "openIdConnect":
client.headers[header_name] = _apply_bearer(value)
headers[header_name] = _apply_bearer(value)
elif scheme_type == 'oauth2':
if sub_type != 'client_credentials':
client.headers[header_name] = _apply_bearer(value)
headers[header_name] = _apply_bearer(value)
elif scheme_type == 'http':
if sub_type == 'bearer':
client.headers[header_name] = _apply_bearer(value)
headers[header_name] = _apply_bearer(value)
else:
raise Exception('not supported')
else:
@@ -126,7 +111,7 @@ def _apply_bearer(token: str) -> str:
return token.lower().startswith('bearer ') and token or f'Bearer {token}'
def _parse_basic_auth_scheme(client: SecurityClient, scheme: dataclass):
def _parse_basic_auth_scheme(headers: Dict[str, str], scheme: Any):
username = ""
password = ""
@@ -145,11 +130,11 @@ def _parse_basic_auth_scheme(client: SecurityClient, scheme: dataclass):
password = value
data = f'{username}:{password}'.encode()
client.headers['Authorization'] = f'Basic {base64.b64encode(data).decode()}'
headers['Authorization'] = f'Basic {base64.b64encode(data).decode()}'
def generate_url(clazz: type, server_url: str, path: str, path_params: dataclass,
gbls: Dict[str, Dict[str, Dict[str, Any]]] = None) -> str:
def generate_url(clazz: type, server_url: str, path: str, path_params: Any,
gbls: Optional[Dict[str, Dict[str, Dict[str, Any]]]] = None) -> str:
path_param_fields: Tuple[Field, ...] = fields(clazz)
for field in path_param_fields:
request_metadata = field.metadata.get('request')
@@ -241,7 +226,7 @@ def template_url(url_with_params: str, params: Dict[str, str]) -> str:
return url_with_params
def get_query_params(clazz: type, query_params: dataclass, gbls: Dict[str, Dict[str, Dict[str, Any]]] = None) -> Dict[
def get_query_params(clazz: type, query_params: Any, gbls: Optional[Dict[str, Dict[str, Dict[str, Any]]]] = None) -> Dict[
str, List[str]]:
params: Dict[str, List[str]] = {}
@@ -287,7 +272,7 @@ def get_query_params(clazz: type, query_params: dataclass, gbls: Dict[str, Dict[
return params
def get_headers(headers_params: dataclass) -> Dict[str, str]:
def get_headers(headers_params: Any, gbls: Optional[Dict[str, Dict[str, Dict[str, Any]]]] = None) -> Dict[str, str]:
if headers_params is None:
return {}
@@ -299,8 +284,8 @@ def get_headers(headers_params: dataclass) -> Dict[str, str]:
if not metadata:
continue
value = _serialize_header(metadata.get(
'explode', False), getattr(headers_params, field.name))
value = _populate_from_globals(field.name, getattr(headers_params, field.name), 'header', gbls)
value = _serialize_header(metadata.get('explode', False), value)
if value != '':
headers[metadata.get('field_name', field.name)] = value
@@ -308,7 +293,7 @@ def get_headers(headers_params: dataclass) -> Dict[str, str]:
return headers
def _get_serialized_params(metadata: Dict, field_type: type, field_name: str, obj: any) -> Dict[str, str]:
def _get_serialized_params(metadata: Dict, field_type: type, field_name: str, obj: Any) -> Dict[str, str]:
params: Dict[str, str] = {}
serialization = metadata.get('serialization', '')
@@ -319,7 +304,7 @@ def _get_serialized_params(metadata: Dict, field_type: type, field_name: str, ob
return params
def _get_deep_object_query_params(metadata: Dict, field_name: str, obj: any) -> Dict[str, List[str]]:
def _get_deep_object_query_params(metadata: Dict, field_name: str, obj: Any) -> Dict[str, List[str]]:
params: Dict[str, List[str]] = {}
if obj is None:
@@ -385,7 +370,7 @@ def _get_query_param_field_name(obj_field: Field) -> str:
return obj_param_metadata.get("field_name", obj_field.name)
def _get_delimited_query_params(metadata: Dict, field_name: str, obj: any, delimiter: str) -> Dict[
def _get_delimited_query_params(metadata: Dict, field_name: str, obj: Any, delimiter: str) -> Dict[
str, List[str]]:
return _populate_form(field_name, metadata.get("explode", True), obj, _get_query_param_field_name, delimiter)
@@ -399,8 +384,8 @@ SERIALIZATION_METHOD_TO_CONTENT_TYPE = {
}
def serialize_request_body(request: dataclass, request_type: type, request_field_name: str, nullable: bool, optional: bool, serialization_method: str, encoder=None) -> Tuple[
str, any, any]:
def serialize_request_body(request: Any, request_type: type, request_field_name: str, nullable: bool, optional: bool, serialization_method: str, encoder=None) -> Tuple[
Optional[str], Optional[Any], Optional[Any]]:
if request is None:
if not nullable and optional:
return None, None, None
@@ -430,7 +415,7 @@ def serialize_request_body(request: dataclass, request_type: type, request_field
request_val)
def serialize_content_type(field_name: str, request_type: any, media_type: str, request: dataclass, encoder=None) -> Tuple[str, any, List[List[any]]]:
def serialize_content_type(field_name: str, request_type: Any, media_type: str, request: Any, encoder=None) -> Tuple[Optional[str], Optional[Any], Optional[List[List[Any]]]]:
if re.match(r'(application|text)\/.*?\+*json.*', media_type) is not None:
return media_type, marshal_json(request, request_type, encoder), None
if re.match(r'multipart\/.*', media_type) is not None:
@@ -446,8 +431,8 @@ def serialize_content_type(field_name: str, request_type: any, media_type: str,
f"invalid request body type {type(request)} for mediaType {media_type}")
def serialize_multipart_form(media_type: str, request: dataclass) -> Tuple[str, any, List[List[any]]]:
form: List[List[any]] = []
def serialize_multipart_form(media_type: str, request: Any) -> Tuple[str, Any, List[List[Any]]]:
form: List[List[Any]] = []
request_fields = fields(request)
for field in request_fields:
@@ -502,7 +487,7 @@ def serialize_multipart_form(media_type: str, request: dataclass) -> Tuple[str,
def serialize_dict(original: Dict, explode: bool, field_name, existing: Optional[Dict[str, List[str]]]) -> Dict[
str, List[str]]:
if existing is None:
existing = []
existing = {}
if explode is True:
for key, val in original.items():
@@ -520,7 +505,7 @@ def serialize_dict(original: Dict, explode: bool, field_name, existing: Optional
return existing
def serialize_form_data(field_name: str, data: dataclass) -> Dict[str, any]:
def serialize_form_data(field_name: str, data: Any) -> Dict[str, Any]:
form: Dict[str, List[str]] = {}
if is_dataclass(data):
@@ -562,7 +547,7 @@ def _get_form_field_name(obj_field: Field) -> str:
return obj_param_metadata.get("field_name", obj_field.name)
def _populate_form(field_name: str, explode: boolean, obj: any, get_field_name_func: Callable, delimiter: str) -> \
def _populate_form(field_name: str, explode: boolean, obj: Any, get_field_name_func: Callable, delimiter: str) -> \
Dict[str, List[str]]:
params: Dict[str, List[str]] = {}
@@ -597,7 +582,7 @@ def _populate_form(field_name: str, explode: boolean, obj: any, get_field_name_f
continue
if explode:
params[key] = _val_to_string(value)
params[key] = [_val_to_string(value)]
else:
items.append(f'{key}{delimiter}{_val_to_string(value)}')
@@ -626,7 +611,7 @@ def _populate_form(field_name: str, explode: boolean, obj: any, get_field_name_f
return params
def _serialize_header(explode: bool, obj: any) -> str:
def _serialize_header(explode: bool, obj: Any) -> str:
if obj is None:
return ''
@@ -850,7 +835,7 @@ def list_decoder(value_decoder: Callable):
def union_encoder(all_encoders: Dict[str, Callable]):
def selective_encoder(val: any):
def selective_encoder(val: Any):
if type(val) in all_encoders:
return all_encoders[type(val)](val)
return val
@@ -858,7 +843,7 @@ def union_encoder(all_encoders: Dict[str, Callable]):
def union_decoder(all_decoders: List[Callable]):
def selective_decoder(val: any):
def selective_decoder(val: Any):
decoded = val
for decoder in all_decoders:
try:
@@ -877,18 +862,18 @@ def get_field_name(name):
return override
def _val_to_string(val):
def _val_to_string(val) -> str:
if isinstance(val, bool):
return str(val).lower()
if isinstance(val, datetime):
return val.isoformat().replace('+00:00', 'Z')
return str(val.isoformat().replace('+00:00', 'Z'))
if isinstance(val, Enum):
return str(val.value)
return str(val)
def _populate_from_globals(param_name: str, value: any, param_type: str, gbls: Dict[str, Dict[str, Dict[str, Any]]]):
def _populate_from_globals(param_name: str, value: Any, param_type: str, gbls: Optional[Dict[str, Dict[str, Dict[str, Any]]]]):
if value is None and gbls is not None:
if 'parameters' in gbls:
if param_type in gbls['parameters']: