[python] support Sanic >=21 and python >= 3.10 (#8045)

### Related Issues

1. exception from python 3.10:
```
TypeError: As of 3.10, the *loop* parameter was removed from Queue() since it is no longer necessary`
```

Remove the deprecated argument `loop` from `Queue`, which can also be omitted in python version < 3.10

2. exception from Sanic > 21.3:
```
File "C:\Users\Kevin\AppData\Local\Temp\zeit-fun-03f18b2d2c7d7\sanic\signals.py", line 93, in get
    group, param_basket = self.find_route(
TypeError: 'NoneType' object is not callable
```
As of Sanic > 21.3, it cannot serve requests immediately after initializing, instead, we need implement the [ASGI lifespan protocol](https://asgi.readthedocs.io/en/latest/specs/lifespan.html) and wait for the startup event completed.  

here I complemented the protocol copied from (same source of the previous HTTP procotol): <https://github.com/jordaneremieff/mangum/blob/main/mangum/protocols/lifespan.py>


### Related link:
https://github.com/encode/uvicorn/pull/498

### 📋 Checklist

<!--
  Please keep your PR as a Draft until the checklist is complete
-->

#### Tests

- [x] The code changed/added as part of this PR has been covered with tests
- [x] All tests pass locally with `yarn test-unit`

#### Code Review

- [x] This PR has a concise title and thorough description useful to a reviewer
- [x] Issue from task tracker has a link to this PR


Co-authored-by: Steven <steven@ceriously.com>
This commit is contained in:
Kevin Tan
2022-07-20 21:50:53 +08:00
committed by GitHub
parent 0140db38fa
commit 66c8544e8f
6 changed files with 432 additions and 7 deletions

View File

@@ -167,13 +167,23 @@ elif 'app' in __vc_variables:
else:
print('using Asynchronous Server Gateway Interface (ASGI)')
# Originally authored by Jordan Eremieff and included under MIT license:
# https://github.com/erm/mangum/blob/b4d21c8f5e304a3e17b88bc9fa345106acc50ad7/mangum/__init__.py
# https://github.com/erm/mangum/blob/b4d21c8f5e304a3e17b88bc9fa345106acc50ad7/LICENSE
# https://github.com/erm/mangum/blob/07ce20a0e2f67c5c2593258a92c03fdc66d9edda/mangum/__init__.py
# https://github.com/erm/mangum/blob/07ce20a0e2f67c5c2593258a92c03fdc66d9edda/LICENSE
import asyncio
import enum
import logging
from contextlib import ExitStack
from urllib.parse import urlparse
from werkzeug.datastructures import Headers
def get_event_loop():
try:
return asyncio.get_running_loop()
except:
if sys.version_info < (3, 10):
return asyncio.get_event_loop()
else:
return asyncio.get_event_loop_policy().get_event_loop()
class ASGICycleState(enum.Enum):
REQUEST = enum.auto()
@@ -194,8 +204,8 @@ elif 'app' in __vc_variables:
ASGI instance using the connection scope.
Runs until the response is completely read from the application.
"""
loop = asyncio.new_event_loop()
self.app_queue = asyncio.Queue(loop=loop)
loop = get_event_loop()
self.app_queue = asyncio.Queue()
self.put_message({'type': 'http.request', 'body': body, 'more_body': False})
asgi_instance = app(self.scope, self.receive, self.send)
@@ -257,6 +267,156 @@ elif 'app' in __vc_variables:
self.response['body'] = base64.b64encode(self.body).decode('utf-8')
self.response['encoding'] = 'base64'
class LifespanFailure(Exception):
"""Raise when a lifespan failure event is sent by an application."""
class LifespanUnsupported(Exception):
"""Raise when lifespan events are not supported by an application."""
class UnexpectedMessage(Exception):
"""Raise when an unexpected message type is received during an ASGI cycle."""
class LifespanCycleState(enum.Enum):
"""
The state of the ASGI `lifespan` connection.
* **CONNECTING** - Initial state. The ASGI application instance will be run with
the connection scope containing the `lifespan` type.
* **STARTUP** - The lifespan startup event has been pushed to the queue to be
received by the application.
* **SHUTDOWN** - The lifespan shutdown event has been pushed to the queue to be
received by the application.
* **FAILED** - A lifespan failure has been detected, and the connection will be
closed with an error.
* **UNSUPPORTED** - An application attempted to send a message before receiving
the lifepan startup event. If the lifespan argument is "on", then the connection
will be closed with an error.
"""
CONNECTING = enum.auto()
STARTUP = enum.auto()
SHUTDOWN = enum.auto()
FAILED = enum.auto()
UNSUPPORTED = enum.auto()
class Lifespan:
def __init__(self, app):
self.app = app
self.state = LifespanCycleState.CONNECTING
self.exception = None
self.logger = logging.getLogger('lifespan')
self.loop = get_event_loop()
self.app_queue = asyncio.Queue()
self.startup_event = asyncio.Event()
self.shutdown_event = asyncio.Event()
def __enter__(self) -> None:
"""Runs the event loop for application startup."""
self.loop.create_task(self.run())
self.loop.run_until_complete(self.startup())
def __exit__(
self,
exc_type,
exc_value,
traceback,
) -> None:
"""Runs the event loop for application shutdown."""
self.loop.run_until_complete(self.shutdown())
async def run(self):
"""Calls the application with the `lifespan` connection scope."""
try:
await self.app(
{"type": "lifespan", "asgi": {"spec_version": "2.0", "version": "3.0"}},
self.receive,
self.send,
)
except LifespanUnsupported:
self.logger.info("ASGI 'lifespan' protocol appears unsupported.")
except (LifespanFailure, UnexpectedMessage) as exc:
self.exception = exc
except BaseException as exc:
self.logger.error("Exception in 'lifespan' protocol.", exc_info=exc)
finally:
self.startup_event.set()
self.shutdown_event.set()
async def send(self, message):
"""Awaited by the application to send ASGI `lifespan` events."""
message_type = message["type"]
if self.state is LifespanCycleState.CONNECTING:
# If a message is sent before the startup event is received by the
# application, then assume that lifespan is unsupported.
self.state = LifespanCycleState.UNSUPPORTED
raise LifespanUnsupported("Lifespan protocol appears unsupported.")
if message_type not in (
"lifespan.startup.complete",
"lifespan.shutdown.complete",
"lifespan.startup.failed",
"lifespan.shutdown.failed",
):
self.state = LifespanCycleState.FAILED
raise UnexpectedMessage(f"Unexpected '{message_type}' event received.")
if self.state is LifespanCycleState.STARTUP:
if message_type == "lifespan.startup.complete":
self.startup_event.set()
elif message_type == "lifespan.startup.failed":
self.state = LifespanCycleState.FAILED
self.startup_event.set()
message_value = message.get("message", "")
raise LifespanFailure(f"Lifespan startup failure. {message_value}")
elif self.state is LifespanCycleState.SHUTDOWN:
if message_type == "lifespan.shutdown.complete":
self.shutdown_event.set()
elif message_type == "lifespan.shutdown.failed":
self.state = LifespanCycleState.FAILED
self.shutdown_event.set()
message_value = message.get("message", "")
raise LifespanFailure(f"Lifespan shutdown failure. {message_value}")
async def receive(self):
"""Awaited by the application to receive ASGI `lifespan` events."""
if self.state is LifespanCycleState.CONNECTING:
# Connection established. The next event returned by the queue will be
# `lifespan.startup` to inform the application that the connection is
# ready to receive lfiespan messages.
self.state = LifespanCycleState.STARTUP
elif self.state is LifespanCycleState.STARTUP:
# Connection shutting down. The next event returned by the queue will be
# `lifespan.shutdown` to inform the application that the connection is now
# closing so that it may perform cleanup.
self.state = LifespanCycleState.SHUTDOWN
return await self.app_queue.get()
async def startup(self) -> None:
"""Pushes the `lifespan` startup event to the queue and handles errors."""
await self.app_queue.put({"type": "lifespan.startup"})
await self.startup_event.wait()
if self.state is LifespanCycleState.FAILED:
raise LifespanFailure(self.exception)
if not self.exception:
self.logger.info("Application startup complete.")
else:
self.logger.info("Application startup failed.")
async def shutdown(self) -> None:
"""Pushes the `lifespan` shutdown event to the queue and handles errors."""
await self.app_queue.put({"type": "lifespan.shutdown"})
await self.shutdown_event.wait()
if self.state is LifespanCycleState.FAILED:
raise LifespanFailure(self.exception)
def vc_handler(event, context):
payload = json.loads(event['body'])
@@ -289,9 +449,13 @@ elif 'app' in __vc_variables:
'raw_path': path.encode(),
}
asgi_cycle = ASGICycle(scope)
response = asgi_cycle(__vc_module.app, body)
return response
with ExitStack() as stack:
lifespan = Lifespan(__vc_module.app)
stack.enter_context(lifespan)
asgi_cycle = ASGICycle(scope)
response = asgi_cycle(__vc_module.app, body)
return response
else:
print('Missing variable `handler` or `app` in file "__VC_HANDLER_ENTRYPOINT".')