Skip to content

Added event streamer featuring a callback to handle received events.#295

Open
Quenos wants to merge 5 commits intotastyware:masterfrom
Quenos:feat/mijn-wijzigingen
Open

Added event streamer featuring a callback to handle received events.#295
Quenos wants to merge 5 commits intotastyware:masterfrom
Quenos:feat/mijn-wijzigingen

Conversation

@Quenos
Copy link
Contributor

@Quenos Quenos commented Dec 19, 2025

Description

In streamer.py an event streamer has been added that instantiates a DXLinkStreamer starts listening to events of the given event type. When a new event is received a callback function is called with that event.

The reason to add this is that this function is a common pattern to work with the DXLinkStreamer and therefore deserves to be part of this SDK IMO.

Als an examples folder has been added showcasing the use of the event streamer as an application reference. This example has been used to test the implementation of the event streamer.

Related issue(s)

None

Pre-merge checklist

  • Code formatted correctly (check with make lint)
  • Code implemented for both sync and async - due to it's nature it's only async
  • Passing tests locally (check with make test, make sure you have TT_REFRESH, TT_SECRET, and TT_ACCOUNT environment variables set) - See description to check how it has been tested
  • New tests added (if applicable) - NA

Please note that, in order to pass the tests, you'll need to set up your Tastytrade credentials as repository secrets on your local fork. Read more at CONTRIBUTING.md.

@Graeme22
Copy link
Member

It seems like this would be better added to DXLinkStreamer directly no? Just add the option for a callback to the subscribe function?

@Quenos
Copy link
Contributor Author

Quenos commented Dec 19, 2025

@Graeme22
Answering your question made me realize that this was already totally possible as is.

Or am I missing something? If not the PR can be closed W/O merge :)

async def handle_quotes(streamer: DXLinkStreamer) -> None:
    while True:
        quote = await streamer.get_event(Quote)
        do_something_with(quote)


async def handle_trades(streamer: DXLinkStreamer) -> None:
    while True:
        trade = await streamer.get_event(Trade)
        do_something_with(trade)


async def main() -> None:
    subs_list = ["SPY"]

    streamer = await DXLinkStreamer(session)
    quote_task = trade_task = None
    try:
        # subscriptions owned by main
        await streamer.subscribe(Quote, subs_list)
        await streamer.subscribe(Trade, subs_list)

        quote_task = asyncio.create_task(handle_quotes(streamer))
        trade_task = asyncio.create_task(handle_trades(streamer))

        await asyncio.gather(quote_task, trade_task)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants