PHP高級編程之守護進程
http://netkiller.github.io/journal/php.daemon.html
Mr. Neo Chen (陳景峯), netkiller, BG7NYT
PHP高級編程之守護進程
http://netkiller.github.io/journal/php.daemon.html
Mr. Neo Chen (陳景峯), netkiller, BG7NYT
1. 什么是守護進程
守護進程是脫離于終端并且在后臺運行的進程。守護進程脫離于終端是為了避免進程在執行過程中的信息在任何終端上顯示并且進程也不會被任何終端所產生的終端信息所打斷。
例如 apache, nginx, mysql 都是守護進程
2. 為什么開發守護進程
很多程序以服務形式存在,他沒有終端或UI交互,它可能采用其他方式與其他程序交互,如TCP/UDP Socket, UNIX Socket, fifo。程序一旦啟動便進入后臺,直到滿足條件他便開始處理任務。
3. 何時采用守護進程開發應用程序
以我當前的需求為例,我需要運行一個程序,然后監聽某端口,持續接受服務端發起的數據,然后對數據分析處理,再將結果寫入到數據庫中; 我采用ZeroMQ實現數據收發。
如果我不采用守護進程方式開發該程序,程序一旦運行就會占用當前終端窗框,還有受到當前終端鍵盤輸入影響,有可能程序誤退出。
4. 守護進程的安全問題
我們希望程序在非超級用戶運行,這樣一旦由于程序出現漏洞被駭客控制,攻擊者只能繼承運行權限,而無法獲得超級用戶權限。
我們希望程序只能運行一個實例,不運行同事開啟兩個以上的程序,因為會出現端口沖突等等問題。
5. 怎樣開發守護進程
<?php
class ExampleWorker extends Worker {
#public function __construct(Logging $logger) {
# $this->logger = $logger;
#}
#protected $logger;
protected static $dbh;
public function __construct() {
}
public function run(){
$dbhost = '192.168.2.1'; // 數據庫服務器
$dbport = 3306;
$dbuser = 'www'; // 數據庫用戶名
$dbpass = 'qwer123'; // 數據庫密碼
$dbname = 'example'; // 數據庫名
self::$dbh = new PDO("mysql:host=$dbhost;port=$dbport;dbname=$dbname", $dbuser, $dbpass, array(
/* PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'', */
PDO::MYSQL_ATTR_COMPRESS => true,
PDO::ATTR_PERSISTENT => true
)
);
}
protected function getInstance(){
return self::$dbh;
}
}
/* the collectable class implements machinery for Pool::collect */
class Fee extends Stackable {
public function __construct($msg) {
$trades = explode(",", $msg);
$this->data = $trades;
print_r($trades);
}
public function run() {
#$this->worker->logger->log("%s executing in Thread #%lu", __CLASS__, $this->worker->getThreadId() );
try {
$dbh = $this->worker->getInstance();
$insert = "INSERT INTO fee(ticket, login, volume, `status`) VALUES(:ticket, :login, :volume,'N')";
$sth = $dbh->prepare($insert);
$sth->bindValue(':ticket', $this->data[0]);
$sth->bindValue(':login', $this->data[1]);
$sth->bindValue(':volume', $this->data[2]);
$sth->execute();
$sth = null;
/* ...... */
$update = "UPDATE fee SET `status` = 'Y' WHERE ticket = :ticket and `status` = 'N'";
$sth = $dbh->prepare($update);
$sth->bindValue(':ticket', $this->data[0]);
$sth->execute();
//echo $sth->queryString;
//$dbh = null;
}
catch(PDOException $e) {
$error = sprintf("%s,%s\n", $mobile, $id );
file_put_contents("mobile_error.log", $error, FILE_APPEND);
}
}
}
class Example {
/* config */
const LISTEN = "tcp://192.168.2.15:5555";
const MAXCONN = 100;
const pidfile = __CLASS__;
const uid = 80;
const gid = 80;
protected $pool = NULL;
protected $zmq = NULL;
public function __construct() {
$this->pidfile = '/var/run/'.self::pidfile.'.pid';
}
private function daemon(){
if (file_exists($this->pidfile)) {
echo "The file $this->pidfile exists.\n";
exit();
}
$pid = pcntl_fork();
if ($pid == -1) {
die('could not fork');
} else if ($pid) {
// we are the parent
//pcntl_wait($status); //Protect against Zombie children
exit($pid);
} else {
// we are the child
file_put_contents($this->pidfile, getmypid());
posix_setuid(self::uid);
posix_setgid(self::gid);
return(getmypid());
}
}
private function start(){
$pid = $this->daemon();
$this->pool = new Pool(self::MAXCONN, \ExampleWorker::class, []);
$this->zmq = new ZMQSocket(new ZMQContext(), ZMQ::SOCKET_REP);
$this->zmq->bind(self::LISTEN);
/* Loop receiving and echoing back */
while ($message = $this->zmq->recv()) {
//print_r($message);
//if($trades){
$this->pool->submit(new Fee($message));
$this->zmq->send('TRUE');
//}else{
// $this->zmq->send('FALSE');
//}
}
$pool->shutdown();
}
private function stop(){
if (file_exists($this->pidfile)) {
$pid = file_get_contents($this->pidfile);
posix_kill($pid, 9);
unlink($this->pidfile);
}
}
private function help($proc){
printf("%s start | stop | help \n", $proc);
}
public function main($argv){
if(count($argv) < 2){
printf("please input help parameter\n");
exit();
}
if($argv[1] === 'stop'){
$this->stop();
}else if($argv[1] === 'start'){
$this->start();
}else{
$this->help($argv[0]);
}
}
}
$cgse = new Example();
$cgse->main($argv);
<?php
declare(ticks = 1);
require_once( __DIR__.'/autoload.class.php' );
umask(077);
class EDM {
protected $queue;
public function __construct() {
global $argc, $argv;
$this->argc = $argc;
$this->argv = $argv;
$this->pidfile = $this->argv[0].".pid";
$this->config = new Config('mq');
$this->logging = new Logging(__DIR__.'/log/'.$this->argv[0].'.'.date('Y-m-d').'.log'); //.H:i:s
//print_r( $this->config->getArray('mq') );
//pcntl_signal(SIGHUP, array(&$this,"restart"));
}
protected function msgqueue(){
$exchangeName = 'email'; //交換機名
$queueName = 'email'; //隊列名
$routeKey = 'email'; //路由key
//創建連接和channel
$connection = new AMQPConnection($this->config->getArray('mq'));
if (!$connection->connect()) {
die("Cannot connect to the broker!\n");
}
$this->channel = new AMQPChannel($connection);
$this->exchange = new AMQPExchange($this->channel);
$this->exchange->setName($exchangeName);
$this->exchange->setType(AMQP_EX_TYPE_DIRECT); //direct類型
$this->exchange->setFlags(AMQP_DURABLE); //持久化
$this->exchange->declare();
//echo "Exchange Status:".$this->exchange->declare()."\n";
//創建隊列
$this->queue = new AMQPQueue($this->channel);
$this->queue->setName($queueName);
$this->queue->setFlags(AMQP_DURABLE); //持久化
$this->queue->declare();
//echo "Message Total:".$this->queue->declare()."\n";
//綁定交換機與隊列,并指定路由鍵
$bind = $this->queue->bind($exchangeName, $routeKey);
//echo 'Queue Bind: '.$bind."\n";
//阻塞模式接收消息
while(true){
//$this->queue->consume('processMessage', AMQP_AUTOACK); //自動ACK應答
$this->queue->consume(function($envelope, $queue) {
$msg = $envelope->getBody();
$queue->ack($envelope->getDeliveryTag()); //手動發送ACK應答
$this->logging->info('('.'+'.')'.$msg);
//$this->logging->debug("Message Total:".$this->queue->declare());
});
$this->channel->qos(0,1);
//echo "Message Total:".$this->queue->declare()."\n";
}
$conn->disconnect();
}
protected function start(){
if (file_exists($this->pidfile)) {
printf("%s already running\n", $this->argv[0]);
exit(0);
}
$this->logging->warning("start");
$pid = pcntl_fork();
if ($pid == -1) {
die('could not fork');
} else if ($pid) {
//pcntl_wait($status); //等待子進程中斷,防止子進程成為僵尸進程。
exit(0);
} else {
posix_setsid();
//printf("pid: %s\n", posix_getpid());
file_put_contents($this->pidfile, posix_getpid());
//posix_kill(posix_getpid(), SIGHUP);
$this->msgqueue();
}
}
protected function stop(){
if (file_exists($this->pidfile)) {
$pid = file_get_contents($this->pidfile);
posix_kill($pid, SIGTERM);
//posix_kill($pid, SIGKILL);
unlink($this->pidfile);
$this->logging->warning("stop");
}else{
printf("%s haven't running\n", $this->argv[0]);
}
}
protected function restart(){
$this->stop();
$this->start();
}
protected function status(){
if (file_exists($this->pidfile)) {
$pid = file_get_contents($this->pidfile);
printf("%s already running, pid = %s\n", $this->argv[0], $pid);
}else{
printf("%s haven't running\n", $this->argv[0]);
}
}
protected function usage(){
printf("Usage: %s {start | stop | restart | status}\n", $this->argv[0]);
}
public function main(){
//print_r($this->argv);
if($this->argc != 2){
$this->usage();
}else{
if($this->argv[1] == 'start'){
$this->start();
}else if($this->argv[1] == 'stop'){
$this->stop();
}else if($this->argv[1] == 'restart'){
$this->restart();
}else if($this->argv[1] == 'status'){
$this->status();
}else{
$this->usage();
}
}
}
}
$edm = New EDM();
$edm->main();
5.1. 程序啟動
下面是程序啟動后進入后臺的代碼
通過進程ID文件來判斷,當前進程狀態,如果進程ID文件存在表示程序在運行中,通過代碼file_exists($this->pidfile)實現,但而后進程被kill需要手工刪除該文件才能運行
private function daemon(){
if (file_exists($this->pidfile)) {
echo "The file $this->pidfile exists.\n";
exit();
}
$pid = pcntl_fork();
if ($pid == -1) {
die('could not fork');
} else if ($pid) {
// we are the parent
//pcntl_wait($status); //Protect against Zombie children
exit($pid);
} else {
// we are the child
file_put_contents($this->pidfile, getmypid());
posix_setuid(self::uid);
posix_setgid(self::gid);
return(getmypid());
}
}
程序啟動后,父進程會推出,子進程會在后臺運行,子進程權限從root切換到指定用戶,同時將pid寫入進程ID文件。
5.2. 程序停止
程序停止,只需讀取pid文件,然后調用posix_kill($pid, 9); 最后將該文件刪除。
private function stop(){
if (file_exists($this->pidfile)) {
$pid = file_get_contents($this->pidfile);
posix_kill($pid, 9);
unlink($this->pidfile);
}
}
5.3. 單例模式
所有線程共用數據庫連接,在多線程中這個非常重要,如果每個線程建立以此數據庫連接在關閉,這對數據庫的開銷是巨大的。
protected function getInstance(){
return self::$dbh;
}
5.4. 實現優雅重啟
所謂優雅重啟是指進程不退出的情況加實現重新載入包含重置變量,刷新配置文件,重置日志等等
stop/start 或者 restart都會退出進程,重新啟動,導致進程ID改變,同時瞬間退出導致業務閃斷。所以很多守護進程都會提供一個reload功能,者就是所謂的優雅重啟。
reload 實現原理是給進程發送SIGHUP信號,可以通過kill命令發送 kill -s SIGHUP 64881,也可以通過庫函數實現 posix_kill(posix_getpid(), SIGUSR1);
<?php
pcntl_signal(SIGTERM, function($signo) {
echo "\n This signal is called. [$signo] \n";
Status::$state = -1;
});
pcntl_signal(SIGHUP, function($signo) {
echo "\n This signal is called. [$signo] \n";
Status::$state = 1;
Status::$ini = parse_ini_file('test.ini');
});
class Status{
public static $state = 0;
public static $ini = null;
}
$pid = pcntl_fork();
if ($pid == -1) {
die('could not fork');
}
if($pid) {
// parent
} else {
$loop = true;
Status::$ini = parse_ini_file('test.ini');
while($loop) {
print_r(Status::$ini);
while(true) {
// Dispatching...
pcntl_signal_dispatch();
if(Status::$state == -1) {
// Do something and end loop.
$loop = false;
break;
}
if(Status::$state == 1) {
printf("This program is reload.\r\n");
Status::$state = 0;
break;
}
echo '.';
sleep(1);
}
echo "\n";
}
echo "Finish \n";
exit();
}
創建配置文件
[root@netkiller pcntl]# cat test.ini
[db]
host=192.168.0.1
port=3306
測試方法,首先運行該守護進程
# php signal.reload.php
Array
(
[host] => 192.168.0.1
[port] => 3306
)
現在修改配置文件,增加user=test配置項
[root@netkiller pcntl]# cat test.ini
[db]
host=192.168.0.1
port=3306
user=test
發送信號,在另一個終端窗口,通過ps命令找到該進程的PID,然后使用kill命令發送SIGHUP信號,然后再通過ps查看進程,你會發現進程PID沒有改變
[root@netkiller pcntl]# ps ax | grep reload
64881 pts/0 S 0:00 php -c /srv/php/etc/php-cli.ini signal.reload.php
65073 pts/1 S+ 0:00 grep --color=auto reload
[root@netkiller pcntl]# kill -s SIGHUP 64881
[root@netkiller pcntl]# ps ax | grep reload
64881 pts/0 S 0:00 php -c /srv/php/etc/php-cli.ini signal.reload.php
65093 pts/1 S+ 0:00 grep --color=auto reload
配置文件被重新載入
This signal is called. [1]
This program is reload.
Array
(
[host] => 192.168.0.1
[port] => 3306
[user] => test
)
優雅重啟完成。
6. Example
<?php
/*
* PHP Daemon sample.
* Home: http://netkiller.github.io
* Author: netkiller<netkiller@msn.com>
*
*/
class Logger {
public function __construct(/*Logging $logger*/) {
}
public function logger($type, $message) {
$log = sprintf ( "%s\t%s\t%s\n", date ( 'Y-m-d H:i:s' ), $type, $message );
file_put_contents ( sprintf(__DIR__."/../log/sender.%s.log", date ( 'Y-m-d' )), $log, FILE_APPEND );
}
}
final class Signal{
public static $signo = 0;
protected static $ini = null;
public static function set($signo){
self::$signo = $signo;
}
public static function get(){
return(self::$signo);
}
public static function reset(){
self::$signo = 0;
}
}
class Test extends Logger {
//public static $signal = null;
public function __construct() {
//self::$signal == null;
}
public function run(){
while(true){
pcntl_signal_dispatch();
printf(".");
sleep(1);
if(Signal::get() == SIGHUP){
Signal::reset();
break;
}
}
printf("\n");
}
}
class Daemon extends Logger {
/* config */
const LISTEN = "tcp://192.168.2.15:5555";
const pidfile = __CLASS__;
const uid = 80;
const gid = 80;
const sleep = 5;
protected $pool = NULL;
protected $config = array();
public function __construct($uid, $gid, $class) {
$this->pidfile = '/var/run/'.basename(get_class($class), '.php').'.pid';
//$this->config = parse_ini_file('sender.ini', true); //include_once(__DIR__."/config.php");
$this->uid = $uid;
$this->gid = $gid;
$this->class = $class;
$this->classname = get_class($class);
$this->signal();
}
public function signal(){
pcntl_signal(SIGHUP, function($signo) /*use ()*/{
//echo "\n This signal is called. [$signo] \n";
printf("The process has been reload.\n");
Signal::set($signo);
});
}
private function daemon(){
if (file_exists($this->pidfile)) {
echo "The file $this->pidfile exists.\n";
exit();
}
$pid = pcntl_fork();
if ($pid == -1) {
die('could not fork');
} else if ($pid) {
// we are the parent
//pcntl_wait($status); //Protect against Zombie children
exit($pid);
} else {
file_put_contents($this->pidfile, getmypid());
posix_setuid(self::uid);
posix_setgid(self::gid);
return(getmypid());
}
}
private function run(){
while(true){
printf("The process begin.\n");
$this->class->run();
printf("The process end.\n");
}
}
private function foreground(){
$this->run();
}
private function start(){
$pid = $this->daemon();
for(;;){
$this->run();
sleep(self::sleep);
}
}
private function stop(){
if (file_exists($this->pidfile)) {
$pid = file_get_contents($this->pidfile);
posix_kill($pid, 9);
unlink($this->pidfile);
}
}
private function reload(){
if (file_exists($this->pidfile)) {
$pid = file_get_contents($this->pidfile);
//posix_kill(posix_getpid(), SIGHUP);
posix_kill($pid, SIGHUP);
}
}
private function status(){
if (file_exists($this->pidfile)) {
$pid = file_get_contents($this->pidfile);
system(sprintf("ps ax | grep %s | grep -v grep", $pid));
}
}
private function help($proc){
printf("%s start | stop | restart | status | foreground | help \n", $proc);
}
public function main($argv){
if(count($argv) < 2){
$this->help($argv[0]);
printf("please input help parameter\n");
exit();
}
if($argv[1] === 'stop'){
$this->stop();
}else if($argv[1] === 'start'){
$this->start();
}else if($argv[1] === 'restart'){
$this->stop();
$this->start();
}else if($argv[1] === 'status'){
$this->status();
}else if($argv[1] === 'foreground'){
$this->foreground();
}else if($argv[1] === 'reload'){
$this->reload();
}else{
$this->help($argv[0]);
}
}
}
$daemon = new Daemon(80,80, new Test());
$daemon->main($argv);
?>
7. 進程意外退出解決方案
如果是非常重要的進程,必須要保證程序正常運行,一旦出現任何異常退出,都需要做即時做處理。下面的程序可能檢查進程是否異常退出,如果退出便立即啟動。
#!/bin/sh
LOGFILE=/var/log/$(basename $0 .sh).log
PATTERN="my.php"
RECOVERY="/path/to/my.php start"
while true
do
TIMEPOINT=$(date -d "today" +"%Y-%m-%d_%H:%M:%S")
PROC=$(pgrep -o -f ${PATTERN})
#echo ${PROC}
if [ -z "${PROC}" ]; then
${RECOVERY} >> $LOGFILE
echo "[${TIMEPOINT}] ${PATTERN} ${RECOVERY}" >> $LOGFILE
#else
#echo "[${TIMEPOINT}] ${PATTERN} ${PROC}" >> $LOGFILE
fi
sleep 5
done &