Как создать простую, но эффективную систему очереди сообщений

  • PHP
  • 3,184
  • 0
  • 3
  • 0
  • 1 год назад

На протяжении последних нескольких месяцев перед нами стояла сложная задача, заключавшаяся в выполнении множества микро-услуг для тысяч запросов наиболее эффективным способом. Нашей первой мыслью было использовать Очередь (что намного лучше, чем создавать таблицу x-SQL со своими собственными статусами, датами создания, приоритетами и т.д.) - и оказались правы. Однако существует очень много вариантов работы с очередью. Итак, вот наши находки, основанные не столько на их технических характеристиках, сколько на наших личных предпочтениях:

  • RabbitMQ: Эффективный, быстрый и надежный инструмент. Но после нескольких месяцев работы с ним мы просто задыхались среди множества вариантов и терминов. Нам пришлось учить много всего, а нам была нужна всего лишь очередь!
  • Amazon SQS: Отличная система с ее легендарным «для работы с AWS (Веб-сервисы Amazon) вам необходимо иметь докторскую степень». Но к нашему удивлению, мы смогли создать систему в очень короткие сроки. К сожалению, сообщения приходят с задержкой, а между публикацией и потреблением приходится ждать «до минуты и более».
  • Beanstalkd: В результате поисков мы наткнулись на Beanstalkd. До абсурда простой инструмент, быстро устанавливается, понятная терминология, хороший уровень выполнения. Недостатки: Защита полностью отсутствует (остается только заблокировать порты и молиться). И еще мне придется найти нормальный инструмент для отслеживания сообщений.

Я покажу, как мы устанавливали Beanstalkd. Итак, я исхожу из следующего:

  • Вы знаете достаточно о PHP
  • Вы обладаете базовыми знаниями о компоновщике
  • Вы разбираетесь в объектно-ориентированном PHP
  • Вы как минимум следуете стандартам PSR-0 или используете какой-нибудь достойный фреймворк.

Установка (Ubuntu)

aptitude install -y beanstalkd

Готово. Налейте себе чашечку кофе. Отличная работа.

Использование

Все по порядку. Сначала нам нужен так называемый издатель, а затем потребители/рабочие.

Разместите эту библиотеку в вашем компоновщике (composer.json):

"pda/pheanstalk": "~3.0.2",

Я создал простой класс под названием Queue\Publisher.php и Queue\Consumer.php:

namespace Queue;

use Pheanstalk\Pheanstalk;

class Publisher
{
    private $queue;

    public function __construct(array $args)
    {
        $this->config = ['queue' => ['host' => 'yourdomain.com']; // Не обращайте на это внимания. Обычно у менять есть файл конфигурации, но я переместил его сюда специально для целей этой статьи. В качестве хоста используйте IP или Домен. 

        $this->queue = $args['queue']; // Просто имя очереди. Я передаю его в качестве параметра. Beanstalkd называет очереди «трубами», но вы можете выбрать любое имя. Если его не существует, оно будет создано. 
        $this->client = new Pheanstalk($this->config['queue']['host']); // Инстанцируйте объект. 
    }

    public function send($request)
    {
        return $this->client
            ->useTube($this->queue)
            ->put(json_encode($request)); // Отправьте что угодно с кодировкой в формате json – и готово!
    }
}

И для потребителя (consumer):

namespace Queue;

use Pheanstalk\Pheanstalk;

class Consumer
{
    private $queue;

    public function __construct(array $args)
    {
        $this->config = ['queue' => ['host' => 'yourdomain.com']; // Не обращайте на это внимания. Обычно у меня есть файл конфигурации, но я переместил его сюда специально для целей этой статьи. В качестве хоста используйте IP или Домен. 

        $this->queue = $args['queue'];
        $this->client = new Pheanstalk($this->config['queue']['host']);
    }

    public function listen()
    {
        $this->client->watch($this->queue); // Снова передайте имя очереди. 

        while ($job = $this->client->reserve()) { // Продолжайте это делать... чтобы он всегда слушал. 
            $message = json_decode($job->getData(), true); // Расшифруйте сообщение

            $status = $this->process($message);

            if ($status)
                $this->client->delete($job);
            else
                $this->client->delete($job); // Удаляйте в любом случае. Позже попытка будет повторена. 
        }
    }

    public function process($msg)
    {
         // Выполните какую-нибудь операцию и возвратите значение `true` в случае успеха или значение `false`
    }

Наконец, вам нужно вызвать это:

$publish = new Queues\Publisher(['queue' => "someQueue"]);
$publish->send("Do this..."); // Это может быть совокупность, имя класса и т.д.

А на другой стороне извлечь это:

$consumer = new Queues\Consumer(['queue' => "someQueue"]); // Обычно это выполняется через консоль.

Готово! В нашем приложении мы используем PhalconPHP, и у нас есть такое понятие, как Задачи. Мы используем эти задачи для управления Потребителем, а Cronjob мы используем для «публикации» сообщений. Аналогично, мы можем выполнять большое количество потребительских задач для более быстрого обслуживания очереди.

Для увеличения масштабируемости мы можем настроить любое количество серверов с одним и тем же кодом, направить потребителя на один и тот же IP/Домен Beanstalkd’а и обслуживать еще больше потребителей. Так что возможности для расширения фактически безграничны.

И совет напоследок. Мы выполняем задачи через Supervisor. Скажем, у нас есть 10 рабочих (workers), что позволяет нам очистить очереди где-то за 15 минут. (Задержка происходит во внешних API, к которым мы делаем запрос на каждую тысячу обрабатываемых нами сообщений). Если ваши операции носят местный характер или в состоянии контролировать их выполнение, вы можете обрабатывать сотни сообщений в секунду.

Комментарии

3
LightAir, 1 год назад
0

Не рассматривали в качестве очереди redis? LPUSH queueName "jsonString" LPPOP queueName

Павел, 1 год назад
0

нет, в основном использую rabbitmq с symfony2

acanthis, 3 месяца назад
0

Павел, добрый день! Я был бы Вам очень признателен если бы Вы написали статью об использовании очередей в Symfony 3. P.S. Спасибо Вам за Ваш труд.