vscode 源码解析 - 事件模块
在进一步深入学习 vscode 的各种机制之前,我们先对 vscode 当中的一些基础工具做一些探索,因为核心机制大量地用到了这些基础模块,这篇文章将会介绍事件(event)模块,相关代码在 vs/base/common/event.ts 文件中。
Event 模块实现
Event 接口
Event 接口规定了一个函数,当调用了这个函数,就表示监听了这个函数所对应的事件流。
listener
参数是事件派发时将会被调用的回调函数,参数e
为单个事件,换句话说,listener
就是事件的消费者thisArgs
参数是回调函数中this
所指向的对象disposables
export interface Event<T> {
(
listener: (e: T) => any,
thisArgs?: any,
disposables?: IDisposable[] | DisposableStore
): IDisposable
}
返回的 IDisposable
对象用于解除这个监听的(通过调用它的 dispose
方法)。
另外一种解除监听的方式就是 disposable
了,Event 函数在执行的过程中会将 IDisposable
插入 disposables
,方便调用方决定在什么时候解除监听。
Emitter
有了 Event
接口,我们可以规定事件如何被消费,那么事件是如何产生的呢?一种方法就是通过 Emitter
。
Emitter
类型暴露了两个重要方法:
fire
,从这个方法的函数签名就能看出它就是用来派发一个事件的,该方法的主要逻辑就是将 this._listeners
当中的保存的 listener
全部调用一遍(省略了部分分支逻辑和性能监控相关代码)
fire(event: T): void {
if (this._listeners) {
for (let listener of this._listeners) {
this._deliveryQueue.push([listener, event]);
}
while (this._deliveryQueue.size > 0) {
const [listener, event] = this._deliveryQueue.shift()!;
try {
if (typeof listener === 'function') {
listener.call(undefined, event);
} else {
listener[0].call(listener[1], event);
}
} catch (e) {
onUnexpectedError(e);
}
}
}
}
get event()
,这个方法会在 Emitter
中创建一个 Event
,其主要逻辑就是将 listener
添加到 this._listeners
当中
const remove = this._listeners.push(!thisArgs ? listener : [listener, thisArgs])
Emitter
类型还提供了一些特殊的回调接口:
export interface EmitterOptions {
onFirstListenerAdd?: Function
onFirstListenerDidAdd?: Function
onListenerDidAdd?: Function
onLastListenerRemove?: Function
}
这使得 Emitter
在注册消费者的时候执行一些额外的逻辑。我们将会在下文中看到其中一些回调所扮演的重要角色。
Event 辅助方法
vscode 还提供了一系列工具方法用于组合 Event
,得到更加丰富的事件处理能力。下面我们一一进行说明。
once
export function once<T>(event: Event<T>): Event<T> {
return (listener, thisArgs = null, disposables?) => {
// we need this, in case the event fires during the listener call
let didFire = false
let result: IDisposable
result = event(
/* A */ (e) => {
if (didFire) {
return
} else if (result) {
result.dispose()
} else {
didFire = true
}
return listener.call(thisArgs, e)
},
null,
disposables
)
if (didFire) {
result.dispose()
}
return result
}
}
这个方法用于将一个 Event
变为只能派发一次的,事件类型相同的 Event
。
每一个事件到达时,会从 A 处开始执行,可以看到这段代码通过 didFire
作为锁,保证 listener.call(thisArgs, e)
只会被执行一次。
很明显, once
的执行过程中有两个 Event
,那么消息如何在 Event 之间传递的呢?我们注意到 A 处的匿名函数调用了一个 Event
的 listener
,而 A 本身又是另一个 Event 的 listener
,所以答案是很明显的:消息沿着 Event
链传递的过程,就是 Event
的 listener
们递归调用的过程。
snapshot
export function snapshot<T>(event: Event<T> /* B */): Event<T> {
let listener: IDisposable
const emitter = new Emitter<T>({
onFirstListenerAdd() {
listener = event(emitter.fire, emitter)
},
onLastListenerRemove() {
listener.dispose()
}
})
/* C */
return emitter.event
}
这个工具方法用于生成 map
等操作,我们把它和 map
一起分析。
map
将一种类型的事件转换成另一种类型的事件,看起来和 Array
的 map
非常相似。
export function map<I, O>(event: Event<I> /* A */, map: (i: I) => O): Event<O> {
return snapshot(
/* B */
(listener, thisArgs = null, disposables?) =>
event((i) => listener.call(thisArgs, map(i)), null, disposables)
)
}
从代码可以看出:这里的 Event
链的顺序是
- 被
map
装饰的Event
A snapshot
的参数,匿名的Event
BEmitter
暴露出的Event
C
当用户调用这个 map
转换出的 Event
的时候,实际上订阅的是 C,然后 C 在第一次被订阅时,会调用 B,而 B 又去订阅了 A。这里我们看到了 Emitter
的参数钩子起到了什么作用:B 是一个很特殊的 Event
它在 onFirstListenerAdd
中被订阅了 ,并且之后它并不会参与到 listener 的调用链中来,而是帮助 A 和 C 的 listener 之间创建了调用链,同时调用 map
对事件做了处理。
当有事件传递过来的时候,则是调用 A 的 listener i => listener.call(thisArgs, map(i))
,而这里的 listener
很明显可以看出是 C 的 listener,也就是 Emitter
的 fire
方法,通过上文中对 Emitter 的学习,我们知道,fire 方法会触发用户调用 C 时所传递来的 listener,这样整个传递链条就完整了。
forEach / filter / reduce
了解了 map
的工作原理之后,这三个函数就容易理解了,大家可以自行阅读代码。
signal
这个函数仅仅是做了一下类型转换,让订阅者忽略事件所携带的数据,比较简单。
any
export function any<T>(...events: Event<T>[]): Event<T> {
return (listener, thisArgs = null, disposables?) =>
combinedDisposable(
...events.map((event) =>
event((e) => listener.call(thisArgs, e), null, disposables)
)
)
}
这个方法会在 events 中任意一个 Event
派发事件的时候派发一个事件。
debounce
对 Event 链条上的事件做防抖处理。
export function debounce<T>(
event: Event<T>,
merge: (last: T | undefined, event: T) => T,
delay?: number,
leading?: boolean,
leakWarningThreshold?: number
): Event<T>
export function debounce<I, O>(
event: Event<I>,
merge: (last: O | undefined, event: I) => O,
delay?: number,
leading?: boolean,
leakWarningThreshold?: number
): Event<O>
export function debounce<I, O>(
event: Event<I>,
merge: (last: O | undefined, event: I) => O,
delay: number = 100,
leading = false,
leakWarningThreshold?: number
): Event<O> {
let subscription: IDisposable
let output: O | undefined = undefined
let handle: any = undefined
let numDebouncedCalls = 0
const emitter = new Emitter<O>({
leakWarningThreshold,
onFirstListenerAdd() {
subscription = event(
/* A */ (cur) => {
numDebouncedCalls++
output = merge(output, cur)
if (leading && !handle) {
emitter.fire(output)
output = undefined
}
clearTimeout(handle)
handle = setTimeout(() => {
const _output = output
output = undefined
handle = undefined
if (!leading || numDebouncedCalls > 1) {
emitter.fire(_output!)
}
numDebouncedCalls = 0
}, delay)
}
)
},
onLastListenerRemove() {
subscription.dispose()
}
})
return emitter.event
}
不难看出这段代码的核心逻辑就是 A 处的 listener,它会对 debounce 时间内对数据做归并处理,并设置定时器,当收到新事件时就取消定时器,而定时器到期时就调用 emitter.fire (opens in a new tab) 向下游继续发送事件。
stopWatch
这是一个记录耗时的 Event
,当它收到第一个事件时,会把这个事件转换为它从创建到收到该事件的耗时。
latch
这个 Event
仅有当事件确实发生变化时,才会向下游发送事件。原理也很简单,就是在 filter
的基础上,利用闭包来缓存上一次事件的数据,然后用新数据和它做比较,新老数据不同或者是第一次接收数据才放通。
buffer
这个 Event
在没有人订阅它时,会缓存所有收到的事件,并在收到订阅时将已经缓存的事件全部发送出去。
export function buffer<T>(
event: Event<T>,
nextTick = false,
_buffer: T[] = []
): Event<T> {
let buffer: T[] | null = _buffer.slice()
let listener: IDisposable | null = event((e) => {
if (buffer) {
buffer.push(e)
} else {
emitter.fire(e)
}
})
const flush = () => {
if (buffer) {
buffer.forEach((e) => emitter.fire(e))
}
buffer = null
}
const emitter = new Emitter<T>({
onFirstListenerAdd() {
if (!listener) {
listener = event((e) => emitter.fire(e))
}
},
onFirstListenerDidAdd() {
if (buffer) {
if (nextTick) {
setTimeout(flush)
} else {
flush()
}
}
},
onLastListenerRemove() {
if (listener) {
listener.dispose()
}
listener = null
}
})
return emitter.event
}
如果调用 buffer 时传入了 nextTick = True
,则发送缓存事件的操作会易步进行,所以如果你第一次订阅时同步添加了很多 listener,则它们都会收到这些缓存的事件。
ChainableEvent
如果要多次使用 map
, filter
等函数,一个比较优雅的写法是链式调用,例如 Event.map.filter.xxx
, ChainableEvent
就是为此准备的,通过调用 chain
方法,一个 Event
会转换成 ChainableEvent
,然后就可以进行链式调用:
export function chain<T>(event: Event<T>): IChainableEvent<T> {
return new ChainableEvent(event)
}
ChainableEvent
的实现很简单,就是对上面的方法进行了一次包裹,这里就不再赘述了。
除了上面提到的对 Event
的转换方法之外,还有一些生成 Event
的方法。
fromNodeEventEmitter
export function fromNodeEventEmitter<T>(
emitter: NodeEventEmitter,
eventName: string,
map: (...args: any[]) => T = (id) => id
): Event<T> {
const fn = (...args: any[]) => result.fire(map(...args))
const onFirstListenerAdd = () => emitter.on(eventName, fn)
const onLastListenerRemove = () => emitter.removeListener(eventName, fn)
const result = new Emitter<T>({ onFirstListenerAdd, onLastListenerRemove })
return result.event
}
该方法是对 node.js 原生事件的包裹,在原生事件的回调中调用 Emitter.fire
(opens in a new tab)。
fromDOMEventEmitter
对 DOM 事件进行包装,和上面的非常相似,这里就不赘述了。
fromPromise
export function fromPromise<T = any>(promise: Promise<T>): Event<undefined> {
const emitter = new Emitter<undefined>()
let shouldEmit = false
promise
.then(undefined, () => null)
.then(() => {
if (!shouldEmit) {
setTimeout(() => emitter.fire(undefined), 0)
} else {
emitter.fire(undefined)
}
})
shouldEmit = true
return emitter.event
}
将 Promise 转换为事件。通过 shouldEmit
确保 Promise 不会因为已经 resolve 而在订阅发生之前就开始派发事件(这样会导致错过事件)。
奇怪的是这里丢失了 Promise 返回的结果,不知道为什么这么设计,可能是 vscode 自己用不着吧。
toPromise
将事件转换为 Event,这个也比较简单。
另外,还有一些工具类提供更多的事件管理能力。
PauseableEmitter
类似于 Emitter
,但是能通过 pause
和 resume
方法暂停一条 Event 链上事件的传播,比较简单。
EventMultiplexer
这个类可以订阅多个事件,并在任意一个事件派发的时候,将该事件转发给自己所有的订阅者,它的核心就是它的 hook
方法:
private hook(e: { event: Event<T>; listener: IDisposable | null; }): void {
e.listener = e.event(r => this.emitter.fire(r));
}
也较为简单,这里不再赘述。
EventBufferer
这是一个非常有趣的类,它提供了一个 wrapEvent
方法包裹一个 Event
,并提供了一个 bufferEvents
方法,在这个方法的回调内所有经过它 wrapEvent
包裹的 Event
,都先不会被传播给订阅者。
/**
* The EventBufferer is useful in situations in which you want
* to delay firing your events during some code.
* You can wrap that code and be sure that the event will not
* be fired during that wrap.
*
* ```
* const emitter: Emitter;
* const delayer = new EventDelayer();
* const delayedEvent = delayer.wrapEvent(emitter.event);
*
* delayedEvent(console.log);
*
* delayer.bufferEvents(() => {
* emitter.fire(); // event will not be fired yet
* });
*
* // event will only be fired at this point
* ```
*/
export class EventBufferer {
private buffers: Function[][] = []
wrapEvent<T>(event: Event<T>): Event<T> {
return (listener, thisArgs?, disposables?) => {
return event(
(i) => {
const buffer = this.buffers[this.buffers.length - 1]
if (buffer) {
buffer.push(() => listener.call(thisArgs, i))
} else {
listener.call(thisArgs, i)
}
},
undefined,
disposables
)
}
}
bufferEvents<R = void>(fn: () => R): R {
const buffer: Array<() => R> = []
this.buffers.push(buffer)
const r = fn()
this.buffers.pop()
buffer.forEach((flush) => flush())
return r
}
}
当 bufferEvents
被调用的时候,会往 this.buffers
中压入一个新 buffer,在 fn 执行过程中派发的事件,就会因为 if (buffer)
判断为 true
而被缓存, fn
执行完毕之后, buffer
被弹出,其中包含的事件全部被派发。
Relay
这个类提供了切换上游 Event
的方法。当设置 Relay
的 input
属性时,就会切换监听的 Event
,而下游的 Event
监听的是 Relay
的 Emitter
,因此无需重新设置监听。
export class Relay<T> implements IDisposable {
// ...
set input(event: Event<T>) {
this.inputEvent = event
if (this.listening) {
this.nputEventListener.dispose()
this.inputEventListener = event(this.emitter.fire, this.emitter)
}
}
// ...
}
总结
至此我们已经学习了 vscode 事件模块的主要内容(其他的性能分析和泄漏监测等这篇文章就不分析了,感兴趣的读者可以自行阅读)。
vscode 事件模块是所谓响应式编程的一种实现,如果想要继续学习响应式编程,非常推荐以下两个项目:
- RxJS (opens in a new tab),前端最强大的响应式编程库,很多 RxJS 的概念都可以在 vscode 的事件模块中找到对应,例如 Emitter 十分类似于 Subject,
Event
类似于Observable
,map
reduce
filter
等函数,在 RxJS 中都有同名的操作符,但是 RxJS 更加强大,除了传递事件外,还能够传递异常以及事件流结束信息、支持事件调度等等,操作符更是要多得多 - callbag (opens in a new tab),是一套响应式编程规范(注意,不是库),vscode 和 RxJS 的事件链都是“推”机制的,而 callbag 同时支持推拉机制,而且实现上完全基于 JavaScript 强大的闭包机制,喜欢闭包体操的读者可以好好研究作者对几个操作符的实现