抽象基于 callback 的异步操作

异步操作的一种最常见形式就是基于 callback,如早期的 Javascript 就是完全基于 callback 风格设计的。
抽象这种风格的异步操作,可以说是一个 IO Monad 的必备功能。
一个 callback 风格的异步操作通常可以定义为

1
def triggerEvent(cb: Either[Throwable, Event] => Unit): Unit

即,触发一个操作,并在完成时调用回调函数 cb 其对应的类型是 (Either[Throwable, A] => Unit) => Unit

注意,Java 中通常会将成功和失败的回调分开作为一个回调接口,如 Channel 的异步回调:

1
2
3
4
interface CompletionHandler {
void completed(V result, A attachment);
void failed(Throwable exc, A attachment);
}

这和上述形式并无区别。

我们可能无法第一时间就给出一个抽象,那么还是回归前文的老套路,使用一种新的 ADT 来表示:

1
2
3
4
5
6
7
8
9
10
11
12
sealed trait IO
object IO {
private case class Pure[A](value: A) extends IO[A]
private case class FlatMap[A, B](fa: IO[A], f: A => IO[B]) extends IO[B]
private case class Effect[A](run: () => A) extends IO[A]
private case class Failure[A](ex: Throwable) extends IO[A]
private case class Recover[A](fa: IO[A], handler: Throwable => IO[A]) extends IO[A]
private case class Async[A](register: (Either[Throwable, A] => Unit) => Unit) extends IO[A]
...

def async[A](register: (Either[Throwable, A], Unit) => Unit): IO[A]
}

现在假设有一个回调风格的 httpclient

1
2
3
trait HttpClient {
def get(url: String, cb: Either[Throwable, Response] => Unit)
}

我们可以将其转换为 IO

1
2
3
4
5
6
7
8
val cli: HttpClient = ???
def httpGet() = {
def get(url: String): IO[Response] = {
IO.async[Response] { cb =>
cli.get(url, cb)
}
}
}

如此一来,我们就可以像前文一样使用 flatMap 等函数对这些操作进行组合,很大程度上避免了回调地狱带来的代码无法阅读的问题。例如:

1
2
3
4
for {
fisrtName <- httpGet(urlA)
lastName <- httpGet(urlB)
} yield s"${firstName}${lastName}"

实现新的解释器

至此为止一切看起来都很简单,我们只要能针对新实现的 ADT 编写解释器就万事俱备了。
考虑到新增了异步操作,那么解释器理论上应该以回调的形式实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private def unsafeRunFlatMap[A, B](io: FlatMap[A, B], cb: Either[Throwable, B] => Unit): Unit = {
unsafeRunAsync[A](io.fa, {
case Left(e) =>
cb(Left(e))
case Right(a) =>
unsafeRunAsync[B](io.f(a), {
case Left(e) => cb(Left(e))
case Right(b) => cb(Right(b))
})
})
}

def unsafeRunAsync[A](io: IO[A], cb: Either[Throwable, A] => Unit): Unit = io match {
case IO.Pure(a) => cb(Right(a))
case io: IO.FlatMap[_, _] => unsafeRunFlatMap(io, cb)
case IO.Effect(run) => cb(Right(run()))
case IO.Failure(ex) => cb(Left(ex))
case IO.Async(register) =>
register(cb)
case IO.Recover(fa, h) =>
unsafeRunAsync[A](fa, {
case Left(e) => unsafeRunAsync(h(e), cb)
case Right(v) => cb(Right(v))
})
}

这个实现看起来仍旧非常的直观,目前为止,我们已经实现了一个朴素的,能隔离异步操作的 IO Monad。
通常,触发并注册回调的动作也会产生副作用,这个时候我们可以将 Async 调整为

1
private case class AsyncF[A](register: (Either[Throwable, A] => Unit) => IO[Unit]) extends IO[A]

这样触发的动作也是一个受管理的 IO Monad。

Stack safety

然而,当我重新实现并运行 MonadError 相关的 Law 校验时,却发现运行 tailRecM 定义的校验时出现了 StackOverflow
这是预料中的情况,前面的解释器存在递归调用,并且无法被优化为 tail call 形式。
一种简单的解决方案是将调用栈手工存储到一个 Stack 中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
private type Callback = Either[Throwable, Any] => Unit
private type IOLoop = Stack[(() => IO[_], Callback)]

private def runAsyncFlatMap[A, B](loop: IOLoop, io: FlatMap[A, B], fb: Either[Throwable, B] => Unit): Unit = {
loop.push((() => io.fa) -> { eoa: Either[Throwable, Any] =>
eoa match {
case Left(e) =>
fb(Left(e))
case Right(a) =>
loop.push((() => {
io.f(a.asInstanceOf[A])
}) -> { eoa: Either[Throwable, Any] =>
loop.push((() => eoa.pure[IO]) ->{ eeob: Either[Throwable, Any] =>
eeob match {
case Left(e) => throw new Exception("unreachable")
case Right(v) => fb(v.asInstanceOf[Either[Throwable, B]])
}
})
})
}
})
}

def unsafeRunAsync[A](io: IO[A])(cb: Either[Throwable, A] => Unit) = {
val loop = Stack[(() => IO[_], Callback)]()
start(() => io, loop)(cb.asInstanceOf[Callback])
while(!loop.isEmpty) {
val (io, cb) = loop.pop()
start(io, loop)(cb)
}
}

private def start[_]( io: () => IO[_], loop: IOLoop )(cb: Callback): Unit = io() match {
case IO.Pure(a) => cb(Right(a))
case io: IO.FlatMap[_, _] => runAsyncFlatMap(loop, io, cb)
case IO.Effect(run) => cb(Right(run()))
case IO.Failure(ex) => cb(Left(ex))
case IO.AsyncF(register) =>
loop.push(() => register(cb), _ => ())
case IO.Recover(fa, h) =>
loop.push((() => fa) -> { eoa: Either[Throwable, Any] =>
eoa match {
case Left(e) =>
loop.push((() => h(e)) -> { ee: Either[Throwable, Any] =>
cb(ee)
})
case Right(v) => cb(Right(v))
}
})
}

如此一来,一个不会 StackOverflow 的解释器就算实现好了。

手工消除递归的问题

上述手工消除递归的方式存在线程安全的问题,当然,我们可以使用线程安全的数据结构来规避这个问题。
同时,这种方式默认了回调处理发生的线程是不可控的,取决于具体框架。例如 NettyChannelFuture,默认情况回调会在其自身线程池里处理,这可能会影响 Netty 的性能。

为了避免这些问题,我们还可以将回调以任务形式提交给线程池处理。当然,如果任意一个简单的操作都作为任务提交到线程池的话,会导致更多上下文切换。
这里的问题较为复杂,后续我们在介绍 cats-effect 或者 zio 相关内容时在详细介绍。

小结

本文简单介绍了IO Monad 如何抽象回调形式的异步操作,可以一定程度上消除回调地狱。
实际上这和 Contiuation Monad 的套路非常相似,不同的时过程中涉及到副作用隔离。

本文相关代码可以在 github仓库 中查看