PHP Threading
- Digital Engineering
PHP Threading
A thread is a small unit of instructions which can be executed by a processor. A application uses threading if it requires parallelism. In other words by a single program ,we can process multiple unit of instructions parallaly.
PHP request serving capabilities are not multithreaded, for every request a new php process is invoked and served. If your server is not multi-core then parallelism can not take place.( It will be more timely based processor allocation)
PHP do not give inbuilt multi-threading functionality, we need to add package/extension “pthreads” to our php.
http://php.net/manual/en/pthreads.installation.php
Pthread extension comes with lots of different concept, the basic three classes are listed here for kick start :
Threaded Objects : A class which is unit of executable instructions (thread), It is what you want to executed asynchronously. the run method in this class has ability to execute it as a thread.
Workers : It is a class which is used to handle threads, make results synchronized.
Pools : This class is used to handle multiple worker class instances and managing them.
A simple Example for thread
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
<?php class crawlUrl extends Thread { public function __construct($url) { $this->url = $url; } public function run() { $this->html = file_get_contents($this->url); $this->putMagicallyIntoFiles(); } } //Direct invoking threads while(1){ $url= getUrlByMagic(); $job = new crawlUrl($url); $job->start(); } //Through worker class // Stack our jobs on our worker $worker = new Worker(); foreach ($UrlArray as &$url) { $urlThread = new crawlUrl($url); $worker->stack($urlThread); } // Start all jobs $worker->start(); // Join all jobs and close worker $worker->shutdown(); //Pooling with the same example $poolobj = new Pool(20, Worker::class); $pool->submit(new crawlUrl($url1)); $pool->submit(new crawlUrl($url2)); $pool->submit(new crawlUrl($url3)); // Close the pool once done $pool->shutdown(); |
Pthread with autoload & context inheritance
All the examples are not related to the real world application where we need to put threading to our framework like YII 2, Slim etc. Where autoload functionality loads the library and the application but with child thread (It a single unit of processing of your own process) it may not relate to the parent context means there can be issue of Class not found even when the class is already loaded to the parent context.
In this case we can call autoload functionality to each thread but it will make things slower. We can solve the autoloading issue with the help of Worker class stacking or Pooling with the help of worker class.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
class WorkerExample extends Worker { public function run() { require '../autoload.php'; } } // Create a worker class $worker = new WorkerExample(); $job = new crawlUrl(); $worker->stack($url); $worker->start(); $worker->join(); //Using pool class $pool = new Pool(20, WorkerExample::class); // here our worker class will help to determine the autoload for child threads $pool->submit(new crawlUrl($url)); $pool->shutdown(); |
Thread can customize inheritance of context, when we start the thread. Default is inherit all.
PTHREADS_INHERIT_CLASSES : Inherit only classed
PTHREADS_INHERIT_CONSTANTS : Inherit only constants
PTHREADS_INHERIT_FUNCTIONS : Inherit only functions
PTHREADS_INHERIT_NONE : Inherit nothing from parent context.
http://php.net/manual/en/pthreads.constants.php
In case of worker with pooling if you want to set inherit property of a thread, we simple can not because the start method of the thread is called by the worker class. we need to override the start method here is an example :
1 2 3 4 5 6 7 8 9 10 11 12 |
class SomeWorker extends \Worker { public function run() { //Auto loading library for threads require_once __DIR__ . '../autoload.php'; } public function start(?int $options = NULL) { //invoking thread with inherit none parent::start(PTHREADS_INHERIT_NONE); } } |
Uses of threading :
We can use threading for notification engine, scheduler engine, or for any kind of batch processing for php application.
Pthread with Task Scheduling : A Real Life Example
My application required a scheduler engine to process tasks in bulk, where some other components are keep pushing tasks to queue. So the basic task of the scheduler engine is to process tasks at the time they meant / scheduled to be with some priority basis.
Before pthread implementation
First Try
We were using cron job and were keep creating cron jobs for tasks to execute. In this case we were using system’s scheduler to hit a process which double checks the job and make things execute.
This was a layman way to complete the job, we figured out that the number of tasks are too high and the server can go into a deadlock state.
Some Evolution
We got that the previous strategy is a failure for huge applications, now we created a domain process with diagnosis script, the process keeps seeking the job with some logic, if it finds any, it executes. On the other hand the diagnosis script is keep watching the domain process, if the process quits or terminates then the script’s job is to make the domain process run again.
And here comes Pthread in picture
The domain strategy was doing fine but later on the tasks were too many with priority logic and things were getting real slow, scheduled task getting late to run and again we were in a situation where we can not go ahead.
So we implemented pthread with our domain process strategy and pooling where domain process seeks jobs and when a job became mature, it executes it by creating a new thread.
Here is code of the sample scheduler engine with mongodb
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 |
//The scheduler class class SchedulerExecuteService extends \Thread { static $count; public $result; public $param; public function __construct($res, $param) { $this->result = $res; $this->param = $param; } public function run() { $file = $this->result[SchedularConstants::$FILE_PATH]; $param = []; if ($this->result[SchedularConstants::$METHOD_PARAMS] != false) { //we need to convert param as a php array to pass, thread variable won't work for this case foreach ($this->param as $p) { $param[] = $p; } } if ($this->result[SchedularConstants::$STATIC] != false) { try { $cal_status = call_user_func_array( array( $file, $this->result[SchedularConstants::$METHOD_NAME] ), $param ); } catch (\Exception $e) { throw $e; } } else { $obj = new $file(); try { $cal_status = call_user_func_array( array( $obj, $this->result[SchedularConstants::$METHOD_NAME] ), $param ); } catch (\Exception $e) { throw $e; } } $searchArray = array( SchedularConstants::$SEQ_ID => $this->result[SchedularConstants::$SEQ_ID] ); $updateArray = [SchedularConstants::$LAST_RUN => $mongoDate]; if ($this->result[SchedularConstants::$CRON_TYPE] == SchedularConstants::$TYPE_ONCE) { $updateArray[SchedularConstants::$STATUS] = SchedularConstants::$CRON_DELETED; } else { $updateArray[SchedularConstants::$STATUS] = SchedularConstants::$CRON_ACTIVE; } $SET = array(SchedularConstants::$CRON_HIT_TIME => time()); $DATA = array('$push' => array(SchedularConstants::$PAST_SCHEDULES => $SET, SchedularConstants::$LOG => implode(PHP_EOL, $log)), '$set' => $updateArray); $db = DatabaseUtil::getDBConnectionObject('mydb'); DatabaseUtil::updateDocument($db, DBConstants::$SCHEDULAR_COLLECTION_NAME, $searchArray, $DATA, $this->result[DBConstants::$USER_ID], true, false, false); } } SchedulerExecuteService::$count = 0; $workers = []; $i = 0; try { $db = DatabaseUtil::getDBConnectionObject('mydb'); while (1) { $mongoDate = MongoDateUtil::getMongoDateFromUnixSeconds(strtotime(date("Y-m-d H:i:s"))); $today = MongoDateUtil::getMongoDateFromUnixSeconds(strtotime(date("Y-m-d"))); //Creating a search array for jobs $searchArray = [ SchedularConstants::$TIME_SCHEDULE => ['$lt' => $mongoDate], SchedularConstants::$STATUS => ['$nin' => [SchedularConstants::$CRON_DELETED, SchedularConstants::$CRON_INEXECUTION]], '$or' => [[SchedularConstants::$LAST_RUN => ['$lt' => $today]], [SchedularConstants::$LAST_RUN => null]], ]; // Get the active jobs from the queue $results = DatabaseUtil::retrieveAllByProjection($db, DBConstants::$SCHEDULAR_COLLECTION_NAME, $searchArray, [], [SchedularConstants::$PRIORITY => 1, SchedularConstants::$TIME_SCHEDULE => 1], SchedularConstants::$MAX_THREADS); if (!isset($results) || empty($results)) { //If no job is there let the process sleep sleep(SchedularConstants::$JOB_DELAY_SEC); } else { //Creating pool for jobs $pool = new \Pool(SchedularConstants::$MAX_THREADS, ScheduleWorker::class); foreach ($results as $result) { $current_time = date("H:i:s"); SchedulerExecuteService::$count++; $searchArray = array(SchedularConstants::$SEQ_ID => $result[SchedularConstants::$SEQ_ID]); //Setting the in-execution status of the job $DATA = array('$set' => array(SchedularConstants::$STATUS => SchedularConstants::$CRON_INEXECUTION)); DatabaseUtil::updateDocument($db, DBConstants::$SCHEDULAR_COLLECTION_NAME, $searchArray, $DATA, $result[DBConstants::$USER_ID], true, false, false); //Submit job to ready queue $pool->submit(new SchedulerExecuteService($result, $result[SchedularConstants::$METHOD_PARAMS])); } //Restoring things sleep(SchedularConstants::$JOB_DELAY_SEC); $pool->shutdown(); $results = []; } } } catch (Exception $e) { throw $e; } |
Related content
Auriga: Leveling Up for Enterprise Growth!
Auriga’s journey began in 2010 crafting products for India’s