mirror of
https://github.com/DanielnetoDotCom/YouPHPTube
synced 2025-10-03 01:39:24 +02:00
490 lines
18 KiB
PHP
490 lines
18 KiB
PHP
<?php
|
|
|
|
namespace Socket;
|
|
|
|
use React\EventLoop\Loop;
|
|
use Ratchet\MessageComponentInterface;
|
|
use Ratchet\ConnectionInterface;
|
|
use Amp\Loop as AMPLoop;
|
|
|
|
require_once dirname(__FILE__) . '/../../videos/configuration.php';
|
|
require_once $global['systemRootPath'] . 'plugin/YPTSocket/functions.php';
|
|
|
|
class Message implements MessageComponentInterface {
|
|
|
|
protected $clients;
|
|
protected $clientsLoggedConnections = 0;
|
|
protected $clientsInVideos = array();
|
|
protected $clientsInLives = array();
|
|
protected $clientsInLivesLinks = array();
|
|
protected $clientsInChatsRooms = array();
|
|
protected $itemsToCheck = array(
|
|
array('parameter' => 'clientsLoggedConnections', 'index' => 'users_id', 'class_prefix'=>''),
|
|
array('parameter' => 'clientsInVideos', 'index' => 'videos_id', 'class_prefix'=>'total_on_videos_id_'),
|
|
array('parameter' => 'clientsInLives', 'index' => 'live_key_servers_id', 'class_prefix'=>'total_on_live_'),
|
|
array('parameter' => 'clientsInLivesLinks', 'index' => 'liveLink', 'class_prefix'=>'total_on_live_links_id_'),
|
|
array('parameter' => 'clientsInChatsRooms', 'index' => 'room_users_id', 'class_prefix'=>'')
|
|
);
|
|
|
|
public function __construct() {
|
|
//$this->loop->ad
|
|
$this->clients = array();
|
|
_log_message("Construct");
|
|
}
|
|
|
|
public function onOpen(ConnectionInterface $conn) {
|
|
$start = microtime(true);
|
|
$query = $conn->httpRequest->getUri()->getQuery();
|
|
parse_str($query, $wsocketGetVars);
|
|
foreach ($wsocketGetVars as $key => $value) {
|
|
$wsocketGetVars[$key] = urldecode($value);
|
|
}
|
|
if (empty($wsocketGetVars['webSocketToken'])) {
|
|
_log_message("Empty websocket token ");
|
|
return false;
|
|
}
|
|
$json = getDecryptedInfo($wsocketGetVars['webSocketToken']);
|
|
if (empty($json)) {
|
|
_log_message("Invalid websocket token ");
|
|
return false;
|
|
}
|
|
|
|
$live_key = object_to_array(@$json->live_key);
|
|
if (empty($live_key)) {
|
|
$live_key = array();
|
|
$live_key['key'] = '';
|
|
$live_key['live_servers_id'] = 0;
|
|
$live_key['liveLink'] = '';
|
|
}
|
|
//var_dump($live_key);
|
|
$client = array();
|
|
$client['resourceId'] = intval($conn->resourceId);
|
|
$client['users_id'] = intval($json->from_users_id);
|
|
$client['room_users_id'] = intval(@$json->room_users_id);
|
|
$client['videos_id'] = intval($json->videos_id);
|
|
$client['live_key_servers_id'] = "{$live_key['key']}_{$live_key['live_servers_id']}";
|
|
$client['liveLink'] = $live_key['liveLink'];
|
|
$client['isAdmin'] = $json->isAdmin;
|
|
$client['live_key'] = $live_key['key'];
|
|
$client['live_servers_id'] = intval($live_key['live_servers_id']);
|
|
$client['user_name'] = $json->user_name;
|
|
$client['browser'] = $json->browser;
|
|
$client['yptDeviceId'] = $json->yptDeviceId;
|
|
$client['client'] = deviceIdToObject($json->yptDeviceId);
|
|
if (!empty($wsocketGetVars['webSocketSelfURI'])) {
|
|
$client['selfURI'] = $wsocketGetVars['webSocketSelfURI'];
|
|
} else {
|
|
$client['selfURI'] = $json->selfURI;
|
|
}
|
|
$client['isCommandLine'] = @$wsocketGetVars['isCommandLine'];
|
|
$client['page_title'] = @utf8_encode(@$wsocketGetVars['page_title']);
|
|
$client['ip'] = $json->ip;
|
|
if (!empty($json->location)) {
|
|
$client['location'] = $json->location->country_name;
|
|
$client['country_name'] = $json->location->country_name;
|
|
$client['country_code'] = $json->location->country_code;
|
|
} else {
|
|
$client['location'] = 0;
|
|
$client['country_name'] = 0;
|
|
$client['country_code'] = 0;
|
|
}
|
|
$client['browser'] = $client['client']->browser;
|
|
$client['os'] = $client['client']->os;
|
|
$client['data'] = $json;
|
|
|
|
var_dump($client['liveLink'], $live_key);
|
|
|
|
$this->setClient($conn, $client);
|
|
dbInsertConnection($client);
|
|
|
|
if ($client['browser'] == \SocketMessageType::TESTING) {
|
|
_log_message("Test detected and received from ($conn->resourceId) " . PHP_EOL . "\e[1;32;40m*** SUCCESS TEST CONNECION {$json->test_msg} ***\e[0m");
|
|
$this->msgToResourceId($json, $conn->resourceId, \SocketMessageType::TESTING);
|
|
} else if ($this->shouldPropagateInfo($client)) {
|
|
//_log_message("shouldPropagateInfo {$json->yptDeviceId}");
|
|
$this->msgToAll($conn, array('users_id' => $client['users_id'], 'user_name' => $client['user_name'], 'yptDeviceId' => $client['yptDeviceId']), \SocketMessageType::NEW_CONNECTION, true);
|
|
} else {
|
|
//_log_message("NOT shouldPropagateInfo ");
|
|
}
|
|
$end = number_format(microtime(true) - $start, 4);
|
|
_log_message("Connection opened in {$end} seconds");
|
|
}
|
|
|
|
public function onClose(ConnectionInterface $conn) {
|
|
global $onMessageSentTo, $SocketGetTotals;
|
|
$client = dbGetRowFromResourcesId($conn->resourceId);
|
|
_log_message("onClose {$conn->resourceId} before deleted");
|
|
dbDeleteConnection($conn->resourceId);
|
|
_log_message("onClose {$conn->resourceId} has deleted");
|
|
$this->unsetClient($conn, $client);
|
|
if ($this->shouldPropagateInfo($client)) {
|
|
$this->msgToAll($conn, array('users_id' => $client['users_id'], 'disconnected'=>$conn->resourceId), \SocketMessageType::NEW_DISCONNECTION);
|
|
}
|
|
_log_message("Connection {$conn->resourceId} has disconnected");
|
|
}
|
|
|
|
protected function setClient(ConnectionInterface $conn, $client) {
|
|
$this->clients[$conn->resourceId] = $conn;
|
|
foreach ($this->itemsToCheck as $value) {
|
|
if (!empty($client[$value['index']])) {
|
|
if (!is_array($this->{$value['parameter']})) {
|
|
$this->{$value['parameter']} = array();
|
|
}
|
|
if (empty($this->{$value['parameter']}[$client[$value['index']]])) {
|
|
$this->{$value['parameter']}[$client[$value['index']]] = 1;
|
|
} else {
|
|
$this->{$value['parameter']}[$client[$value['index']]]++;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
protected function unsetClient(ConnectionInterface $conn, $client) {
|
|
unset($this->clients[$conn->resourceId]);
|
|
|
|
foreach ($this->itemsToCheck as $value) {
|
|
if (!empty($client[$value['index']])) {
|
|
if (!is_array($this->{$value['parameter']})) {
|
|
$this->{$value['parameter']} = array();
|
|
} else {
|
|
$this->{$value['parameter']}[$client[$value['index']]]--;
|
|
if ($this->{$value['parameter']}[$client[$value['index']]] <= 0) {
|
|
unset($this->{$value['parameter']}[$client[$value['index']]]);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
function getTotalFromVars() {
|
|
$totals = array();
|
|
|
|
foreach ($this->itemsToCheck as $value) {
|
|
if(!empty($value['class_prefix'])){
|
|
foreach ($this->{$value['parameter']} as $key2 => $value2) {
|
|
if(empty($key2) || $key2 === '_0' || $key2 === '_'){
|
|
continue;
|
|
}
|
|
$index = "{$value['class_prefix']}{$key2}";
|
|
$totals[$index] = $value2;
|
|
}
|
|
}
|
|
}
|
|
|
|
return $totals;
|
|
}
|
|
|
|
public function getTotals() {
|
|
//$getTotals = dbGetDBTotals();
|
|
$totals = array();
|
|
$totals['total_devices_online'] = dbGetTotalUniqueDevices();
|
|
$totals['total_users_online'] = dbGetTotalUniqueUsers();
|
|
//$totals['class_to_update'] = dbGetTotalUniqueDevices();
|
|
//$totals['users_uri'] = $getTotals['users_uri'];
|
|
$totals['LivesTotals'] = $this->getLivesTotal();
|
|
|
|
//$getTotals = dbGetDBTotals();
|
|
$getTotalFromVars = $this->getTotalFromVars();
|
|
|
|
$totals = array_merge($totals, $getTotalFromVars);
|
|
//var_dump($totals);
|
|
return $totals;
|
|
}
|
|
|
|
public function onMessage(ConnectionInterface $from, $msg) {
|
|
global $onMessageSentTo, $SocketGetTotals;
|
|
$SocketGetTotals = null;
|
|
$onMessageSentTo = array();
|
|
//_log_message("onMessage: {$msg}");
|
|
$json = _json_decode($msg);
|
|
if (empty($json)) {
|
|
_log_message("onMessage ERROR: JSON is empty ");
|
|
return false;
|
|
}
|
|
if (empty($json->webSocketToken)) {
|
|
_log_message("onMessage ERROR: webSocketToken is empty ");
|
|
return false;
|
|
}
|
|
|
|
if (!$msgObj = getDecryptedInfo($json->webSocketToken)) {
|
|
_log_message("onMessage ERROR: could not decrypt webSocketToken");
|
|
return false;
|
|
}
|
|
|
|
switch ($json->msg) {
|
|
case \SocketMessageType::TESTING:
|
|
$this->msgToResourceId($json, $from->resourceId, \SocketMessageType::TESTING);
|
|
break;
|
|
default:
|
|
$this->msgToArray($json);
|
|
//_log_message("onMessage:msgObj: " . json_encode($json));
|
|
if (!empty($msgObj->send_to_uri_pattern)) {
|
|
$this->msgToSelfURI($json, $msgObj->send_to_uri_pattern);
|
|
} else if (!empty($json['resourceId'])) {
|
|
$this->msgToResourceId($json, $json['resourceId']);
|
|
} else if (!empty($json['to_users_id'])) {
|
|
$this->msgToUsers_id($json, $json['to_users_id']);
|
|
} else {
|
|
$this->msgToAll($from, $json);
|
|
}
|
|
break;
|
|
}
|
|
}
|
|
|
|
private function shouldPropagateInfo($row) {
|
|
if (preg_match('/^unknowDevice.*/', $row['yptDeviceId'])) {
|
|
return false;
|
|
}
|
|
if (!empty($row['isCommandLine'])) {
|
|
return false;
|
|
}
|
|
return true;
|
|
}
|
|
|
|
public function msgToResourceId($msg, $resourceId, $type = "", $totals = array()) {
|
|
global $onMessageSentTo, $SocketDataObj;
|
|
if (empty($resourceId)) {
|
|
return false;
|
|
}
|
|
|
|
$row = dbGetRowFromResourcesId($resourceId);
|
|
|
|
if (empty($row)) {
|
|
_log_message("msgToResourceId: resourceId=({$resourceId}) NOT found");
|
|
}
|
|
|
|
if (!$this->shouldPropagateInfo($row)) {
|
|
_log_message("msgToResourceId: we wil NOT send the message to resourceId=({$resourceId}) {$type}");
|
|
}
|
|
|
|
if (!is_array($msg)) {
|
|
$this->msgToArray($msg);
|
|
}
|
|
if (!empty($msg['webSocketToken'])) {
|
|
unset($msg['webSocketToken']);
|
|
}
|
|
if (empty($type)) {
|
|
$type = \SocketMessageType::DEFAULT_MESSAGE;
|
|
}
|
|
|
|
$videos_id = $row['videos_id'];
|
|
$users_id = $row['users_id'];
|
|
$live_key = $row['live_key'];
|
|
|
|
$obj = array();
|
|
$obj['ResourceId'] = $resourceId;
|
|
$obj['type'] = $type;
|
|
|
|
if (isset($msg['callback'])) {
|
|
$obj['callback'] = $msg['callback'];
|
|
unset($msg['callback']);
|
|
} else {
|
|
$obj['callback'] = "";
|
|
}
|
|
|
|
if (!empty($msg['json'])) {
|
|
$obj['msg'] = $msg['json'];
|
|
} else if (!empty($msg['msg'])) {
|
|
$obj['msg'] = $msg['msg'];
|
|
} else {
|
|
$obj['msg'] = $msg;
|
|
}
|
|
|
|
$obj['uniqid'] = uniqid();
|
|
$obj['users_id'] = $users_id;
|
|
$obj['videos_id'] = $videos_id;
|
|
$obj['live_key'] = $live_key;
|
|
$obj['webSocketServerVersion'] = $SocketDataObj->serverVersion;
|
|
$obj['isAdmin'] = $row['isAdmin'];
|
|
|
|
if (empty($totals)) {
|
|
$totals = $this->getTotals();
|
|
}
|
|
$return = $totals;
|
|
|
|
$info = array(
|
|
'webSocketServerVersion' => $SocketDataObj->serverVersion,
|
|
'socket_users_id' => $users_id,
|
|
'socket_resourceId' => $resourceId,
|
|
);
|
|
|
|
$autoUpdateOnHTML = array_merge($info, $return);
|
|
$obj['autoUpdateOnHTML'] = $autoUpdateOnHTML;
|
|
|
|
//$obj['users_uri'] = $return['users_uri'];
|
|
$obj['resourceId'] = $resourceId;
|
|
$obj['users_id_online'] = dbGetUniqueUsers();
|
|
|
|
$msgToSend = json_encode($obj);
|
|
_log_message("msgToResourceId: resourceId=({$resourceId}) {$type}");
|
|
$this->clients[$resourceId]->send($msgToSend);
|
|
}
|
|
|
|
public function onError(ConnectionInterface $conn, \Exception $e) {
|
|
dbDeleteConnection($conn->resourceId);
|
|
$conn->close();
|
|
}
|
|
|
|
public function msgToUsers_id($msg, $users_id, $type = "") {
|
|
if (empty($users_id)) {
|
|
return false;
|
|
}
|
|
try {
|
|
$count = 0;
|
|
if (!is_array($users_id)) {
|
|
$users_id = array($users_id);
|
|
}
|
|
foreach ($users_id as $user_id) {
|
|
$user_id = intval($user_id);
|
|
if (empty($user_id)) {
|
|
continue;
|
|
}
|
|
|
|
$rows = dbGetAllResourcesIdFromUsersId($user_id);
|
|
foreach ($rows as $row) {
|
|
$count++;
|
|
$this->msgToResourceId($msg, $row['resourceId'], $type);
|
|
}
|
|
}
|
|
} catch (Exception $exc) {
|
|
echo $exc->getTraceAsString();
|
|
}
|
|
|
|
_log_message("msgToUsers_id: sent to ($count) clients users_id=" . json_encode($users_id));
|
|
}
|
|
|
|
public function msgToSelfURI($msg, $pattern, $type = "") {
|
|
if (empty($pattern)) {
|
|
return false;
|
|
}
|
|
$count = 0;
|
|
$rows = dbGetAllResourceIdFromSelfURI($pattern);
|
|
$totals = $this->getTotals();
|
|
foreach ($rows as $row) {
|
|
$count++;
|
|
$this->msgToResourceId($msg, $row['resourceId'], $type, $totals);
|
|
}
|
|
_log_message("msgToSelfURI: sent to ($count) clients pattern={$pattern} {$type}");
|
|
}
|
|
|
|
function getLivesTotal() {
|
|
$this->totalUsersOnLives = array('updated' => time());
|
|
$this->totalUsersOnLives['statsList'] = array();
|
|
$rows = dbGetTotalInLive();
|
|
foreach ($rows as $value) {
|
|
$total_viewers = 0;
|
|
if ($this->isLiveUsersEnabled()) {
|
|
$total_viewers = \LiveUsers::getTotalUsers($value['live_key'], $value['live_servers_id']);
|
|
}
|
|
$this->totalUsersOnLives['statsList'][$value['live_key']] = array(
|
|
'total_viewers' => $total_viewers,
|
|
'watching_now' => intval($value['total']),
|
|
);
|
|
}
|
|
|
|
return $this->totalUsersOnLives;
|
|
}
|
|
|
|
private function isLiveUsersEnabled() {
|
|
global $_isLiveUsersEnabled;
|
|
if (!isset($_isLiveUsersEnabled)) {
|
|
$_isLiveUsersEnabled = \AVideoPlugin::isEnabledByName('LiveUsers') && method_exists('LiveUsers', 'getTotalUsers');
|
|
}
|
|
return $_isLiveUsersEnabled;
|
|
}
|
|
|
|
public function msgToAll(ConnectionInterface $from, $msg, $type = "", $includeMe = false) {
|
|
$start = microtime(true);
|
|
$rows = dbGetAll();
|
|
|
|
$totals = $this->getTotals();
|
|
|
|
foreach ($rows as $key => $client) {
|
|
$this->msgToResourceId($msg, $client['resourceId'], $type, $totals);
|
|
}
|
|
$end = number_format(microtime(true) - $start, 4);
|
|
_log_message("msgToAll FROM ({$from->resourceId}) {$type} Total Clients: " . count($rows) . " in {$end} seconds");
|
|
}
|
|
|
|
public function msgToAllSameVideo($videos_id, $msg) {
|
|
if (empty($videos_id)) {
|
|
return false;
|
|
}
|
|
if (!is_array($msg)) {
|
|
$this->msgToArray($msg);
|
|
}
|
|
_log_message("msgToAllSameVideo: {$videos_id}");
|
|
$totals = $this->getTotals();
|
|
foreach (dbGetAllResourcesIdFromVideosId($videos_id) as $client) {
|
|
$this->msgToResourceId($msg, $client['resourceId'], \SocketMessageType::ON_VIDEO_MSG, $totals);
|
|
}
|
|
}
|
|
|
|
public function msgToAllSameLive($live_key, $live_servers_id, $msg) {
|
|
if (empty($live_key)) {
|
|
return false;
|
|
}
|
|
|
|
if (!is_array($msg)) {
|
|
$this->msgToArray($msg);
|
|
}
|
|
|
|
$rows = dbGetAllResourcesIdFromLive($live_key, $live_servers_id);
|
|
$return = $this->getTotals();
|
|
|
|
$info = array(
|
|
'webSocketServerVersion' => $SocketDataObj->serverVersion,
|
|
'socket_users_id' => $users_id,
|
|
'socket_resourceId' => $resourceId,
|
|
);
|
|
|
|
$autoUpdateOnHTML = array_merge($info, $return);
|
|
foreach ($rows as $value) {
|
|
$this->msgToResourceId($msg, $value['resourceId'], \SocketMessageType::ON_LIVE_MSG);
|
|
}
|
|
}
|
|
|
|
private function msgToArray(&$json) {
|
|
$json = $this->makeSureIsArray($json);
|
|
return true;
|
|
}
|
|
|
|
private function makeSureIsArray($msg) {
|
|
if (empty($msg)) {
|
|
return array();
|
|
}
|
|
if (is_string($msg)) {
|
|
$decoded = _json_decode($msg);
|
|
} else {
|
|
$decoded = object_to_array($msg);
|
|
}
|
|
if (is_string($msg) && !$decoded) {
|
|
return array($msg);
|
|
} else if (is_string($msg)) {
|
|
return object_to_array($decoded);
|
|
}
|
|
return object_to_array($msg);
|
|
}
|
|
|
|
public function getTags() {
|
|
return array('free', 'live');
|
|
}
|
|
|
|
public function isUserLive($users_id) {
|
|
return dbIsUserOnLine($users_id);
|
|
}
|
|
|
|
}
|
|
|
|
function _log_message($msg, $type = "") {
|
|
global $SocketDataObj;
|
|
if (!empty($SocketDataObj->debugAllUsersSocket) || !empty($SocketDataObj->debugSocket)) {
|
|
//_error_log($msg, \AVideoLog::$SOCKET);
|
|
$mem_usage = memory_get_usage();
|
|
$mem = humanFileSize($mem_usage);
|
|
echo date('Y-m-d H:i:s') . " Using: {$mem} RAM " . $msg . PHP_EOL;
|
|
} else if ($type == \AVideoLog::$ERROR) {
|
|
_error_log($msg, \AVideoLog::$SOCKET);
|
|
echo "\e[1;31;40m" . date('Y-m-d H:i:s') . ' ' . $msg . "\e[0m" . PHP_EOL;
|
|
}
|
|
}
|