侯体宗的博客
  • 首页
  • 人生(杂谈)
  • 技术
  • 关于我
  • 更多分类
    • 文件下载
    • 文字修仙
    • 中国象棋ai
    • 群聊
    • 九宫格抽奖
    • 拼图
    • 消消乐
    • 相册

php pthreads多线程的安装与使用

php  /  管理员 发布于 7年前   146

安装Pthreads 基本上需要重新编译PHP,加上 --enable-maintainer-zts 参数,但是用这个文档很少;bug会很多很有很多意想不到的问题,生成环境上只能呵呵了,所以这个东西玩玩就算了,真正多线程还是用Python、C等等

一、安装

这里使用的是 php-7.0.2

./configure \--prefix=/usr/local/php7 \--with-config-file-path=/etc \--with-config-file-scan-dir=/etc/php.d \--enable-debug \--enable-maintainer-zts \--enable-pcntl \--enable-fpm \--enable-opcache \--enable-embed=shared \--enable-json=shared \--enable-phpdbg \--with-curl=shared \--with-mysql=/usr/local/mysql \--with-mysqli=/usr/local/mysql/bin/mysql_config \--with-pdo-mysql

make && make install

安装pthreads

pecl install pthreads

二、Thread

getThreadId()}\n"; } };$thread->start() && $thread->join();#2class workerThread extends Thread { public function __construct($i){$this->i=$i;}public function run(){while(true){echo $this->i."\n";sleep(1);} } }for($i=0;$i<50;$i++){$workers[$i]=new workerThread($i);$workers[$i]->start();}?>

三、 Worker 与 Stackable

Stackables are tasks that are executed by Worker threads. You can synchronize with, read, and write Stackable objects before, after and during their execution.

sql = $sql;}public function run() {$dbh = $this->worker->getConnection();$row = $dbh->query($this->sql);while($member = $row->fetch(PDO::FETCH_ASSOC)){print_r($member);}}}class ExampleWorker extends Worker {public static $dbh;public function __construct($name) {}public function run(){self::$dbh = new PDO('mysql:host=10.0.0.30;dbname=testdb','root','123456');}public function getConnection(){return self::$dbh;}}$worker = new ExampleWorker("My Worker Thread");$sql1 = new SQLQuery('select * from test order by id desc limit 1,5');$worker->stack($sql1);$sql2 = new SQLQuery('select * from test order by id desc limit 5,5');$worker->stack($sql2);$worker->start();$worker->shutdown();?>

四、 互斥锁

什么情况下会用到互斥锁?在你需要控制多个线程同一时刻只能有一个线程工作的情况下可以使用。一个简单的计数器程序,说明有无互斥锁情况下的不同

mutex = $mutex;$this->handle = fopen("/tmp/counter.txt", "w+");}public function __destruct(){fclose($this->handle);}public function run() {if($this->mutex)$locked=Mutex::lock($this->mutex);$counter = intval(fgets($this->handle));$counter++;rewind($this->handle);fputs($this->handle, $counter );printf("Thread #%lu says: %s\n", $this->getThreadId(),$counter);if($this->mutex)Mutex::unlock($this->mutex);}}//没有互斥锁for ($i=0;$i<50;$i++){$threads[$i] = new CounterThread();$threads[$i]->start();}//加入互斥锁$mutex = Mutex::create(true);for ($i=0;$i<50;$i++){$threads[$i] = new CounterThread($mutex);$threads[$i]->start();}Mutex::unlock($mutex);for ($i=0;$i<50;$i++){$threads[$i]->join();}Mutex::destroy($mutex);?>

多线程与共享内存

在共享内存的例子中,没有使用任何锁,仍然可能正常工作,可能工作内存操作本身具备锁的功能

shmid = $shmid;}public function run() {$counter = shm_get_var( $this->shmid, 1 );$counter++;shm_put_var( $this->shmid, 1, $counter );printf("Thread #%lu says: %s\n", $this->getThreadId(),$counter);}}for ($i=0;$i<100;$i++){$threads[] = new CounterThread($shmid);}for ($i=0;$i<100;$i++){$threads[$i]->start();}for ($i=0;$i<100;$i++){$threads[$i]->join();}shm_remove( $shmid );shm_detach( $shmid );?>

五、 线程同步

有些场景我们不希望 thread->start() 就开始运行程序,而是希望线程等待我们的命令。thread−>wait();测作用是thread−>start()后线程并不会立即运行,只有收到 thread->notify(); 发出的信号后才运行

shmid = $shmid;}public function run() {$this->synchronized(function($thread){$thread->wait();}, $this);$counter = shm_get_var( $this->shmid, 1 );$counter++;shm_put_var( $this->shmid, 1, $counter );printf("Thread #%lu says: %s\n", $this->getThreadId(),$counter);}}for ($i=0;$i<100;$i++){$threads[] = new CounterThread($shmid);}for ($i=0;$i<100;$i++){$threads[$i]->start();}for ($i=0;$i<100;$i++){$threads[$i]->synchronized(function($thread){$thread->notify();}, $threads[$i]);}for ($i=0;$i<100;$i++){$threads[$i]->join();}shm_remove( $shmid );shm_detach( $shmid );?> 

六、线程池

一个Pool类

row = $row;$this->sql = null;}public function run() {if(strlen($this->row['bankno']) > 100 ){$bankno = safenet_decrypt($this->row['bankno']);}else{$error = sprintf("%s, %s\r\n",$this->row['id'], $this->row['bankno']);file_put_contents("bankno_error.log", $error, FILE_APPEND);}if( strlen($bankno) > 7 ){$sql = sprintf("update members set bankno = '%s' where id = '%s';", $bankno, $this->row['id']);$this->sql = $sql;}printf("%s\n",$this->sql);}}class Pool {public $pool = array();public function __construct($count) {$this->count = $count;}public function push($row){if(count($this->pool) < $this->count){$this->pool[] = new Update($row);return true;}else{return false;}}public function start(){foreach ( $this->pool as $id => $worker){$this->pool[$id]->start();}}public function join(){foreach ( $this->pool as $id => $worker){$this->pool[$id]->join();}}public function clean(){foreach ( $this->pool as $id => $worker){if(! $worker->isRunning()){unset($this->pool[$id]);}}}}try {$dbh = new PDO("mysql:host=" . str_replace(':', ';port=', $dbhost) . ";dbname=$dbname", $dbuser, $dbpw, array(PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'',PDO::MYSQL_ATTR_COMPRESS => true));$sql = "select id,bankno from members order by id desc";$row = $dbh->query($sql);$pool = new Pool(5);while($member = $row->fetch(PDO::FETCH_ASSOC)){while(true){if($pool->push($member)){ //压入任务到池中break;}else{ //如果池已经满,就开始启动线程$pool->start();$pool->join();$pool->clean();}}}$pool->start();$pool->join();$dbh = null;} catch (Exception $e) {echo '[' , date('H:i:s') , ']', '系统错误', $e->getMessage(), "\n";}?>

动态队列线程池

上面的例子是当线程池满后执行start统一启动,下面的例子是只要线程池中有空闲便立即创建新线程。

row = $row;$this->sql = null;//print_r($this->row);}public function run() {if(strlen($this->row['bankno']) > 100 ){$bankno = safenet_decrypt($this->row['bankno']);}else{$error = sprintf("%s, %s\r\n",$this->row['id'], $this->row['bankno']);file_put_contents("bankno_error.log", $error, FILE_APPEND);}if( strlen($bankno) > 7 ){$sql = sprintf("update members set bankno = '%s' where id = '%s';", $bankno, $this->row['id']);$this->sql = $sql;}printf("%s\n",$this->sql);}}try {$dbh = new PDO("mysql:host=" . str_replace(':', ';port=', $dbhost) . ";dbname=$dbname", $dbuser, $dbpw, array(PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'',PDO::MYSQL_ATTR_COMPRESS => true));$sql = "select id,bankno from members order by id desc limit 50";$row = $dbh->query($sql);$pool = array();while($member = $row->fetch(PDO::FETCH_ASSOC)){$id = $member['id'];while (true){if(count($pool) < 5){$pool[$id] = new Update($member);$pool[$id]->start();break;}else{foreach ( $pool as $name => $worker){if(! $worker->isRunning()){unset($pool[$name]);}}}}}$dbh = null;} catch (Exception $e) {echo '【' , date('H:i:s') , '】', '【系统错误】', $e->getMessage(), "\n";}?>

pthreads Pool类

logger = $logger;}protected $loger;}class WebWork extends Stackable {public function isComplete() {return $this->complete;}public function run() {$this->worker->logger->log("%s executing in Thread #%lu",__CLASS__, $this->worker->getThreadId());$this->complete = true;}protected $complete;}class SafeLog extends Stackable {protected function log($message, $args = []) {$args = func_get_args();if (($message = array_shift($args))) {echo vsprintf("{$message}\n", $args);}}}$pool = new Pool(8, \WebWorker::class, [new SafeLog()]);$pool->submit($w=new WebWork());$pool->submit(new WebWork());$pool->submit(new WebWork());$pool->submit(new WebWork());$pool->submit(new WebWork());$pool->submit(new WebWork());$pool->submit(new WebWork());$pool->submit(new WebWork());$pool->submit(new WebWork());$pool->submit(new WebWork());$pool->submit(new WebWork());$pool->submit(new WebWork());$pool->submit(new WebWork());$pool->submit(new WebWork());$pool->shutdown();$pool->collect(function($work){return $work->isComplete();});var_dump($pool); 

七、多线程文件安全读写

LOCK_SH 取得共享锁定(读取的程序)

LOCK_EX 取得独占锁定(写入的程序

LOCK_UN 释放锁定(无论共享或独占)

LOCK_NB 如果不希望 flock() 在锁定时堵塞

八、多线程与数据连接

pthreads 与 pdo 同时使用是,需要注意一点,需要静态声明public static $dbh;并且通过单例模式访问数据库连接。

Worker 与 PDO

worker->getConnection();$sql = "select id,name from members order by id desc limit ";$row = $dbh->query($sql);while($member = $row->fetch(PDO::FETCH_ASSOC)){print_r($member);}}}class ExampleWorker extends Worker {public static $dbh;public function __construct($name) {}/** The run method should just prepare the environment for the work that is coming ...*/public function run(){self::$dbh = new PDO('mysql:host=...;dbname=example','www','');}public function getConnection(){return self::$dbh;}}$worker = new ExampleWorker("My Worker Thread");$work=new Work();$worker->stack($work);$worker->start();$worker->shutdown();?> 

Pool 与 PDO

在线程池中链接数据库

# cat pool.phplogger = $logger;}protected $logger;}/* the collectable class implements machinery for Pool::collect */class Work extends Stackable {public function __construct($number) {$this->number = $number;}public function run() {$dbhost = 'db.example.com'; // 数据库服务器$dbuser = 'example.com'; // 数据库用户名$dbpw = 'password'; // 数据库密码$dbname = 'example_real';$dbh = new PDO("mysql:host=$dbhost;port=;dbname=$dbname", $dbuser, $dbpw, array(PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF\'',PDO::MYSQL_ATTR_COMPRESS => true,PDO::ATTR_PERSISTENT => true));$sql = "select OPEN_TIME, `COMMENT` from MT_TRADES where LOGIN='".$this->number['name']."' and CMD='' and `COMMENT` = '".$this->number['order'].":DEPOSIT'";#echo $sql;$row = $dbh->query($sql);$mt_trades = $row->fetch(PDO::FETCH_ASSOC);if($mt_trades){$row = null;$sql = "UPDATE db_example.accounts SET paystatus='成功', deposit_time='".$mt_trades['OPEN_TIME']."' where `order` = '".$this->number['order']."';";$dbh->query($sql);#printf("%s\n",$sql);}$dbh = null;printf("runtime: %s, %s, %s\n", date('Y-m-d H:i:s'), $this->worker->getThreadId() ,$this->number['order']);}}class Logging extends Stackable {protected static $dbh;public function __construct() {$dbhost = 'db.example.com'; // 数据库服务器$dbuser = 'example.com'; // 数据库用户名$dbpw = 'password'; // 数据库密码$dbname = 'example_real'; // 数据库名self::$dbh = new PDO("mysql:host=$dbhost;port=;dbname=$dbname", $dbuser, $dbpw, array(PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF\'',PDO::MYSQL_ATTR_COMPRESS => true));}protected function log($message, $args = []) {$args = func_get_args();if (($message = array_shift($args))) {echo vsprintf("{$message}\n", $args);}}protected function getConnection(){return self::$dbh;}}$pool = new Pool(, \ExampleWorker::class, [new Logging()]);$dbhost = 'db.example.com'; // 数据库服务器$dbuser = 'example.com'; // 数据库用户名$dbpw = 'password'; // 数据库密码$dbname = 'db_example';$dbh = new PDO("mysql:host=$dbhost;port=;dbname=$dbname", $dbuser, $dbpw, array(PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF\'',PDO::MYSQL_ATTR_COMPRESS => true));$sql = "select `order`,name from accounts where deposit_time is null order by id desc";$row = $dbh->query($sql);while($account = $row->fetch(PDO::FETCH_ASSOC)){$pool->submit(new Work($account));}$pool->shutdown();?> 

进一步改进上面程序,我们使用单例模式 $this->worker->getInstance(); 全局仅仅做一次数据库连接,线程使用共享的数据库连接

logger = $logger;#}#protected $logger;protected static $dbh;public function __construct() {}public function run(){$dbhost = 'db.example.com'; // 数据库服务器$dbuser = 'example.com'; // 数据库用户名$dbpw = 'password'; // 数据库密码$dbname = 'example'; // 数据库名self::$dbh = new PDO("mysql:host=$dbhost;port=;dbname=$dbname", $dbuser, $dbpw, array(PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF\'',PDO::MYSQL_ATTR_COMPRESS => true,PDO::ATTR_PERSISTENT => true));}protected function getInstance(){return self::$dbh;}}/* the collectable class implements machinery for Pool::collect */class Work extends Stackable {public function __construct($data) {$this->data = $data;#print_r($data);}public function run() {#$this->worker->logger->log("%s executing in Thread #%lu", __CLASS__, $this->worker->getThreadId() );try {$dbh = $this->worker->getInstance();#print_r($dbh);$id = $this->data['id'];$mobile = safenet_decrypt($this->data['mobile']);#printf("%d, %s \n", $id, $mobile);if(strlen($mobile) > ){$mobile = substr($mobile, -);}if($mobile == 'null'){# $sql = "UPDATE members_digest SET mobile = '".$mobile."' where id = '".$id."'";# printf("%s\n",$sql);# $dbh->query($sql);$mobile = '';$sql = "UPDATE members_digest SET mobile = :mobile where id = :id";}else{$sql = "UPDATE members_digest SET mobile = md(:mobile) where id = :id";}$sth = $dbh->prepare($sql);$sth->bindValue(':mobile', $mobile);$sth->bindValue(':id', $id);$sth->execute();#echo $sth->debugDumpParams();}catch(PDOException $e) {$error = sprintf("%s,%s\n", $mobile, $id );file_put_contents("mobile_error.log", $error, FILE_APPEND);}#$dbh = null;printf("runtime: %s, %s, %s, %s\n", date('Y-m-d H:i:s'), $this->worker->getThreadId() ,$mobile, $id);#printf("runtime: %s, %s\n", date('Y-m-d H:i:s'), $this->number);}}$pool = new Pool(, \ExampleWorker::class, []);#foreach (range(, ) as $number) {# $pool->submit(new Work($number));#}$dbhost = 'db.example.com'; // 数据库服务器$dbuser = 'example.com'; // 数据库用户名$dbpw = 'password'; // 数据库密码$dbname = 'example';$dbh = new PDO("mysql:host=$dbhost;port=;dbname=$dbname", $dbuser, $dbpw, array(PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF\'',PDO::MYSQL_ATTR_COMPRESS => true));#print_r($dbh);#$sql = "select id, mobile from members where id < :id";#$sth = $dbh->prepare($sql);#$sth->bindValue(':id',);#$sth->execute();#$result = $sth->fetchAll();#print_r($result);##$sql = "UPDATE members_digest SET mobile = :mobile where id = :id";#$sth = $dbh->prepare($sql);#$sth->bindValue(':mobile', 'aa');#$sth->bindValue(':id','');#echo $sth->execute();#echo $sth->queryString;#echo $sth->debugDumpParams();$sql = "select id, mobile from members order by id asc"; // limit ";$row = $dbh->query($sql);while($members = $row->fetch(PDO::FETCH_ASSOC)){#$order = $account['order'];#printf("%s\n",$order);//print_r($members);$pool->submit(new Work($members));#unset($account['order']);}$pool->shutdown();?> 

多线程中操作数据库总结

总的来说 pthreads 仍然处在发展中,仍有一些不足的地方,我们也可以看到pthreads的git在不断改进这个项目

数据库持久链接很重要,否则每个线程都会开启一次数据库连接,然后关闭,会导致很多链接超时。

 true));?>

关于php pthreads多线程的安装与使用的相关知识,就先给大家介绍到这里,后续还会持续更新。

您可能感兴趣的文章:

  • 使用pthreads实现真正的PHP多线程(需PHP5.3以上版本)
  • php使用pthreads v3多线程实现抓取新浪新闻信息操作示例
  • PHP pthreads v3下的Volatile简介与使用方法示例
  • PHP pthreads v3下worker和pool的使用方法示例
  • PHP pthreads v3下同步处理synchronized用法示例
  • PHP pthreads v3使用中的一些坑和注意点分析
  • 实现PHP多线程异步请求的3种方法
  • php异步多线程swoole用法实例
  • PHP安装threads多线程扩展基础教程
  • php多线程并发实现方法
  • PHP pthreads v3在centos7平台下的安装与配置操作方法


  • 上一条:
    CodeIgniter配置之config.php用法实例分析
    下一条:
    PHP各种异常和错误的拦截方法及发生致命错误时进行报警
  • 昵称:

    邮箱:

    0条评论 (评论内容有缓存机制,请悉知!)
    最新最热
    • 分类目录
    • 人生(杂谈)
    • 技术
    • linux
    • Java
    • php
    • 框架(架构)
    • 前端
    • ThinkPHP
    • 数据库
    • 微信(小程序)
    • Laravel
    • Redis
    • Docker
    • Go
    • swoole
    • Windows
    • Python
    • 苹果(mac/ios)
    • 相关文章
    • Laravel从Accel获得5700万美元A轮融资(0个评论)
    • PHP 8.4 Alpha 1现已发布!(0个评论)
    • 用Time Warden监控PHP中的代码处理时间(0个评论)
    • 在PHP中使用array_pop + yield实现读取超大型目录功能示例(0个评论)
    • Property Hooks RFC在PHP 8.4中越来越接近现实(0个评论)
    • 近期文章
    • 在windows10中升级go版本至1.24后LiteIDE的Ctrl+左击无法跳转问题解决方案(0个评论)
    • 智能合约Solidity学习CryptoZombie第四课:僵尸作战系统(0个评论)
    • 智能合约Solidity学习CryptoZombie第三课:组建僵尸军队(高级Solidity理论)(0个评论)
    • 智能合约Solidity学习CryptoZombie第二课:让你的僵尸猎食(0个评论)
    • 智能合约Solidity学习CryptoZombie第一课:生成一只你的僵尸(0个评论)
    • 在go中实现一个常用的先进先出的缓存淘汰算法示例代码(0个评论)
    • 在go+gin中使用"github.com/skip2/go-qrcode"实现url转二维码功能(0个评论)
    • 在go语言中使用api.geonames.org接口实现根据国际邮政编码获取地址信息功能(1个评论)
    • 在go语言中使用github.com/signintech/gopdf实现生成pdf分页文件功能(95个评论)
    • gmail发邮件报错:534 5.7.9 Application-specific password required...解决方案(0个评论)
    • 近期评论
    • 122 在

      学历:一种延缓就业设计,生活需求下的权衡之选中评论 工作几年后,报名考研了,到现在还没认真学习备考,迷茫中。作为一名北漂互联网打工人..
    • 123 在

      Clash for Windows作者删库跑路了,github已404中评论 按理说只要你在国内,所有的流量进出都在监控范围内,不管你怎么隐藏也没用,想搞你分..
    • 原梓番博客 在

      在Laravel框架中使用模型Model分表最简单的方法中评论 好久好久都没看友情链接申请了,今天刚看,已经添加。..
    • 博主 在

      佛跳墙vpn软件不会用?上不了网?佛跳墙vpn常见问题以及解决办法中评论 @1111老铁这个不行了,可以看看近期评论的其他文章..
    • 1111 在

      佛跳墙vpn软件不会用?上不了网?佛跳墙vpn常见问题以及解决办法中评论 网站不能打开,博主百忙中能否发个APP下载链接,佛跳墙或极光..
    • 2016-10
    • 2016-11
    • 2017-06
    • 2017-07
    • 2017-08
    • 2017-09
    • 2017-11
    • 2017-12
    • 2018-01
    • 2018-02
    • 2018-03
    • 2020-03
    • 2020-04
    • 2020-05
    • 2020-06
    • 2020-07
    • 2020-09
    • 2021-02
    • 2021-03
    • 2021-04
    • 2021-05
    • 2021-06
    • 2021-07
    • 2021-08
    • 2021-09
    • 2021-10
    • 2021-11
    • 2021-12
    • 2022-01
    • 2022-02
    • 2022-05
    • 2022-06
    • 2022-07
    • 2022-08
    • 2022-09
    • 2022-10
    • 2022-11
    • 2022-12
    • 2023-01
    • 2023-02
    • 2023-03
    • 2023-04
    • 2023-05
    • 2023-06
    • 2023-07
    • 2023-08
    • 2023-09
    • 2023-10
    • 2023-11
    • 2023-12
    • 2024-01
    • 2024-02
    • 2024-03
    • 2024-04
    • 2024-05
    • 2024-06
    • 2024-07
    • 2024-09
    Top

    Copyright·© 2019 侯体宗版权所有· 粤ICP备20027696号 PHP交流群

    侯体宗的博客