Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transferring data between workers #1028

Open
muramidaza opened this issue Mar 20, 2024 · 5 comments
Open

Transferring data between workers #1028

muramidaza opened this issue Mar 20, 2024 · 5 comments

Comments

@muramidaza
Copy link

muramidaza commented Mar 20, 2024

Good day!
I have the config:

$this->worker = new Worker(self::getWebsocketAddressWorker(), $this->context);
        
$this->worker->count = 4;

How can transfer data between workers? For example, one client received a connection in the first worker. And another client received a connection in the second worker. Need to transfer data from one client to another. But this requires access to the connection from the first client in the second worker. How to correctly receive data (example Connection) from another worker?

It seems that each worker has its own global variables

@walkor
Copy link
Owner

walkor commented Mar 21, 2024

#711

I think this is what you want.

@muramidaza
Copy link
Author

I tried to remodel in a functional style. For the test. But still, global variables are unique for each worker.

<?php

require_once __DIR__ . '/vendor/autoload.php';

//php /var/www/html/chat.php start

use Channel\Client;
use Channel\Server;
use modules\chat\models\Chat;
use modules\chat\models\ChatMember;
use modules\chat\models\ChatMessage;
use modules\homeCall\models\Users;
use Workerman\Connection\TcpConnection;
use Workerman\Worker;
use yii\helpers\ArrayHelper;

const ACTION_CONNECT = 'connect';
const ACTION_SEND_MESSAGE = 'send_message';
const ACTION_CONNECT_TO_CHAT = 'connect_to_chat';

const AUTH_TYPE_KEY = 'key';
const AUTH_TYPE_TOKEN = 'token';

$websocketUrl = '127.0.0.1';
$websocketHost = '127.0.0.1';
$websocketPort = '2346';
$websocketDomain = 'homecall_php_new';
$websocketSsl = false;

$channel_server = new Server(getWebsocketHost());

$users = [];
$connectionsUsers = [];
$chatsLoggedUsers = [];
$chatsUsers = [];
$connections = [];

$dateTimeFormat = 'php:d M Y, H:i';

$debug = true;

$context = [];

if (getWebsocketSsl()) {
    $context = [
        'ssl' => [
//            'local_cert'  => Yii::$app->params['server.chat.ssl.cert'] ?? '',
//            'local_pk'    => Yii::$app->params['server.chat.ssl.pkey'] ?? '',
            'verify_peer' => false,
        ]
    ];
}

$worker = new Worker(getWebsocketAddressWorker(), $context);

if (getWebsocketSsl()) {
    $worker->transport = 'ssl';
}

$worker->count = 4;

// При запуске соединение процесса с каналами
$worker->onWorkerStart = function () {
    Client::connect(getWebsocketHost());
    Client::on('publish', function($eventData){
        onPublish($eventData);
    });
};

// Обработка события нового подключения
$worker->onConnect = function(TcpConnection $connection) {
    global $debug;
    if ($debug)
        echo 'New connection '.$connection->id. ' worker ' . $connection->worker->id . "\n";
};

// Обработка события закрытия соединения
$worker->onClose = function (TcpConnection $connection) {
    global $debug;

    $userID = getUserIDByConnection($connection);

    if(!$userID) return;

    deleteClient($connection, $userID);
    if ($debug)
        echo 'Connection closed ' . $connection->id . " worker " . $connection->worker->id . "\n";
};

// Обработка события получения данных
$worker->onMessage = function (TcpConnection $connection, $data) {
    onMessage($connection, $data);
};

// Запуск
Worker::runAll();

function onMessage(TcpConnection $connection, string $message): void
{
    global $debug;

    $data = json_decode($message, true);

    $action = $data['action'] ?? '';

    $userID = getUserIDByConnection($connection);

    if ($action !== ACTION_CONNECT && is_null($userID)) {
        $connection->send(toJson(['error' => 'У вас нет прав для доступа к чату']));
        $connection->close();
        return;
    }

    switch ($action) {
        case ACTION_CONNECT: {

            $userID = $data['user_id'];

            addClient($connection, $userID, []);

            $connection->send(toJson([
                'action' => 'init',
                'chats' => [],
            ]));

            break;
        }
        case ACTION_CONNECT_TO_CHAT: {
            $chatID = $data['chat_id'] ?? null;

            if (!Chat::checkAccess($chatID, $userID)) {
                $connection->send(toJson(['error' => 'Нет доступа к чату, попробуйте обновить страницу']));
                return;
            }

            $messageList = formatMessages(ChatMessage::find()->where(['chat_id' => $chatID])->all());

            $connection->send(toJson([
                'action' => 'init_chat',
                'messages' => $messageList,
            ]));
        }
        case ACTION_SEND_MESSAGE: {
            $chatID = $data['chat_id'] ?? null;
            $message = $data['message'] ?? null;
            $message_type_id = $data['message_type_id'] ?? null;

            if ($debug) echo 'Received message ' . print_r($data, true)."\n";

            //$count_messages = ChatMessage::find()->where(['chat_id' => $chatID])->count();
            $chat = Chat::findOne($chatID);

            if($chat->status != Chat::STATUS_ACTIVE) {
                $connection->send(toJson(['error' => 'Чат закрыт']));
                $connection->close();
                break;
            }

            $chatMessage = new ChatMessage();
            $chatMessage->user_id = $userID;
            $chatMessage->chat_id = $chatID;
            $chatMessage->message = $message;
            $chatMessage->type_id = $message_type_id;

            if (!$chatMessage->save()) {
                $connection->send(toJson($chatMessage->getErrors()));
                break;
            }

            if ($debug) echo 'Chat ID ' . $chatID . "\n" . 'Message ' . $message . "\n";

            Client::publish('publish', array(
                'chat_id' => $chatID,
                'message' => toJson([
                    'action' => 'add_message',
                    'chat_id' => $chatID,
                    'user_id' => $userID,
                    'message' => $message,
                    'message_type_id' => $message_type_id
                ]),
            ));

            break;
        }

        default: {
            $connection->send(toJson(['error' => 'Команда не найдена']));
        }
    }
}

function formatMessages($messageList)
{
    global $dateTimeFormat;

    foreach ($messageList as &$item) {
        $item['created_at'] = '';
    }

    return $messageList;
}

function formatChats($chatList, $current_user_id)
{
    global $dateTimeFormat;

    $chatListArray = [];

    foreach ($chatList as $key => $chat) {
        /** @var $chat Chat */

        $chatListArray[$key]['id'] = $chat->id;
        $chatListArray[$key]['type_id'] = $chat->type_id;
        $chatListArray[$key]['created_at'] = '';
        if($chat->type_id == Chat::TYPE_SIMPLE) {
            foreach ($chat->users as $user) {
                /** @var $user Users */
                if($user->id != $current_user_id) {
                    $chatListArray[$key]['name'] = $user->getFullName();
                    break;
                }
            }
        }
    }

    return $chatListArray;
}

function onPublish($eventData)
{
    global $debug;
    global $chatsLoggedUsers;
    global $connections;

    $chatID = $eventData['chat_id'];
    $message = $eventData['message'];

    //Получаем список ID пользователей, которые зашли в данный чат
    $userIDs = $chatsLoggedUsers[$chatID];

    foreach ($userIDs as $userID) {
        if (isset($users[$userID])) {
            foreach ($users[$userID]['connections'] as $connectionID) {

                /** @var  $connection TcpConnection */

                $connection = $connections[$connectionID];

                if($connection) $connection->send($message);

                if ($debug) echo 'Send to connection[' . $connectionID . '] - user[' . $userID . ']' . "\n";
                if ($debug) echo 'Message ' . print_r(json_decode($message), true) . "\n";
            }
        }
    }
}

/**
 * Авторизован ли клиент
 * @param TcpConnection $connection
 * @return int|null
 */
function getUserIDByConnection(TcpConnection $connection): ?int
{
    global $connectionsUsers;
    return $connectionsUsers[$connection->id . '-' . $connection->worker->id] ?? null;
}

/**
 * Добавление соединения
 * @param TcpConnection $connection
 * @param int $userID
 * @param array $chatIDs
 * @return void
 */
function addClient(TcpConnection $connection, int $userID, array $chatIDs)
{
    global $connections;
    global $users;
    global $connectionsUsers;

    // Добавляем клиента и его соединение
    $connections[$connection->id . '-' . $connection->worker->id] = $connection;

    $users[$userID]['connections'][$connection->id . '-' . $connection->worker->id] = $connection->id . '-' . $connection->worker->id;

    $users[$userID]['chats'] = $chatIDs;

    $connectionsUsers[$connection->id . '-' . $connection->worker->id] = $userID;

    foreach ($chatIDs as $chatID) {
        $chatsUsers[$chatID][$userID] = $userID;
    }

    printConnectionList();
    printClientList();
}

/**
 * Удаление соединения
 * @param TcpConnection $connection
 * @param int $userID
 * @return void
 */
function deleteClient($connection, $userID)
{
    global $connections;
    global $users;
    global $connectionsUsers;

    //Удаление соединения из списка соединений
    if($connections[$connection->id . '-' . $connection->worker->id]) unset($connections[$connection->id . '-' . $connection->worker->id]);

    unset($connectionsUsers[$connection->id . '-' . $connection->worker->id]);

    //Удаление соединения из списка пользователей
    if (isset($users[$userID]) && $users[$userID]['connections'][$connection->id . '-' . $connection->worker->id]) {
        unset($users[$userID]['connections'][$connection->id . '-' . $connection->worker->id]);
    }

    //Если соединений у пользователя больше не осталось то удаляем пользователя
    if(empty($users[$userID]['connections'])) {
        foreach ($users[$userID]['chats'] as $chatID) {
            if(!empty($chatsUsers[$chatID][$userID])) unset($chatsUsers[$chatID][$userID]);
            if(!empty($chatsLoggedUsers[$chatID][$userID])) unset($chatsLoggedUsers[$chatID][$userID]);
        }

        unset($users[$userID]);
    }

    printConnectionList();
    printClientList();
}

/**
 * Вывод пользователей в консоль
 * @return void
 */
function printClientList()
{
    global $debug;
    global $users;

    if (!$debug) return;

    echo "\n------------------------------------------\nClients: \n";
    foreach ($users as $userID => $userItem) {
        echo 'User ID: ' . $userID . " \n";
        foreach ($userItem['connections'] as $connectionID => $connectionItem) {
            echo ' - Connection ID: ' . $connectionID . " \n";
        }
        foreach ($userItem['chats'] as $chat) {
            echo ' - Chat ID: ' . $chat . " \n";
        }
    }
    echo "---------------- " . memory_get_usage() . " ---------------\n";
}

/**
 * Вывод соединений в консоль
 * @return void
 */
function printConnectionList()
{
    global $debug;
    global $connectionsUsers;

    if (!$debug) return;

    echo "\n------------------------------------------\nConnections: \n";

    foreach ($connectionsUsers as $connectionID => $userID) {
        echo 'Connection ID ' . $connectionID . ' User ID ' . $userID . " \n";
    }

    echo "---------------- " . memory_get_usage() . " ---------------\n";
}

/**
 * Формирование ответа в JSON
 * @param array $data
 * @return false|string
 */
function toJson(array $data) {
    return json_encode($data, JSON_UNESCAPED_UNICODE);
}

function getWebsocketHost()
{
    global $websocketHost;
    return $websocketHost;
}

function getWebsocketAddressWorker(bool $cli = false): string
{
    if ($cli) {
        return (getWebsocketSsl() ? 'wss://' : 'ws://').getWebsocketDomainWorker().':'.getWebsocketPort();
    }
    return 'websocket://'.getWebsocketDomainWorker().':'.getWebsocketPort();
}

function getWebsocketSsl(): bool
{
    global $websocketSsl;
    return $websocketSsl;
}

function getWebsocketDomain(): string
{
    global $websocketDomain;
    return $websocketDomain;
}

function getWebsocketPort(): string
{
    global $websocketPort;
    return $websocketPort;
}

function getWebsocketDomainWorker(): string
{
    global $websocketDomain;
    return $websocketDomain;
}

I've simplified the code for the test. Clients join and a connection is created. But for example variable $users each worker has his own

@muramidaza
Copy link
Author

muramidaza commented Mar 21, 2024

That is - I specify a variable as an array to store connections outside the function. Inside the function I call this variable with word Global and save the user and connection there. This variable is in this array. If the new connection is in the same worker, then there will be 2 connections in the array.

But if the new (third) connection is in the new worker, then the new connection will be added to the empty array. That is, there will be one connection.

And if, for example, there is a fourth connection, and it will be in the first worker. Then it will be added to the array where there are already 2 variables.

That is, each worker has its own global variable.

That is, it turns out to be 2 global variables. One stores an array with 3 connections and the second stores an array with 1 connection. And each global variable is called from the worker to which it belongs.

Initially, I did it in the form of classes and specified these variables as static. And used them as static ServerChatController::$connectionsUsers, example in onMessage method

$this->worker->onMessage = function (TcpConnection $connection, $data) {
            $this->onMessage($connection, $data);
        };

Everything works, but each worker has its own static variable

Please help me make global or static variables common to all workers

@walkor
Copy link
Owner

walkor commented Mar 21, 2024

Unable to share connection objects between processes, you can notify the corresponding worker process through the channel and have the process operate the connection object on its behalf.
Or use only one process to handle all.

@muramidaza
Copy link
Author

Thank you very much! I used channels - now everything works correctly

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants