多个订阅者的默认行为并不总是可取的。在本文中,我们将介绍如何更改此行为并以适当的方式处理多个订阅者。
但首先,让我们来看看多个订阅者的默认行为。
默认行为
假设我们有以下Observable:
private static Observable getObservable() { return Observable.create(subscriber -> { subscriber.onNext(gettingValue(1)); subscriber.onNext(gettingValue(2)); subscriber.add(Subscriptions.create(() -> { LOGGER.info("Clear resources"); })); });}
订阅者订阅后会立即发出两个元素。
在我们的示例中,我们有两个订阅者:
LOGGER.info("Subscribing"); Subscription s1 = obs.subscribe(i -> LOGGER.info("subscriber#1 is printing " + i));Subscription s2 = obs.subscribe(i -> LOGGER.info("subscriber#2 is printing " + i)); s1.unsubscribe();s2.unsubscribe();
想象一下,获取每个元素是一项代价高昂的操作 - 例如,它可能包括密集计算或打开URL连接。
为了简单起见,我们只返回一个数字:
private static Integer gettingValue(int i) { LOGGER.info("Getting " + i); return i;}
这是输出:
SubscribingGetting 1subscriber#1 is printing 1Getting 2subscriber#1 is printing 2Getting 1subscriber#2 is printing 1Getting 2subscriber#2 is printing 2Clear resourcesClear resources
我们可以看到,在默认情况下,获取每个元素和清除资源都要执行两次-对于每个订阅服务器一次。这不是我们想要的。ConnectableObservable类有助于解决这个问题。
ConnectableObservable
类允许与多个订阅者共享订阅,而不允许多次执行底层操作。
但首先,让我们创建一个ConnectableObservable。
publish()
publish()方法是从Observable创建一个ConnectableObservable:
ConnectableObservable obs = Observable.create(subscriber -> { subscriber.onNext(gettingValue(1)); subscriber.onNext(gettingValue(2)); subscriber.add(Subscriptions.create(() -> { LOGGER.info("Clear resources"); }));}).publish();
但就目前而言,它什么都不做。它的工作原理是connect()方法。
connect()
在调用ConnectableObservable的connect()方法之前,即使有一些订阅者,也不会触发Observable的onSubcribe()回调。
让我们来证明一下:
LOGGER.info("Subscribing");obs.subscribe(i -> LOGGER.info("subscriber #1 is printing " + i));obs.subscribe(i -> LOGGER.info("subscriber #2 is printing " + i));Thread.sleep(1000);LOGGER.info("Connecting");Subscription s = obs.connect();s.unsubscribe();
我们订阅,然后等待一秒钟再连接输出是:
SubscribingConnectingGetting 1subscriber #1 is printing 1subscriber #2 is printing 1Getting 2subscriber #1 is printing 2subscriber #2 is printing 2Clear resources
我们可以看到:
- 获取元素只出现一次我们想要的
- 清算资源也只出现一次
- 订阅后获取元素开始一秒钟
- 订阅不再触发元素的发射。只有connect()才能这样做
这种延迟可能是有益的 - 有时我们需要为所有订阅者提供相同的元素序列,即使其中一个订阅者比另一个订阅者更早。
可观察的一致视图 - 在subscribe()之后的connect()
这个用例无法在我们之前的Observable上进行演示,因为它运行很冷,而且两个订阅者都可以获得整个元素序列。
相反,想象一下,元素发射不依赖于订阅的时刻,例如,鼠标点击发出的事件。现在还想象第二个订阅者在第一个订阅者之后订阅第二个订阅者。
第一个订阅者将获得此示例中发出的所有元素,而第二个订阅者将只接收一些元素。
另一方面,在正确的位置使用connect()方法可以为两个订阅者提供Observable序列上的相同视图。
让我们创建一个Observable。它将在JFrame上点击鼠标时发出元素。
每个元素都是点击的x坐标:
private static Observable getObservable() { return Observable.create(subscriber -> { frame.addMouseListener(new MouseAdapter() { @Override public void mouseClicked(MouseEvent e) { subscriber.onNext(e.getX()); } }); subscriber.add(Subscriptions.create(() { LOGGER.info("Clear resources"); for (MouseListener listener : frame.getListeners(MouseListener.class)) { frame.removeMouseListener(listener); } })); });}
现在,如果我们以第二个间隔一个接一个地订阅两个订阅者,运行程序并开始单击,我们将看到第一个订阅者将获得更多元素:
public static void defaultBehaviour() throws InterruptedException { Observable obs = getObservable(); LOGGER.info("subscribing #1"); Subscription subscription1 = obs.subscribe((i) -> LOGGER.info("subscriber#1 is printing x-coordinate " + i)); Thread.sleep(1000); LOGGER.info("subscribing #2"); Subscription subscription2 = obs.subscribe((i) -> LOGGER.info("subscriber#2 is printing x-coordinate " + i)); Thread.sleep(1000); LOGGER.info("unsubscribe#1"); subscription1.unsubscribe(); Thread.sleep(1000); LOGGER.info("unsubscribe#2"); subscription2.unsubscribe();}
subscribing #1subscriber#1 is printing x-coordinate 280subscriber#1 is printing x-coordinate 242subscribing #2subscriber#1 is printing x-coordinate 343subscriber#2 is printing x-coordinate 343unsubscribe#1clearing resourcesunsubscribe#2clearing resources
connect() After subscribe()
为了使两个订阅者获得相同的序列,我们将Observable转换为ConnectableObservable并在订阅者之后调用connect():
public static void subscribeBeforeConnect() throws InterruptedException { ConnectableObservable obs = getObservable().publish(); LOGGER.info("subscribing #1"); Subscription subscription1 = obs.subscribe( i -> LOGGER.info("subscriber#1 is printing x-coordinate " + i)); Thread.sleep(1000); LOGGER.info("subscribing #2"); Subscription subscription2 = obs.subscribe( i -> LOGGER.info("subscriber#2 is printing x-coordinate " + i)); Thread.sleep(1000); LOGGER.info("connecting:"); Subscription s = obs.connect(); Thread.sleep(1000); LOGGER.info("unsubscribe connected"); s.unsubscribe();}
现在他们将得到相同的序列:
subscribing #1subscribing #2connecting:subscriber#1 is printing x-coordinate 317subscriber#2 is printing x-coordinate 317subscriber#1 is printing x-coordinate 364subscriber#2 is printing x-coordinate 364unsubscribe connectedclearing resources
所以重点是等待所有用户准备就绪然后调用connect()。
在Spring应用程序中,我们可以在应用程序启动期间订阅所有组件,例如在onApplicationEvent()中调用connect()。
让我们回到我们的例子;注意,connect()方法之前的所有单击操作都失败了。如果我们不想遗漏元素,但相反,我们可以在代码中更早地放置connect(),并强制可观察到的元素在没有任何订阅服务器的情况下生成事件。
在没有任何订阅者的情况下强制订阅 - connect()在subscribe()之前
为了证明这一点,让我们更正我们的例子:
public static void connectBeforeSubscribe() throws InterruptedException { ConnectableObservable obs = getObservable() .doOnNext(x -> LOGGER.info("saving " + x)).publish(); LOGGER.info("connecting:"); Subscription s = obs.connect(); Thread.sleep(1000); LOGGER.info("subscribing #1"); obs.subscribe((i) -> LOGGER.info("subscriber#1 is printing x-coordinate " + i)); Thread.sleep(1000); LOGGER.info("subscribing #2"); obs.subscribe((i) -> LOGGER.info("subscriber#2 is printing x-coordinate " + i)); Thread.sleep(1000); s.unsubscribe();}
步骤相对简单:
- 首先,我们连接
- 然后我们等待一秒钟并订阅第一个订阅者
- 最后,我们等待另一秒钟并订阅第二个订阅者
请注意,我们添加了doOnNext()运算符。这里我们可以在数据库中存储元素,例如在我们的代码中,我们只打印“save...”。
如果我们启动代码并开始点击,我们将看到在connect()调用之后立即发出和处理元素:
connecting:saving 306saving 248subscribing #1saving 377subscriber#1 is printing x-coordinate 377saving 295subscriber#1 is printing x-coordinate 295saving 206subscriber#1 is printing x-coordinate 206subscribing #2saving 347subscriber#1 is printing x-coordinate 347subscriber#2 is printing x-coordinate 347clearing resources
如果没有订阅者,则仍会处理这些元素。
因此,不管是否有人订阅,connect()方法都会开始发出和处理元素,就好像有一个使用了元素的空操作的人工订阅器一样。
如果有一些真正的订阅者订阅,这个人工中介只向他们传播元素。
若要取消订阅,我们会执行以下步骤:
s.unsubscribe();
然后:
Subscription s = obs.connect();
autoConnect()
此方法意味着在订阅之前或之后不会调用connect(),而是在第一个订阅者订阅时自动调用。
使用此方法,我们不能自己调用connect(),因为返回的对象是通常的Observable,它没有此方法但使用底层的ConnectableObservable:
public static void autoConnectAndSubscribe() throws InterruptedException { Observable obs = getObservable() .doOnNext(x -> LOGGER.info("saving " + x)).publish().autoConnect(); LOGGER.info("autoconnect()"); Thread.sleep(1000); LOGGER.info("subscribing #1"); Subscription s1 = obs.subscribe((i) -> LOGGER.info("subscriber#1 is printing x-coordinate " + i)); Thread.sleep(1000); LOGGER.info("subscribing #2"); Subscription s2 = obs.subscribe((i) -> LOGGER.info("subscriber#2 is printing x-coordinate " + i)); Thread.sleep(1000); LOGGER.info("unsubscribe 1"); s1.unsubscribe(); Thread.sleep(1000); LOGGER.info("unsubscribe 2"); s2.unsubscribe();}
请注意,我们也不能取消订阅人工订阅者。我们可以取消订阅所有真正的订阅者,但人工订阅者仍将处理事件。
为了理解这一点,让我们看一下最后一个订阅者取消订阅后最后发生的事情:
subscribing #1saving 296subscriber#1 is printing x-coordinate 296saving 329subscriber#1 is printing x-coordinate 329subscribing #2saving 226subscriber#1 is printing x-coordinate 226subscriber#2 is printing x-coordinate 226unsubscribe 1saving 268subscriber#2 is printing x-coordinate 268saving 234subscriber#2 is printing x-coordinate 234unsubscribe 2saving 278saving 268
正如我们所看到的,在第二次取消订阅后,不会出现清除资源的情况,并继续使用doOnNext()保存元素。这意味着人工订阅服务器不会取消订阅,而是继续使用元素。
refCount()
refCount()类似于autoConnect(),因为只要第一个订阅者订阅,连接也会自动发生。
与autoconnect()不同,当最后一个订阅者取消订阅时,也会自动断开连接:
public static void refCountAndSubscribe() throws InterruptedException { Observable obs = getObservable() .doOnNext(x -> LOGGER.info("saving " + x)).publish().refCount(); LOGGER.info("refcount()"); Thread.sleep(1000); LOGGER.info("subscribing #1"); Subscription subscription1 = obs.subscribe( i -> LOGGER.info("subscriber#1 is printing x-coordinate " + i)); Thread.sleep(1000); LOGGER.info("subscribing #2"); Subscription subscription2 = obs.subscribe( i -> LOGGER.info("subscriber#2 is printing x-coordinate " + i)); Thread.sleep(1000); LOGGER.info("unsubscribe#1"); subscription1.unsubscribe(); Thread.sleep(1000); LOGGER.info("unsubscribe#2"); subscription2.unsubscribe();}
refcount()subscribing #1saving 265subscriber#1 is printing x-coordinate 265saving 338subscriber#1 is printing x-coordinate 338subscribing #2saving 203subscriber#1 is printing x-coordinate 203subscriber#2 is printing x-coordinate 203unsubscribe#1saving 294subscriber#2 is printing x-coordinate 294unsubscribe#2clearing resources
结论
ConnectableObservable类可以轻松地处理多个订阅者。
它的方法看起来很相似,但由于实现上的细微差别(甚至方法的顺序也很重要),用户的行为发生了很大的变化。