引言
在Akka中, 一个Future
是用来获取某个并发操作的结果的数据结构。这个操作一般是由Actor运行或由Dispatcher直接运行的. 这个结果能够以同步(堵塞)或异步(非堵塞)的方式訪问。
Future直接使用
Future中的一个常见用例是在不须要使用Actor的情况下并发地运行计算。
Future有两种使用方式:
- 堵塞方式(Blocking):该方式下,父actor或主程序停止运行知道全部future完毕各自任务。通过
scala.concurrent.Await
使用。- 非堵塞方式(Non-Blocking),也称为回调方式(Callback):父actor或主程序在运行期间启动future,future任务和父actor并行运行,当每一个future完毕任务,将通知父actor。通过
onComplete
、onSuccess
、onFailure
方式使用。
运行上下文(ExecutionContext)
为了运行回调和操作,Futures须要有一个ExecutionContext
。
假设你在作用域内有一个
ActorSystem
。它会它自己派发器用作ExecutionContext,你也能够用ExecutionContext伴生对象提供的工厂方法来将Executors和ExecutorServices进行包裹。或者甚至创建自己的实例。 通过导入ExecutionContext.Implicits.global
来导入默认的全局运行上下文。 你能够把该运行上下文看做是一个线程池,ExecutionContext是在某个线程池运行任务的抽象。
假设在代码中没有导入该运行上下文,代码将无法编译。
堵塞方式
第一个样例展示怎样创建一个future,然后通过堵塞方式等待其计算结果。尽管堵塞方式不是一个非常好的使用方法,可是能够说明问题。
这个样例中。通过在未来某个时间计算1+1,当计算结果后再返回。
import scala.concurrent.{Await, Future}import scala.concurrent.duration._import scala.concurrent.ExecutionContext.Implicits.globalobject FutureBlockDemo extends App{ implicit val baseTime = System.currentTimeMillis // create a Future val f = Future { Thread.sleep(500) 1+1 } // this is blocking(blocking is bad) val result = Await.result(f, 1 second) // 假设Future没有在Await规定的时间里返回, // 将抛出java.util.concurrent.TimeoutException println(result) Thread.sleep(1000)}
代码解释:
- 在上面的代码中。被传递给Future的代码块会被缺省的
Dispatcher
所运行。代码块的返回结果会被用来完毕Future
。 与从Actor返回的Future不同,这个Future拥有正确的类型, 我们还避免了管理Actor的开销。Await.result
方法将堵塞1秒时间来等待Future结果返回。假设Future在规定时间内没有返回,将抛出java.util.concurrent.TimeoutException
异常。- 通过导入
scala.concurrent.duration._
,能够用一种方便的方式来声明时间间隔,如100 nanos
,500 millis
,5 seconds
、1 minute
、1 hour
,3 days
。还能够通过
Duration(100, MILLISECONDS)
,Duration(200, "millis")
来创建时间间隔。
非堵塞方式(回调方式)
有时你只须要监听Future
的完毕事件,对其进行响应,不是创建新的Future,而不过产生副作用。
通过
onComplete
,onSuccess
,onFailure
三个回调函数来异步运行Future任务,而后两者不过第一项的特例。 使用onComplete的代码演示样例:
import scala.concurrent.{Future}import scala.concurrent.ExecutionContext.Implicits.globalimport scala.util.{Failure, Success}import scala.util.Randomobject FutureNotBlock extends App{ println("starting calculation ...") val f = Future { Thread.sleep(Random.nextInt(500)) 42 } println("before onComplete") f.onComplete{ case Success(value) => println(s"Got the callback, meaning = $value") case Failure(e) => e.printStackTrace } // do the rest of your work println("A ...") Thread.sleep(100) println("B ....") Thread.sleep(100) println("C ....") Thread.sleep(100) println("D ....") Thread.sleep(100) println("E ....") Thread.sleep(100) Thread.sleep(2000)}
使用onSuccess、onFailure的代码演示样例:
import scala.concurrent.{Future}import scala.concurrent.ExecutionContext.Implicits.globalimport scala.util.{Failure, Success}import scala.util.Randomobject Test12_FutureOnSuccessAndFailure extends App{ val f = Future { Thread.sleep(Random.nextInt(500)) if (Random.nextInt(500) > 250) throw new Exception("Tikes!") else 42 } f onSuccess { case result => println(s"Success: $result") } f onFailure { case t => println(s"Exception: ${t.getMessage}") } // do the rest of your work println("A ...") Thread.sleep(100) println("B ....") Thread.sleep(100) println("C ....") Thread.sleep(100) println("D ....") Thread.sleep(100) println("E ....") Thread.sleep(100) Thread.sleep(1000)}
代码解释:
上面两段样例中,Future结构中随机延迟一段时间,然后返回结果或者抛出异常。 然后在回调函数中进行相关处理。创建返回Future[T]的方法
先看一下演示样例:
import scala.concurrent.{Await, Future, future}import scala.concurrent.ExecutionContext.Implicits.globalimport scala.util.{Failure, Success}object ReturnFuture extends App{ implicit val baseTime = System.currentTimeMillis // `future` method is another way to create a future // It starts the computation asynchronously and retures a Future[Int] that // will hold the result of the computation. def longRunningComputation(i: Int): Future[Int] = future { Thread.sleep(100) i + 1 } // this does not block longRunningComputation(11).onComplete { case Success(result) => println(s"result = $result") case Failure(e) => e.printStackTrace } // keep the jvm from shutting down Thread.sleep(1000)}
代码解释:
上面代码中的longRunningComputation返回一个Future[Int]
,然后进行相关的异步操作。 当中future
方法是创建一个future的还有一种方法。它将启动一个异步计算而且返回包括计算结果的Future[T]
。 Future用于Actor
通常有两种方法来从一个Actor获取回应: 第一种是发送一个消息actor ! msg
。这样的方法只在发送者是一个Actor时有效;另外一种是通过一个Future。
使用Actor的
?
方法来发送消息会返回一个Future。 要等待并获取结果的最简单方法是: import scala.concurrent.Awaitimport akka.pattern.askimport scala.concurrent.duration._import akka.util.Timeoutimplicit val timeout = Timeout(5 seconds)val future = actor ? msgval result = Await.result(future, timeout.duration).asInstanceOf[String]
以下是使用?
发送消息给actor,并等待回应的代码演示样例:
import akka.actor._import akka.pattern.askimport akka.util.Timeoutimport scala.concurrent.{Await, Future}import scala.language.postfixOpsimport scala.concurrent.duration._case object AskNameMessageclass TestActor extends Actor { def receive = { case AskNameMessage => // respond to the 'ask' request sender ! "Fred" case _ => println("that was unexpected") }}object AskDemo extends App{ //create the system and actor val system = ActorSystem("AskDemoSystem") val myActor = system.actorOf(Props[TestActor], name="myActor") // (1) this is one way to "ask" another actor for information implicit val timeout = Timeout(5 seconds) val future = myActor ? AskNameMessage val result = Await.result(future, timeout.duration).asInstanceOf[String] println(result) // (2) a slightly different way to ask another actor for information val future2: Future[String] = ask(myActor, AskNameMessage).mapTo[String] val result2 = Await.result(future2, 1 second) println(result2) system.shutdown}
代码解释:
Await.result(future, timeout.duration).asInstanceOf[String]
会导致当前线程被堵塞,并等待actor通过它的应答来完毕Future
。可是堵塞会导致性能问题。所以是不推荐的。
致堵塞的操作位于
Await.result
和Await.ready
中,这样就方便定位堵塞的位置。- 还要注意actor返回的Future的类型是
Future[Any]
,这是由于actor是动态的。 这也是为什么上例中凝视(1)使用了asInstanceOf
。- 在使用非堵塞方式时,最好使用
mapTo
方法来将Future转换到期望的类型。假设转换成功。mapTo
方法会返回一个包括结果的新的 Future。假设不成功,则返回ClassCastException
异常。
转载请注明作者Jason Ding及其出处
Google搜索jasonding1354进入我的博客主页