本書(shū)的前幾張,我們討論了在網(wǎng)絡(luò)上進(jìn)行操作的服務(wù)。其中兩個(gè)例子是數(shù)據(jù)庫(kù)客戶端/服務(wù)器和 web 服務(wù)。當(dāng)需要設(shè)計(jì)新的協(xié)議,或者使用沒(méi)有現(xiàn)成 Haskell 庫(kù)的協(xié)議通信時(shí),將需要使用 Haskell 庫(kù)函數(shù)提供的底層網(wǎng)絡(luò)工具。
本章中,我們將討論這些底層工具。網(wǎng)絡(luò)通訊是個(gè)大題目,可以用一整本書(shū)來(lái)討論。本章中,我們將展示如何使用 Haskell 應(yīng)用你已經(jīng)掌握的底層網(wǎng)絡(luò)知識(shí)。
Haskell 的網(wǎng)絡(luò)函數(shù)幾乎始終與常見(jiàn)的 C 函數(shù)調(diào)用相符。像其他在 C 上層的語(yǔ)言一樣,你將發(fā)現(xiàn)其接口很眼熟。
UDP 將數(shù)據(jù)拆散為數(shù)據(jù)包。其不保證數(shù)據(jù)到達(dá)目的地,也不確保同一個(gè)數(shù)據(jù)包到達(dá)的次數(shù)。其用校驗(yàn)和的方式確保到達(dá)的數(shù)據(jù)包沒(méi)有損壞。 UDP 適合用在對(duì)性能和延遲敏感的應(yīng)用中,此類場(chǎng)景中系統(tǒng)的整體性能比單個(gè)數(shù)據(jù)包更重要。也可以用在 TCP 表現(xiàn)性能不高的場(chǎng)景,比如發(fā)送互不相關(guān)的短消息。適合使用 UDP 的系統(tǒng)的例子包括音頻和視頻會(huì)議、時(shí)間同步、網(wǎng)絡(luò)文件系統(tǒng)、以及日志系統(tǒng)。
傳統(tǒng) Unix syslog 服務(wù)允許程序通過(guò)網(wǎng)絡(luò)向某個(gè)負(fù)責(zé)記錄的中央服務(wù)器發(fā)送日志信息。某些程序?qū)π阅芊浅C舾校铱赡軙?huì)生成大量日志消息。這樣的程序,將日志的開(kāi)銷最小化比確保每條日志被記錄更重要。此外,在日志服務(wù)器無(wú)法訪問(wèn)時(shí),使程序依舊可以操作或許是一種可取的設(shè)計(jì)。因此,UDP 是一種 syslog 支持的日志傳輸協(xié)議。這種協(xié)議比較簡(jiǎn)單,這里有一個(gè) Haskell 實(shí)現(xiàn)的客戶端:
-- file: ch27/syslogclient.hs
import Data.Bits
import Network.Socket
import Network.BSD
import Data.List
import SyslogTypes
data SyslogHandle =
SyslogHandle {slSocket :: Socket,
slProgram :: String,
slAddress :: SockAddr}
openlog :: HostName -- ^ Remote hostname, or localhost
-> String -- ^ Port number or name; 514 is default
-> String -- ^ Name to log under
-> IO SyslogHandle -- ^ Handle to use for logging
openlog hostname port progname =
do -- Look up the hostname and port. Either raises an exception
-- or returns a nonempty list. First element in that list
-- is supposed to be the best option.
addrinfos <- getAddrInfo Nothing (Just hostname) (Just port)
let serveraddr = head addrinfos
-- Establish a socket for communication
sock <- socket (addrFamily serveraddr) Datagram defaultProtocol
-- Save off the socket, program name, and server address in a handle
return $ SyslogHandle sock progname (addrAddress serveraddr)
syslog :: SyslogHandle -> Facility -> Priority -> String -> IO ()
syslog syslogh fac pri msg =
sendstr sendmsg
where code = makeCode fac pri
sendmsg = "<" ++ show code ++ ">" ++ (slProgram syslogh) ++
": " ++ msg
-- Send until everything is done
sendstr :: String -> IO ()
sendstr [] = return ()
sendstr omsg = do sent <- sendTo (slSocket syslogh) omsg
(slAddress syslogh)
sendstr (genericDrop sent omsg)
closelog :: SyslogHandle -> IO ()
closelog syslogh = sClose (slSocket syslogh)
{- | Convert a facility and a priority into a syslog code -}
makeCode :: Facility -> Priority -> Int
makeCode fac pri =
let faccode = codeOfFac fac
pricode = fromEnum pri
in
(faccode `shiftL` 3) .|. pricode
這段程序需要 SyslogTypes.hs ,代碼如下:
-- file: ch27/SyslogTypes.hs
module SyslogTypes where
{- | Priorities define how important a log message is. -}
data Priority =
DEBUG -- ^ Debug messages
| INFO -- ^ Information
| NOTICE -- ^ Normal runtime conditions
| WARNING -- ^ General Warnings
| ERROR -- ^ General Errors
| CRITICAL -- ^ Severe situations
| ALERT -- ^ Take immediate action
| EMERGENCY -- ^ System is unusable
deriving (Eq, Ord, Show, Read, Enum)
{- | Facilities are used by the system to determine where messages
are sent. -}
data Facility =
KERN -- ^ Kernel messages
| USER -- ^ General userland messages
| MAIL -- ^ E-Mail system
| DAEMON -- ^ Daemon (server process) messages
| AUTH -- ^ Authentication or security messages
| SYSLOG -- ^ Internal syslog messages
| LPR -- ^ Printer messages
| NEWS -- ^ Usenet news
| UUCP -- ^ UUCP messages
| CRON -- ^ Cron messages
| AUTHPRIV -- ^ Private authentication messages
| FTP -- ^ FTP messages
| LOCAL0
| LOCAL1
| LOCAL2
| LOCAL3
| LOCAL4
| LOCAL5
| LOCAL6
| LOCAL7
deriving (Eq, Show, Read)
facToCode = [
(KERN, 0),
(USER, 1),
(MAIL, 2),
(DAEMON, 3),
(AUTH, 4),
(SYSLOG, 5),
(LPR, 6),
(NEWS, 7),
(UUCP, 8),
(CRON, 9),
(AUTHPRIV, 10),
(FTP, 11),
(LOCAL0, 16),
(LOCAL1, 17),
(LOCAL2, 18),
(LOCAL3, 19),
(LOCAL4, 20),
(LOCAL5, 21),
(LOCAL6, 22),
(LOCAL7, 23)
]
codeToFac = map (\(x, y) -> (y, x)) facToCode
{- | We can't use enum here because the numbering is discontiguous -}
codeOfFac :: Facility -> Int
codeOfFac f = case lookup f facToCode of
Just x -> x
_ -> error $ "Internal error in codeOfFac"
facOfCode :: Int -> Facility
facOfCode f = case lookup f codeToFac of
Just x -> x
_ -> error $ "Invalid code in facOfCode"
可以用 ghci 向本地的 syslog 服務(wù)器發(fā)送消息。服務(wù)器可以使用本章實(shí)現(xiàn)的例子,也可以使用其它的在 Linux 或者 POSIX 系統(tǒng)中的 syslog 服務(wù)器。注意,這些服務(wù)器默認(rèn)禁用了 UDP 端口,你需要啟用 UDP 以使 syslog 接收 UDP 消息。
可以使用下面這樣的命令向本地 syslog 服務(wù)器發(fā)送一條消息:
ghci> :load syslogclient.hs
[1 of 2] Compiling SyslogTypes ( SyslogTypes.hs, interpreted )
[2 of 2] Compiling Main ( syslogclient.hs, interpreted )
Ok, modules loaded: SyslogTypes, Main.
ghci> h <- openlog "localhost" "514" "testprog"
Loading package parsec-2.1.0.0 ... linking ... done.
Loading package network-2.1.0.0 ... linking ... done.
ghci> syslog h USER INFO "This is my message"
ghci> closelog h
UDP 服務(wù)器會(huì)在服務(wù)器上綁定某個(gè)端口。其接收直接發(fā)到這個(gè)端口的包,并處理它們。UDP 是無(wú)狀態(tài)的,面向包的協(xié)議,程序員通常使用 recvFrom 這個(gè)調(diào)用接收消息和發(fā)送機(jī)信息,在發(fā)送響應(yīng)時(shí)會(huì)用到發(fā)送機(jī)信息。
-- file: ch27/syslogserver.hs
import Data.Bits
import Network.Socket
import Network.BSD
import Data.List
type HandlerFunc = SockAddr -> String -> IO ()
serveLog :: String -- ^ Port number or name; 514 is default
-> HandlerFunc -- ^ Function to handle incoming messages
-> IO ()
serveLog port handlerfunc = withSocketsDo $
do -- Look up the port. Either raises an exception or returns
-- a nonempty list.
addrinfos <- getAddrInfo
(Just (defaultHints {addrFlags = [AI_PASSIVE]}))
Nothing (Just port)
let serveraddr = head addrinfos
-- Create a socket
sock <- socket (addrFamily serveraddr) Datagram defaultProtocol
-- Bind it to the address we're listening to
bindSocket sock (addrAddress serveraddr)
-- Loop forever processing incoming data. Ctrl-C to abort.
procMessages sock
where procMessages sock =
do -- Receive one UDP packet, maximum length 1024 bytes,
-- and save its content into msg and its source
-- IP and port into addr
(msg, _, addr) <- recvFrom sock 1024
-- Handle it
handlerfunc addr msg
-- And process more messages
procMessages sock
-- A simple handler that prints incoming packets
plainHandler :: HandlerFunc
plainHandler addr msg =
putStrLn $ "From " ++ show addr ++ ": " ++ msg
這段程序可以在 ghci 中執(zhí)行。執(zhí)行 serveLog"1514"plainHandler 將建立一個(gè)監(jiān)聽(tīng) 1514 端口的 UDP 服務(wù)器。其使用 plainHandler 將每條收到的 UDP 包打印出來(lái)。按下 Ctrl-C 可以終止這個(gè)程序。
Note
處理錯(cuò)誤。執(zhí)行時(shí)收到了 bind:permissiondenied 消息?要確保端口值比 1024 大。某些操作系統(tǒng)不允許 root 之外的用戶使用小于 1024 的端口。
TCP 被設(shè)計(jì)為確?;ヂ?lián)網(wǎng)上的數(shù)據(jù)盡可能可靠地傳輸。 TCP 是數(shù)據(jù)流傳輸。雖然流在傳輸時(shí)會(huì)被操作系統(tǒng)拆散為一個(gè)個(gè)單獨(dú)的包,但是應(yīng)用程序并不需要關(guān)心包的邊界。TCP 負(fù)責(zé)確保如果流被傳送到應(yīng)用程序,它就是完整的、無(wú)改動(dòng)、僅傳輸一次且保證順序。顯然,如果線纜被破壞會(huì)導(dǎo)致流量無(wú)法送達(dá),任何協(xié)議都無(wú)法克服這類限制。
與 UDP 相比,這帶來(lái)一些折衷。首先,在 TCP 會(huì)話開(kāi)始必須傳遞一些包以建立連接。其次,對(duì)于每個(gè)短會(huì)話,UDP 將有性能優(yōu)勢(shì)。另外,TCP 會(huì)努力確保數(shù)據(jù)到達(dá)。如果會(huì)話的一端嘗試向遠(yuǎn)端發(fā)送數(shù)據(jù),但是沒(méi)有收到響應(yīng),它將周期性的嘗試重新傳輸數(shù)據(jù)直至放棄。這使得 TCP 面對(duì)丟包時(shí)比較健壯可靠??墒?,它同樣意味著 TCP 不是實(shí)時(shí)傳輸協(xié)議(如實(shí)況音頻或視頻傳輸)的最佳選擇。
TCP 的連接是有狀態(tài)的。這意味著每個(gè)客戶機(jī)和服務(wù)器之間都有一條專用的邏輯“頻道”,而不是像 UDP 一樣只是處理一次性的數(shù)據(jù)包。這簡(jiǎn)化了客戶端開(kāi)發(fā)者的工作。服務(wù)器端程序幾乎總是需要同時(shí)處理多條 TCP 連接。如何做到這一點(diǎn)呢?
在服務(wù)器端,首先需要?jiǎng)?chuàng)建一個(gè) socket 并綁定到某個(gè)端口,就像 UDP 一樣。但這回不是重復(fù)監(jiān)聽(tīng)從任意地址發(fā)來(lái)的數(shù)據(jù),取而代之,你的主循環(huán)將圍繞 accept 調(diào)用編寫(xiě)。每當(dāng)有一個(gè)客戶機(jī)連接,服務(wù)器操作系統(tǒng)為其分配一個(gè)新的 socket 。所以我們的主 socket 只用來(lái)監(jiān)聽(tīng)進(jìn)來(lái)的連接,但從不發(fā)送數(shù)據(jù)。我們也獲得了多個(gè)子 socket 可以同時(shí)使用,每個(gè)子 socket 從屬于一個(gè)邏輯上的 TCP 會(huì)話。
在 Haskell 中,通常使用 forkIO 創(chuàng)建一個(gè)單獨(dú)的輕量級(jí)線程以處理與子 socket 的通信。對(duì)此, Haskell 擁有一個(gè)高效的內(nèi)部實(shí)現(xiàn),執(zhí)行得非常好。
讓我們使用 TCP 的實(shí)現(xiàn)來(lái)替換 UDP 的 syslog 服務(wù)器。假設(shè)一條消息并不是定義為單獨(dú)的包,而是以一個(gè)尾部的字符 ‘n' 結(jié)束。任意客戶端可以使用 TCP 連接向服務(wù)器發(fā)送 0 或多條消息。我們可以像下面這樣實(shí)現(xiàn):
-- file: ch27/syslogtcpserver.hs
import Data.Bits
import Network.Socket
import Network.BSD
import Data.List
import Control.Concurrent
import Control.Concurrent.MVar
import System.IO
type HandlerFunc = SockAddr -> String -> IO ()
serveLog :: String -- ^ Port number or name; 514 is default
-> HandlerFunc -- ^ Function to handle incoming messages
-> IO ()
serveLog port handlerfunc = withSocketsDo $
do -- Look up the port. Either raises an exception or returns
-- a nonempty list.
addrinfos <- getAddrInfo
(Just (defaultHints {addrFlags = [AI_PASSIVE]}))
Nothing (Just port)
let serveraddr = head addrinfos
-- Create a socket
sock <- socket (addrFamily serveraddr) Stream defaultProtocol
-- Bind it to the address we're listening to
bindSocket sock (addrAddress serveraddr)
-- Start listening for connection requests. Maximum queue size
-- of 5 connection requests waiting to be accepted.
listen sock 5
-- Create a lock to use for synchronizing access to the handler
lock <- newMVar ()
-- Loop forever waiting for connections. Ctrl-C to abort.
procRequests lock sock
where
-- | Process incoming connection requests
procRequests :: MVar () -> Socket -> IO ()
procRequests lock mastersock =
do (connsock, clientaddr) <- accept mastersock
handle lock clientaddr
"syslogtcpserver.hs: client connnected"
forkIO $ procMessages lock connsock clientaddr
procRequests lock mastersock
-- | Process incoming messages
procMessages :: MVar () -> Socket -> SockAddr -> IO ()
procMessages lock connsock clientaddr =
do connhdl <- socketToHandle connsock ReadMode
hSetBuffering connhdl LineBuffering
messages <- hGetContents connhdl
mapM_ (handle lock clientaddr) (lines messages)
hClose connhdl
handle lock clientaddr
"syslogtcpserver.hs: client disconnected"
-- Lock the handler before passing data to it.
handle :: MVar () -> HandlerFunc
-- This type is the same as
-- handle :: MVar () -> SockAddr -> String -> IO ()
handle lock clientaddr msg =
withMVar lock
(\a -> handlerfunc clientaddr msg >> return a)
-- A simple handler that prints incoming packets
plainHandler :: HandlerFunc
plainHandler addr msg =
putStrLn $ "From " ++ show addr ++ ": " ++ msg
SyslogTypes 的實(shí)現(xiàn),見(jiàn) UDP 客戶端例子:syslog 。
讓我們讀一下源碼。主循環(huán)是 procRequests ,這是一個(gè)死循環(huán),用于等待來(lái)自客戶端的新連接。 accept 調(diào)用將一直阻塞,直到一個(gè)客戶端來(lái)連接。當(dāng)有客戶端連接,我們獲得一個(gè)新 socket 和客戶機(jī)地址。我們向處理函數(shù)發(fā)送一條關(guān)于新連接的消息,接著使用 forkIO 建立一個(gè)線程處理來(lái)自客戶機(jī)的數(shù)據(jù)。這條線程執(zhí)行 procMessages 。
處理 TCP 數(shù)據(jù)時(shí),為了方便,通常將 socket 轉(zhuǎn)換為 Haskell 句柄。我們也同樣處理,并明確設(shè)置了緩沖 – 一個(gè) TCP 通信的要點(diǎn)。接著,設(shè)置惰性讀取 socket 句柄。對(duì)每個(gè)傳入的行,我們都將其傳給 handle 。當(dāng)沒(méi)有更多數(shù)據(jù)時(shí) – 遠(yuǎn)端已經(jīng)關(guān)閉了 socket – 我們輸出一條會(huì)話結(jié)束的消息。
因?yàn)榭赡芡瑫r(shí)收到多條消息,我們需要確保沒(méi)有將多條消息同時(shí)寫(xiě)入一個(gè)處理函數(shù)。那將導(dǎo)致混亂的輸出。我們使用了一個(gè)簡(jiǎn)單的鎖以序列化對(duì)處理函數(shù)的訪問(wèn),并且編寫(xiě)了一個(gè)簡(jiǎn)單的 handle 函數(shù)處理它。
你可以使用下面我們將展示的客戶機(jī)代碼測(cè)試,或者直接使用 telnet 程序來(lái)連接這個(gè)服務(wù)器。你向其發(fā)送的每一行輸入都將被服務(wù)器原樣返回。我們來(lái)試一下:
ghci> :load syslogtcpserver.hs
[1 of 1] Compiling Main ( syslogtcpserver.hs, interpreted )
Ok, modules loaded: Main.
ghci> serveLog "10514" plainHandler
Loading package parsec-2.1.0.0 ... linking ... done.
Loading package network-2.1.0.0 ... linking ... done.
此處,服務(wù)器從 10514 端口監(jiān)聽(tīng)新連接。在有某個(gè)客戶機(jī)過(guò)來(lái)連接之前,它什么事兒都不做。我們可以使用 telnet 來(lái)連接這個(gè)服務(wù)器:
~$ telnet localhost 10514
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
Test message
^]
telnet> quit
Connection closed.
于此同時(shí),在我們運(yùn)行 TCP 服務(wù)器的終端上,你將看到如下輸出:
From 127.0.0.1:38790: syslogtcpserver.hs: client connnected
From 127.0.0.1:38790: Test message
From 127.0.0.1:38790: syslogtcpserver.hs: client disconnected
其顯示一個(gè)客戶端從本機(jī) (127.0.0.1) 的 38790 端口連上了主機(jī)。連接之后,它發(fā)送了一條消息,然后斷開(kāi)。當(dāng)你扮演一個(gè) TCP 客戶端時(shí),操作系統(tǒng)將分配一個(gè)未被使用的端口給你。通常這個(gè)端口在你每次運(yùn)行程序時(shí)都不一樣。
現(xiàn)在,為我們的 TCP syslog 協(xié)議編寫(xiě)一個(gè)客戶端。這個(gè)客戶端與 UDP 客戶端類似,但是有一些變化。首先,因?yàn)?TCP 是流式協(xié)議,我們可以使用句柄傳輸數(shù)據(jù)而不需要使用底層的 socket 操作。其次,不在需要在 SyslogHandle 中保存目的地址,因?yàn)槲覀儗⑹褂?connect 建立 TCP 連接。最后,我們需要一個(gè)途徑,以區(qū)分不同的消息。UDP 中,這很容易,因?yàn)槊織l消息都是不相關(guān)的邏輯包。TCP 中,我們將僅使用換行符 ‘n' 來(lái)作為消息結(jié)尾的標(biāo)識(shí),盡管這意味著不能在單條消息中發(fā)送多行信息。這是代碼:
-- file: ch27/syslogtcpclient.hs
import Data.Bits
import Network.Socket
import Network.BSD
import Data.List
import SyslogTypes
import System.IO
data SyslogHandle =
SyslogHandle {slHandle :: Handle,
slProgram :: String}
openlog :: HostName -- ^ Remote hostname, or localhost
-> String -- ^ Port number or name; 514 is default
-> String -- ^ Name to log under
-> IO SyslogHandle -- ^ Handle to use for logging
openlog hostname port progname =
do -- Look up the hostname and port. Either raises an exception
-- or returns a nonempty list. First element in that list
-- is supposed to be the best option.
addrinfos <- getAddrInfo Nothing (Just hostname) (Just port)
let serveraddr = head addrinfos
-- Establish a socket for communication
sock <- socket (addrFamily serveraddr) Stream defaultProtocol
-- Mark the socket for keep-alive handling since it may be idle
-- for long periods of time
setSocketOption sock KeepAlive 1
-- Connect to server
connect sock (addrAddress serveraddr)
-- Make a Handle out of it for convenience
h <- socketToHandle sock WriteMode
-- We're going to set buffering to BlockBuffering and then
-- explicitly call hFlush after each message, below, so that
-- messages get logged immediately
hSetBuffering h (BlockBuffering Nothing)
-- Save off the socket, program name, and server address in a handle
return $ SyslogHandle h progname
syslog :: SyslogHandle -> Facility -> Priority -> String -> IO ()
syslog syslogh fac pri msg =
do hPutStrLn (slHandle syslogh) sendmsg
-- Make sure that we send data immediately
hFlush (slHandle syslogh)
where code = makeCode fac pri
sendmsg = "<" ++ show code ++ ">" ++ (slProgram syslogh) ++
": " ++ msg
closelog :: SyslogHandle -> IO ()
closelog syslogh = hClose (slHandle syslogh)
{- | Convert a facility and a priority into a syslog code -}
makeCode :: Facility -> Priority -> Int
makeCode fac pri =
let faccode = codeOfFac fac
pricode = fromEnum pri
in
(faccode `shiftL` 3) .|. pricode
可以在 ghci 中試著運(yùn)行它。如果還沒(méi)有關(guān)閉之前的 TCP 服務(wù)器,你的會(huì)話看上去可能會(huì)像是這樣:
ghci> :load syslogtcpclient.hs
Loading package base ... linking ... done.
[1 of 2] Compiling SyslogTypes ( SyslogTypes.hs, interpreted )
[2 of 2] Compiling Main ( syslogtcpclient.hs, interpreted )
Ok, modules loaded: Main, SyslogTypes.
ghci> openlog "localhost" "10514" "tcptest"
Loading package parsec-2.1.0.0 ... linking ... done.
Loading package network-2.1.0.0 ... linking ... done.
ghci> sl <- openlog "localhost" "10514" "tcptest"
ghci> syslog sl USER INFO "This is my TCP message"
ghci> syslog sl USER INFO "This is my TCP message again"
ghci> closelog sl
結(jié)束時(shí),服務(wù)器上將看到這樣的輸出:
From 127.0.0.1:46319: syslogtcpserver.hs: client connnected
From 127.0.0.1:46319: <9>tcptest: This is my TCP message
From 127.0.0.1:46319: <9>tcptest: This is my TCP message again
From 127.0.0.1:46319: syslogtcpserver.hs: client disconnected
<9> 是優(yōu)先級(jí)和設(shè)施代碼,和之前 UDP 例子中的意思一樣。
更多建議: