php实现多线程(实际是多进程,夸平台)

代码实现了一个简单的多进程管理机制,比向WEB服务器发送多个请求要实现多进程要方便很多。只能使用在cli模式。可以用在特殊场合,如邮件发送任务等。

资源的共享访问使用了文件锁,并不是很可靠,主要是为了能够在Windwos下使用,如果确实有必要可以考虑自己改用相应的信号灯机制(这个扩展只能用于 xUNIX)。

实例

复制PHP内容到剪贴板

PHP代码:

define('DIR_PHP_EXEC', 'php');

define('DIR_MAIN_EXEC', __FILE__);

define('DIR_TMP', '/tmp');

require_once('my_process.php');

class pp extends my_process_base {

    public function run($param = null) {

        for ($i = 0; $i < 4; $i++) {
            echo “111 $param\n”;

            sleep(1);

        }

    }

}

init_my_process();

$obj = $GLOBALS['gal_obj_process_m'];

if ($obj->is_main()) {

    $obj->run_task('pp', 'a');

    $obj->run_task('pp', 'b');

    $obj->run_task('pp', 'c');

    $obj->run_task('pp', 'd');

    //$obj->run_task('pp', 'b');

    $obj->set_max_run(10);

    $obj->run();

}

进程管理类

复制PHP内容到剪贴板

PHP代码:

/**

* @copyright 2007 movivi

* @author  徐智  <[email=xzfred@gmail.com]xzfred@gmail.com[/email]>

*

* $Id: getPage.php 11 2007-09-21 02:15:01Z fred $

*/

if (!defined('DIR_PHP_EXEC')) define('DIR_PHP_EXEC', 'php');

//if (!defined('DIR_MAIN_EXEC')) define('DIR_MAIN_EXEC', '');

if (!defined('DIR_TMP')) define('DIR_TMP', '');

/*****************************************************************************/

/* 初始化 */

define('CMD_MAIN_PROCESS_KEY', 'main_process_key');

define('CMD_CHILD_PROCESS_NAME', 'child_process_name');

define('CMD_CHILD_PROCESS_PARAM', 'child_process_param');

function init_my_process() {

    $GLOBALS['gal_obj_cmd'] = new my_cmd_argv();

    $key = $GLOBALS['gal_obj_cmd']->get_value(CMD_MAIN_PROCESS_KEY);

    $key = $key === false ? '' : $key;

    $GLOBALS['gal_obj_process_m'] = new my_process_m($key);

    if (!$GLOBALS['gal_obj_process_m']->is_main()) $GLOBALS['gal_obj_process_m']->run() ;

}

/**

* php多进程类

*

* 你需要从这个对象继承,然后实现你自己的run处理

*/

abstract class my_process_base {

    public function __construct($auto_run=true, $name='') {

    }

    public function __destruct() {

        echo “@end\n”;

    }

    abstract public function run($param = null);

}

class my_cmd_argv {

    private $cmd_argv = array();

    public function __construct() {

        $argv = $_SERVER['argv'];

        for ($i = 1; $i < count($argv); $i++) {
            $cmd = explode('=', $argv[$i]);

            $this->cmd_argv[$cmd[0]] = isset($cmd[1]) ? $cmd[1] : '';

        }

    }

    public function get_key($key) {

        return isset($this->cmd_argv[$key]);

    }

    public function get_value($key) {

        return isset($this->cmd_argv[$key]) ? $this->cmd_argv[$key] : false;

    }

}

/**

* php多进程管理类

* 可以在PHP中实现多进程处理,只限在控制台方式使用

* 当前的信号实现机制采用文件方式

*

*/

class my_process_m {

    /**

     * @var array $task_list

     * 进程列表

     */

    private $task_list = array();

    private $lock_list = array();

    private $lock = null;

    private $is_main = false;

    private $max_run = 3600000;

    private function release_lock($key = null) {

        $lock = &$this->lock_list;

        if (!is_null($key)) {

            $key = md5($this->build_lock_id($key));

            if (isset($lock[$key])) {

                if (is_resource($lock[$key][0])) fclose($lock[$key][0]);

                unlink($lock[$key][1]);

                unset($lock[$key]);

            }

            return true;

        }

        foreach ($lock as $k => $h) {

            if (is_resource($h)) fclose($h);

            unset($lock[$k]);

        }

        return true;

    }

    private function release_task($key = null) {

        $task = &$this->task_list;

        if (!is_null($key) && isset($task[$key])) {

            if (is_resource($task[$key])) pclose($task[$key]);

            unset($task[$key]);

        } else {

            foreach ($task as $k => $h) {

                if (is_resource($h)) pclose($h);

                unset($task[$k]);

            }

        }

        return true;

    }

            

    private function build_lock_id($key) {

        return DIR_TMP . DIRECTORY_SEPARATOR . $key . '_sem.lock';

    }

    protected function run_child_process() {

        $class = $GLOBALS['gal_obj_cmd']->get_value(CMD_CHILD_PROCESS_NAME);

        $param = $GLOBALS['gal_obj_cmd']->get_value(CMD_CHILD_PROCESS_PARAM);

        $param = $param == '' ? null : unserialize(base64_decode(trim($param)));

        $obj = new $class();

        $obj->run($param);

        $this->task_list[] = $obj;

    }

    public function __construct($lock='') {

        if ($lock === '') {

            $this->is_main = true;

            $key = md5(uniqid()) . '_main.my_process';

            $lock = array($key, $this->get($key));

        } else {

            $this->is_main = false;

            $lock = array($lock, 0);

        }

        $this->lock = $lock;

    }

    public function __destruct() {

        $this->release_lock();

        $this->release_task();

    }

    /**

     * 停止所有进程

     *

     */

    public function stop_all() {

    }

    /**

     * 是否是主进程

     *

     */

    public function is_main() {

        return $this->is_main;

    }

    /**

     * 是不是已经存在一个活动信号

     *

     * @param   string      $key

     * @return  bool        

     */

    public function exist($key) {

        return file_exists($this->build_lock_id($key));

    }

    /**

     * 获取一个信号

     *

     * @param   string      $key    

     * @param   int         $max_acquire    最大请求阻塞数量

     * @return mix 如果成功返回一个信号ID

     *

     */

    public function get($key, $max_acquire=5) {

        $fn = $this->build_lock_id($key);

        if (isset($this->lock_list[md5($fn)])) return false;

        $id = fopen($fn, 'a+');

        if ($id) $this->lock_list[md5($fn)] = array($id, $fn);

        return $id;

    }

    /**

     * 释放一个信号

     *

     * @param   string      $key    

     * @return  bool        如果成功返回一个信号true

     *

     */

    public function remove($key) {

        return $this->release_lock($key);

    }

    /**

     * 获取一个信号

     *

     * @param string    $id         信号ID

     * @param bool      $block      是否阻塞

     */

    public function acquire($id, $block=false) {

        if ($block) {

            return flock($id, LOCK_EX);

        } else {

            return flock($id, LOCK_EX + LOCK_NB);

        }

    }

    /**

     * 释放一个信号

     *

     */

    public function release($id) {

        flock($id, LOCK_UN);

    }

    public function run_task($process_name, $param=null) {

        $this->task_list[] = popen(DIR_PHP_EXEC . ' -f ' . DIR_MAIN_EXEC . ' — '

            . CMD_CHILD_PROCESS_NAME . '=' . $process_name . ' '

            . CMD_CHILD_PROCESS_PARAM . '=”' . base64_encode(serialize($param)) . '” '

            . CMD_MAIN_PROCESS_KEY . '=”' . $this->lock[0] . '” ',

            'r');

    }

    public function run($auto_run = true) {

        if ($this->is_main) {

            $ps = &$this->task_list;

            $max_run = &$this->max_run;

            $id = 0;

            do {

                //echo “process—————————————-: \n”;

                $c = 0;

                foreach ($ps as $k => $h) {

                    $c++;

                    $msg = fread($h, 8000);

                    if (substr($msg, -5, 4) === '@end') {

                        echo “end process:[$k][$id] echo \n{$msg} \n”;

                        $this->release_task($k);

                    } else {

                        echo “process:[$k][$id] echo \n{$msg} \n”;

                    }

                }

                sleep(1);

            } while ($auto_run && $id++ < $max_run && $c > 0);

        } else {

            $this->run_child_process();

        }

    }

    public function set_max_run($max=1000) {

        $this->max_run = $max;

    }

}



发表评论

电子邮件地址不会被公开。 必填项已用*标注

您可以使用这些HTML标签和属性: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>