Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changelog

## 1.11.0 (unreleased)

* Add support for [sync streams](https://docs.powersync.com/sync/streams/overview).

## 1.10.0

* Fix "Linking a static library that was built with `-gmodules`, but the module cache was not found.` build warnings.
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import AVFoundation
import IdentifiedCollections
import SwiftUI
import SwiftUINavigation
import PowerSync

struct TodoListView: View {
@Environment(SystemManager.self) private var system
Expand All @@ -11,6 +12,7 @@ struct TodoListView: View {
@State private var error: Error?
@State private var newTodo: NewTodo?
@State private var editing: Bool = false
@State private var loadingListItems: Bool = false

#if os(iOS)
// Called when a photo has been captured. Individual widgets should register the listener
Expand All @@ -33,6 +35,10 @@ struct TodoListView: View {
}
}
}

if (loadingListItems) {
ProgressView()
}

ForEach(todos) { todo in
#if os(iOS)
Expand Down Expand Up @@ -142,6 +148,22 @@ struct TodoListView: View {
}
}
}
.task {
if (Secrets.previewSyncStreams) {
// With sync streams, todo items are not loaded by default. We have to request them while we need them.
// Thanks to builtin caching, navingating to the same list multiple times does not have to fetch items again.
loadingListItems = true
do {
// This will make the sync client request items from this list as long as we keep a reference to the stream subscription,
// and a default TTL of one day afterwards.
let streamSubscription = try await system.db.syncStream(name: "todos", params: ["list": JsonValue.string(listId)]).subscribe()
try await streamSubscription.waitForFirstSync()
} catch {
print("Error subscribing to list stream \(error)")
}
loadingListItems = false
}
}
}

func toggleCompletion(of todo: Todo) async {
Expand Down
18 changes: 18 additions & 0 deletions Demos/PowerSyncExample/PowerSyncExample/Secrets.template.swift
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,22 @@ extension Secrets {
static var supabaseStorageBucket: String? {
return nil
}

static var previewSyncStreams: Bool {
/*
Set to true to preview https://docs.powersync.com/sync/streams/overview.
When enabling this, also set your sync rules to the following:

config:
edition: 2

streams:
lists:
query: SELECT * FROM lists WHERE owner_id = auth.user_id()
auto_subscribe: true
todos:
query: SELECT * FROM todos WHERE list_id = subscription.parameter('list') AND list_id IN (SELECT id FROM lists WHERE owner_id = auth.user_id())
*/
false
}
}
1 change: 1 addition & 0 deletions Demos/PowerSyncExample/PowerSyncExample/_Secrets.swift
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ protocol SecretsProvider {
static var supabaseURL: URL { get }
static var supabaseAnonKey: String { get }
static var supabaseStorageBucket: String? { get }
static var previewSyncStreams: Bool { get }
}

// Default conforming type
Expand Down
5 changes: 5 additions & 0 deletions Sources/PowerSync/Kotlin/KotlinPowerSyncDatabaseImpl.swift
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ final class KotlinPowerSyncDatabaseImpl: PowerSyncDatabaseProtocol,
)
}

func syncStream(name: String, params: JsonParam?) -> any SyncStream {
let rawStream = kotlinDatabase.syncStream(name: name, parameters: params?.mapValues { $0.toKotlinMap() })
return KotlinSyncStream(kotlinStream: rawStream)
}

func connect(
connector: PowerSyncBackendConnectorProtocol,
options: ConnectOptions?
Expand Down
20 changes: 20 additions & 0 deletions Sources/PowerSync/Kotlin/db/KotlinJsonParam.swift
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,24 @@ extension JsonValue {
return PowerSyncKotlin.JsonParam.Map(value: anyDict)
}
}

static func kotlinValueToJsonParam(raw: Any?) -> JsonValue {
if let string = raw as? String {
return Self.string(string)
} else if let bool = raw as? KotlinBoolean {
return Self.bool(bool.boolValue)
} else if let int = raw as? KotlinInt {
return Self.int(int.intValue)
} else if let double = raw as? KotlinDouble {
return Self.double(double.doubleValue)
} else if let array = raw as? [Any?] {
return Self.array(array.map(kotlinValueToJsonParam))
} else if let object = raw as? [String: Any?] {
return Self.object(object.mapValues(kotlinValueToJsonParam))
} else {
// fatalError is fine here because this function is internal, so this being reached
// is an SDK bug.
fatalError("fromValue must only be called on outputs of JsonValue.toValue()");
}
}
}
18 changes: 18 additions & 0 deletions Sources/PowerSync/Kotlin/sync/KotlinSyncStatusData.swift
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,24 @@ extension KotlinSyncStatusDataProtocol {
)
)
}

var syncStreams: [SyncStreamStatus]? {
return base.syncStreams?.map(mapSyncStreamStatus)
}

func forStream(stream: SyncStreamDescription) -> SyncStreamStatus? {
var rawStatus: Optional<PowerSyncKotlin.SyncStreamStatus>
if let kotlinStream = stream as? any HasKotlinStreamDescription {
// Fast path: Reuse Kotlin stream object for lookup.
rawStatus = base.forStream(stream: kotlinStream.kotlinDescription)
} else {
// Custom stream description, we have to convert parameters to a Kotlin map.
let parameters = stream.parameters?.mapValues { $0.toValue() }
rawStatus = syncStatusForStream(status: base, name: stream.name, parameters: parameters)
}

return rawStatus.map(mapSyncStreamStatus)
}

private func mapPriorityStatus(_ status: PowerSyncKotlin.PriorityStatusEntry) -> PriorityStatusEntry {
var lastSyncedAt: Date?
Expand Down
125 changes: 125 additions & 0 deletions Sources/PowerSync/Kotlin/sync/KotlinSyncStreams.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
import Foundation
import PowerSyncKotlin

class KotlinStreamDescription<T: PowerSyncKotlin.SyncStreamDescription> {
let inner: T
let name: String
let parameters: JsonParam?
let kotlinParameters: [String: Any?]?

init(inner: T) {
self.inner = inner
self.name = inner.name
self.kotlinParameters = inner.parameters
self.parameters = inner.parameters?.mapValues { JsonValue.kotlinValueToJsonParam(raw: $0) }
}
}

protocol HasKotlinStreamDescription {
associatedtype Description: PowerSyncKotlin.SyncStreamDescription

var stream: KotlinStreamDescription<Description> { get }
}

extension HasKotlinStreamDescription {
var kotlinDescription: any PowerSyncKotlin.SyncStreamDescription {
self.stream.inner
}
}

class KotlinSyncStream: SyncStream, HasKotlinStreamDescription,
// `PowerSyncKotlin.SyncStream` cannot be marked as Sendable, but is thread-safe.
@unchecked Sendable
{
let stream: KotlinStreamDescription<PowerSyncKotlin.SyncStream>

init(kotlinStream: PowerSyncKotlin.SyncStream) {
self.stream = KotlinStreamDescription(inner: kotlinStream);
}

var name: String {
stream.name
}

var parameters: JsonParam? {
stream.parameters
}

func subscribe(ttl: TimeInterval?, priority: BucketPriority?) async throws -> any SyncStreamSubscription {
let kotlinTtl: Optional<KotlinDouble> = if let ttl {
KotlinDouble(value: ttl)
} else {
nil
}
let kotlinPriority: Optional<KotlinInt> = if let priority {
KotlinInt(value: priority.priorityCode)
} else {
nil
}

let kotlinSubscription = try await syncStreamSubscribeSwift(
stream: stream.inner,
ttl: kotlinTtl,
priority: kotlinPriority,
);
return KotlinSyncStreamSubscription(kotlinStream: kotlinSubscription)
}

func unsubscribeAll() async throws {
try await stream.inner.unsubscribeAll()
}
}

class KotlinSyncStreamSubscription: SyncStreamSubscription, HasKotlinStreamDescription,
// `PowerSyncKotlin.SyncStreamSubscription` cannot be marked as Sendable, but is thread-safe.
@unchecked Sendable
{
let stream: KotlinStreamDescription<PowerSyncKotlin.SyncStreamSubscription>

init(kotlinStream: PowerSyncKotlin.SyncStreamSubscription) {
self.stream = KotlinStreamDescription(inner: kotlinStream)
}

var name: String {
stream.name
}
var parameters: JsonParam? {
stream.parameters
}

func waitForFirstSync() async throws {
try await stream.inner.waitForFirstSync()
}

func unsubscribe() async throws {
try await stream.inner.unsubscribe()
}
}

func mapSyncStreamStatus(_ status: PowerSyncKotlin.SyncStreamStatus) -> SyncStreamStatus {
let progress = status.progress.map { ProgressNumbers(source: $0) }
let subscription = status.subscription

return SyncStreamStatus(
progress: progress,
subscription: SyncSubscriptionDescription(
name: subscription.name,
parameters: subscription.parameters?.mapValues { JsonValue.kotlinValueToJsonParam(raw: $0) },
active: subscription.active,
isDefault: subscription.isDefault,
hasExplicitSubscription: subscription.hasExplicitSubscription,
expiresAt: subscription.expiresAt.map { Double($0.epochSeconds) },
lastSyncedAt: subscription.lastSyncedAt.map { Double($0.epochSeconds) }
)
)
}

struct ProgressNumbers: ProgressWithOperations {
let totalOperations: Int32
let downloadedOperations: Int32

init(source: PowerSyncKotlin.ProgressWithOperations) {
self.totalOperations = source.totalOperations
self.downloadedOperations = source.downloadedOperations
}
}
10 changes: 7 additions & 3 deletions Sources/PowerSync/Protocol/PowerSyncDatabaseProtocol.swift
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,8 @@ public struct ConnectOptions: Sendable {

/// Uses a new sync client implemented in Rust instead of the one implemented in Kotlin.
///
/// The new client is more efficient and will become the default in the future, but is still marked as experimental for now.
/// We encourage interested users to try the new client.
@_spi(PowerSyncExperimental)
/// This option is enabled by default and recommended for all apps. The old Kotlin-based implementation
/// will be removed in a future version of the SDK.
public var newClientImplementation: Bool

/// Configuration for the sync client used for PowerSync requests.
Expand Down Expand Up @@ -250,6 +249,11 @@ public protocol PowerSyncDatabaseProtocol: Queries, Sendable {
/// the database.
func disconnectAndClear(clearLocal: Bool, soft: Bool) async throws

/// Create a ``SyncStream`` instance for the given name and parameters.
///
/// Use ``SyncStream/subscribe`` on the returned instance to subscribe to the stream.
func syncStream(name: String, params: JsonParam?) -> any SyncStream

/// Close the database, releasing resources.
/// Also disconnects any active connection.
///
Expand Down
2 changes: 1 addition & 1 deletion Sources/PowerSync/Protocol/db/JsonParam.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
///
/// Supports all standard JSON types: string, number (integer and double),
/// boolean, null, arrays, and nested objects.
public enum JsonValue: Codable, Sendable {
public enum JsonValue: Codable, Sendable, Equatable {
/// A JSON string value.
case string(String)

Expand Down
17 changes: 17 additions & 0 deletions Sources/PowerSync/Protocol/sync/SyncStatusData.swift
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,15 @@ public protocol SyncStatusData: Sendable {
/// - Parameter priority: The priority for which the status is requested.
/// - Returns: A `PriorityStatusEntry` representing the synchronization status for the given priority.
func statusForPriority(_ priority: BucketPriority) -> PriorityStatusEntry

/// All sync streams currently being tracked in the database.
///
/// This returns null when the database is currently being opened and we don't have reliable information about
/// included streams yet.
var syncStreams: [SyncStreamStatus]? { get }

/// Status information for the given stream, if it's a stream that is currently tracked by the sync client.
func forStream(stream: SyncStreamDescription) -> SyncStreamStatus?
}

/// A protocol extending `SyncStatusData` to include flow-based updates for synchronization status.
Expand All @@ -55,3 +64,11 @@ public protocol SyncStatus: SyncStatusData, Sendable {
/// - Returns: An `AsyncStream` that emits updates whenever the synchronization status changes.
func asFlow() -> AsyncStream<SyncStatusData>
}

/// Current information about a ``SyncStreamSubscription``.
public struct SyncStreamStatus {
/// If the sync status is currently downloading, information about download progress related to this stream.
let progress: ProgressWithOperations?
/// The ``SyncSubscriptionDescription`` providing information about the subscription.
let subscription: SyncSubscriptionDescription
}
Loading