Runnable 接口只有一個(gè)沒(méi)有返回值的方法。
trait Runnable {
def run(): Unit
}
Callable與之類(lèi)似,除了它有一個(gè)返回值
trait Callable[V] {
def call(): V
}
Scala 并發(fā)是建立在 Java 并發(fā)模型基礎(chǔ)上的。
在 Sun JVM 上,對(duì) IO 密集的任務(wù),我們可以在一臺(tái)機(jī)器運(yùn)行成千上萬(wàn)個(gè)線程。
一個(gè)線程需要一個(gè) Runnable。你必須調(diào)用線程的 start 方法來(lái)運(yùn)行 Runnable。
scala> val hello = new Thread(new Runnable {
def run() {
println("hello world")
}
})
hello: java.lang.Thread = Thread[Thread-3,5,main]
scala> hello.start
hello world
當(dāng)你看到一個(gè)類(lèi)實(shí)現(xiàn)了 Runnable 接口,你就知道它的目的是運(yùn)行在一個(gè)線程中。
這里有一個(gè)可以工作但有問(wèn)題的代碼片斷。
import java.net.{Socket, ServerSocket}
import java.util.concurrent.{Executors, ExecutorService}
import java.util.Date
class NetworkService(port: Int, poolSize: Int) extends Runnable {
val serverSocket = new ServerSocket(port)
def run() {
while (true) {
// This will block until a connection comes in.
val socket = serverSocket.accept()
(new Handler(socket)).run()
}
}
}
class Handler(socket: Socket) extends Runnable {
def message = (Thread.currentThread.getName() + "\n").getBytes
def run() {
socket.getOutputStream.write(message)
socket.getOutputStream.close()
}
}
(new NetworkService(2020, 2)).run
每個(gè)請(qǐng)求都會(huì)回應(yīng)當(dāng)前線程的名稱(chēng),所以結(jié)果始終是 main 。
這段代碼的主要缺點(diǎn)是在同一時(shí)間,只有一個(gè)請(qǐng)求可以被相應(yīng)!
你可以把每個(gè)請(qǐng)求放入一個(gè)線程中處理。只要簡(jiǎn)單改變
(new Handler(socket)).run()
為
(new Thread(new Handler(socket))).start()
但如果你想重用線程或者對(duì)線程的行為有其他策略呢?
隨著 Java 5 的發(fā)布,它決定提供一個(gè)針對(duì)線程的更抽象的接口。
你可以通過(guò) Executors 對(duì)象的靜態(tài)方法得到一個(gè) ExecutorService 對(duì)象。這些方法為你提供了可以通過(guò)各種政策配置的 ExecutorService ,如線程池。
下面改寫(xiě)我們之前的阻塞式網(wǎng)絡(luò)服務(wù)器來(lái)允許并發(fā)請(qǐng)求。
import java.net.{Socket, ServerSocket}
import java.util.concurrent.{Executors, ExecutorService}
import java.util.Date
class NetworkService(port: Int, poolSize: Int) extends Runnable {
val serverSocket = new ServerSocket(port)
val pool: ExecutorService = Executors.newFixedThreadPool(poolSize)
def run() {
try {
while (true) {
// This will block until a connection comes in.
val socket = serverSocket.accept()
pool.execute(new Handler(socket))
}
} finally {
pool.shutdown()
}
}
}
class Handler(socket: Socket) extends Runnable {
def message = (Thread.currentThread.getName() + "\n").getBytes
def run() {
socket.getOutputStream.write(message)
socket.getOutputStream.close()
}
}
(new NetworkService(2020, 2)).run
這里有一個(gè)連接腳本展示了內(nèi)部線程是如何重用的。
$ nc localhost 2020
pool-1-thread-1
$ nc localhost 2020
pool-1-thread-2
$ nc localhost 2020
pool-1-thread-1
$ nc localhost 2020
pool-1-thread-2
Future 代表異步計(jì)算。你可以把你的計(jì)算包裝在 Future 中,當(dāng)你需要計(jì)算結(jié)果的時(shí)候,你只需調(diào)用一個(gè)阻塞的 get() 方法就可以了。一個(gè) Executor 返回一個(gè) Future 。如果使用 Finagle RPC 系統(tǒng),你可以使用 Future 實(shí)例持有可能尚未到達(dá)的結(jié)果。
一個(gè) FutureTask 是一個(gè) Runnable 實(shí)現(xiàn),就是被設(shè)計(jì)為由 Executor 運(yùn)行的
val future = new FutureTask[String](new Callable[String]() {
def call(): String = {
searcher.search(target);
}})
executor.execute(future)
現(xiàn)在我需要結(jié)果,所以阻塞直到其完成。
val blockingResult = future.get()
參考 Scala School 的 Finagle 介紹中大量使用了 Future,包括一些把它們結(jié)合起來(lái)的不錯(cuò)的方法。以及 Effective Scala 對(duì) [Futures]( standard libraries-Futures) 的意見(jiàn)。
class Person(var name: String) {
def set(changedName: String) {
name = changedName
}
}
這個(gè)程序在多線程環(huán)境中是不安全的。如果有兩個(gè)線程有引用到同一個(gè) Person 實(shí)例,并調(diào)用 set ,你不能預(yù)測(cè)兩個(gè)調(diào)用結(jié)束后 name 的結(jié)果。
在 Java 內(nèi)存模型中,允許每個(gè)處理器把值緩存在 L1 或 L2 緩存中,所以在不同處理器上運(yùn)行的兩個(gè)線程都可以有自己的數(shù)據(jù)視圖。
讓我們來(lái)討論一些工具,來(lái)使線程保持一致的數(shù)據(jù)視圖。
互斥鎖(Mutex)提供所有權(quán)語(yǔ)義。當(dāng)你進(jìn)入一個(gè)互斥體,你擁有它。同步是 JVM 中使用互斥鎖最常見(jiàn)的方式。在這個(gè)例子中,我們會(huì)同步 Person。
在 JVM 中,你可以同步任何不為 null 的實(shí)例。
class Person(var name: String) {
def set(changedName: String) {
this.synchronized {
name = changedName
}
}
}
隨著 Java 5 內(nèi)存模型的變化,volatile 和 synchronized 基本上是相同的,除了 volatile 允許空值。
synchronized 允許更細(xì)粒度的鎖。 而 volatile 則對(duì)每次訪問(wèn)同步。
class Person(@volatile var name: String) {
def set(changedName: String) {
name = changedName
}
}
此外,在 Java 5 中還添加了一系列低級(jí)別的并發(fā)原語(yǔ)。 AtomicReference 類(lèi)是其中之一
import java.util.concurrent.atomic.AtomicReference
class Person(val name: AtomicReference[String]) {
def set(changedName: String) {
name.set(changedName)
}
}
AtomicReference 是這兩種選擇中最昂貴的,因?yàn)槟惚仨毴ネㄟ^(guò)方法調(diào)度(method dispatch)來(lái)訪問(wèn)值。
volatile 和 synchronized 是建立在 Java 的內(nèi)置監(jiān)視器基礎(chǔ)上的。如果沒(méi)有資源爭(zhēng)用,監(jiān)視器的成本很小。由于 synchronized 允許你進(jìn)行更細(xì)粒度的控制權(quán),從而會(huì)有更少的爭(zhēng)奪,所以 synchronized 往往是最好的選擇。
當(dāng)你進(jìn)入同步點(diǎn),訪問(wèn) volatile 引用,或去掉 AtomicReferences 引用時(shí), Java 會(huì)強(qiáng)制處理器刷新其緩存線從而提供了一致的數(shù)據(jù)視圖。
如果我錯(cuò)了,請(qǐng)大家指正。這是一個(gè)復(fù)雜的課題,我敢肯定要弄清楚這一點(diǎn)需要一個(gè)漫長(zhǎng)的課堂討論。
正如前面提到的 AtomicReference ,Java 5 帶來(lái)了許多很棒的工具。
CountDownLatch 是一個(gè)簡(jiǎn)單的多線程互相通信的機(jī)制。
val doneSignal = new CountDownLatch(2)
doAsyncWork(1)
doAsyncWork(2)
doneSignal.await()
println("both workers finished!")
先不說(shuō)別的,這是一個(gè)優(yōu)秀的單元測(cè)試。比方說(shuō),你正在做一些異步工作,并要確保功能完成。你的函數(shù)只需要 倒數(shù)計(jì)數(shù)(countDown) 并在測(cè)試中 等待(await) 就可以了。
由于對(duì) Int 和 Long 遞增是一個(gè)經(jīng)常用到的任務(wù),所以增加了 AtomicInteger 和 AtomicLong 。
我可能不需要解釋這是什么。
讀寫(xiě)鎖(ReadWriteLock) 使你擁有了讀線程和寫(xiě)線程的鎖控制。當(dāng)寫(xiě)線程獲取鎖的時(shí)候讀線程只能等待。
下面是一個(gè)簡(jiǎn)單的倒排索引,它不是線程安全的。我們的倒排索引按名字映射到一個(gè)給定的用戶(hù)。
這里的代碼天真地假設(shè)只有單個(gè)線程來(lái)訪問(wèn)。
注意使用了 mutable.HashMap 替代了默認(rèn)的構(gòu)造函數(shù) this()
import scala.collection.mutable
case class User(name: String, id: Int)
class InvertedIndex(val userMap: mutable.Map[String, User]) {
def this() = this(new mutable.HashMap[String, User])
def tokenizeName(name: String): Seq[String] = {
name.split(" ").map(_.toLowerCase)
}
def add(term: String, user: User) {
userMap += term -> user
}
def add(user: User) {
tokenizeName(user.name).foreach { term =>
add(term, user)
}
}
}
這里沒(méi)有寫(xiě)如何從索引中獲取用戶(hù)。稍后我們會(huì)補(bǔ)充。
在上面的倒排索引例子中,userMap 不能保證是線程安全的。多個(gè)客戶(hù)端可以同時(shí)嘗試添加項(xiàng)目,并有可能出現(xiàn)前面 Person 例子中的視圖錯(cuò)誤。
由于 userMap 不是線程安全的,那我們?cè)鯓颖3衷谕粋€(gè)時(shí)間只有一個(gè)線程能改變它呢?
你可能會(huì)考慮在做添加操作時(shí)鎖定 userMap。
def add(user: User) {
userMap.synchronized {
tokenizeName(user.name).foreach { term =>
add(term, user)
}
}
}
不幸的是,這個(gè)粒度太粗了。一定要試圖在互斥鎖以外做盡可能多的耗時(shí)的工作。還記得我說(shuō)過(guò)如果不存在資源爭(zhēng)奪,鎖開(kāi)銷(xiāo)就會(huì)很小嗎。如果在鎖代碼塊里面做的工作越少,爭(zhēng)奪就會(huì)越少。
def add(user: User) {
// tokenizeName was measured to be the most expensive operation.
val tokens = tokenizeName(user.name)
tokens.foreach { term =>
userMap.synchronized {
add(term, user)
}
}
}
我們可以通過(guò) SynchronizedMap 特質(zhì)將同步混入一個(gè)可變的 HashMap。
我們可以擴(kuò)展現(xiàn)有的 InvertedIndex,提供給用戶(hù)一個(gè)簡(jiǎn)單的方式來(lái)構(gòu)建同步索引。
import scala.collection.mutable.SynchronizedMap
class SynchronizedInvertedIndex(userMap: mutable.Map[String, User]) extends InvertedIndex(userMap) {
def this() = this(new mutable.HashMap[String, User] with SynchronizedMap[String, User])
}
如果你看一下其實(shí)現(xiàn),你就會(huì)意識(shí)到,它只是在每個(gè)方法上加同步鎖來(lái)保證其安全性,所以它很可能沒(méi)有你希望的性能。
Java 有一個(gè)很好的線程安全的 ConcurrentHashMap。值得慶幸的是,我們可以通過(guò) JavaConverters 獲得不錯(cuò)的 Scala 語(yǔ)義。
事實(shí)上,我們可以通過(guò)擴(kuò)展老的不安全的代碼,來(lái)無(wú)縫地接入新的線程安全 InvertedIndex。
import java.util.concurrent.ConcurrentHashMap
import scala.collection.JavaConverters._
class ConcurrentInvertedIndex(userMap: collection.mutable.ConcurrentMap[String, User])
extends InvertedIndex(userMap) {
def this() = this(new ConcurrentHashMap[String, User] asScala)
}
trait UserMaker {
def makeUser(line: String) = line.split(",") match {
case Array(name, userid) => User(name, userid.trim().toInt)
}
}
class FileRecordProducer(path: String) extends UserMaker {
def run() {
Source.fromFile(path, "utf-8").getLines.foreach { line =>
index.add(makeUser(line))
}
}
}
對(duì)于文件中的每一行,我們可以調(diào)用 makeUser 然后 add 到 InvertedIndex中。如果我們使用并發(fā) InvertedIndex,我們可以并行調(diào)用 add 因?yàn)?makeUser 沒(méi)有副作用,所以我們的代碼已經(jīng)是線程安全的了。
我們不能并行讀取文件,但我們可以并行構(gòu)造用戶(hù)并且把它添加到索引中。
異步計(jì)算的一個(gè)常見(jiàn)模式是把消費(fèi)者和生產(chǎn)者分開(kāi),讓他們只能通過(guò)隊(duì)列(Queue) 溝通。讓我們看看如何將這個(gè)模式應(yīng)用在我們的搜索引擎索引中。
import java.util.concurrent.{BlockingQueue, LinkedBlockingQueue}
// Concrete producer
class Producer[T](path: String, queue: BlockingQueue[T]) extends Runnable {
def run() {
Source.fromFile(path, "utf-8").getLines.foreach { line =>
queue.put(line)
}
}
}
// Abstract consumer
abstract class Consumer[T](queue: BlockingQueue[T]) extends Runnable {
def run() {
while (true) {
val item = queue.take()
consume(item)
}
}
def consume(x: T)
}
val queue = new LinkedBlockingQueue[String]()
// One thread for the producer
val producer = new Producer[String]("users.txt", q)
new Thread(producer).start()
trait UserMaker {
def makeUser(line: String) = line.split(",") match {
case Array(name, userid) => User(name, userid.trim().toInt)
}
}
class IndexerConsumer(index: InvertedIndex, queue: BlockingQueue[String]) extends Consumer[String](queue) with UserMaker {
def consume(t: String) = index.add(makeUser(t))
}
// Let's pretend we have 8 cores on this machine.
val cores = 8
val pool = Executors.newFixedThreadPool(cores)
// Submit one consumer per core.
for (i <- i to cores) {
pool.submit(new IndexerConsumer[String](index, q))
}
更多建議: