Files
plexpy/src/plex_api_client/server.py

1849 lines
74 KiB
Python

"""Code generated by Speakeasy (https://speakeasy.com). DO NOT EDIT."""
from .basesdk import BaseSDK
from plex_api_client import utils
from plex_api_client._hooks import HookContext
from plex_api_client.models import errors, operations
from plex_api_client.types import BaseModel, OptionalNullable, UNSET
from typing import Any, Mapping, Optional, Union, cast
class Server(BaseSDK):
r"""Operations against the Plex Media Server System."""
def get_server_capabilities(
self,
*,
retries: OptionalNullable[utils.RetryConfig] = UNSET,
server_url: Optional[str] = None,
timeout_ms: Optional[int] = None,
http_headers: Optional[Mapping[str, str]] = None,
) -> operations.GetServerCapabilitiesResponse:
r"""Get Server Capabilities
Get Server Capabilities
:param retries: Override the default retry configuration for this method
:param server_url: Override the default server URL for this method
:param timeout_ms: Override the default request timeout configuration for this method in milliseconds
:param http_headers: Additional headers to set or replace on requests.
"""
base_url = None
url_variables = None
if timeout_ms is None:
timeout_ms = self.sdk_configuration.timeout_ms
if server_url is not None:
base_url = server_url
req = self._build_request(
method="GET",
path="/",
base_url=base_url,
url_variables=url_variables,
request=None,
request_body_required=False,
request_has_path_params=False,
request_has_query_params=True,
user_agent_header="user-agent",
accept_header_value="application/json",
http_headers=http_headers,
security=self.sdk_configuration.security,
timeout_ms=timeout_ms,
)
if retries == UNSET:
if self.sdk_configuration.retry_config is not UNSET:
retries = self.sdk_configuration.retry_config
retry_config = None
if isinstance(retries, utils.RetryConfig):
retry_config = (retries, ["429", "500", "502", "503", "504"])
http_res = self.do_request(
hook_ctx=HookContext(
operation_id="getServerCapabilities",
oauth2_scopes=[],
security_source=self.sdk_configuration.security,
),
request=req,
error_status_codes=["400", "401", "4XX", "5XX"],
retry_config=retry_config,
)
response_data: Any = None
if utils.match_response(http_res, "200", "application/json"):
return operations.GetServerCapabilitiesResponse(
object=utils.unmarshal_json(
http_res.text,
Optional[operations.GetServerCapabilitiesResponseBody],
),
status_code=http_res.status_code,
content_type=http_res.headers.get("Content-Type") or "",
raw_response=http_res,
)
if utils.match_response(http_res, "400", "application/json"):
response_data = utils.unmarshal_json(
http_res.text, errors.GetServerCapabilitiesBadRequestData
)
response_data.raw_response = http_res
raise errors.GetServerCapabilitiesBadRequest(data=response_data)
if utils.match_response(http_res, "401", "application/json"):
response_data = utils.unmarshal_json(
http_res.text, errors.GetServerCapabilitiesUnauthorizedData
)
response_data.raw_response = http_res
raise errors.GetServerCapabilitiesUnauthorized(data=response_data)
if utils.match_response(http_res, "4XX", "*"):
http_res_text = utils.stream_to_text(http_res)
raise errors.SDKError(
"API error occurred", http_res.status_code, http_res_text, http_res
)
if utils.match_response(http_res, "5XX", "*"):
http_res_text = utils.stream_to_text(http_res)
raise errors.SDKError(
"API error occurred", http_res.status_code, http_res_text, http_res
)
content_type = http_res.headers.get("Content-Type")
http_res_text = utils.stream_to_text(http_res)
raise errors.SDKError(
f"Unexpected response received (code: {http_res.status_code}, type: {content_type})",
http_res.status_code,
http_res_text,
http_res,
)
async def get_server_capabilities_async(
self,
*,
retries: OptionalNullable[utils.RetryConfig] = UNSET,
server_url: Optional[str] = None,
timeout_ms: Optional[int] = None,
http_headers: Optional[Mapping[str, str]] = None,
) -> operations.GetServerCapabilitiesResponse:
r"""Get Server Capabilities
Get Server Capabilities
:param retries: Override the default retry configuration for this method
:param server_url: Override the default server URL for this method
:param timeout_ms: Override the default request timeout configuration for this method in milliseconds
:param http_headers: Additional headers to set or replace on requests.
"""
base_url = None
url_variables = None
if timeout_ms is None:
timeout_ms = self.sdk_configuration.timeout_ms
if server_url is not None:
base_url = server_url
req = self._build_request_async(
method="GET",
path="/",
base_url=base_url,
url_variables=url_variables,
request=None,
request_body_required=False,
request_has_path_params=False,
request_has_query_params=True,
user_agent_header="user-agent",
accept_header_value="application/json",
http_headers=http_headers,
security=self.sdk_configuration.security,
timeout_ms=timeout_ms,
)
if retries == UNSET:
if self.sdk_configuration.retry_config is not UNSET:
retries = self.sdk_configuration.retry_config
retry_config = None
if isinstance(retries, utils.RetryConfig):
retry_config = (retries, ["429", "500", "502", "503", "504"])
http_res = await self.do_request_async(
hook_ctx=HookContext(
operation_id="getServerCapabilities",
oauth2_scopes=[],
security_source=self.sdk_configuration.security,
),
request=req,
error_status_codes=["400", "401", "4XX", "5XX"],
retry_config=retry_config,
)
response_data: Any = None
if utils.match_response(http_res, "200", "application/json"):
return operations.GetServerCapabilitiesResponse(
object=utils.unmarshal_json(
http_res.text,
Optional[operations.GetServerCapabilitiesResponseBody],
),
status_code=http_res.status_code,
content_type=http_res.headers.get("Content-Type") or "",
raw_response=http_res,
)
if utils.match_response(http_res, "400", "application/json"):
response_data = utils.unmarshal_json(
http_res.text, errors.GetServerCapabilitiesBadRequestData
)
response_data.raw_response = http_res
raise errors.GetServerCapabilitiesBadRequest(data=response_data)
if utils.match_response(http_res, "401", "application/json"):
response_data = utils.unmarshal_json(
http_res.text, errors.GetServerCapabilitiesUnauthorizedData
)
response_data.raw_response = http_res
raise errors.GetServerCapabilitiesUnauthorized(data=response_data)
if utils.match_response(http_res, "4XX", "*"):
http_res_text = await utils.stream_to_text_async(http_res)
raise errors.SDKError(
"API error occurred", http_res.status_code, http_res_text, http_res
)
if utils.match_response(http_res, "5XX", "*"):
http_res_text = await utils.stream_to_text_async(http_res)
raise errors.SDKError(
"API error occurred", http_res.status_code, http_res_text, http_res
)
content_type = http_res.headers.get("Content-Type")
http_res_text = await utils.stream_to_text_async(http_res)
raise errors.SDKError(
f"Unexpected response received (code: {http_res.status_code}, type: {content_type})",
http_res.status_code,
http_res_text,
http_res,
)
def get_server_preferences(
self,
*,
retries: OptionalNullable[utils.RetryConfig] = UNSET,
server_url: Optional[str] = None,
timeout_ms: Optional[int] = None,
http_headers: Optional[Mapping[str, str]] = None,
) -> operations.GetServerPreferencesResponse:
r"""Get Server Preferences
Get Server Preferences
:param retries: Override the default retry configuration for this method
:param server_url: Override the default server URL for this method
:param timeout_ms: Override the default request timeout configuration for this method in milliseconds
:param http_headers: Additional headers to set or replace on requests.
"""
base_url = None
url_variables = None
if timeout_ms is None:
timeout_ms = self.sdk_configuration.timeout_ms
if server_url is not None:
base_url = server_url
req = self._build_request(
method="GET",
path="/:/prefs",
base_url=base_url,
url_variables=url_variables,
request=None,
request_body_required=False,
request_has_path_params=False,
request_has_query_params=True,
user_agent_header="user-agent",
accept_header_value="application/json",
http_headers=http_headers,
security=self.sdk_configuration.security,
timeout_ms=timeout_ms,
)
if retries == UNSET:
if self.sdk_configuration.retry_config is not UNSET:
retries = self.sdk_configuration.retry_config
retry_config = None
if isinstance(retries, utils.RetryConfig):
retry_config = (retries, ["429", "500", "502", "503", "504"])
http_res = self.do_request(
hook_ctx=HookContext(
operation_id="getServerPreferences",
oauth2_scopes=[],
security_source=self.sdk_configuration.security,
),
request=req,
error_status_codes=["400", "401", "4XX", "5XX"],
retry_config=retry_config,
)
response_data: Any = None
if utils.match_response(http_res, "200", "application/json"):
return operations.GetServerPreferencesResponse(
object=utils.unmarshal_json(
http_res.text, Optional[operations.GetServerPreferencesResponseBody]
),
status_code=http_res.status_code,
content_type=http_res.headers.get("Content-Type") or "",
raw_response=http_res,
)
if utils.match_response(http_res, "400", "application/json"):
response_data = utils.unmarshal_json(
http_res.text, errors.GetServerPreferencesBadRequestData
)
response_data.raw_response = http_res
raise errors.GetServerPreferencesBadRequest(data=response_data)
if utils.match_response(http_res, "401", "application/json"):
response_data = utils.unmarshal_json(
http_res.text, errors.GetServerPreferencesUnauthorizedData
)
response_data.raw_response = http_res
raise errors.GetServerPreferencesUnauthorized(data=response_data)
if utils.match_response(http_res, "4XX", "*"):
http_res_text = utils.stream_to_text(http_res)
raise errors.SDKError(
"API error occurred", http_res.status_code, http_res_text, http_res
)
if utils.match_response(http_res, "5XX", "*"):
http_res_text = utils.stream_to_text(http_res)
raise errors.SDKError(
"API error occurred", http_res.status_code, http_res_text, http_res
)
content_type = http_res.headers.get("Content-Type")
http_res_text = utils.stream_to_text(http_res)
raise errors.SDKError(
f"Unexpected response received (code: {http_res.status_code}, type: {content_type})",
http_res.status_code,
http_res_text,
http_res,
)
async def get_server_preferences_async(
self,
*,
retries: OptionalNullable[utils.RetryConfig] = UNSET,
server_url: Optional[str] = None,
timeout_ms: Optional[int] = None,
http_headers: Optional[Mapping[str, str]] = None,
) -> operations.GetServerPreferencesResponse:
r"""Get Server Preferences
Get Server Preferences
:param retries: Override the default retry configuration for this method
:param server_url: Override the default server URL for this method
:param timeout_ms: Override the default request timeout configuration for this method in milliseconds
:param http_headers: Additional headers to set or replace on requests.
"""
base_url = None
url_variables = None
if timeout_ms is None:
timeout_ms = self.sdk_configuration.timeout_ms
if server_url is not None:
base_url = server_url
req = self._build_request_async(
method="GET",
path="/:/prefs",
base_url=base_url,
url_variables=url_variables,
request=None,
request_body_required=False,
request_has_path_params=False,
request_has_query_params=True,
user_agent_header="user-agent",
accept_header_value="application/json",
http_headers=http_headers,
security=self.sdk_configuration.security,
timeout_ms=timeout_ms,
)
if retries == UNSET:
if self.sdk_configuration.retry_config is not UNSET:
retries = self.sdk_configuration.retry_config
retry_config = None
if isinstance(retries, utils.RetryConfig):
retry_config = (retries, ["429", "500", "502", "503", "504"])
http_res = await self.do_request_async(
hook_ctx=HookContext(
operation_id="getServerPreferences",
oauth2_scopes=[],
security_source=self.sdk_configuration.security,
),
request=req,
error_status_codes=["400", "401", "4XX", "5XX"],
retry_config=retry_config,
)
response_data: Any = None
if utils.match_response(http_res, "200", "application/json"):
return operations.GetServerPreferencesResponse(
object=utils.unmarshal_json(
http_res.text, Optional[operations.GetServerPreferencesResponseBody]
),
status_code=http_res.status_code,
content_type=http_res.headers.get("Content-Type") or "",
raw_response=http_res,
)
if utils.match_response(http_res, "400", "application/json"):
response_data = utils.unmarshal_json(
http_res.text, errors.GetServerPreferencesBadRequestData
)
response_data.raw_response = http_res
raise errors.GetServerPreferencesBadRequest(data=response_data)
if utils.match_response(http_res, "401", "application/json"):
response_data = utils.unmarshal_json(
http_res.text, errors.GetServerPreferencesUnauthorizedData
)
response_data.raw_response = http_res
raise errors.GetServerPreferencesUnauthorized(data=response_data)
if utils.match_response(http_res, "4XX", "*"):
http_res_text = await utils.stream_to_text_async(http_res)
raise errors.SDKError(
"API error occurred", http_res.status_code, http_res_text, http_res
)
if utils.match_response(http_res, "5XX", "*"):
http_res_text = await utils.stream_to_text_async(http_res)
raise errors.SDKError(
"API error occurred", http_res.status_code, http_res_text, http_res
)
content_type = http_res.headers.get("Content-Type")
http_res_text = await utils.stream_to_text_async(http_res)
raise errors.SDKError(
f"Unexpected response received (code: {http_res.status_code}, type: {content_type})",
http_res.status_code,
http_res_text,
http_res,
)
def get_available_clients(
self,
*,
retries: OptionalNullable[utils.RetryConfig] = UNSET,
server_url: Optional[str] = None,
timeout_ms: Optional[int] = None,
http_headers: Optional[Mapping[str, str]] = None,
) -> operations.GetAvailableClientsResponse:
r"""Get Available Clients
Get Available Clients
:param retries: Override the default retry configuration for this method
:param server_url: Override the default server URL for this method
:param timeout_ms: Override the default request timeout configuration for this method in milliseconds
:param http_headers: Additional headers to set or replace on requests.
"""
base_url = None
url_variables = None
if timeout_ms is None:
timeout_ms = self.sdk_configuration.timeout_ms
if server_url is not None:
base_url = server_url
req = self._build_request(
method="GET",
path="/clients",
base_url=base_url,
url_variables=url_variables,
request=None,
request_body_required=False,
request_has_path_params=False,
request_has_query_params=True,
user_agent_header="user-agent",
accept_header_value="application/json",
http_headers=http_headers,
security=self.sdk_configuration.security,
timeout_ms=timeout_ms,
)
if retries == UNSET:
if self.sdk_configuration.retry_config is not UNSET:
retries = self.sdk_configuration.retry_config
retry_config = None
if isinstance(retries, utils.RetryConfig):
retry_config = (retries, ["429", "500", "502", "503", "504"])
http_res = self.do_request(
hook_ctx=HookContext(
operation_id="getAvailableClients",
oauth2_scopes=[],
security_source=self.sdk_configuration.security,
),
request=req,
error_status_codes=["400", "401", "4XX", "5XX"],
retry_config=retry_config,
)
response_data: Any = None
if utils.match_response(http_res, "200", "application/json"):
return operations.GetAvailableClientsResponse(
object=utils.unmarshal_json(
http_res.text, Optional[operations.GetAvailableClientsResponseBody]
),
status_code=http_res.status_code,
content_type=http_res.headers.get("Content-Type") or "",
raw_response=http_res,
)
if utils.match_response(http_res, "400", "application/json"):
response_data = utils.unmarshal_json(
http_res.text, errors.GetAvailableClientsBadRequestData
)
response_data.raw_response = http_res
raise errors.GetAvailableClientsBadRequest(data=response_data)
if utils.match_response(http_res, "401", "application/json"):
response_data = utils.unmarshal_json(
http_res.text, errors.GetAvailableClientsUnauthorizedData
)
response_data.raw_response = http_res
raise errors.GetAvailableClientsUnauthorized(data=response_data)
if utils.match_response(http_res, "4XX", "*"):
http_res_text = utils.stream_to_text(http_res)
raise errors.SDKError(
"API error occurred", http_res.status_code, http_res_text, http_res
)
if utils.match_response(http_res, "5XX", "*"):
http_res_text = utils.stream_to_text(http_res)
raise errors.SDKError(
"API error occurred", http_res.status_code, http_res_text, http_res
)
content_type = http_res.headers.get("Content-Type")
http_res_text = utils.stream_to_text(http_res)
raise errors.SDKError(
f"Unexpected response received (code: {http_res.status_code}, type: {content_type})",
http_res.status_code,
http_res_text,
http_res,
)
async def get_available_clients_async(
self,
*,
retries: OptionalNullable[utils.RetryConfig] = UNSET,
server_url: Optional[str] = None,
timeout_ms: Optional[int] = None,
http_headers: Optional[Mapping[str, str]] = None,
) -> operations.GetAvailableClientsResponse:
r"""Get Available Clients
Get Available Clients
:param retries: Override the default retry configuration for this method
:param server_url: Override the default server URL for this method
:param timeout_ms: Override the default request timeout configuration for this method in milliseconds
:param http_headers: Additional headers to set or replace on requests.
"""
base_url = None
url_variables = None
if timeout_ms is None:
timeout_ms = self.sdk_configuration.timeout_ms
if server_url is not None:
base_url = server_url
req = self._build_request_async(
method="GET",
path="/clients",
base_url=base_url,
url_variables=url_variables,
request=None,
request_body_required=False,
request_has_path_params=False,
request_has_query_params=True,
user_agent_header="user-agent",
accept_header_value="application/json",
http_headers=http_headers,
security=self.sdk_configuration.security,
timeout_ms=timeout_ms,
)
if retries == UNSET:
if self.sdk_configuration.retry_config is not UNSET:
retries = self.sdk_configuration.retry_config
retry_config = None
if isinstance(retries, utils.RetryConfig):
retry_config = (retries, ["429", "500", "502", "503", "504"])
http_res = await self.do_request_async(
hook_ctx=HookContext(
operation_id="getAvailableClients",
oauth2_scopes=[],
security_source=self.sdk_configuration.security,
),
request=req,
error_status_codes=["400", "401", "4XX", "5XX"],
retry_config=retry_config,
)
response_data: Any = None
if utils.match_response(http_res, "200", "application/json"):
return operations.GetAvailableClientsResponse(
object=utils.unmarshal_json(
http_res.text, Optional[operations.GetAvailableClientsResponseBody]
),
status_code=http_res.status_code,
content_type=http_res.headers.get("Content-Type") or "",
raw_response=http_res,
)
if utils.match_response(http_res, "400", "application/json"):
response_data = utils.unmarshal_json(
http_res.text, errors.GetAvailableClientsBadRequestData
)
response_data.raw_response = http_res
raise errors.GetAvailableClientsBadRequest(data=response_data)
if utils.match_response(http_res, "401", "application/json"):
response_data = utils.unmarshal_json(
http_res.text, errors.GetAvailableClientsUnauthorizedData
)
response_data.raw_response = http_res
raise errors.GetAvailableClientsUnauthorized(data=response_data)
if utils.match_response(http_res, "4XX", "*"):
http_res_text = await utils.stream_to_text_async(http_res)
raise errors.SDKError(
"API error occurred", http_res.status_code, http_res_text, http_res
)
if utils.match_response(http_res, "5XX", "*"):
http_res_text = await utils.stream_to_text_async(http_res)
raise errors.SDKError(
"API error occurred", http_res.status_code, http_res_text, http_res
)
content_type = http_res.headers.get("Content-Type")
http_res_text = await utils.stream_to_text_async(http_res)
raise errors.SDKError(
f"Unexpected response received (code: {http_res.status_code}, type: {content_type})",
http_res.status_code,
http_res_text,
http_res,
)
def get_devices(
self,
*,
retries: OptionalNullable[utils.RetryConfig] = UNSET,
server_url: Optional[str] = None,
timeout_ms: Optional[int] = None,
http_headers: Optional[Mapping[str, str]] = None,
) -> operations.GetDevicesResponse:
r"""Get Devices
Get Devices
:param retries: Override the default retry configuration for this method
:param server_url: Override the default server URL for this method
:param timeout_ms: Override the default request timeout configuration for this method in milliseconds
:param http_headers: Additional headers to set or replace on requests.
"""
base_url = None
url_variables = None
if timeout_ms is None:
timeout_ms = self.sdk_configuration.timeout_ms
if server_url is not None:
base_url = server_url
req = self._build_request(
method="GET",
path="/devices",
base_url=base_url,
url_variables=url_variables,
request=None,
request_body_required=False,
request_has_path_params=False,
request_has_query_params=True,
user_agent_header="user-agent",
accept_header_value="application/json",
http_headers=http_headers,
security=self.sdk_configuration.security,
timeout_ms=timeout_ms,
)
if retries == UNSET:
if self.sdk_configuration.retry_config is not UNSET:
retries = self.sdk_configuration.retry_config
retry_config = None
if isinstance(retries, utils.RetryConfig):
retry_config = (retries, ["429", "500", "502", "503", "504"])
http_res = self.do_request(
hook_ctx=HookContext(
operation_id="getDevices",
oauth2_scopes=[],
security_source=self.sdk_configuration.security,
),
request=req,
error_status_codes=["400", "401", "4XX", "5XX"],
retry_config=retry_config,
)
response_data: Any = None
if utils.match_response(http_res, "200", "application/json"):
return operations.GetDevicesResponse(
object=utils.unmarshal_json(
http_res.text, Optional[operations.GetDevicesResponseBody]
),
status_code=http_res.status_code,
content_type=http_res.headers.get("Content-Type") or "",
raw_response=http_res,
)
if utils.match_response(http_res, "400", "application/json"):
response_data = utils.unmarshal_json(
http_res.text, errors.GetDevicesBadRequestData
)
response_data.raw_response = http_res
raise errors.GetDevicesBadRequest(data=response_data)
if utils.match_response(http_res, "401", "application/json"):
response_data = utils.unmarshal_json(
http_res.text, errors.GetDevicesUnauthorizedData
)
response_data.raw_response = http_res
raise errors.GetDevicesUnauthorized(data=response_data)
if utils.match_response(http_res, "4XX", "*"):
http_res_text = utils.stream_to_text(http_res)
raise errors.SDKError(
"API error occurred", http_res.status_code, http_res_text, http_res
)
if utils.match_response(http_res, "5XX", "*"):
http_res_text = utils.stream_to_text(http_res)
raise errors.SDKError(
"API error occurred", http_res.status_code, http_res_text, http_res
)
content_type = http_res.headers.get("Content-Type")
http_res_text = utils.stream_to_text(http_res)
raise errors.SDKError(
f"Unexpected response received (code: {http_res.status_code}, type: {content_type})",
http_res.status_code,
http_res_text,
http_res,
)
async def get_devices_async(
self,
*,
retries: OptionalNullable[utils.RetryConfig] = UNSET,
server_url: Optional[str] = None,
timeout_ms: Optional[int] = None,
http_headers: Optional[Mapping[str, str]] = None,
) -> operations.GetDevicesResponse:
r"""Get Devices
Get Devices
:param retries: Override the default retry configuration for this method
:param server_url: Override the default server URL for this method
:param timeout_ms: Override the default request timeout configuration for this method in milliseconds
:param http_headers: Additional headers to set or replace on requests.
"""
base_url = None
url_variables = None
if timeout_ms is None:
timeout_ms = self.sdk_configuration.timeout_ms
if server_url is not None:
base_url = server_url
req = self._build_request_async(
method="GET",
path="/devices",
base_url=base_url,
url_variables=url_variables,
request=None,
request_body_required=False,
request_has_path_params=False,
request_has_query_params=True,
user_agent_header="user-agent",
accept_header_value="application/json",
http_headers=http_headers,
security=self.sdk_configuration.security,
timeout_ms=timeout_ms,
)
if retries == UNSET:
if self.sdk_configuration.retry_config is not UNSET:
retries = self.sdk_configuration.retry_config
retry_config = None
if isinstance(retries, utils.RetryConfig):
retry_config = (retries, ["429", "500", "502", "503", "504"])
http_res = await self.do_request_async(
hook_ctx=HookContext(
operation_id="getDevices",
oauth2_scopes=[],
security_source=self.sdk_configuration.security,
),
request=req,
error_status_codes=["400", "401", "4XX", "5XX"],
retry_config=retry_config,
)
response_data: Any = None
if utils.match_response(http_res, "200", "application/json"):
return operations.GetDevicesResponse(
object=utils.unmarshal_json(
http_res.text, Optional[operations.GetDevicesResponseBody]
),
status_code=http_res.status_code,
content_type=http_res.headers.get("Content-Type") or "",
raw_response=http_res,
)
if utils.match_response(http_res, "400", "application/json"):
response_data = utils.unmarshal_json(
http_res.text, errors.GetDevicesBadRequestData
)
response_data.raw_response = http_res
raise errors.GetDevicesBadRequest(data=response_data)
if utils.match_response(http_res, "401", "application/json"):
response_data = utils.unmarshal_json(
http_res.text, errors.GetDevicesUnauthorizedData
)
response_data.raw_response = http_res
raise errors.GetDevicesUnauthorized(data=response_data)
if utils.match_response(http_res, "4XX", "*"):
http_res_text = await utils.stream_to_text_async(http_res)
raise errors.SDKError(
"API error occurred", http_res.status_code, http_res_text, http_res
)
if utils.match_response(http_res, "5XX", "*"):
http_res_text = await utils.stream_to_text_async(http_res)
raise errors.SDKError(
"API error occurred", http_res.status_code, http_res_text, http_res
)
content_type = http_res.headers.get("Content-Type")
http_res_text = await utils.stream_to_text_async(http_res)
raise errors.SDKError(
f"Unexpected response received (code: {http_res.status_code}, type: {content_type})",
http_res.status_code,
http_res_text,
http_res,
)
def get_server_identity(
self,
*,
retries: OptionalNullable[utils.RetryConfig] = UNSET,
server_url: Optional[str] = None,
timeout_ms: Optional[int] = None,
http_headers: Optional[Mapping[str, str]] = None,
) -> operations.GetServerIdentityResponse:
r"""Get Server Identity
This request is useful to determine if the server is online or offline
:param retries: Override the default retry configuration for this method
:param server_url: Override the default server URL for this method
:param timeout_ms: Override the default request timeout configuration for this method in milliseconds
:param http_headers: Additional headers to set or replace on requests.
"""
base_url = None
url_variables = None
if timeout_ms is None:
timeout_ms = self.sdk_configuration.timeout_ms
if server_url is not None:
base_url = server_url
req = self._build_request(
method="GET",
path="/identity",
base_url=base_url,
url_variables=url_variables,
request=None,
request_body_required=False,
request_has_path_params=False,
request_has_query_params=False,
user_agent_header="user-agent",
accept_header_value="application/json",
http_headers=http_headers,
timeout_ms=timeout_ms,
)
if retries == UNSET:
if self.sdk_configuration.retry_config is not UNSET:
retries = self.sdk_configuration.retry_config
retry_config = None
if isinstance(retries, utils.RetryConfig):
retry_config = (retries, ["429", "500", "502", "503", "504"])
http_res = self.do_request(
hook_ctx=HookContext(
operation_id="get-server-identity",
oauth2_scopes=[],
security_source=None,
),
request=req,
error_status_codes=["408", "4XX", "5XX"],
retry_config=retry_config,
)
response_data: Any = None
if utils.match_response(http_res, "200", "application/json"):
return operations.GetServerIdentityResponse(
object=utils.unmarshal_json(
http_res.text, Optional[operations.GetServerIdentityResponseBody]
),
status_code=http_res.status_code,
content_type=http_res.headers.get("Content-Type") or "",
raw_response=http_res,
)
if utils.match_response(http_res, "408", "application/json"):
response_data = utils.unmarshal_json(
http_res.text, errors.GetServerIdentityRequestTimeoutData
)
response_data.raw_response = http_res
raise errors.GetServerIdentityRequestTimeout(data=response_data)
if utils.match_response(http_res, "4XX", "*"):
http_res_text = utils.stream_to_text(http_res)
raise errors.SDKError(
"API error occurred", http_res.status_code, http_res_text, http_res
)
if utils.match_response(http_res, "5XX", "*"):
http_res_text = utils.stream_to_text(http_res)
raise errors.SDKError(
"API error occurred", http_res.status_code, http_res_text, http_res
)
content_type = http_res.headers.get("Content-Type")
http_res_text = utils.stream_to_text(http_res)
raise errors.SDKError(
f"Unexpected response received (code: {http_res.status_code}, type: {content_type})",
http_res.status_code,
http_res_text,
http_res,
)
async def get_server_identity_async(
self,
*,
retries: OptionalNullable[utils.RetryConfig] = UNSET,
server_url: Optional[str] = None,
timeout_ms: Optional[int] = None,
http_headers: Optional[Mapping[str, str]] = None,
) -> operations.GetServerIdentityResponse:
r"""Get Server Identity
This request is useful to determine if the server is online or offline
:param retries: Override the default retry configuration for this method
:param server_url: Override the default server URL for this method
:param timeout_ms: Override the default request timeout configuration for this method in milliseconds
:param http_headers: Additional headers to set or replace on requests.
"""
base_url = None
url_variables = None
if timeout_ms is None:
timeout_ms = self.sdk_configuration.timeout_ms
if server_url is not None:
base_url = server_url
req = self._build_request_async(
method="GET",
path="/identity",
base_url=base_url,
url_variables=url_variables,
request=None,
request_body_required=False,
request_has_path_params=False,
request_has_query_params=False,
user_agent_header="user-agent",
accept_header_value="application/json",
http_headers=http_headers,
timeout_ms=timeout_ms,
)
if retries == UNSET:
if self.sdk_configuration.retry_config is not UNSET:
retries = self.sdk_configuration.retry_config
retry_config = None
if isinstance(retries, utils.RetryConfig):
retry_config = (retries, ["429", "500", "502", "503", "504"])
http_res = await self.do_request_async(
hook_ctx=HookContext(
operation_id="get-server-identity",
oauth2_scopes=[],
security_source=None,
),
request=req,
error_status_codes=["408", "4XX", "5XX"],
retry_config=retry_config,
)
response_data: Any = None
if utils.match_response(http_res, "200", "application/json"):
return operations.GetServerIdentityResponse(
object=utils.unmarshal_json(
http_res.text, Optional[operations.GetServerIdentityResponseBody]
),
status_code=http_res.status_code,
content_type=http_res.headers.get("Content-Type") or "",
raw_response=http_res,
)
if utils.match_response(http_res, "408", "application/json"):
response_data = utils.unmarshal_json(
http_res.text, errors.GetServerIdentityRequestTimeoutData
)
response_data.raw_response = http_res
raise errors.GetServerIdentityRequestTimeout(data=response_data)
if utils.match_response(http_res, "4XX", "*"):
http_res_text = await utils.stream_to_text_async(http_res)
raise errors.SDKError(
"API error occurred", http_res.status_code, http_res_text, http_res
)
if utils.match_response(http_res, "5XX", "*"):
http_res_text = await utils.stream_to_text_async(http_res)
raise errors.SDKError(
"API error occurred", http_res.status_code, http_res_text, http_res
)
content_type = http_res.headers.get("Content-Type")
http_res_text = await utils.stream_to_text_async(http_res)
raise errors.SDKError(
f"Unexpected response received (code: {http_res.status_code}, type: {content_type})",
http_res.status_code,
http_res_text,
http_res,
)
def get_my_plex_account(
self,
*,
retries: OptionalNullable[utils.RetryConfig] = UNSET,
server_url: Optional[str] = None,
timeout_ms: Optional[int] = None,
http_headers: Optional[Mapping[str, str]] = None,
) -> operations.GetMyPlexAccountResponse:
r"""Get MyPlex Account
Returns MyPlex Account Information
:param retries: Override the default retry configuration for this method
:param server_url: Override the default server URL for this method
:param timeout_ms: Override the default request timeout configuration for this method in milliseconds
:param http_headers: Additional headers to set or replace on requests.
"""
base_url = None
url_variables = None
if timeout_ms is None:
timeout_ms = self.sdk_configuration.timeout_ms
if server_url is not None:
base_url = server_url
req = self._build_request(
method="GET",
path="/myplex/account",
base_url=base_url,
url_variables=url_variables,
request=None,
request_body_required=False,
request_has_path_params=False,
request_has_query_params=True,
user_agent_header="user-agent",
accept_header_value="application/json",
http_headers=http_headers,
security=self.sdk_configuration.security,
timeout_ms=timeout_ms,
)
if retries == UNSET:
if self.sdk_configuration.retry_config is not UNSET:
retries = self.sdk_configuration.retry_config
retry_config = None
if isinstance(retries, utils.RetryConfig):
retry_config = (retries, ["429", "500", "502", "503", "504"])
http_res = self.do_request(
hook_ctx=HookContext(
operation_id="getMyPlexAccount",
oauth2_scopes=[],
security_source=self.sdk_configuration.security,
),
request=req,
error_status_codes=["400", "401", "4XX", "5XX"],
retry_config=retry_config,
)
response_data: Any = None
if utils.match_response(http_res, "200", "application/json"):
return operations.GetMyPlexAccountResponse(
object=utils.unmarshal_json(
http_res.text, Optional[operations.GetMyPlexAccountResponseBody]
),
status_code=http_res.status_code,
content_type=http_res.headers.get("Content-Type") or "",
raw_response=http_res,
)
if utils.match_response(http_res, "400", "application/json"):
response_data = utils.unmarshal_json(
http_res.text, errors.GetMyPlexAccountBadRequestData
)
response_data.raw_response = http_res
raise errors.GetMyPlexAccountBadRequest(data=response_data)
if utils.match_response(http_res, "401", "application/json"):
response_data = utils.unmarshal_json(
http_res.text, errors.GetMyPlexAccountUnauthorizedData
)
response_data.raw_response = http_res
raise errors.GetMyPlexAccountUnauthorized(data=response_data)
if utils.match_response(http_res, "4XX", "*"):
http_res_text = utils.stream_to_text(http_res)
raise errors.SDKError(
"API error occurred", http_res.status_code, http_res_text, http_res
)
if utils.match_response(http_res, "5XX", "*"):
http_res_text = utils.stream_to_text(http_res)
raise errors.SDKError(
"API error occurred", http_res.status_code, http_res_text, http_res
)
content_type = http_res.headers.get("Content-Type")
http_res_text = utils.stream_to_text(http_res)
raise errors.SDKError(
f"Unexpected response received (code: {http_res.status_code}, type: {content_type})",
http_res.status_code,
http_res_text,
http_res,
)
async def get_my_plex_account_async(
self,
*,
retries: OptionalNullable[utils.RetryConfig] = UNSET,
server_url: Optional[str] = None,
timeout_ms: Optional[int] = None,
http_headers: Optional[Mapping[str, str]] = None,
) -> operations.GetMyPlexAccountResponse:
r"""Get MyPlex Account
Returns MyPlex Account Information
:param retries: Override the default retry configuration for this method
:param server_url: Override the default server URL for this method
:param timeout_ms: Override the default request timeout configuration for this method in milliseconds
:param http_headers: Additional headers to set or replace on requests.
"""
base_url = None
url_variables = None
if timeout_ms is None:
timeout_ms = self.sdk_configuration.timeout_ms
if server_url is not None:
base_url = server_url
req = self._build_request_async(
method="GET",
path="/myplex/account",
base_url=base_url,
url_variables=url_variables,
request=None,
request_body_required=False,
request_has_path_params=False,
request_has_query_params=True,
user_agent_header="user-agent",
accept_header_value="application/json",
http_headers=http_headers,
security=self.sdk_configuration.security,
timeout_ms=timeout_ms,
)
if retries == UNSET:
if self.sdk_configuration.retry_config is not UNSET:
retries = self.sdk_configuration.retry_config
retry_config = None
if isinstance(retries, utils.RetryConfig):
retry_config = (retries, ["429", "500", "502", "503", "504"])
http_res = await self.do_request_async(
hook_ctx=HookContext(
operation_id="getMyPlexAccount",
oauth2_scopes=[],
security_source=self.sdk_configuration.security,
),
request=req,
error_status_codes=["400", "401", "4XX", "5XX"],
retry_config=retry_config,
)
response_data: Any = None
if utils.match_response(http_res, "200", "application/json"):
return operations.GetMyPlexAccountResponse(
object=utils.unmarshal_json(
http_res.text, Optional[operations.GetMyPlexAccountResponseBody]
),
status_code=http_res.status_code,
content_type=http_res.headers.get("Content-Type") or "",
raw_response=http_res,
)
if utils.match_response(http_res, "400", "application/json"):
response_data = utils.unmarshal_json(
http_res.text, errors.GetMyPlexAccountBadRequestData
)
response_data.raw_response = http_res
raise errors.GetMyPlexAccountBadRequest(data=response_data)
if utils.match_response(http_res, "401", "application/json"):
response_data = utils.unmarshal_json(
http_res.text, errors.GetMyPlexAccountUnauthorizedData
)
response_data.raw_response = http_res
raise errors.GetMyPlexAccountUnauthorized(data=response_data)
if utils.match_response(http_res, "4XX", "*"):
http_res_text = await utils.stream_to_text_async(http_res)
raise errors.SDKError(
"API error occurred", http_res.status_code, http_res_text, http_res
)
if utils.match_response(http_res, "5XX", "*"):
http_res_text = await utils.stream_to_text_async(http_res)
raise errors.SDKError(
"API error occurred", http_res.status_code, http_res_text, http_res
)
content_type = http_res.headers.get("Content-Type")
http_res_text = await utils.stream_to_text_async(http_res)
raise errors.SDKError(
f"Unexpected response received (code: {http_res.status_code}, type: {content_type})",
http_res.status_code,
http_res_text,
http_res,
)
def get_resized_photo(
self,
*,
request: Union[
operations.GetResizedPhotoRequest,
operations.GetResizedPhotoRequestTypedDict,
],
retries: OptionalNullable[utils.RetryConfig] = UNSET,
server_url: Optional[str] = None,
timeout_ms: Optional[int] = None,
http_headers: Optional[Mapping[str, str]] = None,
) -> operations.GetResizedPhotoResponse:
r"""Get a Resized Photo
Plex's Photo transcoder is used throughout the service to serve images at specified sizes.
:param request: The request object to send.
:param retries: Override the default retry configuration for this method
:param server_url: Override the default server URL for this method
:param timeout_ms: Override the default request timeout configuration for this method in milliseconds
:param http_headers: Additional headers to set or replace on requests.
"""
base_url = None
url_variables = None
if timeout_ms is None:
timeout_ms = self.sdk_configuration.timeout_ms
if server_url is not None:
base_url = server_url
if not isinstance(request, BaseModel):
request = utils.unmarshal(request, operations.GetResizedPhotoRequest)
request = cast(operations.GetResizedPhotoRequest, request)
req = self._build_request(
method="GET",
path="/photo/:/transcode",
base_url=base_url,
url_variables=url_variables,
request=request,
request_body_required=False,
request_has_path_params=False,
request_has_query_params=True,
user_agent_header="user-agent",
accept_header_value="application/json",
http_headers=http_headers,
security=self.sdk_configuration.security,
timeout_ms=timeout_ms,
)
if retries == UNSET:
if self.sdk_configuration.retry_config is not UNSET:
retries = self.sdk_configuration.retry_config
retry_config = None
if isinstance(retries, utils.RetryConfig):
retry_config = (retries, ["429", "500", "502", "503", "504"])
http_res = self.do_request(
hook_ctx=HookContext(
operation_id="getResizedPhoto",
oauth2_scopes=[],
security_source=self.sdk_configuration.security,
),
request=req,
error_status_codes=["400", "401", "4XX", "5XX"],
retry_config=retry_config,
)
response_data: Any = None
if utils.match_response(http_res, "200", "*"):
return operations.GetResizedPhotoResponse(
status_code=http_res.status_code,
content_type=http_res.headers.get("Content-Type") or "",
raw_response=http_res,
)
if utils.match_response(http_res, "400", "application/json"):
response_data = utils.unmarshal_json(
http_res.text, errors.GetResizedPhotoBadRequestData
)
response_data.raw_response = http_res
raise errors.GetResizedPhotoBadRequest(data=response_data)
if utils.match_response(http_res, "401", "application/json"):
response_data = utils.unmarshal_json(
http_res.text, errors.GetResizedPhotoUnauthorizedData
)
response_data.raw_response = http_res
raise errors.GetResizedPhotoUnauthorized(data=response_data)
if utils.match_response(http_res, "4XX", "*"):
http_res_text = utils.stream_to_text(http_res)
raise errors.SDKError(
"API error occurred", http_res.status_code, http_res_text, http_res
)
if utils.match_response(http_res, "5XX", "*"):
http_res_text = utils.stream_to_text(http_res)
raise errors.SDKError(
"API error occurred", http_res.status_code, http_res_text, http_res
)
content_type = http_res.headers.get("Content-Type")
http_res_text = utils.stream_to_text(http_res)
raise errors.SDKError(
f"Unexpected response received (code: {http_res.status_code}, type: {content_type})",
http_res.status_code,
http_res_text,
http_res,
)
async def get_resized_photo_async(
self,
*,
request: Union[
operations.GetResizedPhotoRequest,
operations.GetResizedPhotoRequestTypedDict,
],
retries: OptionalNullable[utils.RetryConfig] = UNSET,
server_url: Optional[str] = None,
timeout_ms: Optional[int] = None,
http_headers: Optional[Mapping[str, str]] = None,
) -> operations.GetResizedPhotoResponse:
r"""Get a Resized Photo
Plex's Photo transcoder is used throughout the service to serve images at specified sizes.
:param request: The request object to send.
:param retries: Override the default retry configuration for this method
:param server_url: Override the default server URL for this method
:param timeout_ms: Override the default request timeout configuration for this method in milliseconds
:param http_headers: Additional headers to set or replace on requests.
"""
base_url = None
url_variables = None
if timeout_ms is None:
timeout_ms = self.sdk_configuration.timeout_ms
if server_url is not None:
base_url = server_url
if not isinstance(request, BaseModel):
request = utils.unmarshal(request, operations.GetResizedPhotoRequest)
request = cast(operations.GetResizedPhotoRequest, request)
req = self._build_request_async(
method="GET",
path="/photo/:/transcode",
base_url=base_url,
url_variables=url_variables,
request=request,
request_body_required=False,
request_has_path_params=False,
request_has_query_params=True,
user_agent_header="user-agent",
accept_header_value="application/json",
http_headers=http_headers,
security=self.sdk_configuration.security,
timeout_ms=timeout_ms,
)
if retries == UNSET:
if self.sdk_configuration.retry_config is not UNSET:
retries = self.sdk_configuration.retry_config
retry_config = None
if isinstance(retries, utils.RetryConfig):
retry_config = (retries, ["429", "500", "502", "503", "504"])
http_res = await self.do_request_async(
hook_ctx=HookContext(
operation_id="getResizedPhoto",
oauth2_scopes=[],
security_source=self.sdk_configuration.security,
),
request=req,
error_status_codes=["400", "401", "4XX", "5XX"],
retry_config=retry_config,
)
response_data: Any = None
if utils.match_response(http_res, "200", "*"):
return operations.GetResizedPhotoResponse(
status_code=http_res.status_code,
content_type=http_res.headers.get("Content-Type") or "",
raw_response=http_res,
)
if utils.match_response(http_res, "400", "application/json"):
response_data = utils.unmarshal_json(
http_res.text, errors.GetResizedPhotoBadRequestData
)
response_data.raw_response = http_res
raise errors.GetResizedPhotoBadRequest(data=response_data)
if utils.match_response(http_res, "401", "application/json"):
response_data = utils.unmarshal_json(
http_res.text, errors.GetResizedPhotoUnauthorizedData
)
response_data.raw_response = http_res
raise errors.GetResizedPhotoUnauthorized(data=response_data)
if utils.match_response(http_res, "4XX", "*"):
http_res_text = await utils.stream_to_text_async(http_res)
raise errors.SDKError(
"API error occurred", http_res.status_code, http_res_text, http_res
)
if utils.match_response(http_res, "5XX", "*"):
http_res_text = await utils.stream_to_text_async(http_res)
raise errors.SDKError(
"API error occurred", http_res.status_code, http_res_text, http_res
)
content_type = http_res.headers.get("Content-Type")
http_res_text = await utils.stream_to_text_async(http_res)
raise errors.SDKError(
f"Unexpected response received (code: {http_res.status_code}, type: {content_type})",
http_res.status_code,
http_res_text,
http_res,
)
def get_media_providers(
self,
*,
x_plex_token: str,
retries: OptionalNullable[utils.RetryConfig] = UNSET,
server_url: Optional[str] = None,
timeout_ms: Optional[int] = None,
http_headers: Optional[Mapping[str, str]] = None,
) -> operations.GetMediaProvidersResponse:
r"""Get Media Providers
Retrieves media providers and their features from the Plex server.
:param x_plex_token: An authentication token, obtained from plex.tv
:param retries: Override the default retry configuration for this method
:param server_url: Override the default server URL for this method
:param timeout_ms: Override the default request timeout configuration for this method in milliseconds
:param http_headers: Additional headers to set or replace on requests.
"""
base_url = None
url_variables = None
if timeout_ms is None:
timeout_ms = self.sdk_configuration.timeout_ms
if server_url is not None:
base_url = server_url
request = operations.GetMediaProvidersRequest(
x_plex_token=x_plex_token,
)
req = self._build_request(
method="GET",
path="/media/providers",
base_url=base_url,
url_variables=url_variables,
request=request,
request_body_required=False,
request_has_path_params=False,
request_has_query_params=True,
user_agent_header="user-agent",
accept_header_value="application/json",
http_headers=http_headers,
security=self.sdk_configuration.security,
timeout_ms=timeout_ms,
)
if retries == UNSET:
if self.sdk_configuration.retry_config is not UNSET:
retries = self.sdk_configuration.retry_config
retry_config = None
if isinstance(retries, utils.RetryConfig):
retry_config = (retries, ["429", "500", "502", "503", "504"])
http_res = self.do_request(
hook_ctx=HookContext(
operation_id="get-media-providers",
oauth2_scopes=[],
security_source=self.sdk_configuration.security,
),
request=req,
error_status_codes=["400", "401", "4XX", "5XX"],
retry_config=retry_config,
)
response_data: Any = None
if utils.match_response(http_res, "200", "application/json"):
return operations.GetMediaProvidersResponse(
object=utils.unmarshal_json(
http_res.text, Optional[operations.GetMediaProvidersResponseBody]
),
status_code=http_res.status_code,
content_type=http_res.headers.get("Content-Type") or "",
raw_response=http_res,
)
if utils.match_response(http_res, "400", "application/json"):
response_data = utils.unmarshal_json(
http_res.text, errors.GetMediaProvidersBadRequestData
)
response_data.raw_response = http_res
raise errors.GetMediaProvidersBadRequest(data=response_data)
if utils.match_response(http_res, "401", "application/json"):
response_data = utils.unmarshal_json(
http_res.text, errors.GetMediaProvidersUnauthorizedData
)
response_data.raw_response = http_res
raise errors.GetMediaProvidersUnauthorized(data=response_data)
if utils.match_response(http_res, "4XX", "*"):
http_res_text = utils.stream_to_text(http_res)
raise errors.SDKError(
"API error occurred", http_res.status_code, http_res_text, http_res
)
if utils.match_response(http_res, "5XX", "*"):
http_res_text = utils.stream_to_text(http_res)
raise errors.SDKError(
"API error occurred", http_res.status_code, http_res_text, http_res
)
content_type = http_res.headers.get("Content-Type")
http_res_text = utils.stream_to_text(http_res)
raise errors.SDKError(
f"Unexpected response received (code: {http_res.status_code}, type: {content_type})",
http_res.status_code,
http_res_text,
http_res,
)
async def get_media_providers_async(
self,
*,
x_plex_token: str,
retries: OptionalNullable[utils.RetryConfig] = UNSET,
server_url: Optional[str] = None,
timeout_ms: Optional[int] = None,
http_headers: Optional[Mapping[str, str]] = None,
) -> operations.GetMediaProvidersResponse:
r"""Get Media Providers
Retrieves media providers and their features from the Plex server.
:param x_plex_token: An authentication token, obtained from plex.tv
:param retries: Override the default retry configuration for this method
:param server_url: Override the default server URL for this method
:param timeout_ms: Override the default request timeout configuration for this method in milliseconds
:param http_headers: Additional headers to set or replace on requests.
"""
base_url = None
url_variables = None
if timeout_ms is None:
timeout_ms = self.sdk_configuration.timeout_ms
if server_url is not None:
base_url = server_url
request = operations.GetMediaProvidersRequest(
x_plex_token=x_plex_token,
)
req = self._build_request_async(
method="GET",
path="/media/providers",
base_url=base_url,
url_variables=url_variables,
request=request,
request_body_required=False,
request_has_path_params=False,
request_has_query_params=True,
user_agent_header="user-agent",
accept_header_value="application/json",
http_headers=http_headers,
security=self.sdk_configuration.security,
timeout_ms=timeout_ms,
)
if retries == UNSET:
if self.sdk_configuration.retry_config is not UNSET:
retries = self.sdk_configuration.retry_config
retry_config = None
if isinstance(retries, utils.RetryConfig):
retry_config = (retries, ["429", "500", "502", "503", "504"])
http_res = await self.do_request_async(
hook_ctx=HookContext(
operation_id="get-media-providers",
oauth2_scopes=[],
security_source=self.sdk_configuration.security,
),
request=req,
error_status_codes=["400", "401", "4XX", "5XX"],
retry_config=retry_config,
)
response_data: Any = None
if utils.match_response(http_res, "200", "application/json"):
return operations.GetMediaProvidersResponse(
object=utils.unmarshal_json(
http_res.text, Optional[operations.GetMediaProvidersResponseBody]
),
status_code=http_res.status_code,
content_type=http_res.headers.get("Content-Type") or "",
raw_response=http_res,
)
if utils.match_response(http_res, "400", "application/json"):
response_data = utils.unmarshal_json(
http_res.text, errors.GetMediaProvidersBadRequestData
)
response_data.raw_response = http_res
raise errors.GetMediaProvidersBadRequest(data=response_data)
if utils.match_response(http_res, "401", "application/json"):
response_data = utils.unmarshal_json(
http_res.text, errors.GetMediaProvidersUnauthorizedData
)
response_data.raw_response = http_res
raise errors.GetMediaProvidersUnauthorized(data=response_data)
if utils.match_response(http_res, "4XX", "*"):
http_res_text = await utils.stream_to_text_async(http_res)
raise errors.SDKError(
"API error occurred", http_res.status_code, http_res_text, http_res
)
if utils.match_response(http_res, "5XX", "*"):
http_res_text = await utils.stream_to_text_async(http_res)
raise errors.SDKError(
"API error occurred", http_res.status_code, http_res_text, http_res
)
content_type = http_res.headers.get("Content-Type")
http_res_text = await utils.stream_to_text_async(http_res)
raise errors.SDKError(
f"Unexpected response received (code: {http_res.status_code}, type: {content_type})",
http_res.status_code,
http_res_text,
http_res,
)
def get_server_list(
self,
*,
retries: OptionalNullable[utils.RetryConfig] = UNSET,
server_url: Optional[str] = None,
timeout_ms: Optional[int] = None,
http_headers: Optional[Mapping[str, str]] = None,
) -> operations.GetServerListResponse:
r"""Get Server List
Get Server List
:param retries: Override the default retry configuration for this method
:param server_url: Override the default server URL for this method
:param timeout_ms: Override the default request timeout configuration for this method in milliseconds
:param http_headers: Additional headers to set or replace on requests.
"""
base_url = None
url_variables = None
if timeout_ms is None:
timeout_ms = self.sdk_configuration.timeout_ms
if server_url is not None:
base_url = server_url
req = self._build_request(
method="GET",
path="/servers",
base_url=base_url,
url_variables=url_variables,
request=None,
request_body_required=False,
request_has_path_params=False,
request_has_query_params=True,
user_agent_header="user-agent",
accept_header_value="application/json",
http_headers=http_headers,
security=self.sdk_configuration.security,
timeout_ms=timeout_ms,
)
if retries == UNSET:
if self.sdk_configuration.retry_config is not UNSET:
retries = self.sdk_configuration.retry_config
retry_config = None
if isinstance(retries, utils.RetryConfig):
retry_config = (retries, ["429", "500", "502", "503", "504"])
http_res = self.do_request(
hook_ctx=HookContext(
operation_id="getServerList",
oauth2_scopes=[],
security_source=self.sdk_configuration.security,
),
request=req,
error_status_codes=["400", "401", "4XX", "5XX"],
retry_config=retry_config,
)
response_data: Any = None
if utils.match_response(http_res, "200", "application/json"):
return operations.GetServerListResponse(
object=utils.unmarshal_json(
http_res.text, Optional[operations.GetServerListResponseBody]
),
status_code=http_res.status_code,
content_type=http_res.headers.get("Content-Type") or "",
raw_response=http_res,
)
if utils.match_response(http_res, "400", "application/json"):
response_data = utils.unmarshal_json(
http_res.text, errors.GetServerListBadRequestData
)
response_data.raw_response = http_res
raise errors.GetServerListBadRequest(data=response_data)
if utils.match_response(http_res, "401", "application/json"):
response_data = utils.unmarshal_json(
http_res.text, errors.GetServerListUnauthorizedData
)
response_data.raw_response = http_res
raise errors.GetServerListUnauthorized(data=response_data)
if utils.match_response(http_res, "4XX", "*"):
http_res_text = utils.stream_to_text(http_res)
raise errors.SDKError(
"API error occurred", http_res.status_code, http_res_text, http_res
)
if utils.match_response(http_res, "5XX", "*"):
http_res_text = utils.stream_to_text(http_res)
raise errors.SDKError(
"API error occurred", http_res.status_code, http_res_text, http_res
)
content_type = http_res.headers.get("Content-Type")
http_res_text = utils.stream_to_text(http_res)
raise errors.SDKError(
f"Unexpected response received (code: {http_res.status_code}, type: {content_type})",
http_res.status_code,
http_res_text,
http_res,
)
async def get_server_list_async(
self,
*,
retries: OptionalNullable[utils.RetryConfig] = UNSET,
server_url: Optional[str] = None,
timeout_ms: Optional[int] = None,
http_headers: Optional[Mapping[str, str]] = None,
) -> operations.GetServerListResponse:
r"""Get Server List
Get Server List
:param retries: Override the default retry configuration for this method
:param server_url: Override the default server URL for this method
:param timeout_ms: Override the default request timeout configuration for this method in milliseconds
:param http_headers: Additional headers to set or replace on requests.
"""
base_url = None
url_variables = None
if timeout_ms is None:
timeout_ms = self.sdk_configuration.timeout_ms
if server_url is not None:
base_url = server_url
req = self._build_request_async(
method="GET",
path="/servers",
base_url=base_url,
url_variables=url_variables,
request=None,
request_body_required=False,
request_has_path_params=False,
request_has_query_params=True,
user_agent_header="user-agent",
accept_header_value="application/json",
http_headers=http_headers,
security=self.sdk_configuration.security,
timeout_ms=timeout_ms,
)
if retries == UNSET:
if self.sdk_configuration.retry_config is not UNSET:
retries = self.sdk_configuration.retry_config
retry_config = None
if isinstance(retries, utils.RetryConfig):
retry_config = (retries, ["429", "500", "502", "503", "504"])
http_res = await self.do_request_async(
hook_ctx=HookContext(
operation_id="getServerList",
oauth2_scopes=[],
security_source=self.sdk_configuration.security,
),
request=req,
error_status_codes=["400", "401", "4XX", "5XX"],
retry_config=retry_config,
)
response_data: Any = None
if utils.match_response(http_res, "200", "application/json"):
return operations.GetServerListResponse(
object=utils.unmarshal_json(
http_res.text, Optional[operations.GetServerListResponseBody]
),
status_code=http_res.status_code,
content_type=http_res.headers.get("Content-Type") or "",
raw_response=http_res,
)
if utils.match_response(http_res, "400", "application/json"):
response_data = utils.unmarshal_json(
http_res.text, errors.GetServerListBadRequestData
)
response_data.raw_response = http_res
raise errors.GetServerListBadRequest(data=response_data)
if utils.match_response(http_res, "401", "application/json"):
response_data = utils.unmarshal_json(
http_res.text, errors.GetServerListUnauthorizedData
)
response_data.raw_response = http_res
raise errors.GetServerListUnauthorized(data=response_data)
if utils.match_response(http_res, "4XX", "*"):
http_res_text = await utils.stream_to_text_async(http_res)
raise errors.SDKError(
"API error occurred", http_res.status_code, http_res_text, http_res
)
if utils.match_response(http_res, "5XX", "*"):
http_res_text = await utils.stream_to_text_async(http_res)
raise errors.SDKError(
"API error occurred", http_res.status_code, http_res_text, http_res
)
content_type = http_res.headers.get("Content-Type")
http_res_text = await utils.stream_to_text_async(http_res)
raise errors.SDKError(
f"Unexpected response received (code: {http_res.status_code}, type: {content_type})",
http_res.status_code,
http_res_text,
http_res,
)