标签归档:socket

即时通信IRC服务Oragono

前一篇讲了XMPP通信,还有一种比它还早的协议IRC(Internet Relay Chat),是一种简单的文本聊天协议,支持一对一聊天和群聊(频道)。通常IRC聊天只需要昵称而不需要密码(也可以使用密码),这与XMML不同,使得它可以作为公开的聊天频道,比如游戏直播平台Twitch就使用类似的功能来实现聊天。IRC同样有许多服务端,比如InspIRCd和客户端软件,比如Irssi,也可以直接连接公开的IRC服务器,比如Freenode。事实上IRC协议非常简单,只要能对相应的命令作出响应就可以在各个客户端之间通信,因此有许多不同编程语言的服务端/客户端实现。Oragono便是IRC 服务器的Golang实现,支持IRCv3,支持SASL/LDAP认证,支持服务端保存消息历史记录。
Oragono运行很简单,克隆代码,编辑配置文件就可以了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
➜  oragono24 ls
CHANGELOG.md     README           default.yaml     docs             languages        oragono          oragono.motd     traditional.yaml
➜  oragono24 cp default.yaml ircd.yaml
➜  oragono24 ./oragono mkcerts
2020/11/11 12:01:04 making self-signed certificates
2020/11/11 12:01:04  making cert for :6697 listener
2020/11/11 12:01:04   Certificate created at fullchain.pem : privkey.pem
➜  oragono24 ls
CHANGELOG.md     README           default.yaml     docs             fullchain.pem    ircd.yaml        languages        oragono          oragono.motd     privkey.pem      traditional.yaml
➜  oragono24 ./oragono run
2020-11-13T03:09:29.320Z : info  : server     : oragono-2.4.0 starting
2020-11-13T03:09:29.320Z : info  : server     : Using config file : ircd.yaml
2020-11-13T03:09:29.320Z : info  : server     : Using datastore : ircd.db
2020-11-13T03:09:29.344Z : info  : server     : Proxied IPs will be accepted from : localhost
2020-11-13T03:09:29.345Z : info  : listeners  : now listening on [::1]:6667, tls=false, tlsproxy=false, tor=false, websocket=false.
2020-11-13T03:09:29.345Z : info  : listeners  : now listening on :6697, tls=true, tlsproxy=false, tor=false, websocket=false.
2020-11-13T03:09:29.345Z : info  : listeners  : now listening on :8097, tls=true, tlsproxy=false, tor=false, websocket=true.
2020-11-13T03:09:29.345Z : info  : listeners  : now listening on 127.0.0.1:6667, tls=false, tlsproxy=false, tor=false, websocket=false.
2020-11-13T03:09:29.345Z : info  : server     : Server running
2020-11-13T03:09:38.386Z : info  : connect-ip : Client connecting: real IP 192.168.33.1, proxied IP <nil>
2020-11-13T03:09:38.439Z : info  : accounts   : client : * : logged into account : kiwi-n30
2020-11-13T03:09:38.442Z : info  : connect    : Client connected [kiwi-n30] [u:~u] [r:https://kiwiirc.com/]
2020-11-13T03:09:40.955Z : info  : connect-ip : Client connecting: real IP 192.168.33.1, proxied IP <nil>
2020-11-13T03:09:40.963Z : info  : connect    : Client connected [kiwi-n28] [u:~u] [r:https://kiwiirc.com/]
2020-11-13T03:09:56.198Z : info  : connect-ip : Client connecting: real IP 192.168.33.1, proxied IP <nil>
2020-11-13T03:09:56.209Z : info  : connect    : Client connected [kiwi-n29] [u:~u] [r:https://kiwiirc.com/]

编辑ircd.yml,设置管理密码,启用一下WebSocket,保存历史消息到MySQL。opers/admin管理密码在认证管理员的时候使用的,可以使用命令oragono genpasswd生成。ircd.db用来保存注册用户,使用的并不是SQLite,而是Golang实现的一个嵌入式的内存Key/Value存储BuntDB,可以持久化到硬盘。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# network configuration
network:
    # name of the network
    name: OragonoTest
 
# server configuration
server:
    # server name
    name: oragono.test
 
    # addresses to listen on
    listeners:
        # The standard plaintext port for IRC is 6667. Allowing plaintext over the
        # public Internet poses serious security and privacy issues. Accordingly,
        # we recommend using plaintext only on local (loopback) interfaces:
        "127.0.0.1:6667": # (loopback ipv4, localhost-only)
        "[::1]:6667":     # (loopback ipv6, localhost-only)
        # If you need to serve plaintext on public interfaces, comment out the above
        # two lines and uncomment the line below (which listens on all interfaces):
        # ":6667":
        # Alternately, if you have a TLS certificate issued by a recognized CA,
        # you can configure port 6667 as an STS-only listener that only serves
        # "redirects" to the TLS port, but doesn't allow chat. See the manual
        # for details.
 
        # The standard SSL/TLS port for IRC is 6697. This will listen on all interfaces:
        ":6697":
            tls:
                cert: fullchain.pem
                key: privkey.pem
                # 'proxy' should typically be false. It's only for Kubernetes-style load
                # balancing that does not terminate TLS, but sends an initial PROXY line
                # in plaintext.
                proxy: false
 
        # Example of a Unix domain socket for proxying:
        # "/tmp/oragono_sock":
 
        # Example of a Tor listener: any connection that comes in on this listener will
        # be considered a Tor connection. It is strongly recommended that this listener
        # *not* be on a public interface --- it should be on 127.0.0.0/8 or unix domain:
        # "/hidden_service_sockets/oragono_tor_sock":
        #     tor: true
 
        # Example of a WebSocket listener:
        ":8097":
            websocket: true
            tls:
                cert: fullchain.pem
                key: privkey.pem
# ircd operators
opers:
    # operator named 'admin'; log in with /OPER admin [password]
    admin:
        # which capabilities this oper has access to
        class: "server-admin"
 
        # custom whois line
        whois-line: is a server admin
 
        # custom hostname
        vhost: "staff"
 
        # normally, operator status is visible to unprivileged users in WHO and WHOIS
        # responses. this can be disabled with 'hidden'. ('hidden' also causes the
        # 'vhost' line above to be ignored.)
        hidden: false
 
        # modes are modes to auto-set upon opering-up. uncomment this to automatically
        # enable snomasks ("server notification masks" that alert you to server events;
        # see `/quote help snomasks` while opered-up for more information):
        #modes: +is acjknoqtuxv
 
        # operators can be authenticated either by password (with the /OPER command),
        # or by certificate fingerprint, or both. if a password hash is set, then a
        # password is required to oper up (e.g., /OPER dan mypassword). to generate
        # the hash, use `oragono genpasswd`.
        password: "$2a$04$I2Yhr7UF4p3iyEZcKTPJgukLA9mm00B1wQgicJvGZP/gf0u0tbQQy"
# datastore configuration
datastore:
    # path to the datastore
    path: ircd.db
 
    # if the database schema requires an upgrade, `autoupgrade` will attempt to
    # perform it automatically on startup. the database will be backed
    # up, and if the upgrade fails, the original database will be restored.
    autoupgrade: true
 
    # connection information for MySQL (currently only used for persistent history):
    mysql:
        enabled: true
        host: "127.0.0.1"
        port: 3306
        # if socket-path is set, it will be used instead of host:port
        #socket-path: "/var/run/mysqld/mysqld.sock"
        user: "root"
        password: "root"
        history-database: "oragono_history"
        timeout: 30s

IRC服务端设置好了,使用客户端连上去就可以聊天了。IRC协议类似HTTP协议,服务器地址类似irc://irc.freenode.net,但是也可以使用web来连接,前面配置了监听来自8097端口的websocket请求。这里使用web的IRC客户端Kiwi IRC,这是Vue实现的一个web客户端,各个组件之间通过触发/订阅Vue实例状态/事件变化进行通信,支持自定义组件/中间件,这也是Freenode使用的客户端。Kiwiirc主要聚焦连接和消息的处理,比如websocket连接/发送/监听,消息的格式化/通知/声音/状态。它能够连接websocket的IRC服务器,甚至还提供在线生成器,只需要添加websocket url就好了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
{
    "windowTitle": "Kiwi IRC with Oragono Test",
    "startupScreen": "welcome",
    "kiwiServer": "http://oragono.test",
    "restricted": false,
    "theme": "Default",
    "themes": [
        { "name": "Default", "url": "static/themes/default" },
        { "name": "Dark", "url": "static/themes/dark" },
        { "name": "Coffee", "url": "static/themes/coffee" },
        { "name": "GrayFox", "url": "static/themes/grayfox" },
        { "name": "Nightswatch", "url": "static/themes/nightswatch" },
        { "name": "Osprey", "url": "static/themes/osprey" },
        { "name": "Radioactive", "url": "static/themes/radioactive" },
        { "name": "Sky", "url": "static/themes/sky" },
        { "name": "Elite", "url": "static/themes/elite" }
    ],
    "startupOptions" : {
        "websocket": "wss://127.0.0.1:8097",
        "channel": "#kiwiirc-default",
        "nick": "kiwi-n?"
    },
    "embedly": {
        "key": ""
    },
    "plugins": [
        { "name": "customise", "url": "static/plugins/customise.html" }
    ]
}

运行登录后的效果


其实IRC是基于TCP连接的文本交互的,也可以使用telnet来模拟

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
➜  ~ telnet 127.0.0.1 6667
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
CAP LS 302
:oragono.test CAP * LS * :account-notify account-tag away-notify batch cap-notify chghost draft/channel-rename draft/chathistory draft/event-playback draft/languages=16,en,~bs,~de,~el,~en-AU,~es,~fi,~fr-FR,~it,~nl,~no,~pl,~pt-BR,~ro,~tr-TR,~zh-CN draft/multiline=max-bytes=4096,max-lines=100 draft/register=before-connect draft/relaymsg=/ draft/resume-0.5 echo-message extended-join invite-notify labeled-response message-tags multi-prefix oragono.io/nope sasl=PLAIN,EXTERNAL server-time setname
:oragono.test CAP * LS :userhost-in-names znc.in/playback znc.in/self-message
NICK kiwi-n26
USER kiwi-n26 0 * https://kiwiirc.com/
CAP REQ :account-notify account-tag away-notify batch cap-notify draft/chathistory extended-join invite-notify message-tags multi-prefix server-time userhost-in-names znc.in/self-message
@time=2020-11-13T01:55:02.201Z :oragono.test CAP * ACK :account-notify account-tag away-notify batch cap-notify draft/chathistory extended-join invite-notify message-tags multi-prefix server-time userhost-in-names znc.in/self-message
CAP END
@time=2020-11-13T01:55:07.714Z :oragono.test 001 kiwi-n26 :Welcome to the Internet Relay Network kiwi-n26
@time=2020-11-13T01:55:07.714Z :oragono.test 002 kiwi-n26 :Your host is oragono.test, running version oragono-2.4.0
@time=2020-11-13T01:55:07.714Z :oragono.test 003 kiwi-n26 :This server was created Fri, 13 Nov 2020 00:35:48 UTC
@time=2020-11-13T01:55:07.714Z :oragono.test 004 kiwi-n26 oragono.test oragono-2.4.0 BERTZios CEIMRUabehiklmnoqstuv Iabehkloqv
@time=2020-11-13T01:55:07.714Z :oragono.test 005 kiwi-n26 AWAYLEN=390 BOT=B CASEMAPPING=ascii CHANLIMIT=#:100 CHANMODES=Ibe,k,l,CEMRUimnstu CHANNELLEN=64 CHANTYPES=# ELIST=U EXCEPTS EXTBAN=,m INVEX KICKLEN=390 MAXLIST=beI:60 :are supported by this server
@time=2020-11-13T01:55:07.714Z :oragono.test 005 kiwi-n26 MAXTARGETS=4 MODES MONITOR=100 NETWORK=OragonoTest NICKLEN=32 PREFIX=(qaohv)~&@%+ STATUSMSG=~&@%+ TARGMAX=NAMES:1,LIST:1,KICK:1,WHOIS:1,USERHOST:10,PRIVMSG:4,TAGMSG:4,NOTICE:4,MONITOR:100 TOPICLEN=390 UTF8MAPPING=rfc8265 WHOX draft/CHATHISTORY=100 :are supported by this server
@time=2020-11-13T01:55:07.714Z :oragono.test 251 kiwi-n26 :There are 0 users and 4 invisible on 1 server(s)
@time=2020-11-13T01:55:07.714Z :oragono.test 252 kiwi-n26 0 :IRC Operators online
@time=2020-11-13T01:55:07.714Z :oragono.test 253 kiwi-n26 0 :unregistered connections
@time=2020-11-13T01:55:07.714Z :oragono.test 254 kiwi-n26 2 :channels formed
@time=2020-11-13T01:55:07.714Z :oragono.test 255 kiwi-n26 :I have 4 clients and 0 servers
@time=2020-11-13T01:55:07.714Z :oragono.test 265 kiwi-n26 4 4 :Current local users 4, max 4
@time=2020-11-13T01:55:07.714Z :oragono.test 266 kiwi-n26 4 4 :Current global users 4, max 4
@time=2020-11-13T01:55:07.714Z :oragono.test 375 kiwi-n26 :- oragono.test Message of the day -
@time=2020-11-13T01:55:07.714Z :oragono.test 372 kiwi-n26 :-
@time=2020-11-13T01:55:07.714Z :oragono.test 372 kiwi-n26 :-       ▄▄▄   ▄▄▄·  ▄▄ •        ▐ ▄
@time=2020-11-13T01:55:07.714Z :oragono.test 372 kiwi-n26 :- <img draggable="false" role="img" class="emoji" alt="▪" src="https://s.w.org/images/core/emoji/13.1.0/svg/25aa.svg">     ▀▄ █·▐█ ▀█ ▐█ ▀ <img draggable="false" role="img" class="emoji" alt="▪" src="https://s.w.org/images/core/emoji/13.1.0/svg/25aa.svg"><img draggable="false" role="img" class="emoji" alt="▪" src="https://s.w.org/images/core/emoji/13.1.0/svg/25aa.svg">     •█▌▐█<img draggable="false" role="img" class="emoji" alt="▪" src="https://s.w.org/images/core/emoji/13.1.0/svg/25aa.svg">
@time=2020-11-13T01:55:07.714Z :oragono.test 372 kiwi-n26 :-  ▄█▀▄ ▐▀▀▄ ▄█▀▀█ ▄█ ▀█▄ ▄█▀▄<img draggable="false" role="img" class="emoji" alt="▪" src="https://s.w.org/images/core/emoji/13.1.0/svg/25aa.svg">▐█▐▐▌ ▄█▀▄
@time=2020-11-13T01:55:07.714Z :oragono.test 372 kiwi-n26 :- ▐█▌.▐▌▐█•█▌▐█ <img draggable="false" role="img" class="emoji" alt="▪" src="https://s.w.org/images/core/emoji/13.1.0/svg/25aa.svg">▐▌▐█▄<img draggable="false" role="img" class="emoji" alt="▪" src="https://s.w.org/images/core/emoji/13.1.0/svg/25aa.svg">▐█▐█▌ ▐▌██▐█▌▐█▌.▐▌
@time=2020-11-13T01:55:07.714Z :oragono.test 372 kiwi-n26 :-  ▀█▄▀<img draggable="false" role="img" class="emoji" alt="▪" src="https://s.w.org/images/core/emoji/13.1.0/svg/25aa.svg">.▀  ▀ ▀  ▀ ·▀▀▀▀  ▀█▄▀ ▀▀ █<img draggable="false" role="img" class="emoji" alt="▪" src="https://s.w.org/images/core/emoji/13.1.0/svg/25aa.svg"> ▀█▄▀<img draggable="false" role="img" class="emoji" alt="▪" src="https://s.w.org/images/core/emoji/13.1.0/svg/25aa.svg">
@time=2020-11-13T01:55:07.714Z :oragono.test 372 kiwi-n26 :-
@time=2020-11-13T01:55:07.714Z :oragono.test 372 kiwi-n26 :- This is the default Oragono MOTD.
@time=2020-11-13T01:55:07.714Z :oragono.test 372 kiwi-n26 :-
@time=2020-11-13T01:55:07.714Z :oragono.test 372 kiwi-n26 :-
@time=2020-11-13T01:55:07.714Z :oragono.test 372 kiwi-n26 :- If motd-formatting is enabled in the config file, you can use the dollarsign character to
@time=2020-11-13T01:55:07.714Z :oragono.test 372 kiwi-n26 :- create special formatting such as bold, italics and color codes.
@time=2020-11-13T01:55:07.714Z :oragono.test 372 kiwi-n26 :-
@time=2020-11-13T01:55:07.714Z :oragono.test 372 kiwi-n26 :- For example, here are a few formatted lines (enable motd-formatting to see these in action):
@time=2020-11-13T01:55:07.714Z :oragono.test 372 kiwi-n26 :-
@time=2020-11-13T01:55:07.714Z :oragono.test 372 kiwi-n26 :- - this is bold text.
@time=2020-11-13T01:55:07.714Z :oragono.test 372 kiwi-n26 :- - this is italics text.
@time=2020-11-13T01:55:07.714Z :oragono.test 372 kiwi-n26 :- - this is 4red and 2blue text.
@time=2020-11-13T01:55:07.714Z :oragono.test 372 kiwi-n26 :- - this is 4,12red text with a light blue background.
@time=2020-11-13T01:55:07.714Z :oragono.test 372 kiwi-n26 :- - this is a normal escaped dollarsign: $
@time=2020-11-13T01:55:07.714Z :oragono.test 372 kiwi-n26 :-
@time=2020-11-13T01:55:07.714Z :oragono.test 372 kiwi-n26 :- And now a few fun colour charts!
@time=2020-11-13T01:55:07.714Z :oragono.test 372 kiwi-n26 :-
@time=2020-11-13T01:55:07.714Z :oragono.test 372 kiwi-n26 :- 1,0 00 0,1 01 0,2 02 0,3 03 1,4 04 0,5 05 0,6 06 1,7 07
@time=2020-11-13T01:55:07.714Z :oragono.test 372 kiwi-n26 :- 1,8 08 1,9 09 0,10 10 1,11 11 0,12 12 1,13 13 1,14 14 1,15 15
@time=2020-11-13T01:55:07.714Z :oragono.test 372 kiwi-n26 :-
@time=2020-11-13T01:55:07.714Z :oragono.test 372 kiwi-n26 :- 0,16 16 0,17 17 0,18 18 0,19 19 0,20 20 0,21 21 0,22 22 0,23 23 0,24 24 0,25 25 0,26 26 0,27 27
@time=2020-11-13T01:55:07.714Z :oragono.test 372 kiwi-n26 :- 0,28 28 0,29 29 0,30 30 0,31 31 0,32 32 0,33 33 0,34 34 0,35 35 0,36 36 0,37 37 0,38 38 0,39 39
@time=2020-11-13T01:55:07.714Z :oragono.test 372 kiwi-n26 :- 0,40 40 0,41 41 0,42 42 0,43 43 0,44 44 0,45 45 0,46 46 0,47 47 0,48 48 0,49 49 0,50 50 0,51 51
@time=2020-11-13T01:55:07.714Z :oragono.test 372 kiwi-n26 :- 0,52 52 0,53 53 1,54 54 1,55 55 1,56 56 1,57 57 1,58 58 0,59 59 0,60 60 0,61 61 0,62 62 0,63 63
@time=2020-11-13T01:55:07.714Z :oragono.test 372 kiwi-n26 :- 0,64 64 1,65 65 1,66 66 1,67 67 1,68 68 1,69 69 1,70 70 1,71 71 0,72 72 0,73 73 0,74 74 0,75 75
@time=2020-11-13T01:55:07.714Z :oragono.test 372 kiwi-n26 :- 1,76 76 1,77 77 1,78 78 1,79 79 1,80 80 1,81 81 1,82 82 1,83 83 1,84 84 1,85 85 1,86 86 1,87 87
@time=2020-11-13T01:55:07.714Z :oragono.test 372 kiwi-n26 :- 0,88 88 0,89 89 0,90 90 0,91 91 0,92 92 0,93 93 0,94 94 0,95 95 1,96 96 1,97 97 1,98 98 99,99 99
@time=2020-11-13T01:55:07.714Z :oragono.test 372 kiwi-n26 :-
@time=2020-11-13T01:55:07.714Z :oragono.test 372 kiwi-n26 :- For more information on using these, see MOTDFORMATTING.md
@time=2020-11-13T01:55:07.714Z :oragono.test 376 kiwi-n26 :End of MOTD command
@time=2020-11-13T01:55:07.715Z :oragono.test 221 kiwi-n26 +Zi
WHO kiwi-n26
@time=2020-11-13T01:55:15.649Z :oragono.test 352 kiwi-n26 * ~u gcjc79gmtbe42.irc oragono.test kiwi-n26 H :0 https://kiwiirc.com/
@time=2020-11-13T01:55:15.649Z :oragono.test 315 kiwi-n26 kiwi-n26!*@* :End of WHO list
JOIN #kiwiirc-default
@msgid=bidkvpja5njjtgchub8u65pxgn;time=2020-11-13T01:55:26.706Z :kiwi-n26!~u@gcjc79gmtbe42.irc JOIN #kiwiirc-default * https://kiwiirc.com/
@time=2020-11-13T01:55:26.706Z :oragono.test 353 kiwi-n26 = #kiwiirc-default :@kiwi-n28!~u@5g96q93tqw3vn.irc kiwi-n30!~u@5g96q93tqw3vn.irc kiwi-n29!~u@5g96q93tqw3vn.irc kiwi-n26!~u@gcjc79gmtbe42.irc
@time=2020-11-13T01:55:26.706Z :oragono.test 366 kiwi-n26 #kiwiirc-default :End of NAMES list
JOIN #ops
@msgid=hftqmpu4ieyjybx9bbgf4vpbia;time=2020-11-13T01:55:32.145Z :kiwi-n26!~u@gcjc79gmtbe42.irc JOIN #ops * https://kiwiirc.com/
@time=2020-11-13T01:55:32.145Z :oragono.test 353 kiwi-n26 = #ops :@kiwi-n28!~u@5g96q93tqw3vn.irc kiwi-n30!~u@5g96q93tqw3vn.irc kiwi-n29!~u@5g96q93tqw3vn.irc kiwi-n26!~u@gcjc79gmtbe42.irc
@time=2020-11-13T01:55:32.145Z :oragono.test 366 kiwi-n26 #ops :End of NAMES list
PRIVMSG #ops hello
PRIVMSG #ops :kiwi-n28 good morning
PRIVMSG kiwi-n28 =D
PING kiwitime-1605232555315
@time=2020-11-13T01:57:14.150Z :oragono.test PONG oragono.test kiwitime-1605232555315
CHATHISTORY BEFORE kiwi-n28 * 50
@time=2020-11-13T01:57:38.462Z :oragono.test BATCH +1 chathistory kiwi-n28
@msgid=rqdbvmytzppssc5ymp7ffhmkga;time=2020-11-13T01:56:05.705Z;batch=1 :kiwi-n26!~u@gcjc79gmtbe42.irc PRIVMSG kiwi-n28 :=D
@time=2020-11-13T01:57:38.462Z :oragono.test BATCH -1
@time=2020-11-13T01:59:08.466Z PING kiwi-n26
:kiwi-n26!~u@gcjc79gmtbe42.irc QUIT :Ping timeout: 2m30s
ERROR :Ping timeout: 2m30s
Connection closed by foreign host.

可以看到这些消息都带有一定的格式,客户端根据特地的格式解析就可以个性化显示来。带有符号@/:前缀的消息都是服务器推送下来的消息,反之都是客户端请求的消息。“CAP LS 302”即协商会话协议版本,随即用命令“NICK ”设置昵称(登录名),和“USER 0 * ”设置该昵称显示的用户名,最后以“CAP END”结束协商,开始会话。服务端推送过来的消息格式为时间戳+服务器+响应码+接受者昵称+消息内容,即“@time=2020-11-13T01:55:07.714Z :oragono.test 001 :Welcome to the Internet Relay Network ”,001代表欢迎信息,具体响应码可以参考这里。客户端发送消息则直接发送,比如使用“JOIN #channel”加入某个频道,服务器会向该频道里的每个人广播对JOIN应的消息“@msgid=hftqmpu4ieyjybx9bbgf4vpbia;time=2020-11-13T01:55:32.145Z :@gcjc79gmtbe42.irc JOIN #channel * ”并向操作者推送频道用户列表“@time=2020-11-13T01:55:32.145Z :oragono.test 353 = #channel :@!~u@5g96q93tqw3vn.irc !~u@5g96q93tqw3vn.irc !~u@5g96q93tqw3vn.irc
”。Twitch则在消息的基础上加入了其他格式化控制,比如

1
@badge-info=;badges=premium/1;client-nonce=2f70a1efd6f84523d0f180cc44f76c61;color=#B22222;display-name=Khodeine;emotes=58765:9-19;flags=;id=50d8790f-c474-42a8-b873-2d5e8008bd52;mod=0;room-id=30220059;subscriber=0;tmi-sent-ts=1605279854692;turbo=0;user-id=49980352;user-type= :khodeine!khodeine@khodeine.tmi.twitch.tv PRIVMSG #esl_sc2 :the scvs NotLikeThis

会显示以一定的颜色/图标显示用户名/消息。空闲状态下IRC服务端与客户端需要维持心跳信息,比如“PING kiwitime-1605232555315”和“time=2020-11-13T01:57:14.150Z :oragono.test PONG oragono.test kiwitime-1605232555315”,否则连接会被关闭。一些常用的命令,可以看出IRC交互非常简单,IRC核心组件也不多。使用“@+dratf/typing=active TAGMSG ”显示输入状态,借助CTCP协议甚至可以点对点发送文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#使用password注册当前nickname
/NS REGISTER <your password>
#注册一个频道
/CS REGISTER #channelname
/CS REGISTER #channe
#要去某人加入频道
/INVITE nickname #channel
#给予某个用户频道管理权限
/mode +o nickname
#认证成为管理员
/OPER admin <admin passowrd>
#在频道发送消息
/PRIVMSG #channe hello
#给频道某人发送消息
/PRIVMSG #channe :<nick> good morning
#给频道某人发送消息
/INVITE <nick> #channe
#使用密码登录
AUTHENTICATE PLAIN
AUTHENTICATE +
AUTHENTICATE a2l3aS1uMzAAa2l3aS1uMzAAMTAyMDMwNDA=

Thelounge是另外一个Vue实现的web的IRC客户端,同样支持websocket。它还用express实现了服务端,支持额外的密码/LDAP登录,即先认证通过才能连接IRC服务器,它本身也可以作为一个服务代理网关,为用户提供永远在线服务/消息存储-如果一个IRC用户不在线是不能够给它发送消息的,这一点也跟XMPP不一样。


Kiwiirc和Thelounge的设计实现良好,可以作为web chat实现的参考,比如在线客服/销售。比较两个web的IRC客户端发现,他们使用的websocket库并不一样。Kiwiirc使用的是sockjs-client,一个类似websocket的JavaScript网络连接库,支持不同浏览器,能够跨域名通信,在浏览器不支持websocket的情况下退化为polling(需要服务端配合)。而Thelounge使用的是socket.io。Socket.IO由Engine.IO发展而来,致力于为web提供实时通信能力。Socket.IO并不是WebSocket的一个实现,只是提供了类似的连接能力,需要服务端配合。两个连接库在浏览器支持websocket的情况下都可以直接与websocket服务端通信(不需要对应的库)。Kiwiirc还提供了irc-framework开发库,可以快速搭建一个IRC客户端,比如IRC Bot。至今仍然存在着许多IRC聊天机器人,为用户提供方便/服务器报警。Twitch一个用户加入频道时甚至能收到广告!

Websocket提供了与其他socket一样的能力,能够持续连接和双向通讯,甚至可以服用80/443端口(nginx支持101协议升级)。它不止可以使用在web上,也可以使用在非web上面,因为能够穿透防火墙,对树莓派的远程链接支持。传统的web应用都是一应一答,这节省的服务器/客户端的资源能够不需要持续请求的话。而对于有些应用而言,比如消息通知,希望是能够持续从服务器推送的,websocket增强了它的能力,在这之前都是long polling(所以才有那些包装库)。
Http协议在tcp基础上实现,请求头里面带有host、cookie,url里面带有路径等,这样后端能够识别、路由、分发请求。而tcp并没有这些,websocket必须自己实现类似的路由分发/调用,可以在连接建立初期,鉴别授权(可以在url上面带参数token/jwt/security key),对于后续消息的应答可以在url上面指定,也可以在事件上面指定。这样实现出来似乎与http差不多,仍然有event对应route,但却可以持续交互。
再观察IRC协议也不过是对各个命令的应答,其实也跟经典web MVC差不多-使用URL映射对应的controller/action。IRC每个会话都是一个队列,join #channel命令即订阅消息,part #channel命令则是取消订阅。再看Rabbitmq消息队列,实现了exchange,route,queue,channel,topic等,然后是持续的消费/推送,与前面所诉差不多,这样子看IRC也是个消息服务器。IRC实现简单,通过扩展支持的命令,也能够与内部/外部系统结合,快速搭建一个即时消息平台。
再看邮件服务,其实也是消息服务,也有一对一/群发消息/历史消息,甚至能够给不认识(不在线)的人发送邮件!而邮件服务至今流行,在许多平台,不论服务器还是手机都自带,不需要额外安装软件。Slack的产品宣传即对比了自己相对邮件服务的优势。其实手机默认带的电话和短信功能是基于硬件模块的在线功能。
相比Ejabberd/Openfire,IRC服务器的部署简单许多,甚至可以部署在树莓派上!

参考连接:
IRC Bot
基于 RabbitMQ 的实时消息推送

TCP UDP探索

最近开发新接口,为实现更高的性能,深入考察了下TCP,UDP通信协议。之前的开发当中都是基于HTTP的接口开发,简单易用,各个编程语言都支持;web服务器、中间件成熟,方便扩展;支持安全加密等等。与自定义Socket通信相比,HTTP仍然不够高效。

TCP协议,也叫传输控制协议(Transmission Control Protocol)是一种面向连接的、可靠的、基于字节流的传输层通信协议。每建立一个TCP连接需要三次握手,关闭时又需要四次握手;传输过程当中,每发送一个包,都要接收一个确认回复,如果在一定时间内未收到,则重发,如果是同一块数据分包传输,则回复所需要的包序号;在发送和接收时都要计算校验和。由于有严格的数据包传输顺序和校验,因为接收到的数据将会是顺序。

UDP协议,也叫用户数据报协议(User Datagram Protocol)是一个简单的面向数据报的传输层协议。它只管数据包发送,并不没有TCP的连接控制、回复确认,并不知道对方是否接收成功。因为并不等待接收方回复,发送端可以连续发送数据,接收端接收到数据并不一定是顺序的。因此相比TCP,UDP的有更高的传输效率,更低的可靠性。如果需要可靠性,则需要发送确认包,类似TCP那样子。

通常HTTP协议基于TCP实现,保证收到的内容是完整、顺序的。在HTTP 1.0时,每一次请求都需要建立TCP连接,关闭TCP连接;在HTTP 1.1时,同一TCP连接则可以在活跃(Keep-Live)间期内复用。HTTP是无连接的,每次处理完成请求即断开连接,一方面可以节省服务器资源,另一方面却浪费连接时间。如果需要维持客户端的Keep-Alive状态,则又需要消耗更多服务器资源,需要权衡。

WebSocket是基于TCP 连接上进行全双工通讯的协议,允许服务端直接向浏览器发送消息,不再需要ajax轮询。它也是在TCP上面的实现,使用类似HTTP请求的握手协议,进行TCP通信切换。WebRTC,也叫Web即时通信(Web Real-Time Communication),则是一个支持浏览器之间进行实时语音对话或视频对话的API。它是建立于TCP/UDP之上的点对点通信,客户端基于上层RTCDataChannel进行通信而非底层TCP/UDP。

Google推出了基于UDP的QUIC协议(快速UDP网络连接)实现的HTTP传输,可以快速在两个端点间创建连接,支持多路复用连接。对于丢包问题,则采用冗余传输,能够根据接收到的包,对缺失的包进行恢复(类似RAID)。对于一个TCP连接,需要四个参数:源IP/端口,目的IP/端口,任何一个改变都需要重新建立连接。QUIC则不需要这些,因为UDP并不需要来源信;它使用UUID来标识一个连接,只要这个UUID在,那么这个连接会话就能够继续(与HTTP会话有点像)。因此在不同网络,特别是移动网络之间的切换,都将保持稳定(参考:Mosh)。目前Google的网站已经支持,并可以在Chrome浏览器的里面查看连接情况:chrome://net-internals/#quic。

通常说TCP是可靠连接,可用于数据流,复杂网络情况;UDP由于缺少确认机制等,属不可靠连接,通常应用于网络情况良好(内部)或者允许丢包的场景。事实上UDP也应用于游戏,VoIP等需要高效传输的场景。当网络状况糟糕时,使用TCP通信反而可能更差,因为需要更多步骤建立连接和确认。参考这里:QQ 为什么采用 UDP 协议,而不采用 TCP 协议实现?。TCP连接是有状态的,服务端需要去维持,及TIME_WAIT问题。如果连接一段时间没用,TCP并不能感知连接是否仍然有效,需要自定义心跳包,维持连接。UDP则是无连接状态的,只要知道目标IP及端口即可以发送。

UDP通信简单许多,不需要socket_connect和socket_connect,主动暴露一端(服务端)需要socket_bind。例如,服务端:

1
2
3
4
5
6
7
8
9
10
11
12
<?php
$socket = stream_socket_server("udp://127.0.0.1:1113", $errno, $errstr, STREAM_SERVER_BIND);
if (!$socket) {
    die("$errstr ($errno)");
}
 
do {
    $pkt = stream_socket_recvfrom($socket, 1024, 0, $peer);
    echo "$peer,$pkt\n";
    //客户端互相通信时,注释下面一行
    stream_socket_sendto($socket, date("D M j H:i:s Y\r\n"), 0, $peer);
} while ($pkt !== false);

客户端:

1
2
3
4
5
6
7
8
9
10
11
12
13
<?php
$sock = socket_create(AF_INET, SOCK_DGRAM, SOL_UDP);
 
$msg = "Ping !";
$len = strlen($msg);
 
socket_sendto($sock, $msg, $len, 0, '127.0.0.1', 1113);
 
//sleep(30)
$res = socket_recv( $sock , $reply , 2045 , MSG_WAITALL);
echo $reply;
 
socket_close($sock);

服务端的代码里面有个stream_socket_sendto(或者socket_sendto),向客户端发送消息。UDP并不需要向客户端回复消息,这并像TCP在同一个连接里面回复,而是另一次UDP通信罢了。实际上,两个客户端之间就能够互相通信(分别使用不同端口),任何能够拿到对方IP和端口,都能够向其发送消息。复制客户端代码,更改目标IP和端口为服务端打印出来的信息,然后运行,原先的客户端也将收到消息。利用中间服务器交换两个客户端的IP和端口,以便直接通信,是NAT穿透常用的方法。

综上,使用TCP可以保证数据可靠传输,复用连接,比如请求-应答模型;使用UDP则可以更高效发送数据,但能容忍部分丢包的场景,比如视频画面;或者只发送不需响应的数据,比如日志;或者无需连接状态的场景(类似HTTP会话),减少服务器压力和客户端等待。当然,也可以在UDP基础上自定义传输规则,实现譬如Mosh或则TCP那样的应用。选择TCP或者UDP,应该综合考虑使用场景,网络,数据包大小,效率,安全等,而不是盲从字面上的“面向可靠的”或者“不可靠的”或者“高效的”。

参考链接:
如何理解HTTP协议的 “无连接,无状态” 特点?
Google’s QUIC protocol: moving the web from TCP to UDP(Google QUIC协议:从TCP到UDP的Web平台
如何看待谷歌 Google 打算用 QUIC 协议替代 TCP/UDP? – 回答作者: Trotyl Yu
How Mosh works
使用TCP协议的NAT穿透技术
PHP Socket通信
日志系统设计
可靠 UDP 传输
DDOS(拒绝服务攻击)
php使用socket感悟–tcp和udp
UDP socket programming in php
Does WebRTC use TCP or UDP?

PHP ZeroMQ开发

ZeroMQ的名字有点巧妙,看起来是个MQ却加了个0,变得不是MQ。ZeroMQ是一个面向消息传递的网络通信框架,支持程序在进程内部部通信,进程之间通信,网络间通信,多播等。ZeroMQ对Socket进行了封装,支持多种网络结构范式如Request/Reply,Pub/Sub,Pull/Push,中介,路由等,还可以在这些模式再次扩展,动态扩容程序和分布式任务开发,能够轻易搭建服务程序集群。

ZeroMQ与支持AMQP的消息中间件不一样,ZeroMQ是一个网络通信库,需要自行实现中间节点和消息的管理。

在CentOS安装ZeroMQ4

1
2
3
4
5
6
7
8
9
10
11
12
git clone https://github.com/zeromq/zeromq4-x.git
cd zeromq4-x
./autogent.sh
./configure
sudo make
sudo make install
 
#声明libzmq库的位置
sudo vim /etc/ld.so.conf.d/libzmq.conf
#内容:/usr/local/lib
 
sudo ldconfig

ZeroMQ支持多种编程语言,也包括PHP。php-zmq安装

1
2
3
4
5
6
7
8
git clone https://github.com/mkoppanen/php-zmq.git
cd php-zmq
phpize
./configure
sudo make
sudo make install
 
sudo vim /etc/php.ini

编辑PHP.ini增加扩展信息

1
2
[zeromq]
extension = zmq.so

查看扩展是否加载成功:php -m | grep php。
先写一个简单请求-应答,首先是服务端reply.php

1
2
3
4
5
6
7
8
9
<?php
$pContext = new ZMQContext();
$pServer  = new ZMQSocket($pContext, ZMQ::SOCKET_REP);
$pServer->bind("tcp://*:5559");
while (true){
    $strMessage = $pServer->recv();
    echo $strMessage."\r\n";
    $pServer->send("From Server1:".$strMessage);
}

然后是客户端request.php

1
2
3
4
5
6
<?php
$pContext = new ZMQContext();
$pClient  = new ZMQSocket($pContext, ZMQ::SOCKET_REQ);
$pClient->connect("tcp://localhost:5559");
$pClient->send("Hello From Client:".uniqid());
var_dump($pClient->recv());

分别在不同终端预先一下程序

1
2
3
4
#请求者可以先启动
php request.php
#另一个终端
php reply.php

使用ZeroMQ进行通信的步骤

  • 使用ZMQContext创建一个上下文
  • 使用上下文初始化ZMQSocket,这里需要指明socket类型(ZMQ::SOCKET_开头),组合模式包括
    • PUB,SUB
    • REQ,REP
    • REQ,ROUTER (take care, REQ inserts an extra null frame)
    • DEALER,REP (take care, REP assumes a null frame)
    • DEALER,ROUTER
    • DEALER,DEALER
    • ROUTER,ROUTER
    • PUSH,PULL
    • PAIR,PAIR

    分类包括

    • 轮询,REQ,PUSH,DEALER
    • 多播,PUB
    • 公平排队,REP,SUB,PULL,DEALER
    • 明确寻址,ROUTER
    • 单播,PAIR
  • 如果是服务端就bind,如果是客户端就conncet,这里的连接信息支持
    • 进程内部通信,inproc://
    • 进程间通信,ipc://
    • 网络间通信,tcp://
    • 多播,pgm://
  • 使用send/recv发送/接收消息

使用ZeroMQ创建通信比socket简单多了,与stream_socket差不多。但是使用ZeroMQ,客户端可以先启动而不用管服务端是否已经启动了,等服务端连接上了便会自动传递消息,还可以维持节点之间的心跳。

ZeroMQ与socket通信是不一样的。ZeroMQ是无状态的,对socket的细节进行了封装,不能知道彼此的socket连接信息,仅能接收和发送消息;ZeroMQ能够使用一个socket与多个节点进行通信,具有极高的性能。

再回头看一下服务端程序,这里采用while循环来处理,亦即同一时刻只能处理一个请求,多个请求排队直到被轮询到,客户端的发送和接收都是同步等待。由于不知道客户端信息,也不能在子进程内处理完成再返回。这里就需要用到ZeroMQ各种范式的组合,比如下面这个
fig16
这里使用ROUTER和DEALER作为中介,转发请求,客户端可以异步发送求,不用等待服务端响应。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
<?php
$pContext = new ZMQContext();
$pFrontend = new ZMQSocket($pContext, ZMQ::SOCKET_ROUTER);
$pBackend = new ZMQSocket($pContext, ZMQ::SOCKET_DEALER);
$pFrontend->bind("tcp://*:5559");
$pBackend->bind("tcp://*:5560");
 
$pPoll = new ZMQPoll();
$pPoll->add($pFrontend, ZMQ::POLL_IN);
$pPoll->add($pBackend, ZMQ::POLL_IN);
 
 
$arrRead = $arrWrite = array();
while(true){
    $nEvent = $pPoll->poll($arrRead, $arrWrite);
    if ($nEvent > 0) {
        foreach($arrRead as $pSocket){
            if($pSocket === $pFrontend){
                while (true){
                    $strMessage = $pSocket->recv();
                    $nMore = $pSocket->getSockOpt(ZMQ::SOCKOPT_RCVMORE);
                    $pBackend->send($strMessage,$nMore ? ZMQ::MODE_SNDMORE : null);
                    if(!$nMore){
                        break;
                    }
                }
            }
            else if ($pSocket === $pBackend){
                while (true){
                    $strMessage = $pSocket->recv();
                    $nMore = $pSocket->getSockOpt(ZMQ::SOCKOPT_RCVMORE);
                    $pFrontend->send($strMessage,$nMore ? ZMQ::MODE_SNDMORE : null);
                    if(!$nMore){
                        break;
                    }
                }
            }
        }
    }
}

然后更改服务端reply.php,不再绑定监听,而不是连接到DEALER上

1
2
3
4
5
6
7
8
9
10
<?php
$pContext = new ZMQContext();
$pServer  = new ZMQSocket($pContext, ZMQ::SOCKET_REP);
//$pServer->bind("tcp://*:5555");
$pServer->connect("tcp://localhost:5560");
while (true){
    $strMessage = $pServer->recv();
    echo $strMessage."\r\n";
    $pServer->send("From Server1:".$strMessage);
}

这里使用ZMQPoll对ZMQSOcket的输入输出事件进行轮询,将ROUTER收到的REQ转发给服务端,将DEALER收到的REP转发给客户端。事实上,还有更简便的方法:使用ZMQDevice将ROUTER和DEALER组合起来

1
2
3
4
5
6
7
8
9
<?php
$pContext = new ZMQContext();
$pFrontend = new ZMQSocket($pContext, ZMQ::SOCKET_ROUTER);
$pBackend = new ZMQSocket($pContext, ZMQ::SOCKET_DEALER);
$pFrontend->bind("tcp://*:5559");
$pBackend->bind("tcp://*:5560");
 
$pDevice = new ZMQDevice($pFrontend, $pBackend);
$pDevice->run();

ZeroMQ的Pub/Sub的通信模型支持一个发布者发布消息给多个订阅者,也支持一个订阅者从多个发布者订阅消息。首先写一个发布者

1
2
3
4
5
6
7
8
9
10
11
12
<?php
$pContext = new ZMQContext();
$pPublisher = new ZMQSocket($pContext, ZMQ::SOCKET_PUB);
$pPublisher->bind("tcp://*:5563");
 
while (true) {
    $pPublisher->send("A", ZMQ::MODE_SNDMORE);
    $pPublisher->send("1:We don't want to see this");
    $pPublisher->send("B", ZMQ::MODE_SNDMORE);
    $pPublisher->send("1:We would like to see this");
    sleep (1);
}

然后是订阅者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
$pContext = new ZMQContext();
$pSubscriber = new ZMQSocket($pContext, ZMQ::SOCKET_SUB);
$pSubscriber->connect("tcp://localhost:5563");
#可以连接多个发布者
$pSubscriber->connect("tcp://localhost:5564");
$pSubscriber->setSockOpt(ZMQ::SOCKOPT_SUBSCRIBE, "B");
 
while (true) {
    //  Read envelope with address
    $address = $pSubscriber->recv();
    //  Read message contents
    $contents = $pSubscriber->recv();
    printf ("[%s] %s%s", $address, $contents, PHP_EOL);
}

Pub/Sub模型,发布者只能发布消息,要求发布消息前,先声明主题(地址),然后发布消息内容;订阅者只能接收消息,先设置订阅主题,然后两次接收,第一次为消息主题,第二次为消息内容。
Pub/Sub模型通消息为单向流动,可以结合其他模型让订阅者与发布者互动,比如REQ\REP。

ZeroMQ的Push/Pull模型,生产者负责推送消息,消费者负责拉取消息。初看之下Pull/Push模型与Pub/sub模型类似,但是Pull/Push下生产者产生的消息只会投递给一个消费者,并不会发布给全部消费者,适合用于任务投递分配
fig5
Push和Pull都既可作为服务端,也可作为客户端。服务端Push.php

1
2
3
4
5
6
7
8
9
<?php
$pContext = new ZMQContext();
$pPush = new ZMQSocket($pContext, ZMQ::SOCKET_PUSH);
 
$pPush->bind("tcp://*:5558");
//$pPush->connect("tcp://localhost:5558");
//$pPush->connect("tcp://localhost:5559");
 
$pPush->send("Hello Client 1");

客户端Pull.php

1
2
3
4
5
6
7
8
9
<?php
$pContext = new ZMQContext();
$pPull = new ZMQSocket($pContext, ZMQ::SOCKET_PULL);
 
//$pPull->bind("tcp://*:5558");
$pPull->connect("tcp://localhost:5558");
$pPull->connect("tcp://localhost:5559");
 
var_dump($pPull->recv());

如果同时启动了两个客户端Pull.php,而只启动一个服务端Push.php,那么一次只会有一个客户端接收到消息。也可以以Pull作为主动监听,Push作为被动连接。可以同时接可以Pub/Sub和Pull/Push来处理任务
fig56
如果是用ZeroMQ传递消息收不到,可以按下面这个流程查问题
chapter1_9
除了客户端可以连接多个服务端,服务端同样可以绑定多个地址。在REQ/REP模型里,让服务端同时使用IPC(进程间通信)来处理本机的连接

1
2
3
4
5
6
7
8
9
10
<?php
$pContext = new ZMQContext();
$pServer  = new ZMQSocket($pContext, ZMQ::SOCKET_REP);
$pServer->bind("tcp://*:5556");
$pServer->bind("ipc:///tmp/req.ipc");
while(true){
    $message = $pServer->recv();
    echo $message . PHP_EOL;
    $pServer->send("Hello from server1:".$message);
}

客户端可以选择走TCP或者IPC进行消息通信

1
2
3
4
5
6
7
8
<?php
$pContext = new ZMQContext();
$pClient  = new ZMQSocket($pContext, ZMQ::SOCKET_REQ);
$pClient->connect("ipc:///tmp/req.ipc");
//$pClient->connect("tcp://localhost:5556");
$pClient->send("Hello From Client1:".uniqid());
$strMessage = $pClient->recv();
echo $strMessage,PHP_EOL;

使用ZeroMQ的进程内部消息通信也很简单

1
2
3
4
5
6
7
8
9
$pServer  = new ZMQSocket(new ZMQContext(), ZMQ::SOCKET_REP);
$pServer->bind("inproc://reply");
 
 
$pClient  = new ZMQSocket(new ZMQContext(), ZMQ::SOCKET_REQ);
$pClient->connect("inproc://reply");;
$pClient->send("Hello From Client1:".uniqid());
 
var_dump($pServer->recv());

ZeroMQ为消息传递的提供极简的方法,提供了各种连接模型,可以自由扩展。zguide更像是一个网络编程指南,指导大家如何利用ZeroMQ搭建各种网络通信模式,提高程序扩展性和健壮性。虽然ZeroMQ解决了进程间和网络间的通信问题,但是各个组件本身进程控制仍然需要自行实现。

更新:ZeroMQ的作者用C语言创建了另外一个支持多种通用通信范式的socket库:nanomsg,可以用来代替ZeroMQ做的那些事,提供了更好的伸缩性,也有对应的PHP扩展

参考链接:
ZMQ 指南
ZeroMQ in PHP
zeromq is the answer
ZeroMQ + libevent in PHP
Europycon2011: Implementing distributed application using ZeroMQ
Getting Started with ‘nanomsg’
A Look at Nanomsg and Scalability Protocols (Why ZeroMQ Shouldn’t Be Your First Choice)

PHP Socket通信

前一篇介绍了跨语言的服务调用框架Thrift,模块与模块之间调用,网络通信必不可少。这里具体介绍下如何使用PHP socket客户端与服务端进行通信。

PHP 的Socket扩展是基于流行的BSD sockets,实现了和socket通讯功能的底层接口,它可以和通用客户端一样当做一个socket服务器。这里的通用客户端是指stream_socket_*系列封装的函数。

首先写一个socket服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
<?php
class ServerSocket{
    protected $strHost = "127.0.0.1";
    protected $nPort = 2015;
    protected $nProtocol = SOL_TCP;
    protected $pSocket = null;
    protected $pClient = null;
 
    public $strErrorCode = "";
    public $strErrorMsg  = "";
    public function __construct($p_strHost = "127.0.0.1", $p_nPort =2015, $p_nProtocol = SOL_TCP){
        //参数验证
        $this->strHost = $p_strHost;
        $this->nPort = $p_nPort;
        $this->nProtocol = $p_nProtocol;
        if($this->_create()&&$this->_bind()){
            $this->_listen();
        }
    }
    protected function _create(){
        $this->pSocket = socket_create(AF_INET, SOCK_STREAM, $this->nProtocol);
        if(!$this->pSocket){
            $this->_log();
        }
        return $this->pSocket;
    }
    protected function _bind(){
        $bRes = socket_bind($this->pSocket, $this->strHost, $this->nPort);
        if(!$bRes){
            $this->_log();
        }
        return $bRes;
    }
    protected function _listen(){
        $bRes  = socket_listen($this->pSocket, 10) ;
        if(!$bRes){
            $this->_log();
        }
        return $bRes;
    }
    public function accept(){
        $this->pClient = socket_accept($this->pSocket);
        if(!$this->pClient){
            $this->_log();
        }
        return $this->pClient;
    }
    protected function _connect(){
        $this->accept();
             
        if(socket_getpeername($this->pClient, $address, $port)){
            echo "Client $address : $port is now connected to us. \n";
        }
        $this->write("hello world from server\n");
    }
    protected function _reply(){
        $mxData = $this->read();
        var_dump($mxData);
        if ($mxData == false) {
            socket_close($this->pClient);
 
            echo "client disconnected.\n";
            return false;
        }
        else{
            $strMessage = "Client: ".trim($mxData)."\n";
            $this->write($strMessage);
            return true;
        }
    }
    public function run(){
        $this->_connect();
        $this->_reply();
    }
    public function read(){
        $mxMessage = socket_read($this->pClient, 1024, PHP_BINARY_READ);
        if($mxMessage === false){
            $this->_log();
        }
        return $mxMessage;
    }
    public function write($p_strMessage){
        $bRes = socket_write($this->pClient, $p_strMessage, strlen ($p_strMessage));
        if(!$bRes){
            $this->_log();
        }
        return $bRes;
    }
    public function close(){
        $bRes = socket_close($this->pSocket);
 
        $this->pSocket = null;
    }
    protected function _log(){
        $this->strErrorCode = socket_last_error();
        $this->strErrorMsg = socket_strerror($this->strErrorCode);
        //throw new Exception("exception:".$this->strErrorMsg , $this->strErrorCode);
    }
    public function __destruct(){
        if($this->pSocket){
            $this->close();
        }
    }
}
$strHost     = "127.0.0.1";
$nPort       = 25003;
$strProtocol = "tcp";
$pServer = new ServerSocket($strHost, $nPort);
$pServer->run();

这里对socket_*系列函数进行了包装,创建socket服务端的步骤

  • 使用socket_create创建socket(套接字)。第一个参数AF_INET指IPV4网络协议,TCP和UDP均可使用,对应IPV6网络协议为AF_INET6,也可以使用UNIX socket协议AF_UNIX,作进程间通信
  • 。第二个参数对应套接字类型,SOCK_STREAM对应TCP协议使用,SOCK_DGRAM对应UDP协议使用,还有SOCK_SEQPACKET,SOCK_RAW,SOCK_RDM等类型。第三个为协议类型,TCP协议对应常量SOL_TCP,UDP协议对应常量SOL_UDP,其他协议可以从getprotobyname函数获取。

  • 使用socket_bind将套接字绑定到对应的主机端口或者UNIX socket上
  • 使用socket_listen监听该套接字上的连接
  • 使用socket_accept接收套接字上的请求连接,返回一个新的套接字用于与客户端通信。如果没有连接接入,将会阻塞住;如果有多个连接,使用第一个达到的连接。
  • 开始通信,使用socket_read获取请求信息,使用socket_write返回响应结果
  • 使用socket_close关闭连接,包括原始的和socket_accept产生的套接字

这个过程中,可以使用socket_last_error和socket_strerror获取错误信息。接着创建客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
<?php
class ClientSocket{
    protected $strHost = "127.0.0.1";
    protected $nPort = 2015;
    protected $nProtocol = SOL_TCP;
    private $pSocket = null;
    public $strErrorCode = "";
    public $strErrorMsg  = "";
 
    public function __construct($p_strHost = "127.0.0.1", $p_nPort =2015, $p_nProtocol = SOL_TCP){
        //参数验证
        $this->strHost = $p_strHost;
        $this->nPort = $p_nPort;
        $this->nProtocol = $p_nProtocol;
        if($this->_create()){
            $this->_connect();
        }
    }
    private function _create(){
        $this->pSocket = socket_create(AF_INET, SOCK_STREAM, $this->nProtocol);
        if(!$this->pSocket){
            $this->_log();
        }
        return $this->pSocket;
    }
    private function _connect(){
        $pSocket = $this->_create();
        $bRes = socket_connect($pSocket, $this->strHost, $this->nPort);
        if(!$bRes){
            $this->_log();
        }
        return $bRes;
    }
    public function read(){
        $strMessage = "";
        $strBuffer = "";
        while ($strBuffer = socket_read ($this->pSocket, 1024, PHP_BINARY_READ)) {
            $strMessage .= $strBuffer;
        }
        return $strMessage;
    }
    public function write($p_strMessage){
        $bRes = socket_write($this->pSocket, $p_strMessage, strlen($p_strMessage));
        if(!$bRes){
            $this->_log();
        }
    }
    public function send($p_strMessage){
        $bRes = socket_send($this->pSocket , $p_strMessage , strlen($p_strMessage) , 0);
        if(!$bRes){
            $this->_log();
        }
        return true;
    }
    public function recv(){
        $strMessage = "";
        $strBuffer = "";
        $bRes = socket_recv($this->pSocket, $strBuffer, 1024 , MSG_WAITALL);
        if(!$bRes){
            $this->_log();
        }
        $strMessage .=$strBuffer;
        return $strMessage;
    }
    public function close(){
        $bRes = socket_close($this->pSocket);
 
        $this->pSocket = null;
    }
    private function _log(){
        $this->strErrorCode = socket_last_error();
        $this->strErrorMsg = socket_strerror($this->strErrorCode);
        //throw new Exception("exception:".$this->strErrorMsg , $this->strErrorCode);
    }
    public function __destruct(){
        if($this->pSocket){
            $this->close();
        }
    }
}
 
$strHost     = "127.0.0.1";
$nPort       = 25003;
$strProtocol = "tcp";
//$nProtocol   = getprotobyname($strProtocol);
 
$pClient = new ClientSocket($strHost, $nPort);
 
var_dump($pClient->read());
$strMessage = 'Some Thing :'.uniqid();
var_dump($strMessage);
$pClient->write($strMessage);
var_dump($pClient->read());
 
/*
 var_dump($pClient->recv());
$pClient->send('hello');
var_dump($pClient->recv());
*/
$pClient->close();

客户端的创建步骤:

  • 使用socket_create创建socket套接字,与服务端对应
  • 使用socket_connect连接到服务端的地址或UNIX socket
  • 开始通信,可以使用socket_write和socket_read向套接字写入和读取信息,也可以使用socket_send和socket_recv发送和接收信息
  • 使用socket_close关闭连接

运行服务端程序

1
php serversocket.php

在另一个终端里运行

1
2
[root@vagrant socket]# netstat -apn | grep 25003
tcp        0      0 127.0.0.1:25003             0.0.0.0:*                   LISTEN      12139/php

如果运行服务端失败,提示 socket_bind(): unable to bind address [98]: Address already in use ,则是端口绑定失败。查看端口占用

1
2
[root@vagrant socket]# netstat -apn | grep 25003
tcp        0      0 127.0.0.1:25003             127.0.0.1:36618             TIME_WAIT   -

该端口处于TIME_WAIT状态,需要再等一会儿才会释放。这是因为TCP连接关闭需要四次握手,服务端主动关闭了连接,但是未收到客户端发过来的关闭确认,导致处于等待状态,具体原因见火丁笔记《再叙TIME_WAIT》

如果服务端已经运行成功,在另一个终端里运行客户端程序

1
php clientsocket.php

这是一个简单服务端/客户端请求应答模型,通常服务端会一直处于监听状态,处理新的请求,重新写一个循环监听的服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class SelectServerSocket extends ServerSocket{
    public function run(){
        $this->loop();
    }
    public function loop(){
        $arrRead = array();
        $arrWrite = $arrExp = null;
        $arrClient = array($this->pSocket);
     
        while(true){
            $arrRead = $arrClient;
            if (socket_select($arrRead, $arrWrite, $arrExp, null) < 1){
                continue;
            }
            foreach ($arrRead as $pSocket){
                if($pSocket == $this->pSocket){
                    $this->_connect();
                         
                    $arrClient[] = $this->pClient;
                }
                else{
                    $bRes = $this->_reply();
                    if($bRes === false){
                        $nKey = array_search($this->pClient, $arrClient);
                        unset($arrClient[$nKey]);
                        continue;
                    }
                }
     
            }
        }
        //usleep(100);
    }
}
$strHost     = "127.0.0.1";
$nPort       = 25003;
$strProtocol = "tcp";
$pServer = new SelectServerSocket($strHost, $nPort);
$pServer->run();

在循环里面使用socket_select查询有可以读的套接字,如果套接字为原始监听的套接字,则使用socket_accept获取新接入的通信套接字进行通信;如果是通信套接字,则与客户端进行交互。

这里socket_select($arrRead, $arrWrite, $arrExp, null)的第四个参数为null,表示可以无限阻塞,如果为0则不阻塞立即返回,其他大于0值则等待超时。
socket_recv($this->pSocket, $strBuffer, 1024 , MSG_WAITALL)的第四个参数为MSG_WAITALL,表示阻塞读取结果。
socket_read ($this->pSocket, 1024, PHP_BINARY_READ )的第三个参数PHP_BINARY_READ表示读取以\0结束,PHP_NORMAL_READ表示读取以\n或\r结束

在终端里运行服务端,会一直在那里等待新的连接。这时候运行客户端,客户端确也阻塞住了。解决的办法有两种:超时设置和非阻塞设置。给ClientSocket类增加超时和阻塞设置的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public function setTimeOut($p_nSendTimeOut = 1, $p_nRecvTimeOut = 1){
    $nSend = (int)$p_nSendTimeOut;
    $nRecv = (int)$p_nRecvTimeOut;
 
    $arrSend = array('sec' => $nSend, 'usec' => (int)(($p_nSendTimeOut - $nSend) * 1000 * 1000));
    $arrRecv = array('sec' => $nRecv, 'usec' => (int)(($p_nRecvTimeOut - $nRecv) * 1000 * 1000));
 
    socket_set_option($this->pSocket, SOL_SOCKET, SO_RCVTIMEO, $arrSend);
    socket_set_option($this->pSocket, SOL_SOCKET, SO_SNDTIMEO, $arrRecv);
}
public function setBlock($p_bType = true){
    if($p_bType){
        socket_set_block($this->pSocket);
    }
    else{
        socket_set_nonblock($this->pSocket);
    }
}

客户端端运行前,先设置一下超时或非阻塞即,此时程序不会再阻塞了

1
2
3
4
5
6
$pClient = new ClientSocket($strHost, $nPort);
 
$pClient->setTimeOut(1, 1);
//$pClient->setBlock(false);
 
//do request here

同样在服务端设置超时和非阻塞也是可以的,给ServerSocket增加超时和非阻塞设置的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
protected function _setNoBlock($p_pSocket){
    socket_set_nonblock($p_pSocket);
}
protected function _setTimeOut($p_pSocket, $p_nSendTimeOut = 1, $p_nRecvTimeOut = 1){
    $nSend = (int)$p_nSendTimeOut;
    $nRecv = (int)$p_nRecvTimeOut;
 
    $arrSend = array('sec' => $nSend, 'usec' => (int)(($p_nSendTimeOut - $nSend) * 1000 * 1000));
    $arrRecv = array('sec' => $nRecv, 'usec' => (int)(($p_nRecvTimeOut - $nRecv) * 1000 * 1000));
 
    socket_set_option($p_pSocket, SOL_SOCKET, SO_RCVTIMEO, $arrSend);
    socket_set_option($p_pSocket, SOL_SOCKET, SO_SNDTIMEO, $arrRecv);
}

将SelectServerSocket的socket_accept后产生的连接设置为非阻塞

1
2
3
4
5
6
7
public function loop(){
    $arrRead = array();
    $arrWrite = $arrExp = null;
    $arrClient = array($this->pSocket);
    $this->_setNoBlock($this->pSocket);
 
    while(true){

再次运行服务端,客户端也不会阻塞了。

在while循环里面使用socket_select进行查询,效率比较低下,有先的连接要等下次循环才能处理;有时候并没有连接需要处理,也一直在循环。可以结合前面介绍过的PHP Libev扩展进行监听

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class EvServerSocket extends ServerSocket{
    protected function _onConnect(){
        $this->_connect();
     
        $pReadClient = new EvIo($this->pClient, Ev::READ, function ($watcher, $revents) {
            $this->_reply();
        });
        Ev::run();
    }
    public function run(){
        $pReadWatcher = new EvIo($this->pSocket, Ev::READ, function ($watcher, $revents) {
            $this->_onConnect();
        });
        Ev::run();
    }
}
 
$strHost     = "127.0.0.1";
$nPort       = 25003;
$strProtocol = "tcp";
$pServer = new EvServerSocket($strHost, $nPort);
$pServer->run();

代码看起来简单了很多。当原始套接字监听到可读事件时,便为新的套接字也创建可读事件监听 ,在事件里面处理新的连接。

通常的服务端程序是一个进程监听原始套接字,然后交由其他进程/线程处理新的连接套接字,与客户端进行交互,提升服务端性能。这样子又涉及到了多进程/多线程的控制、通信,需要一套完善的体系才行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class MulProcessServerSocket extends EvServerSocket{
    protected function _execute(){
        if(!$this->_reply()){
            //子进程执行完毕,通知父进程
            exit();
        }
    }
    protected function _onConnect(){
        $pid = pcntl_fork();
        //父进程和子进程都会执行下面代码
        if ($pid == -1) {
            //错误处理:创建子进程失败时返回-1.
            die('could not fork');
        } else if ($pid) {
            //父进程会得到子进程号,所以这里是父进程执行的逻辑
            pcntl_wait($status); //等待子进程中断,防止子进程成为僵尸进程。
        } else {
            //子进程得到的$pid为0, 所以这里是子进程执行的逻辑。
            $this->_connect();
 
            $pReadClient = new EvIo($this->pClient, Ev::READ, function ($watcher, $revents) {
                $this->_execute();
            });
            Ev::run();
        }
 
    }
}

还可以使用stream_socket_*系列函数来创建sockt服务端和客户端。类似的创建一个客户端与之前的服务端进行交互

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
<?php
class ClientStreamSocket{
    private $pConnetion = null;
    protected $strAddress = "tcp://127.0.0.1:2016";
    protected $nTimeOut   = 3;
    protected $nFlag      = STREAM_CLIENT_CONNECT;
    public $strErrorCode = "";
    public $strErrorMsg  = "";
    const BLOCK   = 1;
    const NOBLOCK = 0;
    public function __construct($p_strAddress, $p_nTimeOut = 3, $p_nFlag = STREAM_CLIENT_CONNECT){
        $this->strAddress = $p_strAddress;
        $this->nTimeOut   = $p_nTimeOut;
        $this->nFlag      = $p_nFlag;
        $this->_connect();
    }
    private function _connect(){
        $this->pConnetion = stream_socket_client($this->strAddress, $this->strErrorCode, $this->strErrorMsg, $this->nTimeOut, $this->nFlag);
        if(!$this->pConnetion){
            throw new Exception("connect exception:".$this->strErrorMsg, $this->strErrorCode);
        }
        return $this->pConnetion;
    }
    public function write($p_strMessage){
        if(fwrite($this->pConnetion, $p_strMessage) !== strlen($p_strMessage))
        {
            throw new Exception('Can not send data');
        }
        return true;
    }
    public function read(){
        //接收一行,阻塞至\n结束
        //$strMessage = fgets($this->pConnetion);
        //指定长度读取
        //$strMessage = fread($this->pConnetion, 1024);
        $strMessage = stream_socket_recvfrom($this->pConnetion, 1024);
        //$strMessage = stream_get_contents($this->pConnetion);
 
        return $strMessage;
    }
    public function close(){
        fclose($this->pConnetion);
        $this->pConnetion = null;
    }
    public function setContext(){
 
    }
    public function setTimeOut($p_nTimeOut = 1){
        $bRes = stream_set_timeout($this->pConnetion, $p_nTimeOut);
    }
    public function setBlock($p_nMode = ClientStreamSocket::BLOCK){
        $bRes = stream_set_blocking($this->pConnetion, $p_nMode);
    }
    public function __destruct(){
        if($this->pConnetion){
            $this->close();
        }
    }
}
 
$strHost     = "127.0.0.1";
$nPort       = 25003;
$strProtocol = "tcp";
//$nProtocol   = getprotobyname($strProtocol);
 
$strAddress = $strProtocol."://".$strHost.":".$nPort;
 
$pStream = new ClientStreamSocket($strAddress);
//$pStream->setBlock(ClientStreamSocket::NOBLOCK);
//$pStream->setTimeOut(1);
var_dump($pStream->read());
$pStream->write("hello from client\n");
var_dump($pStream->read());
$pStream->close();

使用stream_socket_*系列函数创建客户端要简单不少

  • 首先使用stream_socket_client创建一个socket操作流(stream)
  • 然后就可以像操作流式文件那样造成socket stream,使用fread和fwrite进行读写操作,也可以使用stream_socket_recvfrom和stream_socket_sendto进行操作
  • 使用fclose或stream_socket_shutdown关闭连接

使用stream_socket_*系列函数创建一个服务端来与之前的客户端进行交互,同样很简单,也与ServerSocket类似

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
<?php
class ServerStreamSocket{
    protected $pServer = null;
    protected $pClient = null;
    protected $strAddress = "tcp://127.0.0.1:2016";
    protected $nFlag      = STREAM_SERVER_LISTEN;
     
    const BLOCK   = 1;
    const NOBLOCK = 0;
     
    public $strErrorCode = "";
    public $strErrorMsg  = "";
    public function __construct($p_strAddress, $p_nFlag = STREAM_SERVER_LISTEN){
        $this->strAddress = $p_strAddress;
        $this->nFlag = $p_nFlag;
        $this->_create();
    }
    protected function _create(){
        $this->pServer = stream_socket_server($this->strAddress, $this->strErrorCode, $this->strErrorMsg);
        if(!$this->pServer ){
            throw new Exception("create exception:".$this->strErrorMsg, $this->strErrorCode);
        }
        return $this->pServer ;
    }
    public function accept(){
        $this->pClient = stream_socket_accept($this->pServer);
        if(!$this->pClient ){
            return false;
        }
        return $this->pClient ;
    }
    protected function _connect(){
        $this->accept();
        echo "Client". stream_socket_get_name($this->pClient, true)." is now connected to us. \n";
        $this->write("hello world from server\n");
    }
    protected function _reply(){
        $mxData = $this->read();
        var_dump($mxData);
        if($mxData == false){
            fclose($this->pClient);
                 
            echo "client disconnected.\n";
            return false;
        }
        else{
            $strMessage = "Client:".trim($mxData)."\n";
            $this->write($strMessage);
            return true;
        }
    }
    public function run(){
        $this->_connect();
        $this->_reply();
    }
    public function write($p_strMessage){
        //$nLen = fwrite($this->pClient, $p_strMessage);
        $nLen = stream_socket_sendto($this->pClient, $p_strMessage);
        if($nLen !== strlen($p_strMessage))
        {
            throw new Exception('Can not send data');
        }
        return true;
    }
    public function read(){
        //接收一行,阻塞至\n结束
        //$strMessage = fgets($this->pClient);
        //指定长度读取
        //$strMessage = fread($this->pClient, 1024);
        $strMessage = stream_socket_recvfrom($this->pClient, 1024);
        //$strMessage = stream_get_contents($this->pClient);
 
        return $strMessage;
    }
    public function close(){
        fclose($this->pServer);
         
        $this->pServer = null;
    }
    public function setContext(){
 
    }
    public function setTimeOut($p_pConnetction, $p_nTimeOut = 1){
        $bRes = stream_set_timeout($p_pConnetction, $p_nTimeOut);
    }
    public function setBlock($p_pConnetction, $p_nMode = ServerStreamSocket::BLOCK){
        $bRes = stream_set_blocking($p_pConnetction, $p_nMode);
    }
    public function __destruct(){
        if($this->pServer){
            $this->close();
        }
    }
}
class SelectServerStreamSocket extends ServerStreamSocket{
    public function run(){
        $this->loop();
    }
    public function loop(){
        $arrRead = array();
        $arrWrite = $arrExp = null;
        $arrClient = array($this->pServer);
        while(true){
            $arrRead = $arrClient;
            if (stream_select($arrRead, $arrWrite, $arrExp, null) < 1){
                continue;
            }
            if(in_array($this->pServer, $arrRead)){
                $this->_connect();
     
                $arrClient[] = $this->pClient;
     
                $nKey = array_search($this->pServer, $arrRead);
                unset($arrRead[$nKey]);
            }
            foreach($arrRead as $pConnetcion){
                $bRes = $this->_reply();
                if($bRes === false){
                    $nKey = array_search($this->pClient, $arrClient);
                    unset($arrClient[$nKey]);;
                    continue;
                }
            }
        }
        //usleep(100);
    }
}
class EvServerStreamSocket extends ServerStreamSocket{
    protected function _onConnect(){
        $this->_connect();
         
        $pReadClient = new EvIo($this->pClient, Ev::READ, function ($watcher, $revents) {
            $this->_reply();
        });
        Ev::run();
    }
    public function run(){
        $pReadWatcher = new EvIo($this->pServer, Ev::READ, function ($watcher, $revents) {
            $this->_onConnect();
        });
        Ev::run();
    }
}
class MulProcessServerStreamSocket extends EvServerStreamSocket{
    protected function _execute(){
        if(!$this->_reply()){
            //子进程执行完毕,通知父进程
            exit();
        }
    }
    protected function _onConnect(){
        $pid = pcntl_fork();
        if ($pid == -1) {
            die('could not fork');
        } else if ($pid) {
            pcntl_wait($status);
        } else {
            $this->_connect();
 
            $pReadClient = new EvIo($this->pClient, Ev::READ, function ($watcher, $revents) {
                $this->_execute();
            });
            Ev::run();
        }
 
    }
}
$strHost     = "127.0.0.1";
$nPort       = 25003;
$strProtocol = "tcp";
//$nProtocol   = getprotobyname($strProtocol);
 
$strAddress = $strProtocol."://".$strHost.":".$nPort;
 
$pServer = new EvServerStreamSocket($strAddress);
 
$pServer->run();

这里演示客户端与服务端交互,都是两步走,先发送一个请求再获取结果。在Thrift RPC远程调用中,既可先发送请求,过一会儿再来获取结果,达到异步交互的目的;也可发送完请求后立即获取结果,达到同步请求的目的。

参考链接:
Socket Programming in PHP
Socket programming with streams in php
PHP Socket programming tutorial
php 实例说明 socket通信机制
Mpass – PHP做Socket服务的解决方案
How to forcibly close a socket in TIME_WAIT?