1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
object Main1 extends App {
  val system = ActorSystem("HelloSystem")
  val jazzListener = system.actorOf(Props[Listener])
  val musicListener = system.actorOf(Props[Listener])
  system.eventStream.subscribe(jazzListener, classOf[Jazz]) // jazzListener 订阅 Jazz 事件
  system.eventStream.subscribe(musicListener, classOf[AllKindsOfMusic]) // musicListener 订阅 AllKindsOfMusic 以及它的子类 事件

  // 只有 musicListener 接收到这个事件
  system.eventStream.publish(Electronic("Parov Stelar"))

  // jazzListener 和 musicListener 都会收到这个事件
  system.eventStream.publish(Jazz("Sonny Rollins"))
}

subscribe 逻辑

同步地将 subcriber 和 to 加入到 subscriptions 中,diff 应该是和之前的一次比较保证不会重复发送?

1
2
3
4
5
def subscribe(subscriber: Subscriber, to: Classifier): Boolean = subscriptions.synchronized {
  val diff = subscriptions.addValue(to, subscriber)
  addToCache(diff)
  diff.nonEmpty
}

image-20200109114040999

image-20200109131215939

addValue 中有个比较重要的方法,就是从 subkeys 也就是 subscribe 中到找对应的类。

可以将subkeys 想象为一个多叉树中的一个节点,节点的key为订阅源类型,value为所对应的订阅者 Actor

然后这个节点也有自己的subkeys 这些subkeys 为的key为上层类型的子类,同时订阅者是与是上层订阅者的拓展

image-20200109140449787

对于重复的订阅,他会做一次去重,类似于arc diff

对于 system.eventStream.subscribe(jazzListener, classOf[Jazz])

image-20200109120145086

会产生一个这样的diff 然后加入到cache 中

cache 的数据结构是一个 private var cache = Map.empty[Classifier, Set[Subscriber]] Map 分别是订阅源和订阅者

对于 system.eventStream.subscribe(musicListener, classOf[AllKindsOfMusic])

image-20200109120406852

publish 逻辑

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
def publish(event: Event): Unit = {
    val c = classify(event)
    val recv =
      if (cache contains c) cache(c) // c will never be removed from cache
      else
        subscriptions.synchronized {
          if (cache contains c) cache(c)
          else {
            addToCache(subscriptions.addKey(c))
            cache(c)
          }
        }
    recv.foreach(publish(event, _))
  }

publish 逻辑较为简单,首先会从event中找出对应 className

然后走缓存逻辑,如果不在缓存中存在,就将对应的 key 更新到subkeys 多叉树中,找到对应的订阅者,并且更新到cache 中。

最后遍历 recv 调用publish函数 。

1
2
3
4
  protected def publish(event: Any, subscriber: ActorRef) = {
    if (sys == null && subscriber.isTerminated) unsubscribe(subscriber)
    else subscriber ! event
  }

Actor 初始化

1
2
val pinger = system.actorOf(Props[Pinger], "pinger")
val ponger = system.actorOf(Props(classOf[Ponger], pinger), "ponger")

会调用 ActorSystem 中的actorOf方法

1
2
3
4
5
def actorOf(props: Props): ActorRef =
if (guardianProps.isEmpty) guardian.underlying.attachChild(props, systemService = false)
else
throw new UnsupportedOperationException(
  "cannot create top-level actor from the outside on ActorSystem with custom user guardian")

会从守卫Actor 下面创建一个新的Child Actor

会调用下边的makeChild 方法:

Children.scala

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
val actor =
        try {
          val childPath = new ChildActorPath(cell.self.path, name, ActorCell.newUid())
          cell.provider.actorOf(
            cell.systemImpl,
            props,
            cell.self,
            childPath,
            systemService = systemService,
            deploy = None,
            lookupDeploy = true,
            async = async)
        } 

initChild(actor)
actor.start() // 绑定 actor 到 dispatcher 
actor  // 返回 actor ref

Tell 实现

1
2
final def sendMessage(message: Any, sender: ActorRef): Unit =
  sendMessage(Envelope(message, sender, system))

将message 包装为信封,调用Cell 的 sendMessage 方法

是因为 Cell 实现了

image-20200109195143660

Dispatch 特质

其实是执行的 Dispatch 特质中的 sendMessage 方法

1
2
3
4
5
6
7
8
def sendMessage(msg: Envelope): Unit =
    try {
      val msgToDispatch =
        if (system.settings.SerializeAllMessages) serializeAndDeserialize(msg)
        else msg

      dispatcher.dispatch(this, msgToDispatch)
    } catch handleException

但是我仍然有个问题,dispatcher 是我自己规定的dispather ?

再调用这个Actor 所对应的 dispatcher 的 dispatch 函数

1
2
3
4
5
protected[akka] def dispatch(receiver: ActorCell, invocation: Envelope): Unit = {
    val mbox = receiver.mailbox
    mbox.enqueue(receiver.self, invocation)
    registerForExecution(mbox, true, false)
  }

将信封丢入对应接收者的 mailbox 中,然后将 mbox 作为参数传入 registerForExecution 注册到线程池中。

而这个线程池就是我预设的线程池, dispacher 只是对这个线程池做一层封装。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
protected[akka] override def registerForExecution(
      mbox: Mailbox,
      hasMessageHint: Boolean,
      hasSystemMessageHint: Boolean): Boolean = {
    if (mbox.canBeScheduledForExecution(hasMessageHint, hasSystemMessageHint)) { //This needs to be here to ensure thread safety and no races
      if (mbox.setAsScheduled()) {
        try {
          //!!!!
          executorService.execute(mbox)
          true
        } catch {
       		...
        }
      } else false
    } else false
  }

使用内部的线程池来执行这个 MailBox 对象

既然 MailBox 可以被执行 它一定实现了 Runnable 方法 来看看他的实现:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
override final def run(): Unit = {
    try {
      if (!isClosed) { //Volatile read, needed here
        processAllSystemMessages() //First, deal with any system messages
        processMailbox() //Then deal with messages
      }
    } finally {
      setAsIdle() //Volatile write, needed here
      dispatcher.registerForExecution(this, false, false)
    }
  }

来主要看一下 processMailbox 方法的实现吧

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
@tailrec private final def processMailbox(
      left: Int = java.lang.Math.max(dispatcher.throughput, 1),
      deadlineNs: Long =
        if (dispatcher.isThroughputDeadlineTimeDefined)
          System.nanoTime + dispatcher.throughputDeadlineTime.toNanos
        else 0L): Unit =
    if (shouldProcessMessage) {
      val next = dequeue()
      if (next ne null) {
        if (Mailbox.debug) println(actor.self + " processing message " + next)
        actor.invoke(next)
        if (Thread.interrupted())
          throw new InterruptedException("Interrupted while processing actor messages")
        processAllSystemMessages()
        if ((left > 1) && (!dispatcher.isThroughputDeadlineTimeDefined || (System.nanoTime - deadlineNs) < 0))
          processMailbox(left - 1, deadlineNs)
      }
    }

很简单的实现,使用了尾递归的方式,

  1. 首先计算出 left 也就是分发器的吞吐量
  2. 然后从队列里面出队一个元素
  3. 调用actor 的invoke 方法
  4. 继续向下调用直到 left <1 或者

有两个关键的参数,

throughput 也就是单次执行 executorService.execute(mbox) 所消费消息的数量。超出这个数量的消息将会交给下次执行这个 mbox 的时候执行。

deadlineNs 发送throughput数量消息的截止时间,保证throughput的消息要在截止时间内完成。

调用 actor 的 invoke 方法下面会调用 ActorCell 的 receiveMessage 方法

actor.aroundReceive(behaviorStack.head, msg)

获得栈顶的 Receive 偏函数,调用 aroundReceive 来执行操作

1
2
3
4
5
6
protected[akka] def aroundReceive(receive: Actor.Receive, msg: Any): Unit = {
  // optimization: avoid allocation of lambda
  if (receive.applyOrElse(msg, Actor.notHandledFun).asInstanceOf[AnyRef] eq Actor.NotHandled) {
    unhandled(msg)
  }
}

如果receive 没有match对应的message,使用了偏函数的 applyOrElse 捕获剩下的值域,判断返回值是否和 NotHandled 相等。

1
2
3
4
5
6
def unhandled(message: Any): Unit = {
  message match {
    case Terminated(dead) => throw DeathPactException(dead)
    case _                => context.system.eventStream.publish(UnhandledMessage(message, sender(), self))
  }
}

对message 做一次模式匹配,如果是没有handle的message 就将它作为订阅发出。

这里我们可以使用一个订阅者 system.eventStream.subscribe(listener, classOf[UnhandledMessage]) 来订阅这些消息,进行日志输出。