forked from krakjoe/pthreads
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathPooling.php
121 lines (107 loc) · 3.19 KB
/
Pooling.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
<?php
class WebWorker extends Worker {
/*
* We accept a SafeLog which is shared among Workers and an array
* containing PDO connection parameters
*
* The PDO connection itself cannot be shared among contexts so
* is declared static (thread-local), so that each Worker has it's
* own PDO connection.
*/
public function __construct(SafeLog $logger, array $config = []) {
$this->logger = $logger;
$this->config = $config;
}
/*
* The only thing to do here is setup the PDO object
*/
public function run() {
if (isset($this->config)) {
self::$connection = new PDO(... $this->config);
}
}
public function getLogger() { return $this->logger; }
public function getConnection() { return self::$connection; }
private $logger;
private $config;
private static $connection;
}
class WebWork extends Threaded {
/*
* An example of some work that depends upon a shared logger
* and a thread-local PDO connection
*/
public function run() {
$logger = $this->worker->getLogger();
$logger->log("%s executing in Thread #%lu",
__CLASS__, $this->worker->getThreadId());
if ($this->worker->getConnection()) {
$logger->log("%s has PDO in Thread #%lu",
__CLASS__, $this->worker->getThreadId());
}
}
}
class SafeLog extends Threaded {
/*
* If logging were allowed to occur without synchronizing
* the output would be illegible
*/
public function log($message, ... $args) {
$this->synchronized(function($message, ... $args){
if (is_callable($message)) {
$message(...$args);
} else echo vsprintf("{$message}\n", ... $args);
}, $message, $args);
}
}
$logger = new SafeLog();
/*
* Constructing the Pool does not create any Threads
*/
$pool = new Pool(8, 'WebWorker', [$logger, ["sqlite:example.db"]]);
/*
* Only when there is work to do are threads created
*/
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
$pool->submit(new WebWork());
/*
* The Workers in the Pool retain references to the WebWork objects submitted
* in order to release that memory the Pool::collect method must be invoked in the same
* context that created the Pool.
*
* The Worker::collect method is invoked for every Worker in the Pool, the garbage list
* for each Worker is traversed and each Collectable is passed to the provided Closure.
*
* The Closure must return true if the Collectable can be removed from the garbage list.
*
* Worker::collect returns the size of the garbage list, Pool::collect returns the sum of the size of
* the garbage list of all Workers in the Pool.
*
* Collecting in a continuous loop will cause the garbage list to be emptied.
*/
while ($pool->collect(function($work) {
return $work->isGarbage();
})) continue;
/*
* We could submit more stuff here, the Pool is still waiting for Collectables
*/
$logger->log(function($pool) {
var_dump($pool);
}, $pool);
/*
* Shutdown Pools at the appropriate time, don't leave it to chance !
*/
$pool->shutdown();
?>