Skip to content

Commit f5b9557

Browse files
JordonPhillipsnateprewitt
authored andcommitted
Implement event stream methods fro restJson1
1 parent 3a2dff9 commit f5b9557

File tree

1 file changed

+108
-4
lines changed
  • packages/smithy-aws-core/src/smithy_aws_core/aio

1 file changed

+108
-4
lines changed

packages/smithy-aws-core/src/smithy_aws_core/aio/protocols.py

Lines changed: 108 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,59 @@
11
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
22
# SPDX-License-Identifier: Apache-2.0
3-
from typing import Any, Final
4-
3+
from collections.abc import Callable
4+
from inspect import iscoroutinefunction
5+
from typing import TYPE_CHECKING, Any, Final
6+
7+
from smithy_core.aio.interfaces import AsyncWriter
8+
from smithy_core.aio.interfaces.auth import AuthScheme
9+
from smithy_core.aio.interfaces.eventstream import EventPublisher, EventReceiver
10+
from smithy_core.aio.types import AsyncBytesReader
511
from smithy_core.codecs import Codec
6-
from smithy_core.exceptions import DiscriminatorError
12+
from smithy_core.deserializers import DeserializeableShape, ShapeDeserializer
13+
from smithy_core.exceptions import (
14+
DiscriminatorError,
15+
MissingDependencyError,
16+
UnsupportedStreamError,
17+
)
18+
from smithy_core.interfaces import TypedProperties
719
from smithy_core.schemas import APIOperation, Schema
20+
from smithy_core.serializers import SerializeableShape
821
from smithy_core.shapes import ShapeID, ShapeType
922
from smithy_core.types import TimestampFormat
10-
from smithy_http.aio.interfaces import HTTPErrorIdentifier, HTTPResponse
23+
from smithy_http.aio.interfaces import HTTPErrorIdentifier, HTTPRequest, HTTPResponse
1124
from smithy_http.aio.protocols import HttpBindingClientProtocol
1225
from smithy_json import JSONCodec, JSONDocument
1326

1427
from ..traits import RestJson1Trait
1528
from ..utils import parse_document_discriminator, parse_error_code
1629

30+
try:
31+
from smithy_aws_event_stream.aio import (
32+
AWSEventPublisher,
33+
AWSEventReceiver,
34+
SigningConfig,
35+
)
36+
37+
_HAS_EVENT_STREAM = True
38+
except ImportError:
39+
_HAS_EVENT_STREAM = False # type: ignore
40+
41+
if TYPE_CHECKING:
42+
from smithy_aws_event_stream.aio import (
43+
AWSEventPublisher,
44+
AWSEventReceiver,
45+
SigningConfig,
46+
)
47+
from typing_extensions import TypeForm
48+
49+
50+
def _assert_event_stream_capable() -> None:
51+
if not _HAS_EVENT_STREAM:
52+
raise MissingDependencyError(
53+
"Attempted to use event streams, but smithy-aws-event-stream "
54+
"is not installed."
55+
)
56+
1757

1858
class AWSErrorIdentifier(HTTPErrorIdentifier):
1959
_HEADER_KEY: Final = "x-amzn-errortype"
@@ -80,3 +120,67 @@ def content_type(self) -> str:
80120
@property
81121
def error_identifier(self) -> HTTPErrorIdentifier:
82122
return self._error_identifier
123+
124+
def create_event_publisher[
125+
OperationInput: SerializeableShape,
126+
OperationOutput: DeserializeableShape,
127+
Event: SerializeableShape,
128+
](
129+
self,
130+
*,
131+
operation: "APIOperation[OperationInput, OperationOutput]",
132+
request: HTTPRequest,
133+
event_type: "TypeForm[Event]",
134+
context: TypedProperties,
135+
auth_scheme: AuthScheme[Any, Any, Any, Any] | None = None,
136+
) -> EventPublisher[Event]:
137+
_assert_event_stream_capable()
138+
signing_config: SigningConfig | None = None
139+
if auth_scheme is not None:
140+
event_signer = auth_scheme.event_signer(request=request)
141+
if event_signer is not None:
142+
signing_config = SigningConfig(
143+
signer=event_signer,
144+
signing_properties=auth_scheme.signer_properties(context=context),
145+
identity_resolver=auth_scheme.identity_resolver(context=context),
146+
identity_properties=auth_scheme.identity_properties(
147+
context=context
148+
),
149+
)
150+
151+
# The HTTP body must be an async writeable. The HTTP client bindings are
152+
# responsible for ensuring this is the case. The CRT bindings, for example,
153+
# will set the body to an instance of BufferableByteStream.
154+
body = request.body
155+
if not isinstance(body, AsyncWriter) or not iscoroutinefunction(body.write):
156+
raise UnsupportedStreamError(
157+
"Input streams require an async write function, but none was present "
158+
"on the serialized HTTP request."
159+
)
160+
161+
return AWSEventPublisher[Event](
162+
payload_codec=self.payload_codec,
163+
async_writer=body,
164+
signing_config=signing_config,
165+
)
166+
167+
def create_event_receiver[
168+
OperationInput: SerializeableShape,
169+
OperationOutput: DeserializeableShape,
170+
Event: DeserializeableShape,
171+
](
172+
self,
173+
*,
174+
operation: "APIOperation[OperationInput, OperationOutput]",
175+
request: HTTPRequest,
176+
response: HTTPResponse,
177+
event_type: "TypeForm[Event]",
178+
event_deserializer: Callable[[ShapeDeserializer], Event],
179+
context: TypedProperties,
180+
) -> EventReceiver[Event]:
181+
_assert_event_stream_capable()
182+
return AWSEventReceiver(
183+
payload_codec=self.payload_codec,
184+
source=AsyncBytesReader(response.body),
185+
deserializer=event_deserializer,
186+
)

0 commit comments

Comments
 (0)