抽象基于 callback 的异步操作
异步操作的一种最常见形式就是基于 callback,如早期的 Javascript 就是完全基于 callback 风格设计的。
抽象这种风格的异步操作,可以说是一个 IO Monad 的必备功能。
一个 callback 风格的异步操作通常可以定义为
1
| def triggerEvent(cb: Either[Throwable, Event] => Unit): Unit
|
即,触发一个操作,并在完成时调用回调函数 cb
其对应的类型是 (Either[Throwable, A] => Unit) => Unit
注意,Java 中通常会将成功和失败的回调分开作为一个回调接口,如 Channel
的异步回调:
1 2 3 4
| interface CompletionHandler { void completed(V result, A attachment); void failed(Throwable exc, A attachment); }
|
这和上述形式并无区别。
我们可能无法第一时间就给出一个抽象,那么还是回归前文的老套路,使用一种新的 ADT 来表示:
1 2 3 4 5 6 7 8 9 10 11 12
| sealed trait IO object IO { private case class Pure[A](value: A) extends IO[A] private case class FlatMap[A, B](fa: IO[A], f: A => IO[B]) extends IO[B] private case class Effect[A](run: () => A) extends IO[A] private case class Failure[A](ex: Throwable) extends IO[A] private case class Recover[A](fa: IO[A], handler: Throwable => IO[A]) extends IO[A] private case class Async[A](register: (Either[Throwable, A] => Unit) => Unit) extends IO[A] ...
def async[A](register: (Either[Throwable, A], Unit) => Unit): IO[A] }
|
现在假设有一个回调风格的 httpclient
1 2 3
| trait HttpClient { def get(url: String, cb: Either[Throwable, Response] => Unit) }
|
我们可以将其转换为 IO
1 2 3 4 5 6 7 8
| val cli: HttpClient = ??? def httpGet() = { def get(url: String): IO[Response] = { IO.async[Response] { cb => cli.get(url, cb) } } }
|
如此一来,我们就可以像前文一样使用 flatMap 等函数对这些操作进行组合,很大程度上避免了回调地狱带来的代码无法阅读的问题。例如:
1 2 3 4
| for { fisrtName <- httpGet(urlA) lastName <- httpGet(urlB) } yield s"${firstName}${lastName}"
|
实现新的解释器
至此为止一切看起来都很简单,我们只要能针对新实现的 ADT 编写解释器就万事俱备了。
考虑到新增了异步操作,那么解释器理论上应该以回调的形式实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| private def unsafeRunFlatMap[A, B](io: FlatMap[A, B], cb: Either[Throwable, B] => Unit): Unit = { unsafeRunAsync[A](io.fa, { case Left(e) => cb(Left(e)) case Right(a) => unsafeRunAsync[B](io.f(a), { case Left(e) => cb(Left(e)) case Right(b) => cb(Right(b)) }) }) }
def unsafeRunAsync[A](io: IO[A], cb: Either[Throwable, A] => Unit): Unit = io match { case IO.Pure(a) => cb(Right(a)) case io: IO.FlatMap[_, _] => unsafeRunFlatMap(io, cb) case IO.Effect(run) => cb(Right(run())) case IO.Failure(ex) => cb(Left(ex)) case IO.Async(register) => register(cb) case IO.Recover(fa, h) => unsafeRunAsync[A](fa, { case Left(e) => unsafeRunAsync(h(e), cb) case Right(v) => cb(Right(v)) }) }
|
这个实现看起来仍旧非常的直观,目前为止,我们已经实现了一个朴素的,能隔离异步操作的 IO Monad。
通常,触发并注册回调的动作也会产生副作用,这个时候我们可以将 Async
调整为
1
| private case class AsyncF[A](register: (Either[Throwable, A] => Unit) => IO[Unit]) extends IO[A]
|
这样触发的动作也是一个受管理的 IO Monad。
Stack safety
然而,当我重新实现并运行 MonadError
相关的 Law 校验时,却发现运行 tailRecM
定义的校验时出现了 StackOverflow
。
这是预料中的情况,前面的解释器存在递归调用,并且无法被优化为 tail call 形式。
一种简单的解决方案是将调用栈手工存储到一个 Stack
中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| private type Callback = Either[Throwable, Any] => Unit private type IOLoop = Stack[(() => IO[_], Callback)]
private def runAsyncFlatMap[A, B](loop: IOLoop, io: FlatMap[A, B], fb: Either[Throwable, B] => Unit): Unit = { loop.push((() => io.fa) -> { eoa: Either[Throwable, Any] => eoa match { case Left(e) => fb(Left(e)) case Right(a) => loop.push((() => { io.f(a.asInstanceOf[A]) }) -> { eoa: Either[Throwable, Any] => loop.push((() => eoa.pure[IO]) ->{ eeob: Either[Throwable, Any] => eeob match { case Left(e) => throw new Exception("unreachable") case Right(v) => fb(v.asInstanceOf[Either[Throwable, B]]) } }) }) } }) }
def unsafeRunAsync[A](io: IO[A])(cb: Either[Throwable, A] => Unit) = { val loop = Stack[(() => IO[_], Callback)]() start(() => io, loop)(cb.asInstanceOf[Callback]) while(!loop.isEmpty) { val (io, cb) = loop.pop() start(io, loop)(cb) } }
private def start[_]( io: () => IO[_], loop: IOLoop )(cb: Callback): Unit = io() match { case IO.Pure(a) => cb(Right(a)) case io: IO.FlatMap[_, _] => runAsyncFlatMap(loop, io, cb) case IO.Effect(run) => cb(Right(run())) case IO.Failure(ex) => cb(Left(ex)) case IO.AsyncF(register) => loop.push(() => register(cb), _ => ()) case IO.Recover(fa, h) => loop.push((() => fa) -> { eoa: Either[Throwable, Any] => eoa match { case Left(e) => loop.push((() => h(e)) -> { ee: Either[Throwable, Any] => cb(ee) }) case Right(v) => cb(Right(v)) } }) }
|
如此一来,一个不会 StackOverflow 的解释器就算实现好了。
手工消除递归的问题
上述手工消除递归的方式存在线程安全的问题,当然,我们可以使用线程安全的数据结构来规避这个问题。
同时,这种方式默认了回调处理发生的线程是不可控的,取决于具体框架。例如 Netty
的 ChannelFuture
,默认情况回调会在其自身线程池里处理,这可能会影响 Netty
的性能。
为了避免这些问题,我们还可以将回调以任务形式提交给线程池处理。当然,如果任意一个简单的操作都作为任务提交到线程池的话,会导致更多上下文切换。
这里的问题较为复杂,后续我们在介绍 cats-effect
或者 zio
相关内容时在详细介绍。
小结
本文简单介绍了IO
Monad 如何抽象回调形式的异步操作,可以一定程度上消除回调地狱。
实际上这和 Contiuation
Monad 的套路非常相似,不同的时过程中涉及到副作用隔离。
本文相关代码可以在 github仓库 中查看