Regenerated with pythonv2 and newest spec

This commit is contained in:
Luke Hagar
2024-09-09 18:08:47 +00:00
parent 02226c3294
commit 6565d702c6
911 changed files with 49000 additions and 17210 deletions

View File

@@ -0,0 +1,392 @@
"""Code generated by Speakeasy (https://speakeasy.com). DO NOT EDIT."""
from .basesdk import BaseSDK
from plex_api_client import utils
from plex_api_client._hooks import HookContext
from plex_api_client.models import errors, operations
from plex_api_client.types import OptionalNullable, UNSET
from typing import Any, Optional
class Activities(BaseSDK):
r"""Activities are awesome. They provide a way to monitor and control asynchronous operations on the server. In order to receive real-time updates for activities, a client would normally subscribe via either EventSource or Websocket endpoints.
Activities are associated with HTTP replies via a special `X-Plex-Activity` header which contains the UUID of the activity.
Activities are optional cancellable. If cancellable, they may be cancelled via the `DELETE` endpoint. Other details:
- They can contain a `progress` (from 0 to 100) marking the percent completion of the activity.
- They must contain an `type` which is used by clients to distinguish the specific activity.
- They may contain a `Context` object with attributes which associate the activity with various specific entities (items, libraries, etc.)
- The may contain a `Response` object which attributes which represent the result of the asynchronous operation.
"""
def get_server_activities(
self,
*,
retries: OptionalNullable[utils.RetryConfig] = UNSET,
server_url: Optional[str] = None,
timeout_ms: Optional[int] = None,
) -> operations.GetServerActivitiesResponse:
r"""Get Server Activities
Get Server Activities
:param retries: Override the default retry configuration for this method
:param server_url: Override the default server URL for this method
:param timeout_ms: Override the default request timeout configuration for this method in milliseconds
"""
base_url = None
url_variables = None
if timeout_ms is None:
timeout_ms = self.sdk_configuration.timeout_ms
if server_url is not None:
base_url = server_url
req = self.build_request(
method="GET",
path="/activities",
base_url=base_url,
url_variables=url_variables,
request=None,
request_body_required=False,
request_has_path_params=False,
request_has_query_params=True,
user_agent_header="user-agent",
accept_header_value="application/json",
security=self.sdk_configuration.security,
timeout_ms=timeout_ms,
)
if retries == UNSET:
if self.sdk_configuration.retry_config is not UNSET:
retries = self.sdk_configuration.retry_config
retry_config = None
if isinstance(retries, utils.RetryConfig):
retry_config = (retries, ["429", "500", "502", "503", "504"])
http_res = self.do_request(
hook_ctx=HookContext(
operation_id="getServerActivities",
oauth2_scopes=[],
security_source=self.sdk_configuration.security,
),
request=req,
error_status_codes=["400", "401", "4XX", "5XX"],
retry_config=retry_config,
)
data: Any = None
if utils.match_response(http_res, "200", "application/json"):
return operations.GetServerActivitiesResponse(
object=utils.unmarshal_json(
http_res.text, Optional[operations.GetServerActivitiesResponseBody]
),
status_code=http_res.status_code,
content_type=http_res.headers.get("Content-Type") or "",
raw_response=http_res,
)
if utils.match_response(http_res, "400", "application/json"):
data = utils.unmarshal_json(
http_res.text, errors.GetServerActivitiesResponseBodyData
)
data.raw_response = http_res
raise errors.GetServerActivitiesResponseBody(data=data)
if utils.match_response(http_res, "401", "application/json"):
data = utils.unmarshal_json(
http_res.text, errors.GetServerActivitiesActivitiesResponseBodyData
)
data.raw_response = http_res
raise errors.GetServerActivitiesActivitiesResponseBody(data=data)
if utils.match_response(http_res, ["4XX", "5XX"], "*"):
raise errors.SDKError(
"API error occurred", http_res.status_code, http_res.text, http_res
)
content_type = http_res.headers.get("Content-Type")
raise errors.SDKError(
f"Unexpected response received (code: {http_res.status_code}, type: {content_type})",
http_res.status_code,
http_res.text,
http_res,
)
async def get_server_activities_async(
self,
*,
retries: OptionalNullable[utils.RetryConfig] = UNSET,
server_url: Optional[str] = None,
timeout_ms: Optional[int] = None,
) -> operations.GetServerActivitiesResponse:
r"""Get Server Activities
Get Server Activities
:param retries: Override the default retry configuration for this method
:param server_url: Override the default server URL for this method
:param timeout_ms: Override the default request timeout configuration for this method in milliseconds
"""
base_url = None
url_variables = None
if timeout_ms is None:
timeout_ms = self.sdk_configuration.timeout_ms
if server_url is not None:
base_url = server_url
req = self.build_request_async(
method="GET",
path="/activities",
base_url=base_url,
url_variables=url_variables,
request=None,
request_body_required=False,
request_has_path_params=False,
request_has_query_params=True,
user_agent_header="user-agent",
accept_header_value="application/json",
security=self.sdk_configuration.security,
timeout_ms=timeout_ms,
)
if retries == UNSET:
if self.sdk_configuration.retry_config is not UNSET:
retries = self.sdk_configuration.retry_config
retry_config = None
if isinstance(retries, utils.RetryConfig):
retry_config = (retries, ["429", "500", "502", "503", "504"])
http_res = await self.do_request_async(
hook_ctx=HookContext(
operation_id="getServerActivities",
oauth2_scopes=[],
security_source=self.sdk_configuration.security,
),
request=req,
error_status_codes=["400", "401", "4XX", "5XX"],
retry_config=retry_config,
)
data: Any = None
if utils.match_response(http_res, "200", "application/json"):
return operations.GetServerActivitiesResponse(
object=utils.unmarshal_json(
http_res.text, Optional[operations.GetServerActivitiesResponseBody]
),
status_code=http_res.status_code,
content_type=http_res.headers.get("Content-Type") or "",
raw_response=http_res,
)
if utils.match_response(http_res, "400", "application/json"):
data = utils.unmarshal_json(
http_res.text, errors.GetServerActivitiesResponseBodyData
)
data.raw_response = http_res
raise errors.GetServerActivitiesResponseBody(data=data)
if utils.match_response(http_res, "401", "application/json"):
data = utils.unmarshal_json(
http_res.text, errors.GetServerActivitiesActivitiesResponseBodyData
)
data.raw_response = http_res
raise errors.GetServerActivitiesActivitiesResponseBody(data=data)
if utils.match_response(http_res, ["4XX", "5XX"], "*"):
raise errors.SDKError(
"API error occurred", http_res.status_code, http_res.text, http_res
)
content_type = http_res.headers.get("Content-Type")
raise errors.SDKError(
f"Unexpected response received (code: {http_res.status_code}, type: {content_type})",
http_res.status_code,
http_res.text,
http_res,
)
def cancel_server_activities(
self,
*,
activity_uuid: str,
retries: OptionalNullable[utils.RetryConfig] = UNSET,
server_url: Optional[str] = None,
timeout_ms: Optional[int] = None,
) -> operations.CancelServerActivitiesResponse:
r"""Cancel Server Activities
Cancel Server Activities
:param activity_uuid: The UUID of the activity to cancel.
:param retries: Override the default retry configuration for this method
:param server_url: Override the default server URL for this method
:param timeout_ms: Override the default request timeout configuration for this method in milliseconds
"""
base_url = None
url_variables = None
if timeout_ms is None:
timeout_ms = self.sdk_configuration.timeout_ms
if server_url is not None:
base_url = server_url
request = operations.CancelServerActivitiesRequest(
activity_uuid=activity_uuid,
)
req = self.build_request(
method="DELETE",
path="/activities/{activityUUID}",
base_url=base_url,
url_variables=url_variables,
request=request,
request_body_required=False,
request_has_path_params=True,
request_has_query_params=True,
user_agent_header="user-agent",
accept_header_value="application/json",
security=self.sdk_configuration.security,
timeout_ms=timeout_ms,
)
if retries == UNSET:
if self.sdk_configuration.retry_config is not UNSET:
retries = self.sdk_configuration.retry_config
retry_config = None
if isinstance(retries, utils.RetryConfig):
retry_config = (retries, ["429", "500", "502", "503", "504"])
http_res = self.do_request(
hook_ctx=HookContext(
operation_id="cancelServerActivities",
oauth2_scopes=[],
security_source=self.sdk_configuration.security,
),
request=req,
error_status_codes=["400", "401", "4XX", "5XX"],
retry_config=retry_config,
)
data: Any = None
if utils.match_response(http_res, "200", "*"):
return operations.CancelServerActivitiesResponse(
status_code=http_res.status_code,
content_type=http_res.headers.get("Content-Type") or "",
raw_response=http_res,
)
if utils.match_response(http_res, "400", "application/json"):
data = utils.unmarshal_json(
http_res.text, errors.CancelServerActivitiesResponseBodyData
)
data.raw_response = http_res
raise errors.CancelServerActivitiesResponseBody(data=data)
if utils.match_response(http_res, "401", "application/json"):
data = utils.unmarshal_json(
http_res.text, errors.CancelServerActivitiesActivitiesResponseBodyData
)
data.raw_response = http_res
raise errors.CancelServerActivitiesActivitiesResponseBody(data=data)
if utils.match_response(http_res, ["4XX", "5XX"], "*"):
raise errors.SDKError(
"API error occurred", http_res.status_code, http_res.text, http_res
)
content_type = http_res.headers.get("Content-Type")
raise errors.SDKError(
f"Unexpected response received (code: {http_res.status_code}, type: {content_type})",
http_res.status_code,
http_res.text,
http_res,
)
async def cancel_server_activities_async(
self,
*,
activity_uuid: str,
retries: OptionalNullable[utils.RetryConfig] = UNSET,
server_url: Optional[str] = None,
timeout_ms: Optional[int] = None,
) -> operations.CancelServerActivitiesResponse:
r"""Cancel Server Activities
Cancel Server Activities
:param activity_uuid: The UUID of the activity to cancel.
:param retries: Override the default retry configuration for this method
:param server_url: Override the default server URL for this method
:param timeout_ms: Override the default request timeout configuration for this method in milliseconds
"""
base_url = None
url_variables = None
if timeout_ms is None:
timeout_ms = self.sdk_configuration.timeout_ms
if server_url is not None:
base_url = server_url
request = operations.CancelServerActivitiesRequest(
activity_uuid=activity_uuid,
)
req = self.build_request_async(
method="DELETE",
path="/activities/{activityUUID}",
base_url=base_url,
url_variables=url_variables,
request=request,
request_body_required=False,
request_has_path_params=True,
request_has_query_params=True,
user_agent_header="user-agent",
accept_header_value="application/json",
security=self.sdk_configuration.security,
timeout_ms=timeout_ms,
)
if retries == UNSET:
if self.sdk_configuration.retry_config is not UNSET:
retries = self.sdk_configuration.retry_config
retry_config = None
if isinstance(retries, utils.RetryConfig):
retry_config = (retries, ["429", "500", "502", "503", "504"])
http_res = await self.do_request_async(
hook_ctx=HookContext(
operation_id="cancelServerActivities",
oauth2_scopes=[],
security_source=self.sdk_configuration.security,
),
request=req,
error_status_codes=["400", "401", "4XX", "5XX"],
retry_config=retry_config,
)
data: Any = None
if utils.match_response(http_res, "200", "*"):
return operations.CancelServerActivitiesResponse(
status_code=http_res.status_code,
content_type=http_res.headers.get("Content-Type") or "",
raw_response=http_res,
)
if utils.match_response(http_res, "400", "application/json"):
data = utils.unmarshal_json(
http_res.text, errors.CancelServerActivitiesResponseBodyData
)
data.raw_response = http_res
raise errors.CancelServerActivitiesResponseBody(data=data)
if utils.match_response(http_res, "401", "application/json"):
data = utils.unmarshal_json(
http_res.text, errors.CancelServerActivitiesActivitiesResponseBodyData
)
data.raw_response = http_res
raise errors.CancelServerActivitiesActivitiesResponseBody(data=data)
if utils.match_response(http_res, ["4XX", "5XX"], "*"):
raise errors.SDKError(
"API error occurred", http_res.status_code, http_res.text, http_res
)
content_type = http_res.headers.get("Content-Type")
raise errors.SDKError(
f"Unexpected response received (code: {http_res.status_code}, type: {content_type})",
http_res.status_code,
http_res.text,
http_res,
)