mirror of
https://github.com/LukeHagar/plexpy.git
synced 2025-12-10 20:57:45 +00:00
160 lines
8.3 KiB
Python
160 lines
8.3 KiB
Python
"""Code generated by Speakeasy (https://speakeasyapi.dev). DO NOT EDIT."""
|
|
|
|
import requests as requests_http
|
|
from .sdkconfiguration import SDKConfiguration
|
|
from plex_api import utils
|
|
from plex_api._hooks import AfterErrorContext, AfterSuccessContext, BeforeRequestContext, HookContext
|
|
from plex_api.models import errors, operations
|
|
from typing import Optional
|
|
|
|
class Hubs:
|
|
r"""Hubs are a structured two-dimensional container for media, generally represented by multiple horizontal rows."""
|
|
sdk_configuration: SDKConfiguration
|
|
|
|
def __init__(self, sdk_config: SDKConfiguration) -> None:
|
|
self.sdk_configuration = sdk_config
|
|
|
|
|
|
|
|
def get_global_hubs(self, count: Optional[float] = None, only_transient: Optional[operations.OnlyTransient] = None) -> operations.GetGlobalHubsResponse:
|
|
r"""Get Global Hubs
|
|
Get Global Hubs filtered by the parameters provided.
|
|
"""
|
|
hook_ctx = HookContext(operation_id='getGlobalHubs', oauth2_scopes=[], security_source=self.sdk_configuration.security)
|
|
request = operations.GetGlobalHubsRequest(
|
|
count=count,
|
|
only_transient=only_transient,
|
|
)
|
|
|
|
base_url = utils.template_url(*self.sdk_configuration.get_server_details())
|
|
|
|
url = base_url + '/hubs'
|
|
|
|
if callable(self.sdk_configuration.security):
|
|
headers, query_params = utils.get_security(self.sdk_configuration.security())
|
|
else:
|
|
headers, query_params = utils.get_security(self.sdk_configuration.security)
|
|
|
|
query_params = { **utils.get_query_params(operations.GetGlobalHubsRequest, request, self.sdk_configuration.globals), **query_params }
|
|
headers['Accept'] = 'application/json'
|
|
headers['user-agent'] = self.sdk_configuration.user_agent
|
|
client = self.sdk_configuration.client
|
|
|
|
try:
|
|
req = client.prepare_request(requests_http.Request('GET', url, params=query_params, headers=headers))
|
|
req = self.sdk_configuration.get_hooks().before_request(BeforeRequestContext(hook_ctx), req)
|
|
http_res = client.send(req)
|
|
except Exception as e:
|
|
_, e = self.sdk_configuration.get_hooks().after_error(AfterErrorContext(hook_ctx), None, e)
|
|
if e is not None:
|
|
raise e
|
|
|
|
if utils.match_status_codes(['400','401','4XX','5XX'], http_res.status_code):
|
|
result, e = self.sdk_configuration.get_hooks().after_error(AfterErrorContext(hook_ctx), http_res, None)
|
|
if e is not None:
|
|
raise e
|
|
if result is not None:
|
|
http_res = result
|
|
else:
|
|
http_res = self.sdk_configuration.get_hooks().after_success(AfterSuccessContext(hook_ctx), http_res)
|
|
|
|
|
|
|
|
res = operations.GetGlobalHubsResponse(status_code=http_res.status_code, content_type=http_res.headers.get('Content-Type') or '', raw_response=http_res)
|
|
|
|
if http_res.status_code == 200:
|
|
if utils.match_content_type(http_res.headers.get('Content-Type') or '', 'application/json'):
|
|
out = utils.unmarshal_json(http_res.text, Optional[operations.GetGlobalHubsResponseBody])
|
|
res.object = out
|
|
else:
|
|
content_type = http_res.headers.get('Content-Type')
|
|
raise errors.SDKError(f'unknown content-type received: {content_type}', http_res.status_code, http_res.text, http_res)
|
|
elif http_res.status_code == 400 or http_res.status_code >= 400 and http_res.status_code < 500 or http_res.status_code >= 500 and http_res.status_code < 600:
|
|
raise errors.SDKError('API error occurred', http_res.status_code, http_res.text, http_res)
|
|
elif http_res.status_code == 401:
|
|
if utils.match_content_type(http_res.headers.get('Content-Type') or '', 'application/json'):
|
|
out = utils.unmarshal_json(http_res.text, errors.GetGlobalHubsResponseBody)
|
|
out.raw_response = http_res
|
|
raise out
|
|
else:
|
|
content_type = http_res.headers.get('Content-Type')
|
|
raise errors.SDKError(f'unknown content-type received: {content_type}', http_res.status_code, http_res.text, http_res)
|
|
else:
|
|
raise errors.SDKError('unknown status code received', http_res.status_code, http_res.text, http_res)
|
|
|
|
return res
|
|
|
|
|
|
|
|
def get_library_hubs(self, section_id: float, count: Optional[float] = None, only_transient: Optional[operations.QueryParamOnlyTransient] = None) -> operations.GetLibraryHubsResponse:
|
|
r"""Get library specific hubs
|
|
This endpoint will return a list of library specific hubs
|
|
"""
|
|
hook_ctx = HookContext(operation_id='getLibraryHubs', oauth2_scopes=[], security_source=self.sdk_configuration.security)
|
|
request = operations.GetLibraryHubsRequest(
|
|
section_id=section_id,
|
|
count=count,
|
|
only_transient=only_transient,
|
|
)
|
|
|
|
base_url = utils.template_url(*self.sdk_configuration.get_server_details())
|
|
|
|
url = utils.generate_url(operations.GetLibraryHubsRequest, base_url, '/hubs/sections/{sectionId}', request, self.sdk_configuration.globals)
|
|
|
|
if callable(self.sdk_configuration.security):
|
|
headers, query_params = utils.get_security(self.sdk_configuration.security())
|
|
else:
|
|
headers, query_params = utils.get_security(self.sdk_configuration.security)
|
|
|
|
query_params = { **utils.get_query_params(operations.GetLibraryHubsRequest, request, self.sdk_configuration.globals), **query_params }
|
|
headers['Accept'] = 'application/json'
|
|
headers['user-agent'] = self.sdk_configuration.user_agent
|
|
client = self.sdk_configuration.client
|
|
|
|
try:
|
|
req = client.prepare_request(requests_http.Request('GET', url, params=query_params, headers=headers))
|
|
req = self.sdk_configuration.get_hooks().before_request(BeforeRequestContext(hook_ctx), req)
|
|
http_res = client.send(req)
|
|
except Exception as e:
|
|
_, e = self.sdk_configuration.get_hooks().after_error(AfterErrorContext(hook_ctx), None, e)
|
|
if e is not None:
|
|
raise e
|
|
|
|
if utils.match_status_codes(['400','401','4XX','5XX'], http_res.status_code):
|
|
result, e = self.sdk_configuration.get_hooks().after_error(AfterErrorContext(hook_ctx), http_res, None)
|
|
if e is not None:
|
|
raise e
|
|
if result is not None:
|
|
http_res = result
|
|
else:
|
|
http_res = self.sdk_configuration.get_hooks().after_success(AfterSuccessContext(hook_ctx), http_res)
|
|
|
|
|
|
|
|
res = operations.GetLibraryHubsResponse(status_code=http_res.status_code, content_type=http_res.headers.get('Content-Type') or '', raw_response=http_res)
|
|
|
|
if http_res.status_code == 200:
|
|
if utils.match_content_type(http_res.headers.get('Content-Type') or '', 'application/json'):
|
|
out = utils.unmarshal_json(http_res.text, Optional[operations.GetLibraryHubsResponseBody])
|
|
res.object = out
|
|
else:
|
|
content_type = http_res.headers.get('Content-Type')
|
|
raise errors.SDKError(f'unknown content-type received: {content_type}', http_res.status_code, http_res.text, http_res)
|
|
elif http_res.status_code == 400 or http_res.status_code >= 400 and http_res.status_code < 500 or http_res.status_code >= 500 and http_res.status_code < 600:
|
|
raise errors.SDKError('API error occurred', http_res.status_code, http_res.text, http_res)
|
|
elif http_res.status_code == 401:
|
|
if utils.match_content_type(http_res.headers.get('Content-Type') or '', 'application/json'):
|
|
out = utils.unmarshal_json(http_res.text, errors.GetLibraryHubsResponseBody)
|
|
out.raw_response = http_res
|
|
raise out
|
|
else:
|
|
content_type = http_res.headers.get('Content-Type')
|
|
raise errors.SDKError(f'unknown content-type received: {content_type}', http_res.status_code, http_res.text, http_res)
|
|
else:
|
|
raise errors.SDKError('unknown status code received', http_res.status_code, http_res.text, http_res)
|
|
|
|
return res
|
|
|
|
|
|
|