mirror of
https://github.com/LukeHagar/plexpy.git
synced 2025-12-07 04:20:54 +00:00
ci: regenerated with OpenAPI Doc 0.0.3, Speakeasy CLI 1.204.1
This commit is contained in:
@@ -4,7 +4,7 @@ import base64
|
||||
import json
|
||||
import re
|
||||
import sys
|
||||
from dataclasses import Field, dataclass, fields, is_dataclass, make_dataclass
|
||||
from dataclasses import Field, fields, is_dataclass, make_dataclass
|
||||
from datetime import date, datetime
|
||||
from decimal import Decimal
|
||||
from email.message import Message
|
||||
@@ -14,30 +14,15 @@ from typing import (Any, Callable, Dict, List, Optional, Tuple, Union,
|
||||
from xmlrpc.client import boolean
|
||||
from typing_inspect import is_optional_type
|
||||
import dateutil.parser
|
||||
import requests
|
||||
from dataclasses_json import DataClassJsonMixin
|
||||
|
||||
|
||||
class SecurityClient:
|
||||
client: requests.Session
|
||||
query_params: Dict[str, str] = {}
|
||||
def get_security(security: Any) -> Tuple[Dict[str, str], Dict[str, str]]:
|
||||
headers: Dict[str, str] = {}
|
||||
|
||||
def __init__(self, client: requests.Session):
|
||||
self.client = client
|
||||
|
||||
def send(self, request: requests.PreparedRequest, **kwargs):
|
||||
request.prepare_url(url=request.url, params=self.query_params)
|
||||
request.headers.update(self.headers)
|
||||
|
||||
return self.client.send(request, **kwargs)
|
||||
|
||||
|
||||
def configure_security_client(client: requests.Session, security: dataclass):
|
||||
client = SecurityClient(client)
|
||||
query_params: Dict[str, str] = {}
|
||||
|
||||
if security is None:
|
||||
return client
|
||||
return headers, query_params
|
||||
|
||||
sec_fields: Tuple[Field, ...] = fields(security)
|
||||
for sec_field in sec_fields:
|
||||
@@ -49,35 +34,35 @@ def configure_security_client(client: requests.Session, security: dataclass):
|
||||
if metadata is None:
|
||||
continue
|
||||
if metadata.get('option'):
|
||||
_parse_security_option(client, value)
|
||||
return client
|
||||
_parse_security_option(headers, query_params, value)
|
||||
return headers, query_params
|
||||
if metadata.get('scheme'):
|
||||
# Special case for basic auth which could be a flattened struct
|
||||
if metadata.get("sub_type") == "basic" and not is_dataclass(value):
|
||||
_parse_security_scheme(client, metadata, security)
|
||||
_parse_security_scheme(headers, query_params, metadata, security)
|
||||
else:
|
||||
_parse_security_scheme(client, metadata, value)
|
||||
_parse_security_scheme(headers, query_params, metadata, value)
|
||||
|
||||
return client
|
||||
return headers, query_params
|
||||
|
||||
|
||||
def _parse_security_option(client: SecurityClient, option: dataclass):
|
||||
def _parse_security_option(headers: Dict[str, str], query_params: Dict[str, str], option: Any):
|
||||
opt_fields: Tuple[Field, ...] = fields(option)
|
||||
for opt_field in opt_fields:
|
||||
metadata = opt_field.metadata.get('security')
|
||||
if metadata is None or metadata.get('scheme') is None:
|
||||
continue
|
||||
_parse_security_scheme(
|
||||
client, metadata, getattr(option, opt_field.name))
|
||||
headers, query_params, metadata, getattr(option, opt_field.name))
|
||||
|
||||
|
||||
def _parse_security_scheme(client: SecurityClient, scheme_metadata: Dict, scheme: any):
|
||||
def _parse_security_scheme(headers: Dict[str, str], query_params: Dict[str, str], scheme_metadata: Dict, scheme: Any):
|
||||
scheme_type = scheme_metadata.get('type')
|
||||
sub_type = scheme_metadata.get('sub_type')
|
||||
|
||||
if is_dataclass(scheme):
|
||||
if scheme_type == 'http' and sub_type == 'basic':
|
||||
_parse_basic_auth_scheme(client, scheme)
|
||||
_parse_basic_auth_scheme(headers, scheme)
|
||||
return
|
||||
|
||||
scheme_fields: Tuple[Field, ...] = fields(scheme)
|
||||
@@ -89,33 +74,33 @@ def _parse_security_scheme(client: SecurityClient, scheme_metadata: Dict, scheme
|
||||
value = getattr(scheme, scheme_field.name)
|
||||
|
||||
_parse_security_scheme_value(
|
||||
client, scheme_metadata, metadata, value)
|
||||
headers, query_params, scheme_metadata, metadata, value)
|
||||
else:
|
||||
_parse_security_scheme_value(
|
||||
client, scheme_metadata, scheme_metadata, scheme)
|
||||
headers, query_params, scheme_metadata, scheme_metadata, scheme)
|
||||
|
||||
|
||||
def _parse_security_scheme_value(client: SecurityClient, scheme_metadata: Dict, security_metadata: Dict, value: any):
|
||||
def _parse_security_scheme_value(headers: Dict[str, str], query_params: Dict[str, str], scheme_metadata: Dict, security_metadata: Dict, value: Any):
|
||||
scheme_type = scheme_metadata.get('type')
|
||||
sub_type = scheme_metadata.get('sub_type')
|
||||
|
||||
header_name = security_metadata.get('field_name')
|
||||
header_name = str(security_metadata.get('field_name'))
|
||||
|
||||
if scheme_type == "apiKey":
|
||||
if sub_type == 'header':
|
||||
client.headers[header_name] = value
|
||||
headers[header_name] = value
|
||||
elif sub_type == 'query':
|
||||
client.query_params[header_name] = value
|
||||
query_params[header_name] = value
|
||||
else:
|
||||
raise Exception('not supported')
|
||||
elif scheme_type == "openIdConnect":
|
||||
client.headers[header_name] = _apply_bearer(value)
|
||||
headers[header_name] = _apply_bearer(value)
|
||||
elif scheme_type == 'oauth2':
|
||||
if sub_type != 'client_credentials':
|
||||
client.headers[header_name] = _apply_bearer(value)
|
||||
headers[header_name] = _apply_bearer(value)
|
||||
elif scheme_type == 'http':
|
||||
if sub_type == 'bearer':
|
||||
client.headers[header_name] = _apply_bearer(value)
|
||||
headers[header_name] = _apply_bearer(value)
|
||||
else:
|
||||
raise Exception('not supported')
|
||||
else:
|
||||
@@ -126,7 +111,7 @@ def _apply_bearer(token: str) -> str:
|
||||
return token.lower().startswith('bearer ') and token or f'Bearer {token}'
|
||||
|
||||
|
||||
def _parse_basic_auth_scheme(client: SecurityClient, scheme: dataclass):
|
||||
def _parse_basic_auth_scheme(headers: Dict[str, str], scheme: Any):
|
||||
username = ""
|
||||
password = ""
|
||||
|
||||
@@ -145,11 +130,11 @@ def _parse_basic_auth_scheme(client: SecurityClient, scheme: dataclass):
|
||||
password = value
|
||||
|
||||
data = f'{username}:{password}'.encode()
|
||||
client.headers['Authorization'] = f'Basic {base64.b64encode(data).decode()}'
|
||||
headers['Authorization'] = f'Basic {base64.b64encode(data).decode()}'
|
||||
|
||||
|
||||
def generate_url(clazz: type, server_url: str, path: str, path_params: dataclass,
|
||||
gbls: Dict[str, Dict[str, Dict[str, Any]]] = None) -> str:
|
||||
def generate_url(clazz: type, server_url: str, path: str, path_params: Any,
|
||||
gbls: Optional[Dict[str, Dict[str, Dict[str, Any]]]] = None) -> str:
|
||||
path_param_fields: Tuple[Field, ...] = fields(clazz)
|
||||
for field in path_param_fields:
|
||||
request_metadata = field.metadata.get('request')
|
||||
@@ -241,7 +226,7 @@ def template_url(url_with_params: str, params: Dict[str, str]) -> str:
|
||||
return url_with_params
|
||||
|
||||
|
||||
def get_query_params(clazz: type, query_params: dataclass, gbls: Dict[str, Dict[str, Dict[str, Any]]] = None) -> Dict[
|
||||
def get_query_params(clazz: type, query_params: Any, gbls: Optional[Dict[str, Dict[str, Dict[str, Any]]]] = None) -> Dict[
|
||||
str, List[str]]:
|
||||
params: Dict[str, List[str]] = {}
|
||||
|
||||
@@ -287,7 +272,7 @@ def get_query_params(clazz: type, query_params: dataclass, gbls: Dict[str, Dict[
|
||||
return params
|
||||
|
||||
|
||||
def get_headers(headers_params: dataclass) -> Dict[str, str]:
|
||||
def get_headers(headers_params: Any) -> Dict[str, str]:
|
||||
if headers_params is None:
|
||||
return {}
|
||||
|
||||
@@ -308,7 +293,7 @@ def get_headers(headers_params: dataclass) -> Dict[str, str]:
|
||||
return headers
|
||||
|
||||
|
||||
def _get_serialized_params(metadata: Dict, field_type: type, field_name: str, obj: any) -> Dict[str, str]:
|
||||
def _get_serialized_params(metadata: Dict, field_type: type, field_name: str, obj: Any) -> Dict[str, str]:
|
||||
params: Dict[str, str] = {}
|
||||
|
||||
serialization = metadata.get('serialization', '')
|
||||
@@ -319,7 +304,7 @@ def _get_serialized_params(metadata: Dict, field_type: type, field_name: str, ob
|
||||
return params
|
||||
|
||||
|
||||
def _get_deep_object_query_params(metadata: Dict, field_name: str, obj: any) -> Dict[str, List[str]]:
|
||||
def _get_deep_object_query_params(metadata: Dict, field_name: str, obj: Any) -> Dict[str, List[str]]:
|
||||
params: Dict[str, List[str]] = {}
|
||||
|
||||
if obj is None:
|
||||
@@ -385,7 +370,7 @@ def _get_query_param_field_name(obj_field: Field) -> str:
|
||||
return obj_param_metadata.get("field_name", obj_field.name)
|
||||
|
||||
|
||||
def _get_delimited_query_params(metadata: Dict, field_name: str, obj: any, delimiter: str) -> Dict[
|
||||
def _get_delimited_query_params(metadata: Dict, field_name: str, obj: Any, delimiter: str) -> Dict[
|
||||
str, List[str]]:
|
||||
return _populate_form(field_name, metadata.get("explode", True), obj, _get_query_param_field_name, delimiter)
|
||||
|
||||
@@ -399,8 +384,8 @@ SERIALIZATION_METHOD_TO_CONTENT_TYPE = {
|
||||
}
|
||||
|
||||
|
||||
def serialize_request_body(request: dataclass, request_type: type, request_field_name: str, nullable: bool, optional: bool, serialization_method: str, encoder=None) -> Tuple[
|
||||
str, any, any]:
|
||||
def serialize_request_body(request: Any, request_type: type, request_field_name: str, nullable: bool, optional: bool, serialization_method: str, encoder=None) -> Tuple[
|
||||
Optional[str], Optional[Any], Optional[Any]]:
|
||||
if request is None:
|
||||
if not nullable and optional:
|
||||
return None, None, None
|
||||
@@ -430,7 +415,7 @@ def serialize_request_body(request: dataclass, request_type: type, request_field
|
||||
request_val)
|
||||
|
||||
|
||||
def serialize_content_type(field_name: str, request_type: any, media_type: str, request: dataclass, encoder=None) -> Tuple[str, any, List[List[any]]]:
|
||||
def serialize_content_type(field_name: str, request_type: Any, media_type: str, request: Any, encoder=None) -> Tuple[Optional[str], Optional[Any], Optional[List[List[Any]]]]:
|
||||
if re.match(r'(application|text)\/.*?\+*json.*', media_type) is not None:
|
||||
return media_type, marshal_json(request, request_type, encoder), None
|
||||
if re.match(r'multipart\/.*', media_type) is not None:
|
||||
@@ -446,8 +431,8 @@ def serialize_content_type(field_name: str, request_type: any, media_type: str,
|
||||
f"invalid request body type {type(request)} for mediaType {media_type}")
|
||||
|
||||
|
||||
def serialize_multipart_form(media_type: str, request: dataclass) -> Tuple[str, any, List[List[any]]]:
|
||||
form: List[List[any]] = []
|
||||
def serialize_multipart_form(media_type: str, request: Any) -> Tuple[str, Any, List[List[Any]]]:
|
||||
form: List[List[Any]] = []
|
||||
request_fields = fields(request)
|
||||
|
||||
for field in request_fields:
|
||||
@@ -502,7 +487,7 @@ def serialize_multipart_form(media_type: str, request: dataclass) -> Tuple[str,
|
||||
def serialize_dict(original: Dict, explode: bool, field_name, existing: Optional[Dict[str, List[str]]]) -> Dict[
|
||||
str, List[str]]:
|
||||
if existing is None:
|
||||
existing = []
|
||||
existing = {}
|
||||
|
||||
if explode is True:
|
||||
for key, val in original.items():
|
||||
@@ -520,7 +505,7 @@ def serialize_dict(original: Dict, explode: bool, field_name, existing: Optional
|
||||
return existing
|
||||
|
||||
|
||||
def serialize_form_data(field_name: str, data: dataclass) -> Dict[str, any]:
|
||||
def serialize_form_data(field_name: str, data: Any) -> Dict[str, Any]:
|
||||
form: Dict[str, List[str]] = {}
|
||||
|
||||
if is_dataclass(data):
|
||||
@@ -562,7 +547,7 @@ def _get_form_field_name(obj_field: Field) -> str:
|
||||
return obj_param_metadata.get("field_name", obj_field.name)
|
||||
|
||||
|
||||
def _populate_form(field_name: str, explode: boolean, obj: any, get_field_name_func: Callable, delimiter: str) -> \
|
||||
def _populate_form(field_name: str, explode: boolean, obj: Any, get_field_name_func: Callable, delimiter: str) -> \
|
||||
Dict[str, List[str]]:
|
||||
params: Dict[str, List[str]] = {}
|
||||
|
||||
@@ -597,7 +582,7 @@ def _populate_form(field_name: str, explode: boolean, obj: any, get_field_name_f
|
||||
continue
|
||||
|
||||
if explode:
|
||||
params[key] = _val_to_string(value)
|
||||
params[key] = [_val_to_string(value)]
|
||||
else:
|
||||
items.append(f'{key}{delimiter}{_val_to_string(value)}')
|
||||
|
||||
@@ -626,7 +611,7 @@ def _populate_form(field_name: str, explode: boolean, obj: any, get_field_name_f
|
||||
return params
|
||||
|
||||
|
||||
def _serialize_header(explode: bool, obj: any) -> str:
|
||||
def _serialize_header(explode: bool, obj: Any) -> str:
|
||||
if obj is None:
|
||||
return ''
|
||||
|
||||
@@ -850,7 +835,7 @@ def list_decoder(value_decoder: Callable):
|
||||
|
||||
|
||||
def union_encoder(all_encoders: Dict[str, Callable]):
|
||||
def selective_encoder(val: any):
|
||||
def selective_encoder(val: Any):
|
||||
if type(val) in all_encoders:
|
||||
return all_encoders[type(val)](val)
|
||||
return val
|
||||
@@ -858,7 +843,7 @@ def union_encoder(all_encoders: Dict[str, Callable]):
|
||||
|
||||
|
||||
def union_decoder(all_decoders: List[Callable]):
|
||||
def selective_decoder(val: any):
|
||||
def selective_decoder(val: Any):
|
||||
decoded = val
|
||||
for decoder in all_decoders:
|
||||
try:
|
||||
@@ -877,18 +862,18 @@ def get_field_name(name):
|
||||
return override
|
||||
|
||||
|
||||
def _val_to_string(val):
|
||||
def _val_to_string(val) -> str:
|
||||
if isinstance(val, bool):
|
||||
return str(val).lower()
|
||||
if isinstance(val, datetime):
|
||||
return val.isoformat().replace('+00:00', 'Z')
|
||||
return str(val.isoformat().replace('+00:00', 'Z'))
|
||||
if isinstance(val, Enum):
|
||||
return str(val.value)
|
||||
|
||||
return str(val)
|
||||
|
||||
|
||||
def _populate_from_globals(param_name: str, value: any, param_type: str, gbls: Dict[str, Dict[str, Dict[str, Any]]]):
|
||||
def _populate_from_globals(param_name: str, value: Any, param_type: str, gbls: Optional[Dict[str, Dict[str, Dict[str, Any]]]]):
|
||||
if value is None and gbls is not None:
|
||||
if 'parameters' in gbls:
|
||||
if param_type in gbls['parameters']:
|
||||
|
||||
Reference in New Issue
Block a user