Source code for aiotus.creation
"""Implementation of the creation extension.
The
`creation extension <https://tus.io/protocols/resumable-upload.html#creation>`_
defines how to reserve space on the server for uploading data to.
"""
from __future__ import annotations
import asyncio
import base64
import io
from typing import BinaryIO, Mapping, Optional
import aiohttp
import yarl
from . import common
from .log import logger
def _check_metadata_keys(metadata: common.Metadata) -> None:
"""Check if the metadata keys are valid.
Raises a 'ValueError' exception if a key is invalid.
"""
for k in metadata:
if not k.isascii():
raise ValueError("Metadata keys must only contain ASCII characters.")
if " " in k:
raise ValueError("Metadata keys must not contain spaces.")
if "," in k:
raise ValueError("Metadata keys must not contain commas.")
[docs]
async def create(
session: aiohttp.ClientSession,
url: yarl.URL,
file: Optional[BinaryIO],
metadata: common.Metadata,
ssl: Optional[common.SSLArgument] = None,
headers: Optional[Mapping[str, str]] = None,
) -> yarl.URL:
"""Create an upload.
:param session: HTTP session to use for connections.
:param url: The creation endpoint of the server.
:param file: The file object to upload.
Used to determine the length of data to be uploaded. If not given, the
corresponding HTTP header is not included in the request.
:param metadata: Additional metadata for the upload.
:param ssl: SSL validation mode, passed on to aiohttp.
:param headers: Optional headers used in the request.
:return: The URL to upload the data to.
:raises ProtocolError: When the server does not comply to the tus protocol.
"""
tus_headers = dict(headers or {})
tus_headers["Tus-Resumable"] = common.TUS_PROTOCOL_VERSION
if file is not None:
loop = asyncio.get_event_loop()
total_size = await loop.run_in_executor(None, file.seek, 0, io.SEEK_END)
tus_headers["Upload-Length"] = str(total_size)
if metadata_header := encode_metadata(metadata):
tus_headers["Upload-Metadata"] = metadata_header
logger.debug("Creating upload...")
async with await session.post(url, headers=tus_headers, ssl=ssl) as response:
response.raise_for_status()
if response.status != 201:
raise common.ProtocolError(
f"Wrong status code {response.status}, expected 201."
)
if "Location" not in response.headers:
raise common.ProtocolError(
'Upload created, but no "Location" header in response.'
)
return yarl.URL(response.headers["Location"])