标签归档:消息队列

即时通信XMPP服务器ejabberd/Openfire

即时通信的需求,不仅在公共互联网上,比如微信、QQ,也在企业的内部网络,特别是与内部系统的整合,比如OA、客服系统。XMPP,也叫扩展消息与存在协议,是一种以XML为基础的开放式即时通信协议。XMPP不但能够用来做消息通信(单聊/群聊/订阅),也可以做语音通信。XMPP甚至支持多个服务器之间互相连接,互相通信,早期Gtalk就能够与其他XMPP服务器通信。采用XMPP来开发的好处是,已经有大量的开源服务器(ejabberd、Openfire)和客户端(Spark/Smack/converse.js/XMPPFramework)实现,能够快速搭建。
Ejabberd则是ProcessOne出品的,基于Erlang开发的XMPP/MQTT/SIP服务器,内置了WebSocket/文件上传支持,提供更加丰富的REST API。在CentOS64下面安装Ejabberd,会安装到/etc/init.d/ejabberd,直接service start ejabberd就好了,可以使用docker快速启用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
--2020-10-29 08:26:16--  https://static.process-one.net/ejabberd/downloads/20.04/ejabberd-20.04-0.x86_64.rpm
Resolving static.process-one.net... 13.226.36.69, 13.226.36.105, 13.226.36.34, ...
Connecting to static.process-one.net|13.226.36.69|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 18713774 (18M) [application/x-rpm]
Saving to: “ejabberd-20.04-0.x86_64.rpm”
 
100%[====================================================================================================================================================================================================================================>] 18,713,774  2.84M/s   in 9.1s    
 
2020-10-29 08:26:27 (1.96 MB/s) - “ejabberd-20.04-0.x86_64.rpm” saved [18713774/18713774]
 
[root@vagrant-centos64 tmp]# yum localinstall ejabberd-20.04-0.x86_64.rpm
Loaded plugins: fastestmirror
Setting up Local Package Process
Examining ejabberd-20.04-0.x86_64.rpm: ejabberd-20.04-0.x86_64
Marking ejabberd-20.04-0.x86_64.rpm to be installed
Loading mirror speeds from cached hostfile
 * base: mirrors.163.com
 * epel: mirror.math.princeton.edu
 * extras: mirrors.cn99.com
 * updates: mirrors.cn99.com
 * webtatic: uk.repo.webtatic.com
phalcon_stable/signature                                                                                                                                                                                                                               |  819 B     00:00     
phalcon_stable/signature                                                                                                                                                                                                                               |  951 B     00:00 ... 
phalcon_stable-source/signature                                                                                                                                                                                                                        |  819 B     00:00     
phalcon_stable-source/signature                                                                                                                                                                                                                        |  951 B     00:00 ... 
Resolving Dependencies
--> Running transaction check
---> Package ejabberd.x86_64 0:20.04-0 will be installed
--> Finished Dependency Resolution
 
Dependencies Resolved
 
==============================================================================================================================================================================================================================================================================
 Package                                                       Arch                                                        Version                                                        Repository                                                                     Size
==============================================================================================================================================================================================================================================================================
Installing:
 ejabberd                                                      x86_64                                                      20.04-0                                                        /ejabberd-20.04-0.x86_64                                                       29 M
 
Transaction Summary
==============================================================================================================================================================================================================================================================================
Install       1 Package(s)
 
Total size: 29 M
Installed size: 29 M
Is this ok [y/N]: y
Downloading Packages:
Running rpm_check_debug
Running Transaction Test
Transaction Test Succeeded
Running Transaction
  Installing : ejabberd-20.04-0.x86_64                                                                                                                                                                                                                                    1/1 
  Verifying  : ejabberd-20.04-0.x86_64                                                                                                                                                                                                                                    1/1 
 
Installed:
  ejabberd.x86_64 0:20.04-0                                                                                                                                                                                                                                                   
 
Complete!
[root@vagrant-centos64 tmp]# ls  /etc/init.d/ejabberd
/etc/init.d/ejabberd
[root@vagrant-centos64 tmp]# ls -la /opt/ejabberd
total 24
drwxr-xr-x  5 ejabberd ejabberd 4096 Oct 29 08:27 .
drwxr-xr-x. 8 root     root     4096 Oct 29 08:27 ..
drwxr-xr-x  2 ejabberd ejabberd 4096 Oct 29 08:27 conf
drwxr-xr-x  3 ejabberd ejabberd 4096 Oct 29 08:27 database
-r--------  1 ejabberd ejabberd   20 Oct 29 00:00 .erlang.cookie
drwxr-xr-x  2 ejabberd ejabberd 4096 Oct 29 08:27 logs
[root@vagrant-centos64 bin]# service ejabberd start
Starting ejabberd...
done.
[root@vagrant-centos64 sql]# ps aux | grep ejabberd
ejabberd 10097  0.2  7.9 1817344 48144 ?       Sl   08:51   0:04 /opt/ejabberd-20.04/bin/beam.smp -K true -P 250000 -- -root /opt/ejabberd-20.04 -progname /opt/ejabberd-20.04/bin/erl -- -home /opt/ejabberd -- -sname ejabberd@localhost -smp enable -mnesia dir "/opt/ejabberd/database/ejabberd@localhost" -ejabberd log_rate_limit 100 log_rotate_size 10485760 log_rotate_count 1 log_rotate_date "" -s ejabberd -noshell -noinput
ejabberd 10105  0.0  0.0   4060   496 ?        Ss   08:51   0:00 erl_child_setup 1024
ejabberd 10186  0.0  0.0   4052   572 ?        Ss   08:51   0:00 /opt/ejabberd-20.04/lib/os_mon-2.4.7/priv/bin/memsup
root     11215  0.0  0.1 103324   892 pts/4    S+   09:16   0:00 grep ejabberd
[root@vagrant-centos64 bin]# netstat -apn | grep beam
tcp        0      0 127.0.0.1:7777              0.0.0.0:*                   LISTEN      10097/beam.smp      
tcp        0      0 0.0.0.0:37617               0.0.0.0:*                   LISTEN      1599/beam           
tcp        0      0 0.0.0.0:15672               0.0.0.0:*                   LISTEN      1599/beam           
tcp        0      0 0.0.0.0:55672               0.0.0.0:*                   LISTEN      1599/beam           
tcp        0      0 0.0.0.0:43674               0.0.0.0:*                   LISTEN      10097/beam.smp      
tcp        0      0 127.0.0.1:44055             127.0.0.1:4369              ESTABLISHED 1599/beam           
tcp        0      0 127.0.0.1:35415             127.0.0.1:4369              ESTABLISHED 10097/beam.smp      
tcp        0      0 :::5280                     :::*                        LISTEN      10097/beam.smp      
tcp        0      0 :::5443                     :::*                        LISTEN      10097/beam.smp      
tcp        0      0 :::5222                     :::*                        LISTEN      10097/beam.smp      
tcp        0      0 :::5672                     :::*                        LISTEN      1599/beam           
tcp        0      0 :::5269                     :::*                        LISTEN      10097/beam.smp      
tcp        0      0 :::1883                     :::*                        LISTEN      10097/beam.smp      
unix  3      [ ]         STREAM     CONNECTED     1637533 10097/beam.smp   

Ejabberd自带Erlang的数据库服务Mnesia,但有一些限制建议使用第三方的比如MySQL,这样也方便第三方交互(比如认证Token)。初始化的SQL脚本在/opt/ejabberd-20.04/lib/ejabberd-20.04/priv/sql,如果只是支持单个domain的话使用mysql.sql就可以了,多domain支持使用mysql.new.sql

1
2
3
4
[root@vagrant-centos64 sql]# ls
lite.new.sql  lite.sql  mssql.sql  mysql.new.sql  mysql.sql  pg.new.sql  pg.sql
[root@vagrant-centos64 sql]# pwd
/opt/ejabberd-20.04/lib/ejabberd-20.04/priv/sql

更改为使用MySQL存储和认证

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
[root@vagrant-centos64 bin]# cat /opt/ejabberd/conf/ejabberd.yml
###
###'           ejabberd configuration file
###
### The parameters used in this configuration file are explained at
###
###
### The configuration file is written in YAML.
### *******************************************************
### *******           !!! WARNING !!!               *******
### *******     YAML IS INDENTATION SENSITIVE       *******
### ******* MAKE SURE YOU INDENT SECTIONS CORRECTLY *******
### *******************************************************
### Refer to http://en.wikipedia.org/wiki/YAML for the brief description.
###
 
hosts:
  - "vagrant-centos64"
 
loglevel: 4
log_rotate_size: 10485760
log_rotate_date: ""
log_rotate_count: 1
log_rate_limit: 100
 
certfiles:
  - "/opt/ejabberd/conf/server.pem"
##  - "/etc/letsencrypt/live/localhost/fullchain.pem"
##  - "/etc/letsencrypt/live/localhost/privkey.pem"
 
ca_file: "/opt/ejabberd/conf/cacert.pem"
 
listen:
  -
    port: 5222
    ip: "::"
    module: ejabberd_c2s
    max_stanza_size: 262144
    shaper: c2s_shaper
    access: c2s
    starttls_required: true
  -
    port: 5269
    ip: "::"
    module: ejabberd_s2s_in
    max_stanza_size: 524288
  -
    port: 5443
    ip: "::"
    module: ejabberd_http
    tls: true
    request_handlers:
      "/admin": ejabberd_web_admin
      "/api": mod_http_api
      "/bosh": mod_bosh
      "/captcha": ejabberd_captcha
      "/upload": mod_http_upload
      "/ws": ejabberd_http_ws
      "/oauth": ejabberd_oauth
  -
    port: 5280
    ip: "::"
    module: ejabberd_http
    request_handlers:
      "/admin": ejabberd_web_admin
  -
    port: 1883
    ip: "::"
    module: mod_mqtt
    backlog: 1000
 
s2s_use_starttls: optional
 
acl:
  local:
    user_regexp: ""
  loopback:
    ip:
      - 127.0.0.0/8
      - ::1/128
      - ::FFFF:127.0.0.1/128
  admin:
    user:
      - "admin@vagrant-centos64"
 
access_rules:
  local:
    allow: local
  c2s:
    deny: blocked
    allow: all
  announce:
    allow: admin
  configure:
    allow: admin
  muc_create:
    allow: local
  pubsub_createnode:
    allow: local
  trusted_network:
    allow: loopback
 
api_permissions:
  "console commands":
    from:
      - ejabberd_ctl
      - mod_http_api
    who: all
    what: "*"
  "admin access":
    who:
      access:
        allow:
          acl: loopback
          acl: admin
      oauth:
        scope: "ejabberd:admin"
        access:
          allow:
            acl: loopback
            acl: admin
    what:
      - "*"
      - "!stop"
      - "!start"
  "public commands":
    who:
      ip: 127.0.0.1/8
    what:
      - status
      - connected_users_number
 
shaper:
  normal: 1000
  fast: 50000
 
shaper_rules:
  max_user_sessions: 10
  max_user_offline_messages:
    5000: admin
    100: all
  c2s_shaper:
    none: admin
    normal: all
  s2s_shaper: fast
 
max_fsm_queue: 10000
 
acme:
   contact: "mailto:admin@vagrant-centos64"
 
modules:
  mod_adhoc: {}
  mod_admin_extra: {}
  mod_announce:
    access: announce
  mod_avatar: {}
  mod_blocking: {}
  mod_bosh: {}
  mod_caps: {}
  mod_carboncopy: {}
  mod_client_state: {}
  mod_configure: {}
  mod_disco: {}
  mod_fail2ban: {}
  mod_http_api: {}
  mod_http_upload:
    put_url: https://@HOST@:5443/upload
  mod_last: {}
  mod_mam:
    ## Mnesia is limited to 2GB, better to use an SQL backend
    ## For small servers SQLite is a good fit and is very easy
    ## to configure. Uncomment this when you have SQL configured:
    ## db_type: sql
    assume_mam_usage: true
    default: never
  mod_mqtt: {}
  mod_muc:
    access:
      - allow
    access_admin:
      - allow: admin
    access_create: muc_create
    access_persistent: muc_create
    access_mam:
      - allow
    default_room_options:
      allow_subscription: true  # enable MucSub
      mam: false
  mod_muc_admin: {}
  mod_offline:
    access_max_user_messages: max_user_offline_messages
  mod_ping: {}
  mod_privacy: {}
  mod_private: {}
  mod_proxy65:
    access: local
    max_connections: 5
  mod_pubsub:
    access_createnode: pubsub_createnode
    plugins:
      - flat
      - pep
    force_node_config:
      ## Avoid buggy clients to make their bookmarks public
      storage:bookmarks:
        access_model: whitelist
  mod_push: {}
  mod_push_keepalive: {}
  mod_register:
    ## Only accept registration requests from the "trusted"
    ## network (see access_rules section above).
    ## Think twice before enabling registration from any
    ## address. See the Jabber SPAM Manifesto for details:
    ip_access: trusted_network
  mod_roster:
    versioning: true
  mod_s2s_dialback: {}
  mod_shared_roster: {}
  mod_stream_mgmt:
    resend_on_timeout: if_offline
  mod_vcard: {}
  mod_vcard_xupdate: {}
  mod_version:
    show_os: false
 
auth_method: sql
 
 
sql_type: mysql
sql_server: "localhost"
sql_database: "ejabberd"
sql_username: "ejabberd"
sql_password: "ejabberd"
 
default_db: sql
## If you want to specify the port:
#sql_port: 3306
### Local Variables:
### mode: yaml
### End:
### vim: set filetype=yaml tabstop=8

试着注册一下管理员

1
2
[root@vagrant-centos64 bin]# ./ejabberdctl register admin1 localhost admin
Error: cannot_register

失败了,这是因为hostname的关系,在ejabberd.yml配置的host并不是localhost,改一下注册的domain就好了,具体可以参考这里这里

1
2
3
4
5
6
[root@vagrant-centos64 bin]# ./ejabberdctl status
The node ejabberd@localhost is started with status: started
[root@vagrant-centos64 logs]# hostname -s
vagrant-centos64
[root@vagrant-centos64 bin]# ./ejabberdctl register admin vagrant-centos64 admin
User admin@vagrant-centos64 successfully registered

访问http://127.0.0.1:5280/admin就可以进入到管理界面了。管理界面比较简单,只能管理用户/消息/聊天室,完整的xmpp协议服务器是支持的,客户端对应实现就好了。


使用conversejs快速搭建一个web客户端来验证一下,创建chat.html

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<html>
    <head>
        <title>chat</title>
        <link rel="stylesheet" type="text/css" media="screen" href="https://cdn.conversejs.org/dist/converse.min.css">
        <script src="https://cdn.conversejs.org/dist/converse.min.js" charset="utf-8"></script>
    </head>
    <body>
        <script>
            converse.initialize({
                //bosh_service_url: 'http://chat.vagrant-centos64.com/bosh',
                websocket_url: 'wss:chat.vagrant-centos64.com/ws',
                //show_controlbox_by_default: true,
                view_mode: 'fullscreen'
            });
        </script>
    </body>
</html>

打开浏览器,访问对应的url登录就可以看到了聊天界面了。这里使用的域名是经过nginx代理转发过的了,配置可以参考API 网关 Kong,如果使用nginx-proxy-servier需要打开Websockets Support,以便进行协议升级101 Switching Protocols。



conversejs配置里面有两个url,一个是BOSH(Bidirectional-streams Over Synchronous HTTP)的,即HTTP长连接,以便通服务器即时交互,获取消息;另一个是WebSocket,基于TCP的,客户端/服务器双向通信。BOHS需要浏览器定时发起一个请求直致服务器返回消息,而WebSocket可以像其他TCP那样双向通信,灵活的多。如果服务端不支持Websocket协议升级或者连接失败conversejs会自动切换使用BOSH通信。如果使用https://www.websocket.org/echo.html来测试连接,需要遵循XMPP协议要求,先认证,否则连接会被关闭。conversejs的认证是在Websocket里面的做的,并不基于cookie之类的,所以一个浏览器打开多个聊天窗口登录不同的账户也是可以的



Ejabberd支持REST API来做交互比如查看在线用户、发送消息

1
2
➜  chat-example git:(master) ✗ curl -k https://192.168.33.14:5443/api/connected_users
["admin@vagrant-centos64/converse.js-16142875"]                                                                                                                                                                                                                              ➜  chat-example git:(master) ✗ curl -k https://192.168.33.14:5443/api/send_message -X POST -d '{"type":"headline","from":"test@vagrant-centos64","to":"admin@vagrant-centos64","subject":"Restart","body":"In 5 minutes"}'

这些API将大大增加ejabberd与第三方软件的交互。虽然ejabberd支持OAuth认证,但那是以ejabberd为账户中心的认证,方便其他系统调用ejabberd功能。通常即时通信只是内部系统的一部分,账户中心部署在其他地方,所以需要ejabberd支持外部的认证。前边已经配置ejabberd为数据库认证,还可以配置为使用LDAP认证。如果不满足,还可以配置为外部脚本认证或者使用第三方开发的HTTP认证。对于简单的内部交互,可以将认证服务的token刷新到ejabberd的数据库即可。
Openfire则是ignite realtime出品的Java实现的XMPP服务器,同时提供Java客户端Spark、Java开发库Smack,还提供Chrome扩展Pade。Openfire提供基于web的管理界面,支持LDAP登录及数据库存储。有一些功能Openfire并不直接支持,比如API,而是以扩展的形式支持,包括用户管理/分组管理/聊天室管理/消息广播/邮件监听/WebSocket/Meeting/Sip等等。

用户管理

配置用户分组,可以配置部门之类的,会出现在用户的个人分组里面(Spark)

在线会话管理,可以踢人


创建聊天室之前需要创建对应的service,默认的service叫conference。
Openfire的API并不支持OAuth/SSO,简单的1V1消息发送,聊天室消息订阅等等,需要自己基于Java扩展。
Ejabberd是采用Erlang开发的,一如消息队列服务器RabbitMQ,具有极高吞吐能力,提供REST API/XMl RPC,方便交互;Openfire则易于管理和扩展,采用哪个软件进行开发需要结合企业实际进行考量。
XMPP服务器主要提供消息聊天,对于语音服务可以使用SIP服务器实现,结合客户端sip.js,比如ctxSip

参考链接:
TCP UDP探索
SIP(会话发起协议)

PHP ZeroMQ开发

ZeroMQ的名字有点巧妙,看起来是个MQ却加了个0,变得不是MQ。ZeroMQ是一个面向消息传递的网络通信框架,支持程序在进程内部部通信,进程之间通信,网络间通信,多播等。ZeroMQ对Socket进行了封装,支持多种网络结构范式如Request/Reply,Pub/Sub,Pull/Push,中介,路由等,还可以在这些模式再次扩展,动态扩容程序和分布式任务开发,能够轻易搭建服务程序集群。

ZeroMQ与支持AMQP的消息中间件不一样,ZeroMQ是一个网络通信库,需要自行实现中间节点和消息的管理。

在CentOS安装ZeroMQ4

1
2
3
4
5
6
7
8
9
10
11
12
git clone https://github.com/zeromq/zeromq4-x.git
cd zeromq4-x
./autogent.sh
./configure
sudo make
sudo make install
 
#声明libzmq库的位置
sudo vim /etc/ld.so.conf.d/libzmq.conf
#内容:/usr/local/lib
 
sudo ldconfig

ZeroMQ支持多种编程语言,也包括PHP。php-zmq安装

1
2
3
4
5
6
7
8
git clone https://github.com/mkoppanen/php-zmq.git
cd php-zmq
phpize
./configure
sudo make
sudo make install
 
sudo vim /etc/php.ini

编辑PHP.ini增加扩展信息

1
2
[zeromq]
extension = zmq.so

查看扩展是否加载成功:php -m | grep php。
先写一个简单请求-应答,首先是服务端reply.php

1
2
3
4
5
6
7
8
9
<?php
$pContext = new ZMQContext();
$pServer  = new ZMQSocket($pContext, ZMQ::SOCKET_REP);
$pServer->bind("tcp://*:5559");
while (true){
    $strMessage = $pServer->recv();
    echo $strMessage."\r\n";
    $pServer->send("From Server1:".$strMessage);
}

然后是客户端request.php

1
2
3
4
5
6
<?php
$pContext = new ZMQContext();
$pClient  = new ZMQSocket($pContext, ZMQ::SOCKET_REQ);
$pClient->connect("tcp://localhost:5559");
$pClient->send("Hello From Client:".uniqid());
var_dump($pClient->recv());

分别在不同终端预先一下程序

1
2
3
4
#请求者可以先启动
php request.php
#另一个终端
php reply.php

使用ZeroMQ进行通信的步骤

  • 使用ZMQContext创建一个上下文
  • 使用上下文初始化ZMQSocket,这里需要指明socket类型(ZMQ::SOCKET_开头),组合模式包括
    • PUB,SUB
    • REQ,REP
    • REQ,ROUTER (take care, REQ inserts an extra null frame)
    • DEALER,REP (take care, REP assumes a null frame)
    • DEALER,ROUTER
    • DEALER,DEALER
    • ROUTER,ROUTER
    • PUSH,PULL
    • PAIR,PAIR

    分类包括

    • 轮询,REQ,PUSH,DEALER
    • 多播,PUB
    • 公平排队,REP,SUB,PULL,DEALER
    • 明确寻址,ROUTER
    • 单播,PAIR
  • 如果是服务端就bind,如果是客户端就conncet,这里的连接信息支持
    • 进程内部通信,inproc://
    • 进程间通信,ipc://
    • 网络间通信,tcp://
    • 多播,pgm://
  • 使用send/recv发送/接收消息

使用ZeroMQ创建通信比socket简单多了,与stream_socket差不多。但是使用ZeroMQ,客户端可以先启动而不用管服务端是否已经启动了,等服务端连接上了便会自动传递消息,还可以维持节点之间的心跳。

ZeroMQ与socket通信是不一样的。ZeroMQ是无状态的,对socket的细节进行了封装,不能知道彼此的socket连接信息,仅能接收和发送消息;ZeroMQ能够使用一个socket与多个节点进行通信,具有极高的性能。

再回头看一下服务端程序,这里采用while循环来处理,亦即同一时刻只能处理一个请求,多个请求排队直到被轮询到,客户端的发送和接收都是同步等待。由于不知道客户端信息,也不能在子进程内处理完成再返回。这里就需要用到ZeroMQ各种范式的组合,比如下面这个
fig16
这里使用ROUTER和DEALER作为中介,转发请求,客户端可以异步发送求,不用等待服务端响应。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
<?php
$pContext = new ZMQContext();
$pFrontend = new ZMQSocket($pContext, ZMQ::SOCKET_ROUTER);
$pBackend = new ZMQSocket($pContext, ZMQ::SOCKET_DEALER);
$pFrontend->bind("tcp://*:5559");
$pBackend->bind("tcp://*:5560");
 
$pPoll = new ZMQPoll();
$pPoll->add($pFrontend, ZMQ::POLL_IN);
$pPoll->add($pBackend, ZMQ::POLL_IN);
 
 
$arrRead = $arrWrite = array();
while(true){
    $nEvent = $pPoll->poll($arrRead, $arrWrite);
    if ($nEvent > 0) {
        foreach($arrRead as $pSocket){
            if($pSocket === $pFrontend){
                while (true){
                    $strMessage = $pSocket->recv();
                    $nMore = $pSocket->getSockOpt(ZMQ::SOCKOPT_RCVMORE);
                    $pBackend->send($strMessage,$nMore ? ZMQ::MODE_SNDMORE : null);
                    if(!$nMore){
                        break;
                    }
                }
            }
            else if ($pSocket === $pBackend){
                while (true){
                    $strMessage = $pSocket->recv();
                    $nMore = $pSocket->getSockOpt(ZMQ::SOCKOPT_RCVMORE);
                    $pFrontend->send($strMessage,$nMore ? ZMQ::MODE_SNDMORE : null);
                    if(!$nMore){
                        break;
                    }
                }
            }
        }
    }
}

然后更改服务端reply.php,不再绑定监听,而不是连接到DEALER上

1
2
3
4
5
6
7
8
9
10
<?php
$pContext = new ZMQContext();
$pServer  = new ZMQSocket($pContext, ZMQ::SOCKET_REP);
//$pServer->bind("tcp://*:5555");
$pServer->connect("tcp://localhost:5560");
while (true){
    $strMessage = $pServer->recv();
    echo $strMessage."\r\n";
    $pServer->send("From Server1:".$strMessage);
}

这里使用ZMQPoll对ZMQSOcket的输入输出事件进行轮询,将ROUTER收到的REQ转发给服务端,将DEALER收到的REP转发给客户端。事实上,还有更简便的方法:使用ZMQDevice将ROUTER和DEALER组合起来

1
2
3
4
5
6
7
8
9
<?php
$pContext = new ZMQContext();
$pFrontend = new ZMQSocket($pContext, ZMQ::SOCKET_ROUTER);
$pBackend = new ZMQSocket($pContext, ZMQ::SOCKET_DEALER);
$pFrontend->bind("tcp://*:5559");
$pBackend->bind("tcp://*:5560");
 
$pDevice = new ZMQDevice($pFrontend, $pBackend);
$pDevice->run();

ZeroMQ的Pub/Sub的通信模型支持一个发布者发布消息给多个订阅者,也支持一个订阅者从多个发布者订阅消息。首先写一个发布者

1
2
3
4
5
6
7
8
9
10
11
12
<?php
$pContext = new ZMQContext();
$pPublisher = new ZMQSocket($pContext, ZMQ::SOCKET_PUB);
$pPublisher->bind("tcp://*:5563");
 
while (true) {
    $pPublisher->send("A", ZMQ::MODE_SNDMORE);
    $pPublisher->send("1:We don't want to see this");
    $pPublisher->send("B", ZMQ::MODE_SNDMORE);
    $pPublisher->send("1:We would like to see this");
    sleep (1);
}

然后是订阅者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
$pContext = new ZMQContext();
$pSubscriber = new ZMQSocket($pContext, ZMQ::SOCKET_SUB);
$pSubscriber->connect("tcp://localhost:5563");
#可以连接多个发布者
$pSubscriber->connect("tcp://localhost:5564");
$pSubscriber->setSockOpt(ZMQ::SOCKOPT_SUBSCRIBE, "B");
 
while (true) {
    //  Read envelope with address
    $address = $pSubscriber->recv();
    //  Read message contents
    $contents = $pSubscriber->recv();
    printf ("[%s] %s%s", $address, $contents, PHP_EOL);
}

Pub/Sub模型,发布者只能发布消息,要求发布消息前,先声明主题(地址),然后发布消息内容;订阅者只能接收消息,先设置订阅主题,然后两次接收,第一次为消息主题,第二次为消息内容。
Pub/Sub模型通消息为单向流动,可以结合其他模型让订阅者与发布者互动,比如REQ\REP。

ZeroMQ的Push/Pull模型,生产者负责推送消息,消费者负责拉取消息。初看之下Pull/Push模型与Pub/sub模型类似,但是Pull/Push下生产者产生的消息只会投递给一个消费者,并不会发布给全部消费者,适合用于任务投递分配
fig5
Push和Pull都既可作为服务端,也可作为客户端。服务端Push.php

1
2
3
4
5
6
7
8
9
<?php
$pContext = new ZMQContext();
$pPush = new ZMQSocket($pContext, ZMQ::SOCKET_PUSH);
 
$pPush->bind("tcp://*:5558");
//$pPush->connect("tcp://localhost:5558");
//$pPush->connect("tcp://localhost:5559");
 
$pPush->send("Hello Client 1");

客户端Pull.php

1
2
3
4
5
6
7
8
9
<?php
$pContext = new ZMQContext();
$pPull = new ZMQSocket($pContext, ZMQ::SOCKET_PULL);
 
//$pPull->bind("tcp://*:5558");
$pPull->connect("tcp://localhost:5558");
$pPull->connect("tcp://localhost:5559");
 
var_dump($pPull->recv());

如果同时启动了两个客户端Pull.php,而只启动一个服务端Push.php,那么一次只会有一个客户端接收到消息。也可以以Pull作为主动监听,Push作为被动连接。可以同时接可以Pub/Sub和Pull/Push来处理任务
fig56
如果是用ZeroMQ传递消息收不到,可以按下面这个流程查问题
chapter1_9
除了客户端可以连接多个服务端,服务端同样可以绑定多个地址。在REQ/REP模型里,让服务端同时使用IPC(进程间通信)来处理本机的连接

1
2
3
4
5
6
7
8
9
10
<?php
$pContext = new ZMQContext();
$pServer  = new ZMQSocket($pContext, ZMQ::SOCKET_REP);
$pServer->bind("tcp://*:5556");
$pServer->bind("ipc:///tmp/req.ipc");
while(true){
    $message = $pServer->recv();
    echo $message . PHP_EOL;
    $pServer->send("Hello from server1:".$message);
}

客户端可以选择走TCP或者IPC进行消息通信

1
2
3
4
5
6
7
8
<?php
$pContext = new ZMQContext();
$pClient  = new ZMQSocket($pContext, ZMQ::SOCKET_REQ);
$pClient->connect("ipc:///tmp/req.ipc");
//$pClient->connect("tcp://localhost:5556");
$pClient->send("Hello From Client1:".uniqid());
$strMessage = $pClient->recv();
echo $strMessage,PHP_EOL;

使用ZeroMQ的进程内部消息通信也很简单

1
2
3
4
5
6
7
8
9
$pServer  = new ZMQSocket(new ZMQContext(), ZMQ::SOCKET_REP);
$pServer->bind("inproc://reply");
 
 
$pClient  = new ZMQSocket(new ZMQContext(), ZMQ::SOCKET_REQ);
$pClient->connect("inproc://reply");;
$pClient->send("Hello From Client1:".uniqid());
 
var_dump($pServer->recv());

ZeroMQ为消息传递的提供极简的方法,提供了各种连接模型,可以自由扩展。zguide更像是一个网络编程指南,指导大家如何利用ZeroMQ搭建各种网络通信模式,提高程序扩展性和健壮性。虽然ZeroMQ解决了进程间和网络间的通信问题,但是各个组件本身进程控制仍然需要自行实现。

更新:ZeroMQ的作者用C语言创建了另外一个支持多种通用通信范式的socket库:nanomsg,可以用来代替ZeroMQ做的那些事,提供了更好的伸缩性,也有对应的PHP扩展

参考链接:
ZMQ 指南
ZeroMQ in PHP
zeromq is the answer
ZeroMQ + libevent in PHP
Europycon2011: Implementing distributed application using ZeroMQ
Getting Started with ‘nanomsg’
A Look at Nanomsg and Scalability Protocols (Why ZeroMQ Shouldn’t Be Your First Choice)

PHP 进程间通信

上一篇介绍了PHP的多进程开发,进程间通过信号进行交互。这里介绍一下PHP的进程间通信(IPC)方法,包括基于System V IPC通信,如信号量、进程间消息、共享内存和基于socket的IPC通信。

信号量主要作为不同进程以及同一进程不同线程之间的同步手段。信号量是一个特殊的变量,程序对其访问都是原子操作,且只允许对它进行等待和发送信息操作。如果信号量是一个任意的整数,通常被称为计数信号量,或一般信号量;如果信号量只有二进制的0或1,称为二进制信号量。在linux系中,二进制信号量又称Mutex,互斥锁。以下例子采用信号量来协调进程对资源的访问

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
<?php
$key = ftok ( __FILE__, 's' );
// 同时最多只能有一个进程进入临界区
$sem_id = sem_get ( $key, 1 );
echo "This is a room,can only stay one people!\n\r";
// 派生子进程
$pid = pcntl_fork ();
if ($pid == - 1) {
    exit ( 'fork failed!' );
} else if ($pid > 0) {
    $name = 'parent';
} else {
    $name = 'child';
}
echo "{$name} want to enter the room \n";
sem_acquire ( $sem_id );
// 原子操作开始
echo "{$name} in the room , other people can't enter!\n";
sleep ( 3 );
echo "{$name} leave the room\n";
// 原子操作结束
sem_release ( $sem_id );
if ($pid > 0) {
    pcntl_waitpid ( $pid, $status );
    sem_remove ( $sem_id ); // 移除信号量
}

sem_get和sem_remove分别为创建和销毁信号量。当前进程(父进程)通过sem_acquire获取到信号量后其他进程(子进程)将会一直阻塞直到获取到信号量;在sem_acquire和sem_release之间操作都将是原子性的;当前进程通过sem_release释放所请求的信号量,其他进程便使用,从而实现对资源的有序访问。sem_acquire是阻塞操作,即之后的程序都需要等待获取到信号量后才能继续执行。使用多个信号量控制,需要注意是否会造成死锁。

消息队列提供了一种从一个进程向另一个进程异步发送一个数据块的方法,消息的发送者和接收者不需要同时与消息队列互交。消息会保存在队列中,直到接收者取回它。有足够权限的进程可以向队列中添加消息,被赋予读权限的进程则可以读走队列中的消息。以下是PHP子进程使用消息队列与父进程进行通信

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// 生成key
$message_queue_key = ftok ( __FILE__, 'a' );
// 根据生成的key新建队列,也可自定,如123456
$message_queue = msg_get_queue ( $message_queue_key, 0666 );
 
$pids = array ();
for($i = 0; $i < 5; $i ++) {
    // 创建子进程
    $pids [$i] = pcntl_fork ();
     
    if ($pids [$i]) {
        echo "No.$i child process was created, the pid is $pids[$i]\r\n";
        pcntl_wait ( $status ); // 非阻塞的线程等待,防止僵尸进程的出现
    } elseif ($pids [$i] == 0) {
        $pid = posix_getpid ();
        echo "process.$pid is writing now\r\n";
        // 写队列
        msg_send ( $message_queue, 1, "this is process.$pid's data\r\n" );
        posix_kill ( $pid, SIGTERM );
    }
}
 
do {
    // 读队列
    msg_receive ( $message_queue, 0, $message_type, 1024, $message, true, MSG_IPC_NOWAIT );
    echo $message;
    // 获取队列内消息数
    $a = msg_stat_queue ( $message_queue );
    if ($a ['msg_qnum'] == 0) {
        break;
    }
} while ( true );

消息队列存在消息大小及队列长度的限制,一旦超过将写不进去。

共享内存使得多个进程可以访问同一块内存空间,是最快的可用IPC形式。一个进程在内存创建了一个共享区域,其他进程也可以对这块内存区域进行访问

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
<?php
//Check the command line arguments
if(sizeof($argv) < 2) {
     echo  "Usage: php shared_memory.php <send|get|delete> <integer identifier> <value>\n";
     exit;
}
  
//Define shared memory segment properties.
$key = "987654";
$permissions = 0666;
$size = 1024;
  
//Create or open the shared memory segment.
$segment = shm_attach($key, $size, $permissions);
  
//Handle operations for the segment.
switch($argv[1]) {
     case "send":
          shm_put_var($segment, $argv[2], $argv[3]);
          echo "Message sent to shared memory segment.\n";
          break;
     case "get":
          $data = shm_get_var($segment, $argv[2]);
          echo "Received data: {$data}\n";
          break;
     case "delete":
          shm_remove($segment);
          echo "Shared memory segment released.\n";
          break;
}

共享内存并未提供同步机制,往往与其它通信机制,如信号量结合使用,来达到进程间的同步及互斥。PHP还有另外一个共享内存扩展:shmop。注意,共享内存存在内存限制。

Sockets IPC提供了进程间双向的点对点通信。通过socket_create_pair创建一对socket,作为上行和下行,父进程和子进程分别使用其中一个进行读写通信

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
<?php
$sockets = array();
$strone = 'Message From Parent.';
$strtwo = 'Message From Child.';
 
if (socket_create_pair(AF_UNIX, SOCK_STREAM, 0, $sockets) === false) {
    echo "socket_create_pair() failed. Reason: ".socket_strerror(socket_last_error());
}
$pid = pcntl_fork();
if ($pid == -1) {
    echo 'Could not fork Process.';
} elseif ($pid) {
    /*parent*/
    socket_close($sockets[0]);
    if (socket_write($sockets[1], $strone, strlen($strone)) === false) {
        echo "socket_write() failed. Reason: ".socket_strerror(socket_last_error($sockets[1]));
    }
    if (socket_read($sockets[1], strlen($strtwo), PHP_BINARY_READ) == $strtwo) {
        echo "Recieved $strtwo\n";
    }
    socket_close($sockets[1]);
} else {
    /*child*/
    socket_close($sockets[1]);
    if (socket_write($sockets[0], $strtwo, strlen($strtwo)) === false) {
        echo "socket_write() failed. Reason: ".socket_strerror(socket_last_error($sockets[0]));
    }
    if (socket_read($sockets[0], strlen($strone), PHP_BINARY_READ) == $strone) {
        echo "Recieved $strone\n";
    }
    socket_close($sockets[0]);
}

以下例子是github上一个利用PHP Sockets IP开发的多进程任务处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
<?php
namespace Lifo\IPC;
 
declare(ticks = 1);
 
interface ProcessInterface
{
    public function run($parent);
}
 
class ProcessPoolException extends \Exception
{
     
}
 
class ProcessPool
{
    /** @var Integer Maximum workers allowed at once */
    protected $max;
    /** @var boolean If true workers will fork. Otherwise they will run synchronously */
    protected $fork;
    /** @var Integer Total results collected */
    protected $count;
    /** @var array Pending processes that have not been started yet */
    protected $pending;
    /** @var array Processes that have been started */
    protected $workers;
    /** @var array Results that have been collected */
    protected $results;
    /** @var \Closure Function to call every time a child is forked */
    protected $createCallback;
    /** @var array children PID's that died prematurely */
    private   $caught;
    /** @var boolean Is the signal handler initialized? */
    private   $initialized;
    private static $instance = array();
    public function __construct($max = 1, $fork = true)
    {
        //$pid = getmypid();
        //if (isset(self::$instance[$pid])) {
        //    $caller = debug_backtrace();
        //    throw new ProcessPoolException("Cannot instantiate more than 1 ProcessPool in the same process in {$caller[0]['file']} line {$caller[0]['line']}");
        //}
        //self::$instance[$pid] = $this;
        $this->count = 0;
        $this->max = $max;
        $this->fork = $fork;
        $this->results = array();
        $this->workers = array();
        $this->pending = array();
        $this->caught = array();
        $this->initialized = false;
    }
    public function __destruct()
    {
        // make sure signal handler is removed
        $this->uninit();
        //unset(self::$instance[getmygid()]);
    }
    /**
     * Initialize the signal handler.
     *
     * Note: This will replace any current handler for SIGCHLD.
     *
     * @param boolean $force Force initialization even if already initialized
     */
    private function init($force = false)
    {
        if ($this->initialized and !$force) {
            return;
        }
        $this->initialized = true;
        pcntl_signal(SIGCHLD, array($this, 'signalHandler'));
    }
    private function uninit()
    {
        if (!$this->initialized) {
            return;
        }
        $this->initialized = false;
        pcntl_signal(SIGCHLD, SIG_DFL);
    }
    public function signalHandler($signo)
    {
        switch ($signo) {
            case SIGCHLD:
                $this->reaper();
                break;
        }
    }
    /**
     * Reap any dead children
     */
    public function reaper($pid = null, $status = null)
    {
        if ($pid === null) {
            $pid = pcntl_waitpid(-1, $status, WNOHANG);
        }
        while ($pid > 0) {
            if (isset($this->workers[$pid])) {
                // @todo does the socket really need to be closed?
                //@socket_close($this->workers[$pid]['socket']);
                unset($this->workers[$pid]);
            } else {
                // the child died before the parent could initialize the $worker
                // queue. So we track it temporarily so we can handle it in
                // self::create().
                $this->caught[$pid] = $status;
            }
            $pid = pcntl_waitpid(-1, $status, WNOHANG);
        }
    }
    /**
     * Wait for any child to be ready
     *
     * @param integer $timeout Timeout to wait (fractional seconds)
     * @return array|null Returns array of sockets ready to be READ or null
     */
    public function wait($timeout = null)
    {
        $x = null;                      // trash var needed for socket_select
        $startTime = microtime(true);
        while (true) {
            $this->apply();                         // maintain worker queue
            // check each child socket pair for a new result
            $read = array_map(function($w){ return $w['socket']; }, $this->workers);
            // it's possible for no workers/sockets to be present due to REAPING
            if (!empty($read)) {
                $ok = @socket_select($read, $x, $x, $timeout);
                if ($ok !== false and $ok > 0) {
                    return $read;
                }
            }
            // timed out?
            if ($timeout and microtime(true) - $startTime > $timeout) {
                return null;
            }
            // no sense in waiting if we have no workers and no more pending
            if (empty($this->workers) and empty($this->pending)) {
                return null;
            }
        }
    }
    /**
     * Return the next available result.
     *
     * Blocks unless a $timeout is specified.
     *
     * @param integer $timeout Timeout in fractional seconds if no results are available.
     * @return mixed Returns next child response or null on timeout
     * @throws ProcessPoolException On timeout if $nullOnTimeout is false
     */
    public function get($timeout = null, $nullOnTimeout = false)
    {
        $startTime = microtime(true);
        while ($this->getPending()) {
            // return the next result
            if ($this->hasResult()) {
                return $this->getResult();
            }
            // wait for the next result
            $ready = $this->wait($timeout);
            if (is_array($ready)) {
                foreach ($ready as $socket) {
                    $res = self::socket_fetch($socket);
                    if ($res !== null) {
                        $this->results[] = $res;
                        $this->count++;
                    }
                }
                if ($this->hasResult()) {
                    return $this->getResult();
                }
            }
            // timed out?
            if ($timeout and microtime(true) - $startTime > $timeout) {
                if ($nullOnTimeout) {
                    return null;
                }
                throw new ProcessPoolException("Timeout");
            }
        }
    }
    /**
     * Return results from all workers.
     *
     * Does not return until all pending workers are complete or the $timeout
     * is reached.
     *
     * @param integer $timeout Timeout in fractional seconds if no results are available.
     * @return array Returns an array of results
     * @throws ProcessPoolException On timeout if $nullOnTimeout is false
     */
    public function getAll($timeout = null, $nullOnTimeout = false)
    {
        $results = array();
        $startTime = microtime(true);
        while ($this->getPending()) {
            try {
                $res = $this->get($timeout);
                if ($res !== null) {
                    $results[] = $res;
                }
            } catch (ProcessPoolException $e) {
                // timed out
            }
            // timed out?
            if ($timeout and microtime(true) - $startTime > $timeout) {
                if ($nullOnTimeout) {
                    return null;
                }
                throw new ProcessPoolException("Timeout");
            }
        }
        return $results;
    }
    public function hasResult()
    {
        return !empty($this->results);
    }
    /**
     * Return the next available result or null if none are available.
     *
     * This does not wait or manage the worker queue.
     */
    public function getResult()
    {
        if (empty($this->results)) {
            return null;
        }
        return array_shift($this->results);
    }
    /**
     * Apply a worker to the working or pending queue
     *
     * @param Callable $func Callback function to fork into.
     * @return ProcessPool
     */
    public function apply($func = null)
    {
        // add new function to pending queue
        if ($func !== null) {
            if ($func instanceof \Closure or $func instanceof ProcessInterface or is_callable($func)) {
                $this->pending[] = func_get_args();
            } else {
                throw new \UnexpectedValueException("Parameter 1 in ProcessPool#apply must be a Closure or callable");
            }
        }
        // start a new worker if our current worker queue is low
        if (!empty($this->pending) and count($this->workers) < $this->max) {
            call_user_func_array(array($this, 'create'), array_shift($this->pending));
        }
        return $this;
    }
    /**
     * Create a new worker.
     *
     * If forking is disabled this will BLOCK.
     *
     * @param Closure $func Callback function.
     * @param mixed Any extra parameters are passed to the callback function.
     * @throws \RuntimeException if the child can not be forked.
     */
    protected function create($func /*, ...*/)
    {
        // create a socket pair before forking so our child process can write to the PARENT.
        $sockets = array();
        $domain = strtoupper(substr(PHP_OS, 0, 3)) == 'WIN' ? AF_INET : AF_UNIX;
        if (socket_create_pair($domain, SOCK_STREAM, 0, $sockets) === false) {
            throw new \RuntimeException("socket_create_pair failed: " . socket_strerror(socket_last_error()));
        }
        list($child, $parent) = $sockets; // just to make the code below more readable
        unset($sockets);
        $args = array_merge(array($parent), array_slice(func_get_args(), 1));
        $this->init();                  // make sure signal handler is installed
        if ($this->fork) {
            $pid = pcntl_fork();
            if ($pid == -1) {
                throw new \RuntimeException("Could not fork");
            }
            if ($pid > 0) {
                // PARENT PROCESS; Just track the child and return
                socket_close($parent);
                $this->workers[$pid] = array(
                    'pid' => $pid,
                    'socket' => $child,
                );
                // don't pass $parent to callback
                $this->doOnCreate(array_slice($args, 1));
                // If a SIGCHLD was already caught at this point we need to
                // manually handle it to avoid a defunct process.
                if (isset($this->caught[$pid])) {
                    $this->reaper($pid, $this->caught[$pid]);
                    unset($this->caught[$pid]);
                }
            } else {
                // CHILD PROCESS; execute the callback function and wait for response
                socket_close($child);
                try {
                    if ($func instanceof ProcessInterface) {
                        $result = call_user_func_array(array($func, 'run'), $args);
                    } else {
                        $result = call_user_func_array($func, $args);
                    }
                    if ($result !== null) {
                        self::socket_send($parent, $result);
                    }
                } catch (\Exception $e) {
                    // this is kind of useless in a forking context but at
                    // least the developer can see the exception if it occurs.
                    throw $e;
                }
                exit(0);
            }
        } else {
            // forking is disabled so we simply run the child worker and wait
            // synchronously for response.
            try {
                if ($func instanceof ProcessInterface) {
                    $result = call_user_func_array(array($func, 'run'), $args);
                } else {
                    $result = call_user_func_array($func, $args);
                }
                if ($result !== null) {
                    //$this->results[] = $result;
                    self::socket_send($parent, $result);
                }
                // read anything pending from the worker if they chose to write
                // to the socket instead of just returning a value.
                $x = null;
                do {
                    $read = array($child);
                    $ok = socket_select($read, $x, $x, 0);
                    if ($ok !== false and $ok > 0) {
                        $res = self::socket_fetch($read[0]);
                        if ($res !== null) {
                            $this->results[] = $res;
                        }
                    }
                } while ($ok);
            } catch (\Exception $e) {
                // nop; we didn't fork so let the caller handle it
                throw $e;
            }
        }
    }
    /**
     * Clear all pending workers from the queue.
     */
    public function clear()
    {
        $this->pending = array();
        return $this;
    }
    /**
     * Send a SIGTERM (or other) signal to the PID given
     */
    public function kill($pid, $signo = SIGTERM)
    {
        posix_kill($pid, $signo);
        return $this;
    }
    /**
     * Send a SIGTERM (or other) signal to all current workers
     */
    public function killAll($signo = SIGTERM)
    {
        foreach ($this->workers as $w) {
            $this->kill($w['pid'], $signo);
        }
        return $this;
    }
    /**
     * Set a callback when a new forked process is created. This will allow the
     * parent to perform some sort of cleanup after every child is created.
     *
     * This is useful to reinitialize certain resources like DB connections
     * since children will inherit the parent resources.
     *
     * @param \Closure $callback Function to callback after every forked child.
     */
    public function setOnCreate(\Closure $callback = null)
    {
        $this->createCallback = $callback;
    }
    protected function doOnCreate($args = array())
    {
        if ($this->createCallback) {
            call_user_func_array($this->createCallback, $args);
        }
    }
    /**
     * Return the total jobs that have NOT completed yet.
     */
    public function getPending($pendingOnly = false)
    {
        if ($pendingOnly) {
            return count($this->pending);
        }
        return count($this->pending) + count($this->workers) + count($this->results);
    }
    public function getWorkers()
    {
        return count($this->workers);
    }
    public function getActive()
    {
        return count($this->pending) + count($this->workers);
    }
    public function getCompleted()
    {
        return $this->count;
    }
    public function setForking($fork)
    {
        $this->fork = $fork;
        return $this;
    }
    public function setMax($max)
    {
        if (!is_numeric($max) or $max < 1) {
            throw new \InvalidArgumentException("Max value must be > 0");
        }
        $this->max = $max;
        return $this;
    }
    public function getMax()
    {
        return $this->max;
    }
    /**
     * Write the data to the socket in a predetermined format
     */
    public static function socket_send($socket, $data)
    {
        $serialized = serialize($data);
        $hdr = pack('N', strlen($serialized));    // 4 byte length
        $buffer = $hdr . $serialized;
        $total = strlen($buffer);
        while (true) {
            $sent = socket_write($socket, $buffer);
            if ($sent === false) {
                // @todo handle error?
                //$error = socket_strerror(socket_last_error());
                break;
            }
            if ($sent >= $total) {
                break;
            }
            $total -= $sent;
            $buffer = substr($buffer, $sent);
        }
    }
    /**
     * Read a data packet from the socket in a predetermined format.
     *
     * Blocking.
     *
     */
    public static function socket_fetch($socket)
    {
        // read 4 byte length first
        $hdr = '';
        do {
            $read = socket_read($socket, 4 - strlen($hdr));
            if ($read === false or $read === '') {
                return null;
            }
            $hdr .= $read;
        } while (strlen($hdr) < 4);
        list($len) = array_values(unpack("N", $hdr));
        // read the full buffer
        $buffer = '';
        do {
            $read = socket_read($socket, $len - strlen($buffer));
            if ($read === false or $read == '') {
                return null;
            }
            $buffer .= $read;
        } while (strlen($buffer) < $len);
        $data = unserialize($buffer);
        return $data;
    }
}
 
 
$pool = new ProcessPool(16);
for ($i=0; $i<100; $i++) {
    $pool->apply(function($parent) use ($i) {
        echo "$i running...\n";
        mt_srand(); // must re-seed for each child
        $rand = mt_rand(1000000, 2000000);
        usleep($rand);
        return $i . ' : slept for ' . ($rand / 1000000) . ' seconds';
        });
}
while ($pool->getPending()) {
    try {
        $result = $pool->get(1);    // timeout in 1 second
        echo "GOT: ", $result, "\n";
    } catch (ProcessPoolException $e) {
            // timeout
    }
}

当前进程(父进程)添加任务时交由其他进程(子进程)处理,不阻塞当前进程;其他进程运行结束后,通过socket返回结果给父进程。

当然进程间通信也可以通过文件(FIFO)或者类似的中介角色如异步消息队列,Mysql,Redis等等进行交互。

参考链接:
Semaphore, Shared Memory and IPC
深刻理解Linux进程间通信(IPC)
PHP IPC with Daemon Service using Message Queues, Shared Memory and Semaphores
关于PHP你可能不知道的-PHP的事件驱动化设计
Store datasets directly in shared memory with PHP
PHP Dark Arts: Shared Memory Segments (IPC)
Something Like Threading – PHP Process Forking and Interprocess Communication
Mimicking Threading in PHP
基于System V Message queue的PHP消息队列封装
PHP进程间通信System V消息队列
PHP进程间通信System V信号量
we Do web sockets on PHP from null. A part 2. IPC
proc_open