Files
vercel/packages/python/test/fixtures/30-asgi-lifespan/api/raw.py
Kevin Tan 66c8544e8f [python] support Sanic >=21 and python >= 3.10 (#8045)
### Related Issues

1. exception from python 3.10:
```
TypeError: As of 3.10, the *loop* parameter was removed from Queue() since it is no longer necessary`
```

Remove the deprecated argument `loop` from `Queue`, which can also be omitted in python version < 3.10

2. exception from Sanic > 21.3:
```
File "C:\Users\Kevin\AppData\Local\Temp\zeit-fun-03f18b2d2c7d7\sanic\signals.py", line 93, in get
    group, param_basket = self.find_route(
TypeError: 'NoneType' object is not callable
```
As of Sanic > 21.3, it cannot serve requests immediately after initializing, instead, we need implement the [ASGI lifespan protocol](https://asgi.readthedocs.io/en/latest/specs/lifespan.html) and wait for the startup event completed.  

here I complemented the protocol copied from (same source of the previous HTTP procotol): <https://github.com/jordaneremieff/mangum/blob/main/mangum/protocols/lifespan.py>


### Related link:
https://github.com/encode/uvicorn/pull/498

### 📋 Checklist

<!--
  Please keep your PR as a Draft until the checklist is complete
-->

#### Tests

- [x] The code changed/added as part of this PR has been covered with tests
- [x] All tests pass locally with `yarn test-unit`

#### Code Review

- [x] This PR has a concise title and thorough description useful to a reviewer
- [x] Issue from task tracker has a link to this PR


Co-authored-by: Steven <steven@ceriously.com>
2022-07-20 09:50:53 -04:00

26 lines
835 B
Python

async def app(scope, receive, send):
if scope['type'] == 'http':
await send(
{
"type": "http.response.start",
"status": 200,
"headers": [[b"content-type", b"text/html"]],
}
)
await send(
{
"type": "http.response.body",
"body": b"asgi:RANDOMNESS_PLACEHOLDER"
}
)
elif scope['type'] == 'lifespan':
while True:
message = await receive()
if message['type'] == 'lifespan.startup':
await send({'type': 'lifespan.startup.complete'})
elif message['type'] == 'lifespan.shutdown':
await send({'type': 'lifespan.shutdown.complete'})
return
else:
assert False, "unreachable"