1
0
Fork 0
mirror of https://github.com/Yetangitu/ampache synced 2025-10-05 02:39:47 +02:00

Move from React 0.4.0 to 0.3.4 for PHP 5.3 compatibility reasons

This commit is contained in:
Afterster 2014-04-13 08:58:34 +02:00
parent d3f01a3bc7
commit 7f82bea13e
50 changed files with 608 additions and 1937 deletions

View file

@ -20,7 +20,7 @@ Recommended Version
------------------- -------------------
Currently, the recommended version is [git HEAD](https://github.com/ampache-doped/ampache-doped/archive/master.tar.gz). Currently, the recommended version is [git HEAD](https://github.com/ampache-doped/ampache-doped/archive/master.tar.gz).
[![Build Status](https://api.travis-ci.org/ampache-doped/ampache.png?branch=master)](https://travis-ci.org/ampache-doped/ampache) [![Build Status](https://api.travis-ci.org/ampache-doped/ampache-doped.png?branch=master)](https://travis-ci.org/ampache-doped/ampache-doped)
Requirements Requirements
------------ ------------

View file

@ -297,7 +297,7 @@ class Browse extends Query
} }
} }
} // set_filter_from_request } // set_filter_from_request
public function set_type($type, $custom_base = '') public function set_type($type, $custom_base = '')
{ {
$cn = 'browse_' . $type . '_pages'; $cn = 'browse_' . $type . '_pages';
@ -315,11 +315,12 @@ class Browse extends Query
$this->set_filter('regex_not_match', ''); $this->set_filter('regex_not_match', '');
} }
} }
parent::set_type($type, $custom_base); parent::set_type($type, $custom_base);
} }
public function save_cookie_params($option, $value) { public function save_cookie_params($option, $value)
{
if ($this->get_type()) { if ($this->get_type()) {
setcookie('browse_' . $this->get_type() . '_' . $option, $value, time() + 31536000, "/"); setcookie('browse_' . $this->get_type() . '_' . $option, $value, time() + 31536000, "/");
} }

View file

@ -2,7 +2,7 @@
namespace React\Cache; namespace React\Cache;
use React\Promise; use React\Promise\When;
class ArrayCache implements CacheInterface class ArrayCache implements CacheInterface
{ {
@ -11,10 +11,10 @@ class ArrayCache implements CacheInterface
public function get($key) public function get($key)
{ {
if (!isset($this->data[$key])) { if (!isset($this->data[$key])) {
return Promise\reject(); return When::reject();
} }
return Promise\resolve($this->data[$key]); return When::resolve($this->data[$key]);
} }
public function set($key, $value) public function set($key, $value)

View file

@ -4,15 +4,16 @@
"keywords": ["cache"], "keywords": ["cache"],
"license": "MIT", "license": "MIT",
"require": { "require": {
"php": ">=5.4.0", "php": ">=5.3.2",
"react/promise": "~2.0" "react/promise": "~1.0"
}, },
"autoload": { "autoload": {
"psr-4": { "React\\Cache\\": "" } "psr-0": { "React\\Cache": "" }
}, },
"target-dir": "React/Cache",
"extra": { "extra": {
"branch-alias": { "branch-alias": {
"dev-master": "0.4-dev" "dev-master": "0.3-dev"
} }
} }
} }

View file

@ -1,428 +0,0 @@
<?php
namespace React\ChildProcess;
use Evenement\EventEmitter;
use React\EventLoop\LoopInterface;
use React\EventLoop\Timer\Timer;
use React\Stream\Stream;
/**
* Process component.
*
* This class borrows logic from Symfony's Process component for ensuring
* compatibility when PHP is compiled with the --enable-sigchild option.
*
* @event exit
*/
class Process extends EventEmitter
{
public $stdin;
public $stdout;
public $stderr;
private $cmd;
private $cwd;
private $env;
private $options;
private $enhanceSigchildCompatibility;
private $pipes;
private $process;
private $status;
private $exitCode;
private $fallbackExitCode;
private $stopSignal;
private $termSignal;
private static $sigchild;
/**
* Constructor.
*
* @param string $cmd Command line to run
* @param string $cwd Current working directory or null to inherit
* @param array $env Environment variables or null to inherit
* @param array $options Options for proc_open()
* @throws RuntimeException When proc_open() is not installed
*/
public function __construct($cmd, $cwd = null, array $env = null, array $options = array())
{
if (!function_exists('proc_open')) {
throw new \RuntimeException('The Process class relies on proc_open(), which is not available on your PHP installation.');
}
$this->cmd = $cmd;
$this->cwd = $cwd;
if (null !== $env) {
$this->env = array();
foreach ($env as $key => $value) {
$this->env[(binary) $key] = (binary) $value;
}
}
$this->options = $options;
$this->enhanceSigchildCompatibility = $this->isSigchildEnabled();
}
/**
* Start the process.
*
* After the process is started, the standard IO streams will be constructed
* and available via public properties. STDIN will be paused upon creation.
*
* @param LoopInterface $loop Loop interface for stream construction
* @param float $interval Interval to periodically monitor process state (seconds)
* @throws RuntimeException If the process is already running or fails to start
*/
public function start(LoopInterface $loop, $interval = 0.1)
{
if ($this->isRunning()) {
throw new \RuntimeException('Process is already running');
}
$cmd = $this->cmd;
$fdSpec = array(
array('pipe', 'r'), // stdin
array('pipe', 'w'), // stdout
array('pipe', 'w'), // stderr
);
// Read exit code through fourth pipe to work around --enable-sigchild
if ($this->isSigchildEnabled() && $this->enhanceSigchildCompatibility) {
$fdSpec[] = array('pipe', 'w');
$cmd = sprintf('(%s) 3>/dev/null; code=$?; echo $code >&3; exit $code', $cmd);
}
$this->process = proc_open($cmd, $fdSpec, $this->pipes, $this->cwd, $this->env, $this->options);
if (!is_resource($this->process)) {
throw new \RuntimeException('Unable to launch a new process.');
}
$this->stdin = new Stream($this->pipes[0], $loop);
$this->stdin->pause();
$this->stdout = new Stream($this->pipes[1], $loop);
$this->stderr = new Stream($this->pipes[2], $loop);
foreach ($this->pipes as $pipe) {
stream_set_blocking($pipe, 0);
}
$loop->addPeriodicTimer($interval, function (Timer $timer) {
if (!$this->isRunning()) {
$this->close();
$timer->cancel();
$this->emit('exit', array($this->getExitCode(), $this->getTermSignal()));
}
});
}
/**
* Close the process.
*
* This method should only be invoked via the periodic timer that monitors
* the process state.
*/
public function close()
{
if ($this->process === null) {
return;
}
$this->stdin->close();
$this->stdout->close();
$this->stderr->close();
if ($this->isSigchildEnabled() && $this->enhanceSigchildCompatibility) {
$this->pollExitCodePipe();
$this->closeExitCodePipe();
}
$exitCode = proc_close($this->process);
$this->process = null;
if ($this->exitCode === null && $exitCode !== -1) {
$this->exitCode = $exitCode;
}
if ($this->exitCode === null && $this->status['exitcode'] !== -1) {
$this->exitCode = $this->status['exitcode'];
}
if ($this->exitCode === null && $this->fallbackExitCode !== null) {
$this->exitCode = $this->fallbackExitCode;
$this->fallbackExitCode = null;
}
}
/**
* Terminate the process with an optional signal.
*
* @param int $signal Optional signal (default: SIGTERM)
* @return boolean Whether the signal was sent successfully
*/
public function terminate($signal = null)
{
if ($signal !== null) {
return proc_terminate($this->process, $signal);
}
return proc_terminate($this->process);
}
/**
* Get the command string used to launch the process.
*
* @return string
*/
public function getCommand()
{
return $this->cmd;
}
/**
* Return whether sigchild compatibility is enabled.
*
* @return boolean
*/
public final function getEnhanceSigchildCompatibility()
{
return $this->enhanceSigchildCompatibility;
}
/**
* Enable or disable sigchild compatibility mode.
*
* Sigchild compatibility mode is required to get the exit code and
* determine the success of a process when PHP has been compiled with
* the --enable-sigchild option.
*
* @param boolean $enhance
* @return self
* @throws RuntimeException If the process is already running
*/
public final function setEnhanceSigchildCompatibility($enhance)
{
if ($this->isRunning()) {
throw new \RuntimeException('Process is already running');
}
$this->enhanceSigchildCompatibility = (bool) $enhance;
return $this;
}
/**
* Get the exit code returned by the process.
*
* This value is only meaningful if isRunning() has returned false. Null
* will be returned if the process is still running.
*
* Null may also be returned if the process has terminated, but the exit
* code could not be determined (e.g. sigchild compatibility was disabled).
*
* @return int|null
*/
public function getExitCode()
{
return $this->exitCode;
}
/**
* Get the process ID.
*
* @return int|null
*/
public function getPid()
{
$status = $this->getCachedStatus();
return $status !== null ? $status['pid'] : null;
}
/**
* Get the signal that caused the process to stop its execution.
*
* This value is only meaningful if isStopped() has returned true. Null will
* be returned if the process was never stopped.
*
* @return int|null
*/
public function getStopSignal()
{
return $this->stopSignal;
}
/**
* Get the signal that caused the process to terminate its execution.
*
* This value is only meaningful if isTerminated() has returned true. Null
* will be returned if the process was never terminated.
*
* @return int|null
*/
public function getTermSignal()
{
return $this->termSignal;
}
/**
* Return whether the process is still running.
*
* @return boolean
*/
public function isRunning()
{
if ($this->process === null) {
return false;
}
$status = $this->getFreshStatus();
return $status !== null ? $status['running'] : false;
}
/**
* Return whether the process has been stopped by a signal.
*
* @return boolean
*/
public function isStopped()
{
$status = $this->getFreshStatus();
return $status !== null ? $status['stopped'] : false;
}
/**
* Return whether the process has been terminated by an uncaught signal.
*
* @return boolean
*/
public function isTerminated()
{
$status = $this->getFreshStatus();
return $status !== null ? $status['signaled'] : false;
}
/**
* Return whether PHP has been compiled with the '--enable-sigchild' option.
*
* @see \Symfony\Component\Process\Process::isSigchildEnabled()
* @return bool
*/
public final static function isSigchildEnabled()
{
if (null !== self::$sigchild) {
return self::$sigchild;
}
ob_start();
phpinfo(INFO_GENERAL);
return self::$sigchild = false !== strpos(ob_get_clean(), '--enable-sigchild');
}
/**
* Check the fourth pipe for an exit code.
*
* This should only be used if --enable-sigchild compatibility was enabled.
*/
private function pollExitCodePipe()
{
if ( ! isset($this->pipes[3])) {
return;
}
$r = array($this->pipes[3]);
$w = $e = null;
$n = @stream_select($r, $w, $e, 0);
if (1 !== $n) {
return;
}
$data = fread($r[0], 8192);
if (strlen($data) > 0) {
$this->fallbackExitCode = (int) $data;
}
}
/**
* Close the fourth pipe used to relay an exit code.
*
* This should only be used if --enable-sigchild compatibility was enabled.
*/
private function closeExitCodePipe()
{
if ( ! isset($this->pipes[3])) {
return;
}
fclose($this->pipes[3]);
unset($this->pipes[3]);
}
/**
* Return the cached process status.
*
* @return array
*/
private function getCachedStatus()
{
if ($this->status === null) {
$this->updateStatus();
}
return $this->status;
}
/**
* Return the updated process status.
*
* @return array
*/
private function getFreshStatus()
{
$this->updateStatus();
return $this->status;
}
/**
* Update the process status, stop/term signals, and exit code.
*
* Stop/term signals are only updated if the process is currently stopped or
* signaled, respectively. Otherwise, signal values will remain as-is so the
* corresponding getter methods may be used at a later point in time.
*/
private function updateStatus()
{
if ($this->process === null) {
return;
}
$this->status = proc_get_status($this->process);
if ($this->status === false) {
throw new \UnexpectedValueException('proc_get_status() failed');
}
if ($this->status['stopped']) {
$this->stopSignal = $this->status['stopsig'];
}
if ($this->status['signaled']) {
$this->termSignal = $this->status['termsig'];
}
if (!$this->status['running'] && -1 !== $this->status['exitcode']) {
$this->exitCode = $this->status['exitcode'];
}
}
}

View file

@ -1,93 +0,0 @@
# Child Process Component
Library for executing child processes.
## Introduction
This library integrates the
[Program Execution](http://php.net/manual/en/book.exec.php) extension in PHP
with React's event loop.
Child processes launched within the event loop may be signaled and will emit an
`exit` event upon termination. Additionally, process I/O streams (i.e. stdin,
stdout, stderr) are registered with the loop.
## Processes
### EventEmitter Events
* `exit`: Emitted whenever the process is no longer running. Event listeners
will receive the exit code and termination signal as two arguments.
### Methods
* `start()`: Launches the process and registers its IO streams with the event
loop. The stdin stream will be left in a paused state.
* `terminate()`: Send the process a signal (SIGTERM by default).
There are additional public methods on the Process class, which may be used to
access fields otherwise available through `proc_get_status()`.
### Stream Properties
Once a process is started, its I/O streams will be constructed as instances of
`React\Stream\Stream`. Before `start()` is called, these properties are `null`.
Once a process terminates, the streams will become closed but not unset.
* `$stdin`
* `$stdout`
* `$stderr`
## Usage
$loop = React\EventLoop\Factory::create();
$process = new React\ChildProcess\Process('echo foo');
$process->on('exit', function($exitCode, $termSignal) {
// ...
});
$loop->addTimer(0.001, function($timer) use ($process) {
$process->start($timer->getLoop());
$process->stdout->on('data', function($output) {
// ...
});
});
$loop->run();
### Prepending Commands with `exec`
Symfony pull request [#5759](https://github.com/symfony/symfony/issues/5759)
documents a caveat with the
[Program Execution](http://php.net/manual/en/book.exec.php) extension. PHP will
launch processes via `sh`, which obfuscates the underlying process' PID and
complicates signaling (our process becomes a child of `sh`). As a work-around,
prepend the command string with `exec`, which will cause the `sh` process to be
replaced by our process.
### Sigchild Compatibility
When PHP has been compiled with the `--enabled-sigchild` option, a child
process' exit code cannot be reliably determined via `proc_close()` or
`proc_get_status()`. Instead, we execute the child process with a fourth pipe
and use that to retrieve its exit code.
This behavior is used by default and only when necessary. It may be manually
disabled by calling `setEnhanceSigchildCompatibility(false)` on the Process
before it is started, in which case the `exit` event may receive `null` instead
of the actual exit code.
**Note:** This functionality was taken from Symfony's
[Process](https://github.com/symfony/process) compoment.
### Command Chaining
Command chaning with `&&` or `;`, while possible with `proc_open()`, should not
be used with this component. There is currently no way to discern when each
process in a chain ends, which would complicate working with I/O streams. As an
alternative, considering launching one process at a time and listening on its
`exit` event to conditionally start the next process in the chain. This will
give you an opportunity to configure the subsequent process' I/O streams.

View file

@ -1,20 +0,0 @@
{
"name": "react/child-process",
"description": "Library for executing child processes.",
"keywords": ["process"],
"license": "MIT",
"require": {
"php": ">=5.4.0",
"evenement/evenement": "~2.0",
"react/event-loop": "0.4.*",
"react/stream": "0.4.*"
},
"autoload": {
"psr-4": { "React\\ChildProcess\\": "" }
},
"extra": {
"branch-alias": {
"dev-master": "0.4-dev"
}
}
}

View file

@ -3,8 +3,8 @@
namespace React\Dns\Config; namespace React\Dns\Config;
use React\EventLoop\LoopInterface; use React\EventLoop\LoopInterface;
use React\Promise;
use React\Promise\Deferred; use React\Promise\Deferred;
use React\Promise\When;
use React\Stream\Stream; use React\Stream\Stream;
class FilesystemFactory class FilesystemFactory
@ -38,13 +38,13 @@ class FilesystemFactory
$config = new Config(); $config = new Config();
$config->nameservers = $nameservers; $config->nameservers = $nameservers;
return Promise\resolve($config); return When::resolve($config);
} }
public function loadEtcResolvConf($filename) public function loadEtcResolvConf($filename)
{ {
if (!file_exists($filename)) { if (!file_exists($filename)) {
return Promise\reject(new \InvalidArgumentException("The filename for /etc/resolv.conf given does not exist: $filename")); return When::reject(new \InvalidArgumentException("The filename for /etc/resolv.conf given does not exist: $filename"));
} }
try { try {
@ -68,7 +68,7 @@ class FilesystemFactory
return $deferred->promise(); return $deferred->promise();
} catch (\Exception $e) { } catch (\Exception $e) {
return Promise\reject($e); return When::reject($e);
} }
} }
} }

View file

@ -4,6 +4,7 @@ namespace React\Dns\Query;
use React\Dns\Model\Message; use React\Dns\Model\Message;
use React\Dns\Model\Record; use React\Dns\Model\Record;
use React\Promise\When;
class CachedExecutor implements ExecutorInterface class CachedExecutor implements ExecutorInterface
{ {
@ -18,14 +19,15 @@ class CachedExecutor implements ExecutorInterface
public function query($nameserver, Query $query) public function query($nameserver, Query $query)
{ {
$that = $this;
$executor = $this->executor; $executor = $this->executor;
$cache = $this->cache; $cache = $this->cache;
return $this->cache return $this->cache
->lookup($query) ->lookup($query)
->then( ->then(
function ($cachedRecords) use ($query) { function ($cachedRecords) use ($that, $query) {
return $this->buildResponse($query, $cachedRecords); return $that->buildResponse($query, $cachedRecords);
}, },
function () use ($executor, $cache, $nameserver, $query) { function () use ($executor, $cache, $nameserver, $query) {
return $executor return $executor

View file

@ -48,14 +48,15 @@ class Executor implements ExecutorInterface
public function doQuery($nameserver, $transport, $queryData, $name) public function doQuery($nameserver, $transport, $queryData, $name)
{ {
$that = $this;
$parser = $this->parser; $parser = $this->parser;
$loop = $this->loop; $loop = $this->loop;
$response = new Message(); $response = new Message();
$deferred = new Deferred(); $deferred = new Deferred();
$retryWithTcp = function () use ($nameserver, $queryData, $name) { $retryWithTcp = function () use ($that, $nameserver, $queryData, $name) {
return $this->doQuery($nameserver, 'tcp', $queryData, $name); return $that->doQuery($nameserver, 'tcp', $queryData, $name);
}; };
$timer = $this->loop->addTimer($this->timeout, function () use (&$conn, $name, $deferred) { $timer = $this->loop->addTimer($this->timeout, function () use (&$conn, $name, $deferred) {

View file

@ -5,7 +5,7 @@ namespace React\Dns\Query;
use React\Cache\CacheInterface; use React\Cache\CacheInterface;
use React\Dns\Model\Message; use React\Dns\Model\Message;
use React\Dns\Model\Record; use React\Dns\Model\Record;
use React\Promise; use React\Promise\When;
class RecordCache class RecordCache
{ {
@ -29,7 +29,7 @@ class RecordCache
$recordBag = unserialize($value); $recordBag = unserialize($value);
if (null !== $expiredAt && $expiredAt <= $query->currentTime) { if (null !== $expiredAt && $expiredAt <= $query->currentTime) {
return Promise\reject(); return When::reject();
} }
return $recordBag->all(); return $recordBag->all();

View file

@ -19,16 +19,17 @@ class RetryExecutor implements ExecutorInterface
{ {
$deferred = new Deferred(); $deferred = new Deferred();
$this->tryQuery($nameserver, $query, $this->retries, $deferred); $this->tryQuery($nameserver, $query, $this->retries, $deferred->resolver());
return $deferred->promise(); return $deferred->promise();
} }
public function tryQuery($nameserver, Query $query, $retries, $deferred) public function tryQuery($nameserver, Query $query, $retries, $resolver)
{ {
$errorback = function ($error) use ($nameserver, $query, $retries, $deferred) { $that = $this;
$errorback = function ($error) use ($nameserver, $query, $retries, $resolver, $that) {
if (!$error instanceof TimeoutException) { if (!$error instanceof TimeoutException) {
$deferred->reject($error); $resolver->reject($error);
return; return;
} }
if (0 >= $retries) { if (0 >= $retries) {
@ -37,14 +38,14 @@ class RetryExecutor implements ExecutorInterface
0, 0,
$error $error
); );
$deferred->reject($error); $resolver->reject($error);
return; return;
} }
$this->tryQuery($nameserver, $query, $retries-1, $deferred); $that->tryQuery($nameserver, $query, $retries-1, $resolver);
}; };
$this->executor $this->executor
->query($nameserver, $query) ->query($nameserver, $query)
->then(array($deferred, 'resolve'), $errorback); ->then(array($resolver, 'resolve'), $errorback);
} }
} }

View file

@ -20,79 +20,40 @@ class Resolver
public function resolve($domain) public function resolve($domain)
{ {
$that = $this;
$query = new Query($domain, Message::TYPE_A, Message::CLASS_IN, time()); $query = new Query($domain, Message::TYPE_A, Message::CLASS_IN, time());
return $this->executor return $this->executor
->query($this->nameserver, $query) ->query($this->nameserver, $query)
->then(function (Message $response) use ($query) { ->then(function (Message $response) use ($that) {
return $this->extractAddress($query, $response); return $that->extractAddress($response, Message::TYPE_A);
}); });
} }
public function extractAddress(Query $query, Message $response) public function extractAddress(Message $response, $type)
{ {
$answers = $response->answers; $answer = $this->pickRandomAnswerOfType($response, $type);
$address = $answer->data;
$addresses = $this->resolveAliases($answers, $query->name);
if (0 === count($addresses)) {
$message = 'DNS Request did not return valid answer.';
throw new RecordNotFoundException($message);
}
$address = $addresses[array_rand($addresses)];
return $address; return $address;
} }
public function resolveAliases(array $answers, $name) public function pickRandomAnswerOfType(Message $response, $type)
{ {
$named = $this->filterByName($answers, $name); // TODO: filter by name to make sure domain matches
$aRecords = $this->filterByType($named, Message::TYPE_A); // TODO: resolve CNAME aliases
$cnameRecords = $this->filterByType($named, Message::TYPE_CNAME);
if ($aRecords) { $filteredAnswers = array_filter($response->answers, function ($answer) use ($type) {
return $this->mapRecordData($aRecords); return $type === $answer->type;
}
if ($cnameRecords) {
$aRecords = array();
$cnames = $this->mapRecordData($cnameRecords);
foreach ($cnames as $cname) {
$targets = $this->filterByName($answers, $cname);
$aRecords = array_merge(
$aRecords,
$this->resolveAliases($answers, $cname)
);
}
return $aRecords;
}
return array();
}
private function filterByName(array $answers, $name)
{
return $this->filterByField($answers, 'name', $name);
}
private function filterByType(array $answers, $type)
{
return $this->filterByField($answers, 'type', $type);
}
private function filterByField(array $answers, $field, $value)
{
return array_filter($answers, function ($answer) use ($field, $value) {
return $value === $answer->$field;
}); });
}
private function mapRecordData(array $records) if (0 === count($filteredAnswers)) {
{ $message = sprintf('DNS Request did not return valid answer. Received answers: %s', json_encode($response->answers));
return array_map(function ($record) { throw new RecordNotFoundException($message);
return $record->data; }
}, $records);
$answer = $filteredAnswers[array_rand($filteredAnswers)];
return $answer;
} }
} }

View file

@ -4,17 +4,18 @@
"keywords": ["dns", "dns-resolver"], "keywords": ["dns", "dns-resolver"],
"license": "MIT", "license": "MIT",
"require": { "require": {
"php": ">=5.4.0", "php": ">=5.3.2",
"react/cache": "0.4.*", "react/cache": "0.3.*",
"react/socket": "0.4.*", "react/socket": "0.3.*",
"react/promise": "~2.0" "react/promise": "~1.0"
}, },
"autoload": { "autoload": {
"psr-0": { "React\\Dns\\": "" } "psr-0": { "React\\Dns": "" }
}, },
"target-dir": "React/Dns",
"extra": { "extra": {
"branch-alias": { "branch-alias": {
"dev-master": "0.4-dev" "dev-master": "0.3-dev"
} }
} }
} }

View file

@ -1,326 +0,0 @@
<?php
namespace React\EventLoop;
use Event;
use EventBase;
use React\EventLoop\Tick\FutureTickQueue;
use React\EventLoop\Tick\NextTickQueue;
use React\EventLoop\Timer\Timer;
use React\EventLoop\Timer\TimerInterface;
use SplObjectStorage;
/**
* An ext-event based event-loop.
*/
class ExtEventLoop implements LoopInterface
{
private $eventBase;
private $nextTickQueue;
private $futureTickQueue;
private $timerCallback;
private $timerEvents;
private $streamCallback;
private $streamEvents = [];
private $streamFlags = [];
private $readListeners = [];
private $writeListeners = [];
private $running;
public function __construct()
{
$this->eventBase = new EventBase();
$this->nextTickQueue = new NextTickQueue($this);
$this->futureTickQueue = new FutureTickQueue($this);
$this->timerEvents = new SplObjectStorage();
$this->createTimerCallback();
$this->createStreamCallback();
}
/**
* {@inheritdoc}
*/
public function addReadStream($stream, callable $listener)
{
$key = (int) $stream;
if (!isset($this->readListeners[$key])) {
$this->readListeners[$key] = $listener;
$this->subscribeStreamEvent($stream, Event::READ);
}
}
/**
* {@inheritdoc}
*/
public function addWriteStream($stream, callable $listener)
{
$key = (int) $stream;
if (!isset($this->writeListeners[$key])) {
$this->writeListeners[$key] = $listener;
$this->subscribeStreamEvent($stream, Event::WRITE);
}
}
/**
* {@inheritdoc}
*/
public function removeReadStream($stream)
{
$key = (int) $stream;
if (isset($this->readListeners[$key])) {
unset($this->readListeners[$key]);
$this->unsubscribeStreamEvent($stream, Event::READ);
}
}
/**
* {@inheritdoc}
*/
public function removeWriteStream($stream)
{
$key = (int) $stream;
if (isset($this->writeListeners[$key])) {
unset($this->writeListeners[$key]);
$this->unsubscribeStreamEvent($stream, Event::WRITE);
}
}
/**
* {@inheritdoc}
*/
public function removeStream($stream)
{
$key = (int) $stream;
if (isset($this->streamEvents[$key])) {
$this->streamEvents[$key]->free();
unset(
$this->streamFlags[$key],
$this->streamEvents[$key],
$this->readListeners[$key],
$this->writeListeners[$key]
);
}
}
/**
* {@inheritdoc}
*/
public function addTimer($interval, callable $callback)
{
$timer = new Timer($this, $interval, $callback, false);
$this->scheduleTimer($timer);
return $timer;
}
/**
* {@inheritdoc}
*/
public function addPeriodicTimer($interval, callable $callback)
{
$timer = new Timer($this, $interval, $callback, true);
$this->scheduleTimer($timer);
return $timer;
}
/**
* {@inheritdoc}
*/
public function cancelTimer(TimerInterface $timer)
{
if ($this->isTimerActive($timer)) {
$this->timerEvents[$timer]->free();
$this->timerEvents->detach($timer);
}
}
/**
* {@inheritdoc}
*/
public function isTimerActive(TimerInterface $timer)
{
return $this->timerEvents->contains($timer);
}
/**
* {@inheritdoc}
*/
public function nextTick(callable $listener)
{
$this->nextTickQueue->add($listener);
}
/**
* {@inheritdoc}
*/
public function futureTick(callable $listener)
{
$this->futureTickQueue->add($listener);
}
/**
* {@inheritdoc}
*/
public function tick()
{
$this->nextTickQueue->tick();
$this->futureTickQueue->tick();
// @-suppression: https://github.com/reactphp/react/pull/234#discussion-diff-7759616R226
@$this->eventBase->loop(EventBase::LOOP_ONCE | EventBase::LOOP_NONBLOCK);
}
/**
* {@inheritdoc}
*/
public function run()
{
$this->running = true;
while ($this->running) {
$this->nextTickQueue->tick();
$this->futureTickQueue->tick();
$flags = EventBase::LOOP_ONCE;
if (!$this->running || !$this->nextTickQueue->isEmpty() || !$this->futureTickQueue->isEmpty()) {
$flags |= EventBase::LOOP_NONBLOCK;
} elseif (!$this->streamEvents && !$this->timerEvents->count()) {
break;
}
// @-suppression: https://github.com/reactphp/react/pull/234#discussion-diff-7759616R226
@$this->eventBase->loop($flags);
}
}
/**
* {@inheritdoc}
*/
public function stop()
{
$this->running = false;
}
/**
* Schedule a timer for execution.
*
* @param TimerInterface $timer
*/
private function scheduleTimer(TimerInterface $timer)
{
$flags = Event::TIMEOUT;
if ($timer->isPeriodic()) {
$flags |= Event::PERSIST;
}
$event = new Event($this->eventBase, -1, $flags, $this->timerCallback, $timer);
$this->timerEvents[$timer] = $event;
$event->add($timer->getInterval());
}
/**
* Create a new ext-event Event object, or update the existing one.
*
* @param stream $stream
* @param integer $flag Event::READ or Event::WRITE
*/
private function subscribeStreamEvent($stream, $flag)
{
$key = (int) $stream;
if (isset($this->streamEvents[$key])) {
$event = $this->streamEvents[$key];
$flags = ($this->streamFlags[$key] |= $flag);
$event->del();
$event->set($this->eventBase, $stream, Event::PERSIST | $flags, $this->streamCallback);
} else {
$event = new Event($this->eventBase, $stream, Event::PERSIST | $flag, $this->streamCallback);
$this->streamEvents[$key] = $event;
$this->streamFlags[$key] = $flag;
}
$event->add();
}
/**
* Update the ext-event Event object for this stream to stop listening to
* the given event type, or remove it entirely if it's no longer needed.
*
* @param stream $stream
* @param integer $flag Event::READ or Event::WRITE
*/
private function unsubscribeStreamEvent($stream, $flag)
{
$key = (int) $stream;
$flags = $this->streamFlags[$key] &= ~$flag;
if (0 === $flags) {
$this->removeStream($stream);
return;
}
$event = $this->streamEvents[$key];
$event->del();
$event->set($this->eventBase, $stream, Event::PERSIST | $flags, $this->streamCallback);
$event->add();
}
/**
* Create a callback used as the target of timer events.
*
* A reference is kept to the callback for the lifetime of the loop
* to prevent "Cannot destroy active lambda function" fatal error from
* the event extension.
*/
private function createTimerCallback()
{
$this->timerCallback = function ($_, $_, $timer) {
call_user_func($timer->getCallback(), $timer);
if (!$timer->isPeriodic() && $this->isTimerActive($timer)) {
$this->cancelTimer($timer);
}
};
}
/**
* Create a callback used as the target of stream events.
*
* A reference is kept to the callback for the lifetime of the loop
* to prevent "Cannot destroy active lambda function" fatal error from
* the event extension.
*/
private function createStreamCallback()
{
$this->streamCallback = function ($stream, $flags) {
$key = (int) $stream;
if (Event::READ === (Event::READ & $flags) && isset($this->readListeners[$key])) {
call_user_func($this->readListeners[$key], $stream, $this);
}
if (Event::WRITE === (Event::WRITE & $flags) && isset($this->writeListeners[$key])) {
call_user_func($this->writeListeners[$key], $stream, $this);
}
};
}
}

View file

@ -2,6 +2,9 @@
namespace React\EventLoop; namespace React\EventLoop;
use React\EventLoop\StreamSelectLoop;
use React\EventLoop\LibEventLoop;
class Factory class Factory
{ {
public static function create() public static function create()
@ -9,10 +12,6 @@ class Factory
// @codeCoverageIgnoreStart // @codeCoverageIgnoreStart
if (function_exists('event_base_new')) { if (function_exists('event_base_new')) {
return new LibEventLoop(); return new LibEventLoop();
} else if (class_exists('libev\EventLoop')) {
return new LibEvLoop;
} else if (class_exists('EventBase')) {
return new ExtEventLoop;
} }
return new StreamSelectLoop(); return new StreamSelectLoop();

View file

@ -2,14 +2,9 @@
namespace React\EventLoop; namespace React\EventLoop;
use libev\EventLoop; use SplObjectStorage;
use libev\IOEvent;
use libev\TimerEvent;
use React\EventLoop\Tick\FutureTickQueue;
use React\EventLoop\Tick\NextTickQueue;
use React\EventLoop\Timer\Timer; use React\EventLoop\Timer\Timer;
use React\EventLoop\Timer\TimerInterface; use React\EventLoop\Timer\TimerInterface;
use SplObjectStorage;
/** /**
* @see https://github.com/m4rw3r/php-libev * @see https://github.com/m4rw3r/php-libev
@ -18,201 +13,140 @@ use SplObjectStorage;
class LibEvLoop implements LoopInterface class LibEvLoop implements LoopInterface
{ {
private $loop; private $loop;
private $nextTickQueue; private $timers;
private $futureTickQueue; private $readEvents = array();
private $timerEvents; private $writeEvents = array();
private $readEvents = [];
private $writeEvents = [];
private $running;
public function __construct() public function __construct()
{ {
$this->loop = new EventLoop(); $this->loop = new \libev\EventLoop();
$this->nextTickQueue = new NextTickQueue($this); $this->timers = new SplObjectStorage();
$this->futureTickQueue = new FutureTickQueue($this);
$this->timerEvents = new SplObjectStorage();
} }
/** public function addReadStream($stream, $listener)
* {@inheritdoc}
*/
public function addReadStream($stream, callable $listener)
{ {
$callback = function () use ($stream, $listener) { $this->addStream($stream, $listener, \libev\IOEvent::READ);
call_user_func($listener, $stream, $this);
};
$event = new IOEvent($callback, $stream, IOEvent::READ);
$this->loop->add($event);
$this->readEvents[(int) $stream] = $event;
} }
/** public function addWriteStream($stream, $listener)
* {@inheritdoc}
*/
public function addWriteStream($stream, callable $listener)
{ {
$callback = function () use ($stream, $listener) { $this->addStream($stream, $listener, \libev\IOEvent::WRITE);
call_user_func($listener, $stream, $this);
};
$event = new IOEvent($callback, $stream, IOEvent::WRITE);
$this->loop->add($event);
$this->writeEvents[(int) $stream] = $event;
} }
/**
* {@inheritdoc}
*/
public function removeReadStream($stream) public function removeReadStream($stream)
{ {
$key = (int) $stream; if (isset($this->readEvents[(int)$stream])) {
$this->readEvents[(int)$stream]->stop();
if (isset($this->readEvents[$key])) { unset($this->readEvents[(int)$stream]);
$this->readEvents[$key]->stop();
unset($this->readEvents[$key]);
} }
} }
/**
* {@inheritdoc}
*/
public function removeWriteStream($stream) public function removeWriteStream($stream)
{ {
$key = (int) $stream; if (isset($this->writeEvents[(int)$stream])) {
$this->writeEvents[(int)$stream]->stop();
if (isset($this->writeEvents[$key])) { unset($this->writeEvents[(int)$stream]);
$this->writeEvents[$key]->stop();
unset($this->writeEvents[$key]);
} }
} }
/**
* {@inheritdoc}
*/
public function removeStream($stream) public function removeStream($stream)
{ {
$this->removeReadStream($stream); $this->removeReadStream($stream);
$this->removeWriteStream($stream); $this->removeWriteStream($stream);
} }
/** private function addStream($stream, $listener, $flags)
* {@inheritdoc} {
*/ $listener = $this->wrapStreamListener($stream, $listener, $flags);
public function addTimer($interval, callable $callback) $event = new \libev\IOEvent($listener, $stream, $flags);
$this->loop->add($event);
if (($flags & \libev\IOEvent::READ) === $flags) {
$this->readEvents[(int)$stream] = $event;
} elseif (($flags & \libev\IOEvent::WRITE) === $flags) {
$this->writeEvents[(int)$stream] = $event;
}
}
private function wrapStreamListener($stream, $listener, $flags)
{
if (($flags & \libev\IOEvent::READ) === $flags) {
$removeCallback = array($this, 'removeReadStream');
} elseif (($flags & \libev\IOEvent::WRITE) === $flags) {
$removeCallback = array($this, 'removeWriteStream');
}
return function ($event) use ($stream, $listener, $removeCallback) {
call_user_func($listener, $stream);
};
}
public function addTimer($interval, $callback)
{ {
$timer = new Timer($this, $interval, $callback, false); $timer = new Timer($this, $interval, $callback, false);
$this->setupTimer($timer);
$callback = function () use ($timer) {
call_user_func($timer->getCallback(), $timer);
if ($this->isTimerActive($timer)) {
$this->cancelTimer($timer);
}
};
$event = new TimerEvent($callback, $timer->getInterval());
$this->timerEvents->attach($timer, $event);
$this->loop->add($event);
return $timer; return $timer;
} }
/** public function addPeriodicTimer($interval, $callback)
* {@inheritdoc}
*/
public function addPeriodicTimer($interval, callable $callback)
{ {
$timer = new Timer($this, $interval, $callback, true); $timer = new Timer($this, $interval, $callback, true);
$this->setupTimer($timer);
$callback = function () use ($timer) {
call_user_func($timer->getCallback(), $timer);
};
$event = new TimerEvent($callback, $interval, $interval);
$this->timerEvents->attach($timer, $event);
$this->loop->add($event);
return $timer; return $timer;
} }
/**
* {@inheritdoc}
*/
public function cancelTimer(TimerInterface $timer) public function cancelTimer(TimerInterface $timer)
{ {
if (isset($this->timerEvents[$timer])) { if (isset($this->timers[$timer])) {
$this->loop->remove($this->timerEvents[$timer]); $this->loop->remove($this->timers[$timer]);
$this->timerEvents->detach($timer); $this->timers->detach($timer);
} }
} }
/** private function setupTimer(TimerInterface $timer)
* {@inheritdoc} {
*/ $dummyCallback = function () {};
$interval = $timer->getInterval();
if ($timer->isPeriodic()) {
$libevTimer = new \libev\TimerEvent($dummyCallback, $interval, $interval);
} else {
$libevTimer = new \libev\TimerEvent($dummyCallback, $interval);
}
$libevTimer->setCallback(function () use ($timer) {
call_user_func($timer->getCallback(), $timer);
if (!$timer->isPeriodic()) {
$timer->cancel();
}
});
$this->timers->attach($timer, $libevTimer);
$this->loop->add($libevTimer);
return $timer;
}
public function isTimerActive(TimerInterface $timer) public function isTimerActive(TimerInterface $timer)
{ {
return $this->timerEvents->contains($timer); return $this->timers->contains($timer);
} }
/**
* {@inheritdoc}
*/
public function nextTick(callable $listener)
{
$this->nextTickQueue->add($listener);
}
/**
* {@inheritdoc}
*/
public function futureTick(callable $listener)
{
$this->futureTickQueue->add($listener);
}
/**
* {@inheritdoc}
*/
public function tick() public function tick()
{ {
$this->nextTickQueue->tick(); $this->loop->run(\libev\EventLoop::RUN_ONCE);
$this->futureTickQueue->tick();
$this->loop->run(EventLoop::RUN_ONCE | EventLoop::RUN_NOWAIT);
} }
/**
* {@inheritdoc}
*/
public function run() public function run()
{ {
$this->running = true; $this->loop->run();
while ($this->running) {
$this->nextTickQueue->tick();
$this->futureTickQueue->tick();
$flags = EventLoop::RUN_ONCE;
if (!$this->running || !$this->nextTickQueue->isEmpty() || !$this->futureTickQueue->isEmpty()) {
$flags |= EventLoop::RUN_NOWAIT;
} elseif (!$this->readEvents && !$this->writeEvents && !$this->timerEvents->count()) {
break;
}
$this->loop->run($flags);
}
} }
/**
* {@inheritdoc}
*/
public function stop() public function stop()
{ {
$this->running = false; $this->loop->breakLoop();
} }
} }

View file

@ -2,342 +2,221 @@
namespace React\EventLoop; namespace React\EventLoop;
use Event; use SplObjectStorage;
use EventBase;
use React\EventLoop\Tick\FutureTickQueue;
use React\EventLoop\Tick\NextTickQueue;
use React\EventLoop\Timer\Timer; use React\EventLoop\Timer\Timer;
use React\EventLoop\Timer\TimerInterface; use React\EventLoop\Timer\TimerInterface;
use SplObjectStorage;
/**
* An ext-libevent based event-loop.
*/
class LibEventLoop implements LoopInterface class LibEventLoop implements LoopInterface
{ {
const MICROSECONDS_PER_SECOND = 1000000; const MIN_TIMER_RESOLUTION = 0.001;
private $eventBase; private $base;
private $nextTickQueue; private $callback;
private $futureTickQueue; private $timers;
private $timerCallback;
private $timerEvents; private $events = array();
private $streamCallback; private $flags = array();
private $streamEvents = []; private $readCallbacks = array();
private $streamFlags = []; private $writeCallbacks = array();
private $readListeners = [];
private $writeListeners = [];
private $running;
public function __construct() public function __construct()
{ {
$this->eventBase = event_base_new(); $this->base = event_base_new();
$this->nextTickQueue = new NextTickQueue($this); $this->callback = $this->createLibeventCallback();
$this->futureTickQueue = new FutureTickQueue($this); $this->timers = new SplObjectStorage();
$this->timerEvents = new SplObjectStorage();
$this->createTimerCallback();
$this->createStreamCallback();
} }
/** protected function createLibeventCallback()
* {@inheritdoc}
*/
public function addReadStream($stream, callable $listener)
{ {
$key = (int) $stream; $readCallbacks = &$this->readCallbacks;
$writeCallbacks = &$this->writeCallbacks;
if (!isset($this->readListeners[$key])) { return function ($stream, $flags, $loop) use (&$readCallbacks, &$writeCallbacks) {
$this->readListeners[$key] = $listener; $id = (int) $stream;
$this->subscribeStreamEvent($stream, EV_READ);
}
}
/** try {
* {@inheritdoc} if (($flags & EV_READ) === EV_READ && isset($readCallbacks[$id])) {
*/ call_user_func($readCallbacks[$id], $stream, $loop);
public function addWriteStream($stream, callable $listener) }
{
$key = (int) $stream;
if (!isset($this->writeListeners[$key])) { if (($flags & EV_WRITE) === EV_WRITE && isset($writeCallbacks[$id])) {
$this->writeListeners[$key] = $listener; call_user_func($writeCallbacks[$id], $stream, $loop);
$this->subscribeStreamEvent($stream, EV_WRITE); }
} } catch (\Exception $ex) {
} // If one of the callbacks throws an exception we must stop the loop
// otherwise libevent will swallow the exception and go berserk.
$loop->stop();
/** throw $ex;
* {@inheritdoc}
*/
public function removeReadStream($stream)
{
$key = (int) $stream;
if (isset($this->readListeners[$key])) {
unset($this->readListeners[$key]);
$this->unsubscribeStreamEvent($stream, EV_READ);
}
}
/**
* {@inheritdoc}
*/
public function removeWriteStream($stream)
{
$key = (int) $stream;
if (isset($this->writeListeners[$key])) {
unset($this->writeListeners[$key]);
$this->unsubscribeStreamEvent($stream, EV_WRITE);
}
}
/**
* {@inheritdoc}
*/
public function removeStream($stream)
{
$key = (int) $stream;
if (isset($this->streamEvents[$key])) {
$event = $this->streamEvents[$key];
event_del($event);
event_free($event);
unset(
$this->streamFlags[$key],
$this->streamEvents[$key],
$this->readListeners[$key],
$this->writeListeners[$key]
);
}
}
/**
* {@inheritdoc}
*/
public function addTimer($interval, callable $callback)
{
$timer = new Timer($this, $interval, $callback, false);
$this->scheduleTimer($timer);
return $timer;
}
/**
* {@inheritdoc}
*/
public function addPeriodicTimer($interval, callable $callback)
{
$timer = new Timer($this, $interval, $callback, true);
$this->scheduleTimer($timer);
return $timer;
}
/**
* {@inheritdoc}
*/
public function cancelTimer(TimerInterface $timer)
{
if ($this->isTimerActive($timer)) {
$event = $this->timerEvents[$timer];
event_del($event);
event_free($event);
$this->timerEvents->detach($timer);
}
}
/**
* {@inheritdoc}
*/
public function isTimerActive(TimerInterface $timer)
{
return $this->timerEvents->contains($timer);
}
/**
* {@inheritdoc}
*/
public function nextTick(callable $listener)
{
$this->nextTickQueue->add($listener);
}
/**
* {@inheritdoc}
*/
public function futureTick(callable $listener)
{
$this->futureTickQueue->add($listener);
}
/**
* {@inheritdoc}
*/
public function tick()
{
$this->nextTickQueue->tick();
$this->futureTickQueue->tick();
event_base_loop($this->eventBase, EVLOOP_ONCE | EVLOOP_NONBLOCK);
}
/**
* {@inheritdoc}
*/
public function run()
{
$this->running = true;
while ($this->running) {
$this->nextTickQueue->tick();
$this->futureTickQueue->tick();
$flags = EVLOOP_ONCE;
if (!$this->running || !$this->nextTickQueue->isEmpty() || !$this->futureTickQueue->isEmpty()) {
$flags |= EVLOOP_NONBLOCK;
} elseif (!$this->streamEvents && !$this->timerEvents->count()) {
break;
} }
};
event_base_loop($this->eventBase, $flags);
}
} }
/** public function addReadStream($stream, $listener)
* {@inheritdoc}
*/
public function stop()
{ {
$this->running = false; $this->addStreamEvent($stream, EV_READ, 'read', $listener);
} }
/** public function addWriteStream($stream, $listener)
* Schedule a timer for execution.
*
* @param TimerInterface $timer
*/
private function scheduleTimer(TimerInterface $timer)
{ {
$this->timerEvents[$timer] = $event = event_timer_new(); $this->addStreamEvent($stream, EV_WRITE, 'write', $listener);
event_timer_set($event, $this->timerCallback, $timer);
event_base_set($event, $this->eventBase);
event_add($event, $timer->getInterval() * self::MICROSECONDS_PER_SECOND);
} }
/** protected function addStreamEvent($stream, $eventClass, $type, $listener)
* Create a new ext-libevent event resource, or update the existing one.
*
* @param stream $stream
* @param integer $flag EV_READ or EV_WRITE
*/
private function subscribeStreamEvent($stream, $flag)
{ {
$key = (int) $stream; $id = (int) $stream;
if (isset($this->streamEvents[$key])) {
$event = $this->streamEvents[$key];
$flags = $this->streamFlags[$key] |= $flag;
if ($existing = isset($this->events[$id])) {
if (($this->flags[$id] & $eventClass) === $eventClass) {
return;
}
$event = $this->events[$id];
event_del($event); event_del($event);
event_set($event, $stream, EV_PERSIST | $flags, $this->streamCallback);
} else { } else {
$event = event_new(); $event = event_new();
}
event_set($event, $stream, EV_PERSIST | $flag, $this->streamCallback); $flags = isset($this->flags[$id]) ? $this->flags[$id] | $eventClass : $eventClass;
event_base_set($event, $this->eventBase); event_set($event, $stream, $flags | EV_PERSIST, $this->callback, $this);
$this->streamEvents[$key] = $event; if (!$existing) {
$this->streamFlags[$key] = $flag; // Set the base only if $event has been newly created or be ready for segfaults.
event_base_set($event, $this->base);
} }
event_add($event); event_add($event);
$this->events[$id] = $event;
$this->flags[$id] = $flags;
$this->{"{$type}Callbacks"}[$id] = $listener;
} }
/** public function removeReadStream($stream)
* Update the ext-libevent event resource for this stream to stop listening to
* the given event type, or remove it entirely if it's no longer needed.
*
* @param stream $stream
* @param integer $flag EV_READ or EV_WRITE
*/
private function unsubscribeStreamEvent($stream, $flag)
{ {
$key = (int) $stream; $this->removeStreamEvent($stream, EV_READ, 'read');
}
$flags = $this->streamFlags[$key] &= ~$flag; public function removeWriteStream($stream)
{
$this->removeStreamEvent($stream, EV_WRITE, 'write');
}
if (0 === $flags) { protected function removeStreamEvent($stream, $eventClass, $type)
$this->removeStream($stream); {
$id = (int) $stream;
return; if (isset($this->events[$id])) {
$flags = $this->flags[$id] & ~$eventClass;
if ($flags === 0) {
// Remove if stream is not subscribed to any event at this point.
return $this->removeStream($stream);
}
$event = $this->events[$id];
event_del($event);
event_free($event);
unset($this->{"{$type}Callbacks"}[$id]);
$event = event_new();
event_set($event, $stream, $flags | EV_PERSIST, $this->callback, $this);
event_base_set($event, $this->base);
event_add($event);
$this->events[$id] = $event;
$this->flags[$id] = $flags;
}
}
public function removeStream($stream)
{
$id = (int) $stream;
if (isset($this->events[$id])) {
$event = $this->events[$id];
unset(
$this->events[$id],
$this->flags[$id],
$this->readCallbacks[$id],
$this->writeCallbacks[$id]
);
event_del($event);
event_free($event);
}
}
protected function addTimerInternal($interval, $callback, $periodic = false)
{
if ($interval < self::MIN_TIMER_RESOLUTION) {
throw new \InvalidArgumentException('Timer events do not support sub-millisecond timeouts.');
} }
$event = $this->streamEvents[$key]; $timer = new Timer($this, $interval, $callback, $periodic);
$resource = event_new();
event_del($event); $timers = $this->timers;
event_set($event, $stream, EV_PERSIST | $flags, $this->streamCallback); $timers->attach($timer, $resource);
event_add($event);
}
/** $callback = function () use ($timers, $timer, &$callback) {
* Create a callback used as the target of timer events. if (isset($timers[$timer])) {
* call_user_func($timer->getCallback(), $timer);
* A reference is kept to the callback for the lifetime of the loop
* to prevent "Cannot destroy active lambda function" fatal error from
* the event extension.
*/
private function createTimerCallback()
{
$this->timerCallback = function ($_, $_, $timer) {
call_user_func($timer->getCallback(), $timer);
// Timer already cancelled ... if ($timer->isPeriodic() && isset($timers[$timer])) {
if (!$this->isTimerActive($timer)) { event_add($timers[$timer], $timer->getInterval() * 1000000);
return; } else {
$timer->cancel();
// Reschedule periodic timers ... }
} elseif ($timer->isPeriodic()) {
event_add(
$this->timerEvents[$timer],
$timer->getInterval() * self::MICROSECONDS_PER_SECOND
);
// Clean-up one shot timers ...
} else {
$this->cancelTimer($timer);
} }
}; };
event_timer_set($resource, $callback);
event_base_set($resource, $this->base);
event_add($resource, $interval * 1000000);
return $timer;
} }
/** public function addTimer($interval, $callback)
* Create a callback used as the target of stream events.
*
* A reference is kept to the callback for the lifetime of the loop
* to prevent "Cannot destroy active lambda function" fatal error from
* the event extension.
*/
private function createStreamCallback()
{ {
$this->streamCallback = function ($stream, $flags) { return $this->addTimerInternal($interval, $callback);
$key = (int) $stream; }
if (EV_READ === (EV_READ & $flags) && isset($this->readListeners[$key])) { public function addPeriodicTimer($interval, $callback)
call_user_func($this->readListeners[$key], $stream, $this); {
} return $this->addTimerInternal($interval, $callback, true);
}
if (EV_WRITE === (EV_WRITE & $flags) && isset($this->writeListeners[$key])) { public function cancelTimer(TimerInterface $timer)
call_user_func($this->writeListeners[$key], $stream, $this); {
} if (isset($this->timers[$timer])) {
}; $resource = $this->timers[$timer];
event_del($resource);
event_free($resource);
$this->timers->detach($timer);
}
}
public function isTimerActive(TimerInterface $timer)
{
return $this->timers->contains($timer);
}
public function tick()
{
event_base_loop($this->base, EVLOOP_ONCE | EVLOOP_NONBLOCK);
}
public function run()
{
event_base_loop($this->base);
}
public function stop()
{
event_base_loopexit($this->base);
} }
} }

View file

@ -6,116 +6,19 @@ use React\EventLoop\Timer\TimerInterface;
interface LoopInterface interface LoopInterface
{ {
/** public function addReadStream($stream, $listener);
* Register a listener to be notified when a stream is ready to read. public function addWriteStream($stream, $listener);
*
* @param stream $stream The PHP stream resource to check.
* @param callable $listener Invoked when the stream is ready.
*/
public function addReadStream($stream, callable $listener);
/**
* Register a listener to be notified when a stream is ready to write.
*
* @param stream $stream The PHP stream resource to check.
* @param callable $listener Invoked when the stream is ready.
*/
public function addWriteStream($stream, callable $listener);
/**
* Remove the read event listener for the given stream.
*
* @param stream $stream The PHP stream resource.
*/
public function removeReadStream($stream); public function removeReadStream($stream);
/**
* Remove the write event listener for the given stream.
*
* @param stream $stream The PHP stream resource.
*/
public function removeWriteStream($stream); public function removeWriteStream($stream);
/**
* Remove all listeners for the given stream.
*
* @param stream $stream The PHP stream resource.
*/
public function removeStream($stream); public function removeStream($stream);
/** public function addTimer($interval, $callback);
* Enqueue a callback to be invoked once after the given interval. public function addPeriodicTimer($interval, $callback);
*
* The execution order of timers scheduled to execute at the same time is
* not guaranteed.
*
* @param numeric $interval The number of seconds to wait before execution.
* @param callable $callback The callback to invoke.
*
* @return TimerInterface
*/
public function addTimer($interval, callable $callback);
/**
* Enqueue a callback to be invoked repeatedly after the given interval.
*
* The execution order of timers scheduled to execute at the same time is
* not guaranteed.
*
* @param numeric $interval The number of seconds to wait before execution.
* @param callable $callback The callback to invoke.
*
* @return TimerInterface
*/
public function addPeriodicTimer($interval, callable $callback);
/**
* Cancel a pending timer.
*
* @param TimerInterface $timer The timer to cancel.
*/
public function cancelTimer(TimerInterface $timer); public function cancelTimer(TimerInterface $timer);
/**
* Check if a given timer is active.
*
* @param TimerInterface $timer The timer to check.
*
* @return boolean True if the timer is still enqueued for execution.
*/
public function isTimerActive(TimerInterface $timer); public function isTimerActive(TimerInterface $timer);
/**
* Schedule a callback to be invoked on the next tick of the event loop.
*
* Callbacks are guaranteed to be executed in the order they are enqueued,
* before any timer or stream events.
*
* @param callable $listener The callback to invoke.
*/
public function nextTick(callable $listener);
/**
* Schedule a callback to be invoked on a future tick of the event loop.
*
* Callbacks are guaranteed to be executed in the order they are enqueued.
*
* @param callable $listener The callback to invoke.
*/
public function futureTick(callable $listener);
/**
* Perform a single iteration of the event loop.
*/
public function tick(); public function tick();
/**
* Run the event loop until there are no more tasks to perform.
*/
public function run(); public function run();
/**
* Instruct a running event loop to stop.
*/
public function stop(); public function stop();
} }

View file

@ -20,15 +20,11 @@ In addition to the interface there are some implementations provided:
([github](https://github.com/m4rw3r/php-libev)). It supports the same ([github](https://github.com/m4rw3r/php-libev)). It supports the same
backends as libevent. backends as libevent.
* `ExtEventLoop`: This uses the `event` pecl extension. It supports the same
backends as libevent.
All of the loops support these features: All of the loops support these features:
* File descriptor polling * File descriptor polling
* One-off timers * One-off timers
* Periodic timers * Periodic timers
* Deferred execution of callbacks
## Usage ## Usage

View file

@ -2,258 +2,188 @@
namespace React\EventLoop; namespace React\EventLoop;
use React\EventLoop\Tick\FutureTickQueue;
use React\EventLoop\Tick\NextTickQueue;
use React\EventLoop\Timer\Timer; use React\EventLoop\Timer\Timer;
use React\EventLoop\Timer\TimerInterface; use React\EventLoop\Timer\TimerInterface;
use React\EventLoop\Timer\Timers; use React\EventLoop\Timer\Timers;
/**
* A stream_select() based event-loop.
*/
class StreamSelectLoop implements LoopInterface class StreamSelectLoop implements LoopInterface
{ {
const MICROSECONDS_PER_SECOND = 1000000; const QUANTUM_INTERVAL = 1000000;
private $nextTickQueue;
private $futureTickQueue;
private $timers; private $timers;
private $readStreams = []; private $running = false;
private $readListeners = []; private $readStreams = array();
private $writeStreams = []; private $readListeners = array();
private $writeListeners = []; private $writeStreams = array();
private $running; private $writeListeners = array();
public function __construct() public function __construct()
{ {
$this->nextTickQueue = new NextTickQueue($this);
$this->futureTickQueue = new FutureTickQueue($this);
$this->timers = new Timers(); $this->timers = new Timers();
} }
/** public function addReadStream($stream, $listener)
* {@inheritdoc}
*/
public function addReadStream($stream, callable $listener)
{ {
$key = (int) $stream; $id = (int) $stream;
if (!isset($this->readStreams[$key])) { if (!isset($this->readStreams[$id])) {
$this->readStreams[$key] = $stream; $this->readStreams[$id] = $stream;
$this->readListeners[$key] = $listener; $this->readListeners[$id] = $listener;
} }
} }
/** public function addWriteStream($stream, $listener)
* {@inheritdoc}
*/
public function addWriteStream($stream, callable $listener)
{ {
$key = (int) $stream; $id = (int) $stream;
if (!isset($this->writeStreams[$key])) { if (!isset($this->writeStreams[$id])) {
$this->writeStreams[$key] = $stream; $this->writeStreams[$id] = $stream;
$this->writeListeners[$key] = $listener; $this->writeListeners[$id] = $listener;
} }
} }
/**
* {@inheritdoc}
*/
public function removeReadStream($stream) public function removeReadStream($stream)
{ {
$key = (int) $stream; $id = (int) $stream;
unset( unset(
$this->readStreams[$key], $this->readStreams[$id],
$this->readListeners[$key] $this->readListeners[$id]
); );
} }
/**
* {@inheritdoc}
*/
public function removeWriteStream($stream) public function removeWriteStream($stream)
{ {
$key = (int) $stream; $id = (int) $stream;
unset( unset(
$this->writeStreams[$key], $this->writeStreams[$id],
$this->writeListeners[$key] $this->writeListeners[$id]
); );
} }
/**
* {@inheritdoc}
*/
public function removeStream($stream) public function removeStream($stream)
{ {
$this->removeReadStream($stream); $this->removeReadStream($stream);
$this->removeWriteStream($stream); $this->removeWriteStream($stream);
} }
/** public function addTimer($interval, $callback)
* {@inheritdoc}
*/
public function addTimer($interval, callable $callback)
{ {
$timer = new Timer($this, $interval, $callback, false); $timer = new Timer($this, $interval, $callback, false);
$this->timers->add($timer); $this->timers->add($timer);
return $timer; return $timer;
} }
/** public function addPeriodicTimer($interval, $callback)
* {@inheritdoc}
*/
public function addPeriodicTimer($interval, callable $callback)
{ {
$timer = new Timer($this, $interval, $callback, true); $timer = new Timer($this, $interval, $callback, true);
$this->timers->add($timer); $this->timers->add($timer);
return $timer; return $timer;
} }
/**
* {@inheritdoc}
*/
public function cancelTimer(TimerInterface $timer) public function cancelTimer(TimerInterface $timer)
{ {
$this->timers->cancel($timer); $this->timers->cancel($timer);
} }
/**
* {@inheritdoc}
*/
public function isTimerActive(TimerInterface $timer) public function isTimerActive(TimerInterface $timer)
{ {
return $this->timers->contains($timer); return $this->timers->contains($timer);
} }
/** protected function getNextEventTimeInMicroSeconds()
* {@inheritdoc}
*/
public function nextTick(callable $listener)
{ {
$this->nextTickQueue->add($listener); $nextEvent = $this->timers->getFirst();
if (null === $nextEvent) {
return self::QUANTUM_INTERVAL;
}
$currentTime = microtime(true);
if ($nextEvent > $currentTime) {
return ($nextEvent - $currentTime) * 1000000;
}
return 0;
} }
/** protected function sleepOnPendingTimers()
* {@inheritdoc}
*/
public function futureTick(callable $listener)
{ {
$this->futureTickQueue->add($listener); if ($this->timers->isEmpty()) {
} $this->running = false;
} else {
/** // We use usleep() instead of stream_select() to emulate timeouts
* {@inheritdoc} // since the latter fails when there are no streams registered for
*/ // read / write events. Blame PHP for us needing this hack.
public function tick() usleep($this->getNextEventTimeInMicroSeconds());
{
$this->nextTickQueue->tick();
$this->futureTickQueue->tick();
$this->timers->tick();
$this->waitForStreamActivity(0);
}
/**
* {@inheritdoc}
*/
public function run()
{
$this->running = true;
while ($this->running) {
$this->nextTickQueue->tick();
$this->futureTickQueue->tick();
$this->timers->tick();
// Next-tick or future-tick queues have pending callbacks ...
if (!$this->running || !$this->nextTickQueue->isEmpty() || !$this->futureTickQueue->isEmpty()) {
$timeout = 0;
// There is a pending timer, only block until it is due ...
} elseif ($scheduledAt = $this->timers->getFirst()) {
if (0 > $timeout = $scheduledAt - $this->timers->getTime()) {
$timeout = 0;
}
// The only possible event is stream activity, so wait forever ...
} elseif ($this->readStreams || $this->writeStreams) {
$timeout = null;
// There's nothing left to do ...
} else {
break;
}
$this->waitForStreamActivity($timeout * self::MICROSECONDS_PER_SECOND);
} }
} }
/** protected function runStreamSelect($block)
* {@inheritdoc} {
*/ $read = $this->readStreams ?: null;
$write = $this->writeStreams ?: null;
$except = null;
if (!$read && !$write) {
if ($block) {
$this->sleepOnPendingTimers();
}
return;
}
$timeout = $block ? $this->getNextEventTimeInMicroSeconds() : 0;
if (stream_select($read, $write, $except, 0, $timeout) > 0) {
if ($read) {
foreach ($read as $stream) {
if (!isset($this->readListeners[(int) $stream])) {
continue;
}
$listener = $this->readListeners[(int) $stream];
call_user_func($listener, $stream, $this);
}
}
if ($write) {
foreach ($write as $stream) {
if (!isset($this->writeListeners[(int) $stream])) {
continue;
}
$listener = $this->writeListeners[(int) $stream];
call_user_func($listener, $stream, $this);
}
}
}
}
protected function loop($block = true)
{
$this->timers->tick();
$this->runStreamSelect($block);
return $this->running;
}
public function tick()
{
return $this->loop(false);
}
public function run()
{
$this->running = true;
while ($this->loop());
}
public function stop() public function stop()
{ {
$this->running = false; $this->running = false;
} }
/**
* Wait/check for stream activity, or until the next timer is due.
*/
private function waitForStreamActivity($timeout)
{
$read = $this->readStreams;
$write = $this->writeStreams;
$this->streamSelect($read, $write, $timeout);
foreach ($read as $stream) {
$key = (int) $stream;
if (isset($this->readListeners[$key])) {
call_user_func($this->readListeners[$key], $stream, $this);
}
}
foreach ($write as $stream) {
$key = (int) $stream;
if (isset($this->writeListeners[$key])) {
call_user_func($this->writeListeners[$key], $stream, $this);
}
}
}
/**
* Emulate a stream_select() implementation that does not break when passed
* empty stream arrays.
*
* @param array &$read An array of read streams to select upon.
* @param array &$write An array of write streams to select upon.
* @param integer|null $timeout Activity timeout in microseconds, or null to wait forever.
*
* @return integer The total number of streams that are ready for read/write.
*/
protected function streamSelect(array &$read, array &$write, $timeout)
{
if ($read || $write) {
$except = null;
return stream_select($read, $write, $except, $timeout === null ? null : 0, $timeout);
}
usleep($timeout);
return 0;
}
} }

View file

@ -1,59 +0,0 @@
<?php
namespace React\EventLoop\Tick;
use React\EventLoop\LoopInterface;
use SplQueue;
class FutureTickQueue
{
private $eventLoop;
private $queue;
/**
* @param LoopInterface $eventLoop The event loop passed as the first parameter to callbacks.
*/
public function __construct(LoopInterface $eventLoop)
{
$this->eventLoop = $eventLoop;
$this->queue = new SplQueue();
}
/**
* Add a callback to be invoked on a future tick of the event loop.
*
* Callbacks are guaranteed to be executed in the order they are enqueued.
*
* @param callable $listener The callback to invoke.
*/
public function add(callable $listener)
{
$this->queue->enqueue($listener);
}
/**
* Flush the callback queue.
*/
public function tick()
{
// Only invoke as many callbacks as were on the queue when tick() was called.
$count = $this->queue->count();
while ($count--) {
call_user_func(
$this->queue->dequeue(),
$this->eventLoop
);
}
}
/**
* Check if the next tick queue is empty.
*
* @return boolean
*/
public function isEmpty()
{
return $this->queue->isEmpty();
}
}

View file

@ -1,57 +0,0 @@
<?php
namespace React\EventLoop\Tick;
use React\EventLoop\LoopInterface;
use SplQueue;
class NextTickQueue
{
private $eventLoop;
private $queue;
/**
* @param LoopInterface $eventLoop The event loop passed as the first parameter to callbacks.
*/
public function __construct(LoopInterface $eventLoop)
{
$this->eventLoop = $eventLoop;
$this->queue = new SplQueue();
}
/**
* Add a callback to be invoked on the next tick of the event loop.
*
* Callbacks are guaranteed to be executed in the order they are enqueued,
* before any timer or stream events.
*
* @param callable $listener The callback to invoke.
*/
public function add(callable $listener)
{
$this->queue->enqueue($listener);
}
/**
* Flush the callback queue.
*/
public function tick()
{
while (!$this->queue->isEmpty()) {
call_user_func(
$this->queue->dequeue(),
$this->eventLoop
);
}
}
/**
* Check if the next tick queue is empty.
*
* @return boolean
*/
public function isEmpty()
{
return $this->queue->isEmpty();
}
}

View file

@ -2,22 +2,21 @@
namespace React\EventLoop\Timer; namespace React\EventLoop\Timer;
use InvalidArgumentException;
use React\EventLoop\LoopInterface; use React\EventLoop\LoopInterface;
class Timer implements TimerInterface class Timer implements TimerInterface
{ {
const MIN_INTERVAL = 0.000001;
protected $loop; protected $loop;
protected $interval; protected $interval;
protected $callback; protected $callback;
protected $periodic; protected $periodic;
protected $data; protected $data;
public function __construct(LoopInterface $loop, $interval, callable $callback, $periodic = false, $data = null) public function __construct(LoopInterface $loop, $interval, $callback, $periodic = false, $data = null)
{ {
if ($interval < self::MIN_INTERVAL) { if (false === is_callable($callback)) {
$interval = self::MIN_INTERVAL; throw new InvalidArgumentException('The callback argument must be a valid callable object');
} }
$this->loop = $loop; $this->loop = $loop;

View file

@ -4,9 +4,12 @@ namespace React\EventLoop\Timer;
use SplObjectStorage; use SplObjectStorage;
use SplPriorityQueue; use SplPriorityQueue;
use InvalidArgumentException;
class Timers class Timers
{ {
const MIN_RESOLUTION = 0.001;
private $time; private $time;
private $timers; private $timers;
private $scheduler; private $scheduler;
@ -30,6 +33,11 @@ class Timers
public function add(TimerInterface $timer) public function add(TimerInterface $timer)
{ {
$interval = $timer->getInterval(); $interval = $timer->getInterval();
if ($interval < self::MIN_RESOLUTION) {
throw new InvalidArgumentException('Timer events do not support sub-millisecond timeouts.');
}
$scheduledAt = $interval + $this->getTime(); $scheduledAt = $interval + $this->getTime();
$this->timers->attach($timer, $scheduledAt); $this->timers->attach($timer, $scheduledAt);
@ -48,17 +56,13 @@ class Timers
public function getFirst() public function getFirst()
{ {
while ($this->scheduler->count()) { if ($this->scheduler->isEmpty()) {
$timer = $this->scheduler->top(); return null;
if ($this->timers->contains($timer)) {
return $this->timers[$timer];
}
$this->scheduler->extract();
} }
return null; $scheduledAt = $this->timers[$this->scheduler->top()];
return $scheduledAt;
} }
public function isEmpty() public function isEmpty()

View file

@ -4,19 +4,19 @@
"keywords": ["event-loop"], "keywords": ["event-loop"],
"license": "MIT", "license": "MIT",
"require": { "require": {
"php": ">=5.4.0" "php": ">=5.3.3"
}, },
"suggest": { "suggest": {
"ext-libevent": ">=0.1.0", "ext-libevent": ">=0.0.5",
"ext-event": "~1.0",
"ext-libev": "*" "ext-libev": "*"
}, },
"autoload": { "autoload": {
"psr-4": { "React\\EventLoop\\": "" } "psr-0": { "React\\EventLoop": "" }
}, },
"target-dir": "React/EventLoop",
"extra": { "extra": {
"branch-alias": { "branch-alias": {
"dev-master": "0.4-dev" "dev-master": "0.3-dev"
} }
} }
} }

View file

@ -16,9 +16,6 @@ class Request extends EventEmitter implements ReadableStreamInterface
private $httpVersion; private $httpVersion;
private $headers; private $headers;
// metadata, implicitly added externally
public $remoteAddress;
public function __construct($method, $path, $query = array(), $httpVersion = '1.1', $headers = array()) public function __construct($method, $path, $query = array(), $httpVersion = '1.1', $headers = array())
{ {
$this->method = $method; $this->method = $method;

View file

@ -18,17 +18,19 @@ class Response extends EventEmitter implements WritableStreamInterface
{ {
$this->conn = $conn; $this->conn = $conn;
$this->conn->on('end', function () { $that = $this;
$this->close();
$this->conn->on('end', function () use ($that) {
$that->close();
}); });
$this->conn->on('error', function ($error) { $this->conn->on('error', function ($error) use ($that) {
$this->emit('error', array($error, $this)); $that->emit('error', array($error, $that));
$this->close(); $that->close();
}); });
$this->conn->on('drain', function () { $this->conn->on('drain', function () use ($that) {
$this->emit('drain'); $that->emit('drain');
}); });
} }
@ -78,12 +80,9 @@ class Response extends EventEmitter implements WritableStreamInterface
foreach ($headers as $name => $value) { foreach ($headers as $name => $value) {
$name = str_replace(array("\r", "\n"), '', $name); $name = str_replace(array("\r", "\n"), '', $name);
$value = str_replace(array("\r", "\n"), '', $value);
foreach ((array) $value as $val) { $data .= "$name: $value\r\n";
$val = str_replace(array("\r", "\n"), '', $val);
$data .= "$name: $val\r\n";
}
} }
$data .= "\r\n"; $data .= "\r\n";

View file

@ -15,17 +15,16 @@ class Server extends EventEmitter implements ServerInterface
{ {
$this->io = $io; $this->io = $io;
$this->io->on('connection', function ($conn) { $server = $this;
$this->io->on('connection', function ($conn) use ($server) {
// TODO: http 1.1 keep-alive // TODO: http 1.1 keep-alive
// TODO: chunked transfer encoding (also for outgoing data) // TODO: chunked transfer encoding (also for outgoing data)
// TODO: multipart parsing // TODO: multipart parsing
$parser = new RequestHeaderParser(); $parser = new RequestHeaderParser();
$parser->on('headers', function (Request $request, $bodyBuffer) use ($conn, $parser) { $parser->on('headers', function (Request $request, $bodyBuffer) use ($server, $conn, $parser) {
// attach remote ip to the request as metadata $server->handleRequest($conn, $request, $bodyBuffer);
$request->remoteAddress = $conn->getRemoteAddress();
$this->handleRequest($conn, $request, $bodyBuffer);
$conn->removeListener('data', array($parser, 'feed')); $conn->removeListener('data', array($parser, 'feed'));
$conn->on('end', function () use ($request) { $conn->on('end', function () use ($request) {

View file

@ -4,16 +4,17 @@
"keywords": ["http"], "keywords": ["http"],
"license": "MIT", "license": "MIT",
"require": { "require": {
"php": ">=5.4.0", "php": ">=5.3.3",
"guzzle/parser": "~3.0", "guzzle/parser": "3.0.*",
"react/socket": "0.4.*" "react/socket": "0.3.*"
}, },
"autoload": { "autoload": {
"psr-4": { "React\\Http\\": "" } "psr-0": { "React\\Http": "" }
}, },
"target-dir": "React/Http",
"extra": { "extra": {
"branch-alias": { "branch-alias": {
"dev-master": "0.4-dev" "dev-master": "0.3-dev"
} }
} }
} }

View file

@ -8,11 +8,13 @@ use React\SocketClient\ConnectorInterface;
class Client class Client
{ {
private $loop;
private $connectionManager; private $connectionManager;
private $secureConnectionManager; private $secureConnectionManager;
public function __construct(ConnectorInterface $connector, ConnectorInterface $secureConnector) public function __construct(LoopInterface $loop, ConnectorInterface $connector, ConnectorInterface $secureConnector)
{ {
$this->loop = $loop;
$this->connector = $connector; $this->connector = $connector;
$this->secureConnector = $secureConnector; $this->secureConnector = $secureConnector;
} }
@ -21,7 +23,7 @@ class Client
{ {
$requestData = new RequestData($method, $url, $headers); $requestData = new RequestData($method, $url, $headers);
$connectionManager = $this->getConnectorForScheme($requestData->getScheme()); $connectionManager = $this->getConnectorForScheme($requestData->getScheme());
return new Request($connectionManager, $requestData); return new Request($this->loop, $connectionManager, $requestData);
} }

View file

@ -13,7 +13,7 @@ class Factory
{ {
$connector = new Connector($loop, $resolver); $connector = new Connector($loop, $resolver);
$secureConnector = new SecureConnector($connector, $loop); $secureConnector = new SecureConnector($connector, $loop);
return new Client($connector, $secureConnector); return new Client($loop, $connector, $secureConnector);
} }
} }

View file

@ -22,6 +22,7 @@ class Request extends EventEmitter implements WritableStreamInterface
const STATE_HEAD_WRITTEN = 2; const STATE_HEAD_WRITTEN = 2;
const STATE_END = 3; const STATE_END = 3;
private $loop;
private $connector; private $connector;
private $requestData; private $requestData;
@ -31,8 +32,9 @@ class Request extends EventEmitter implements WritableStreamInterface
private $response; private $response;
private $state = self::STATE_INIT; private $state = self::STATE_INIT;
public function __construct(ConnectorInterface $connector, RequestData $requestData) public function __construct(LoopInterface $loop, ConnectorInterface $connector, RequestData $requestData)
{ {
$this->loop = $loop;
$this->connector = $connector; $this->connector = $connector;
$this->requestData = $requestData; $this->requestData = $requestData;
} }
@ -50,6 +52,7 @@ class Request extends EventEmitter implements WritableStreamInterface
$this->state = self::STATE_WRITING_HEAD; $this->state = self::STATE_WRITING_HEAD;
$that = $this;
$requestData = $this->requestData; $requestData = $this->requestData;
$streamRef = &$this->stream; $streamRef = &$this->stream;
$stateRef = &$this->state; $stateRef = &$this->state;
@ -57,13 +60,13 @@ class Request extends EventEmitter implements WritableStreamInterface
$this $this
->connect() ->connect()
->then( ->then(
function ($stream) use ($requestData, &$streamRef, &$stateRef) { function ($stream) use ($that, $requestData, &$streamRef, &$stateRef) {
$streamRef = $stream; $streamRef = $stream;
$stream->on('drain', array($this, 'handleDrain')); $stream->on('drain', array($that, 'handleDrain'));
$stream->on('data', array($this, 'handleData')); $stream->on('data', array($that, 'handleData'));
$stream->on('end', array($this, 'handleEnd')); $stream->on('end', array($that, 'handleEnd'));
$stream->on('error', array($this, 'handleError')); $stream->on('error', array($that, 'handleError'));
$requestData->setProtocolVersion('1.0'); $requestData->setProtocolVersion('1.0');
$headers = (string) $requestData; $headers = (string) $requestData;
@ -72,7 +75,7 @@ class Request extends EventEmitter implements WritableStreamInterface
$stateRef = Request::STATE_HEAD_WRITTEN; $stateRef = Request::STATE_HEAD_WRITTEN;
$this->emit('headers-written', array($this)); $that->emit('headers-written', array($that));
}, },
array($this, 'handleError') array($this, 'handleError')
); );
@ -88,8 +91,8 @@ class Request extends EventEmitter implements WritableStreamInterface
return $this->stream->write($data); return $this->stream->write($data);
} }
$this->on('headers-written', function ($this) use ($data) { $this->on('headers-written', function ($that) use ($data) {
$this->write($data); $that->write($data);
}); });
if (self::STATE_WRITING_HEAD > $this->state) { if (self::STATE_WRITING_HEAD > $this->state) {
@ -132,12 +135,13 @@ class Request extends EventEmitter implements WritableStreamInterface
$this->stream->removeListener('error', array($this, 'handleError')); $this->stream->removeListener('error', array($this, 'handleError'));
$this->response = $response; $this->response = $response;
$that = $this;
$response->on('end', function () { $response->on('end', function () use ($that) {
$this->close(); $that->close();
}); });
$response->on('error', function (\Exception $error) { $response->on('error', function (\Exception $error) use ($that) {
$this->closeError(new \RuntimeException( $that->closeError(new \RuntimeException(
"An error occured in the response", "An error occured in the response",
0, 0,
$error $error
@ -225,10 +229,12 @@ class Request extends EventEmitter implements WritableStreamInterface
public function getResponseFactory() public function getResponseFactory()
{ {
if (null === $factory = $this->responseFactory) { if (null === $factory = $this->responseFactory) {
$loop = $this->loop;
$stream = $this->stream; $stream = $this->stream;
$factory = function ($protocol, $version, $code, $reasonPhrase, $headers) use ($stream) { $factory = function ($protocol, $version, $code, $reasonPhrase, $headers) use ($loop, $stream) {
return new Response( return new Response(
$loop,
$stream, $stream,
$protocol, $protocol,
$version, $version,

View file

@ -11,16 +11,19 @@ use React\Stream\WritableStreamInterface;
class Response extends EventEmitter implements ReadableStreamInterface class Response extends EventEmitter implements ReadableStreamInterface
{ {
private $loop;
private $stream; private $stream;
private $protocol; private $protocol;
private $version; private $version;
private $code; private $code;
private $reasonPhrase; private $reasonPhrase;
private $headers; private $headers;
private $body;
private $readable = true; private $readable = true;
public function __construct(Stream $stream, $protocol, $version, $code, $reasonPhrase, $headers) public function __construct(LoopInterface $loop, Stream $stream, $protocol, $version, $code, $reasonPhrase, $headers)
{ {
$this->loop = $loop;
$this->stream = $stream; $this->stream = $stream;
$this->protocol = $protocol; $this->protocol = $protocol;
$this->version = $version; $this->version = $version;
@ -32,31 +35,36 @@ class Response extends EventEmitter implements ReadableStreamInterface
$stream->on('error', array($this, 'handleError')); $stream->on('error', array($this, 'handleError'));
$stream->on('end', array($this, 'handleEnd')); $stream->on('end', array($this, 'handleEnd'));
} }
public function getProtocol() public function getProtocol()
{ {
return $this->protocol; return $this->protocol;
} }
public function getVersion() public function getVersion()
{ {
return $this->version; return $this->version;
} }
public function getCode() public function getCode()
{ {
return $this->code; return $this->code;
} }
public function getReasonPhrase() public function getReasonPhrase()
{ {
return $this->reasonPhrase; return $this->reasonPhrase;
} }
public function getHeaders() public function getHeaders()
{ {
return $this->headers; return $this->headers;
} }
public function getBody()
{
return $this->body;
}
public function handleData($data) public function handleData($data)
{ {

View file

@ -4,17 +4,18 @@
"keywords": ["http"], "keywords": ["http"],
"license": "MIT", "license": "MIT",
"require": { "require": {
"php": ">=5.4.0", "php": ">=5.3.3",
"guzzle/parser": "~3.0", "guzzle/parser": "2.8.*",
"react/socket-client": "0.4.*", "react/socket-client": "0.3.*",
"react/dns": "0.4.*" "react/dns": "0.3.*"
}, },
"autoload": { "autoload": {
"psr-4": { "React\\HttpClient\\": "" } "psr-0": { "React\\HttpClient": "" }
}, },
"target-dir": "React/HttpClient",
"extra": { "extra": {
"branch-alias": { "branch-alias": {
"dev-master": "0.4-dev" "dev-master": "0.3-dev"
} }
} }
} }

View file

@ -2,14 +2,17 @@
namespace React\Socket; namespace React\Socket;
use Evenement\EventEmitter;
use React\EventLoop\LoopInterface;
use React\Stream\WritableStreamInterface;
use React\Stream\Buffer;
use React\Stream\Stream; use React\Stream\Stream;
use React\Stream\Util;
class Connection extends Stream implements ConnectionInterface class Connection extends Stream implements ConnectionInterface
{ {
public function handleData($stream) public function handleData($stream)
{ {
// Socket is raw, not using fread as it's interceptable by filters
// See issues #192, #209, and #240
$data = stream_socket_recvfrom($stream, $this->bufferSize); $data = stream_socket_recvfrom($stream, $this->bufferSize);
if ('' === $data || false === $data || feof($stream)) { if ('' === $data || false === $data || feof($stream)) {
$this->end(); $this->end();
@ -21,7 +24,9 @@ class Connection extends Stream implements ConnectionInterface
public function handleClose() public function handleClose()
{ {
if (is_resource($this->stream)) { if (is_resource($this->stream)) {
// http://chat.stackoverflow.com/transcript/message/7727858#7727858
stream_socket_shutdown($this->stream, STREAM_SHUT_RDWR); stream_socket_shutdown($this->stream, STREAM_SHUT_RDWR);
stream_set_blocking($this->stream, false);
fclose($this->stream); fclose($this->stream);
} }
} }

View file

@ -30,14 +30,16 @@ class Server extends EventEmitter implements ServerInterface
} }
stream_set_blocking($this->master, 0); stream_set_blocking($this->master, 0);
$this->loop->addReadStream($this->master, function ($master) { $that = $this;
$this->loop->addReadStream($this->master, function ($master) use ($that) {
$newSocket = stream_socket_accept($master); $newSocket = stream_socket_accept($master);
if (false === $newSocket) { if (false === $newSocket) {
$this->emit('error', array(new \RuntimeException('Error accepting new connection'))); $that->emit('error', array(new \RuntimeException('Error accepting new connection')));
return; return;
} }
$this->handleConnection($newSocket); $that->handleConnection($newSocket);
}); });
} }

View file

@ -4,17 +4,18 @@
"keywords": ["socket"], "keywords": ["socket"],
"license": "MIT", "license": "MIT",
"require": { "require": {
"php": ">=5.4.0", "php": ">=5.3.3",
"evenement/evenement": "~2.0", "evenement/evenement": "1.0.*",
"react/event-loop": "0.4.*", "react/event-loop": "0.3.*",
"react/stream": "0.4.*" "react/stream": "0.3.*"
}, },
"autoload": { "autoload": {
"psr-4": { "React\\Socket\\": "" } "psr-0": { "React\\Socket": "" }
}, },
"target-dir": "React/Socket",
"extra": { "extra": {
"branch-alias": { "branch-alias": {
"dev-master": "0.4-dev" "dev-master": "0.3-dev"
} }
} }
} }

View file

@ -5,7 +5,7 @@ namespace React\SocketClient;
use React\EventLoop\LoopInterface; use React\EventLoop\LoopInterface;
use React\Dns\Resolver\Resolver; use React\Dns\Resolver\Resolver;
use React\Stream\Stream; use React\Stream\Stream;
use React\Promise; use React\Promise\When;
use React\Promise\Deferred; use React\Promise\Deferred;
class Connector implements ConnectorInterface class Connector implements ConnectorInterface
@ -21,10 +21,12 @@ class Connector implements ConnectorInterface
public function create($host, $port) public function create($host, $port)
{ {
$that = $this;
return $this return $this
->resolveHostname($host) ->resolveHostname($host)
->then(function ($address) use ($port) { ->then(function ($address) use ($port, $that) {
return $this->createSocketForAddress($address, $port); return $that->createSocketForAddress($address, $port);
}); });
} }
@ -35,7 +37,7 @@ class Connector implements ConnectorInterface
$socket = stream_socket_client($url, $errno, $errstr, 0, STREAM_CLIENT_CONNECT | STREAM_CLIENT_ASYNC_CONNECT); $socket = stream_socket_client($url, $errno, $errstr, 0, STREAM_CLIENT_CONNECT | STREAM_CLIENT_ASYNC_CONNECT);
if (!$socket) { if (!$socket) {
return Promise\reject(new \RuntimeException( return When::reject(new \RuntimeException(
sprintf("connection to %s:%d failed: %s", $address, $port, $errstr), sprintf("connection to %s:%d failed: %s", $address, $port, $errstr),
$errno $errno
)); ));
@ -71,10 +73,10 @@ class Connector implements ConnectorInterface
// The following hack looks like the only way to // The following hack looks like the only way to
// detect connection refused errors with PHP's stream sockets. // detect connection refused errors with PHP's stream sockets.
if (false === stream_socket_get_name($socket, true)) { if (false === stream_socket_get_name($socket, true)) {
return Promise\reject(new ConnectionException('Connection refused')); return When::reject(new ConnectionException('Connection refused'));
} }
return Promise\resolve($socket); return When::resolve($socket);
} }
public function handleConnectedSocket($socket) public function handleConnectedSocket($socket)
@ -94,7 +96,7 @@ class Connector implements ConnectorInterface
protected function resolveHostname($host) protected function resolveHostname($host)
{ {
if (false !== filter_var($host, FILTER_VALIDATE_IP)) { if (false !== filter_var($host, FILTER_VALIDATE_IP)) {
return Promise\resolve($host); return When::resolve($host);
} }
return $this->resolver->resolve($host); return $this->resolver->resolve($host);

View file

@ -5,8 +5,9 @@ Async Connector to open TCP/IP and SSL/TLS based connections.
## Introduction ## Introduction
Think of this library as an async version of Think of this library as an async version of
[`fsockopen()`](http://www.php.net/function.fsockopen) or [`fsockopen()`](http://php.net/manual/en/function.fsockopen.php) or
[`stream_socket_client()`](http://php.net/function.stream-socket-client). [`stream_socket_client()`](http://php.net/manual/en/function.stream-socket-
client.php).
Before you can actually transmit and receive data to/from a remote server, you Before you can actually transmit and receive data to/from a remote server, you
have to establish a connection to the remote end. Establishing this connection have to establish a connection to the remote end. Establishing this connection

View file

@ -4,6 +4,7 @@ namespace React\SocketClient;
use React\EventLoop\LoopInterface; use React\EventLoop\LoopInterface;
use React\Stream\Stream; use React\Stream\Stream;
use React\Promise\When;
class SecureConnector implements ConnectorInterface class SecureConnector implements ConnectorInterface
{ {
@ -18,9 +19,10 @@ class SecureConnector implements ConnectorInterface
public function create($host, $port) public function create($host, $port)
{ {
return $this->connector->create($host, $port)->then(function (Stream $stream) { $streamEncryption = $this->streamEncryption;
return $this->connector->create($host, $port)->then(function (Stream $stream) use ($streamEncryption) {
// (unencrypted) connection succeeded => try to enable encryption // (unencrypted) connection succeeded => try to enable encryption
return $this->streamEncryption->enable($stream)->then(null, function ($error) use ($stream) { return $streamEncryption->enable($stream)->then(null, function ($error) use ($stream) {
// establishing encryption failed => close invalid connection and return error // establishing encryption failed => close invalid connection and return error
$stream->close(); $stream->close();
throw $error; throw $error;

View file

@ -2,6 +2,7 @@
namespace React\SocketClient; namespace React\SocketClient;
use React\Promise\ResolverInterface;
use React\Promise\Deferred; use React\Promise\Deferred;
use React\Stream\Stream; use React\Stream\Stream;
use React\EventLoop\LoopInterface; use React\EventLoop\LoopInterface;
@ -46,15 +47,16 @@ class StreamEncryption
// get actual stream socket from stream instance // get actual stream socket from stream instance
$socket = $stream->stream; $socket = $stream->stream;
$toggleCrypto = function () use ($socket, $deferred, $toggle) { $that = $this;
$this->toggleCrypto($socket, $deferred, $toggle); $toggleCrypto = function () use ($that, $socket, $deferred, $toggle) {
$that->toggleCrypto($socket, $deferred, $toggle);
}; };
$this->loop->addWriteStream($socket, $toggleCrypto); $this->loop->addWriteStream($socket, $toggleCrypto);
$this->loop->addReadStream($socket, $toggleCrypto); $this->loop->addReadStream($socket, $toggleCrypto);
$toggleCrypto(); $toggleCrypto();
return $deferred->promise()->then(function () use ($stream) { return $deferred->then(function () use ($stream) {
$stream->resume(); $stream->resume();
return $stream; return $stream;
}, function($error) use ($stream) { }, function($error) use ($stream) {
@ -63,7 +65,7 @@ class StreamEncryption
}); });
} }
public function toggleCrypto($socket, Deferred $deferred, $toggle) public function toggleCrypto($socket, ResolverInterface $resolver, $toggle)
{ {
set_error_handler(array($this, 'handleError')); set_error_handler(array($this, 'handleError'));
$result = stream_socket_enable_crypto($socket, $toggle, $this->method); $result = stream_socket_enable_crypto($socket, $toggle, $this->method);
@ -73,12 +75,12 @@ class StreamEncryption
$this->loop->removeWriteStream($socket); $this->loop->removeWriteStream($socket);
$this->loop->removeReadStream($socket); $this->loop->removeReadStream($socket);
$deferred->resolve(); $resolver->resolve();
} else if (false === $result) { } else if (false === $result) {
$this->loop->removeWriteStream($socket); $this->loop->removeWriteStream($socket);
$this->loop->removeReadStream($socket); $this->loop->removeReadStream($socket);
$deferred->reject(new UnexpectedValueException( $resolver->reject(new UnexpectedValueException(
sprintf("Unable to complete SSL/TLS handshake: %s", $this->errstr), sprintf("Unable to complete SSL/TLS handshake: %s", $this->errstr),
$this->errno $this->errno
)); ));

View file

@ -4,17 +4,18 @@
"keywords": ["socket"], "keywords": ["socket"],
"license": "MIT", "license": "MIT",
"require": { "require": {
"php": ">=5.4.0", "php": ">=5.3.3",
"react/dns": "0.4.*", "react/dns": "0.3.*",
"react/event-loop": "0.4.*", "react/event-loop": "0.3.*",
"react/promise": "~2.0" "react/promise": "~1.0"
}, },
"autoload": { "autoload": {
"psr-4": { "React\\SocketClient\\": "" } "psr-0": { "React\\SocketClient": "" }
}, },
"target-dir": "React/SocketClient",
"extra": { "extra": {
"branch-alias": { "branch-alias": {
"dev-master": "0.4-dev" "dev-master": "0.3-dev"
} }
} }
} }

View file

@ -4,7 +4,6 @@ namespace React\Stream;
use Evenement\EventEmitter; use Evenement\EventEmitter;
use React\EventLoop\LoopInterface; use React\EventLoop\LoopInterface;
use React\Stream\WritableStreamInterface;
/** @event full-drain */ /** @event full-drain */
class Buffer extends EventEmitter implements WritableStreamInterface class Buffer extends EventEmitter implements WritableStreamInterface
@ -21,16 +20,11 @@ class Buffer extends EventEmitter implements WritableStreamInterface
'file' => '', 'file' => '',
'line' => 0, 'line' => 0,
); );
private $meta;
public function __construct($stream, LoopInterface $loop) public function __construct($stream, LoopInterface $loop)
{ {
$this->stream = $stream; $this->stream = $stream;
$this->loop = $loop; $this->loop = $loop;
if (is_resource($stream)) {
$this->meta = stream_get_meta_data($stream);
}
} }
public function isWritable() public function isWritable()
@ -78,13 +72,13 @@ class Buffer extends EventEmitter implements WritableStreamInterface
$this->listening = false; $this->listening = false;
$this->data = ''; $this->data = '';
$this->emit('close', [$this]); $this->emit('close');
} }
public function handleWrite() public function handleWrite()
{ {
if (!is_resource($this->stream) || ('generic_socket' === $this->meta['stream_type'] && feof($this->stream))) { if (!is_resource($this->stream)) {
$this->emit('error', array(new \RuntimeException('Tried to write to closed or invalid stream.'), $this)); $this->emit('error', array(new \RuntimeException('Tried to write to invalid stream.'), $this));
return; return;
} }
@ -96,23 +90,26 @@ class Buffer extends EventEmitter implements WritableStreamInterface
restore_error_handler(); restore_error_handler();
if (false === $sent) { if (false === $sent) {
$this->emit('error', array( $this->emit('error', array(new \ErrorException(
new \ErrorException( $this->lastError['message'],
$this->lastError['message'], 0,
0, $this->lastError['number'],
$this->lastError['number'], $this->lastError['file'],
$this->lastError['file'], $this->lastError['line']
$this->lastError['line'] )));
),
$this return;
)); }
if (0 === $sent && feof($this->stream)) {
$this->emit('error', array(new \RuntimeException('Tried to write to closed stream.'), $this));
return; return;
} }
$len = strlen($this->data); $len = strlen($this->data);
if ($len >= $this->softLimit && $len - $sent < $this->softLimit) { if ($len >= $this->softLimit && $len - $sent < $this->softLimit) {
$this->emit('drain', [$this]); $this->emit('drain');
} }
$this->data = (string) substr($this->data, $sent); $this->data = (string) substr($this->data, $sent);
@ -121,7 +118,7 @@ class Buffer extends EventEmitter implements WritableStreamInterface
$this->loop->removeWriteStream($this->stream); $this->loop->removeWriteStream($this->stream);
$this->listening = false; $this->listening = false;
$this->emit('full-drain', [$this]); $this->emit('full-drain');
} }
} }

View file

@ -10,11 +10,10 @@ use Evenement\EventEmitterInterface;
* @event error * @event error
* @event close * @event close
*/ */
interface ReadableStreamInterface extends EventEmitterInterface interface ReadableStreamInterface extends StreamInterface
{ {
public function isReadable(); public function isReadable();
public function pause(); public function pause();
public function resume(); public function resume();
public function pipe(WritableStreamInterface $dest, array $options = array()); public function pipe(WritableStreamInterface $dest, array $options = array());
public function close();
} }

View file

@ -21,13 +21,15 @@ class Stream extends EventEmitter implements ReadableStreamInterface, WritableSt
$this->loop = $loop; $this->loop = $loop;
$this->buffer = new Buffer($this->stream, $this->loop); $this->buffer = new Buffer($this->stream, $this->loop);
$this->buffer->on('error', function ($error) { $that = $this;
$this->emit('error', array($error, $this));
$this->close(); $this->buffer->on('error', function ($error) use ($that) {
$that->emit('error', array($error, $that));
$that->close();
}); });
$this->buffer->on('drain', function () { $this->buffer->on('drain', function () use ($that) {
$this->emit('drain', array($this)); $that->emit('drain');
}); });
$this->resume(); $this->resume();
@ -93,8 +95,10 @@ class Stream extends EventEmitter implements ReadableStreamInterface, WritableSt
$this->readable = false; $this->readable = false;
$this->writable = false; $this->writable = false;
$this->buffer->on('close', function () { $that = $this;
$this->close();
$this->buffer->on('close', function () use ($that) {
$that->close();
}); });
$this->buffer->end($data); $this->buffer->end($data);

View file

@ -0,0 +1,13 @@
<?php
namespace React\Stream;
use Evenement\EventEmitterInterface;
// This class exists because ReadableStreamInterface and WritableStreamInterface
// both need close methods.
// In PHP <= 5.3.8 a class can not implement 2 interfaces with coincidental matching methods
interface StreamInterface extends EventEmitterInterface
{
public function close();
}

View file

@ -19,13 +19,13 @@ class ThroughStream extends CompositeStream
public function write($data) public function write($data)
{ {
$this->readable->emit('data', array($this->filter($data), $this)); $this->readable->emit('data', array($this->filter($data)));
} }
public function end($data = null) public function end($data = null)
{ {
if (null !== $data) { if (null !== $data) {
$this->readable->emit('data', array($this->filter($data), $this)); $this->readable->emit('data', array($this->filter($data)));
} }
$this->writable->end($data); $this->writable->end($data);

View file

@ -10,10 +10,9 @@ use Evenement\EventEmitterInterface;
* @event close * @event close
* @event pipe * @event pipe
*/ */
interface WritableStreamInterface extends EventEmitterInterface interface WritableStreamInterface extends StreamInterface
{ {
public function isWritable(); public function isWritable();
public function write($data); public function write($data);
public function end($data = null); public function end($data = null);
public function close();
} }

View file

@ -4,19 +4,20 @@
"keywords": ["stream", "pipe"], "keywords": ["stream", "pipe"],
"license": "MIT", "license": "MIT",
"require": { "require": {
"php": ">=5.4.0", "php": ">=5.3.3",
"evenement/evenement": "~2.0" "evenement/evenement": "1.0.*"
}, },
"suggest": { "suggest": {
"react/event-loop": "0.4.*", "react/event-loop": "0.3.*",
"react/promise": "~2.0" "react/promise": "~1.0"
}, },
"autoload": { "autoload": {
"psr-4": { "React\\Stream\\": "" } "psr-0": { "React\\Stream": "" }
}, },
"target-dir": "React/Stream",
"extra": { "extra": {
"branch-alias": { "branch-alias": {
"dev-master": "0.4-dev" "dev-master": "0.3-dev"
} }
} }
} }