Using RxSwift with StreamChat

LAST EDIT Mar 03 2021
The new v3 SDK does not currently have RxSwift integration. Stay tuned!

Rx is a generic abstraction of computation expressed through Observable interface. RxSwift is the Swift implementation of Rx.

Stream uses RxSwift to enable easy composition of asynchronous operations and event/data streams. You can check their quick start guide if you're not familiar with RxSwift.

Using RxSwift with StreamChat Swift SDK

Copied!

By default the official Swift API client handles I/O operation using traditional callback-style. RxSwift is supported by installing our Core library called StreamChatCore. When you use StreamChatCore you can access rx style methods from the client. For instance, Client.shared.addDevice Rx counter part is Client.shared.rx.addDevice.

To use RxSwift, you should have the StreamChatCore dependency installed. If you're using StreamChat, it's already available since StreamChat depends on it. If you're only using StreamChatClient, then you'll need to add StreamChatCore to your Podfile like so:

1
pod 'StreamChatCore'

In the file you want to use RxSwift,

1
import StreamChatCore

You can access rx subset as such:

1
Client.shared.rx.someFunction(arguments)

For example, to add device, you can use both callback way and Rx way:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// Callback way 
Client.shared.addDevice(deviceToken: deviceToken) 
 
// Rx way 
Client.shared.rx.addDevice(deviceToken: deviceToken).subscribe().disposed(by: disposeBag) 
 
 
// With completions: 
// Callback way 
Client.shared.addDevice(deviceToken: deviceToken) { (result) in 
    // handle result 
} 
 
// Rx way 
Client.shared.rx.addDevice(deviceToken: deviceToken).subscribe(onError: { error in 
    // handle error 
}, onCompleted: { 
    // handle completion 
}).disposed(by: disposeBag)

As another example, let's look at creating a channel:

1
2
3
4
5
6
7
8
9
10
11
12
13
let channel = Client.shared.channel(type: .messaging, id: "general") 
 
// using callbacks 
Client.shared.create(channel: channel) { (result) in 
    // handle result 
} 
 
// using Rx 
Client.shared.rx.create(channel: channel).subscribe(onError: { (error) in 
    // handle error 
}, onCompleted: { 
    // handle completed 
}).disposed(by: disposeBag)

Debugging RxSwift

Copied!

Here's the official Debugging guide.

You can just put .debug("my debug label") to any Observable to enable debug logging.

Example

1
2
3
4
5
6
7
8
9
10
11
12
let subscription = myInterval(.milliseconds(100)) 
    .debug("my probe") 
    .map { e in 
        return "This is simply \(e)" 
    } 
    .subscribe(onNext: { n in 
        print(n) 
    }) 
 
Thread.sleepForTimeInterval(0.5) 
 
subscription.dispose()

will print

1
2
3
4
5
6
7
8
9
10
11
12
13
14
[my probe] subscribed 
Subscribed 
[my probe] -> Event next(Box(0)) 
This is simply 0 
[my probe] -> Event next(Box(1)) 
This is simply 1 
[my probe] -> Event next(Box(2)) 
This is simply 2 
[my probe] -> Event next(Box(3)) 
This is simply 3 
[my probe] -> Event next(Box(4)) 
This is simply 4 
[my probe] dispose 
Disposed