1. 队列

1.1. 基本原则

  • 推送消息$params要做到尽量减小体积,在使用某些mq时,不容易发生硬盘数据交换
  • adapter中,逻辑要做到“谁干活谁推送”,即数据推送(生产者)和消费者为同一app
  • 一个队列中,只能放同一个app的worker,当数据量较大时,应该一个队列中只放一个类型的worker,且这个worker应该与queu属于同一个app

1.2. 配置参数

队列默认配置文件为config/queue.php,需修改配置,可执行命令cp config/queue.php config/production/queue.php

return array(
    /*
    |--------------------------------------------------------------------------
    | 默认队列处理类
    |--------------------------------------------------------------------------
    |
    | 默认缓存处理类
    | 目前支持 system_queue_adapter_mysql,system_queue_adapter_redis
    | 对应原系统:  QUEUE_SCHEDULE
    |
    | 配置队列将采用哪种方式
    */
    'default' => 'system_queue_adapter_mysql',

    /*
    |--------------------------------------------------------------------------
    | 默认发送队列
    |--------------------------------------------------------------------------
    |
    | 默认发送队列, 对应queues的定义. 请检查核对, 避免错误
    | 对应原系统:  DEFAULT_PUBLISH_QUEUE
    |
    | 在任务没有绑定队列的情况下默认发送到该定义的队列
    */
    'default_publish_queue' => 'normal',

    /*
    |--------------------------------------------------------------------------
    | 任务和队列绑定关系
    |--------------------------------------------------------------------------
    |
    | 任务和队列绑定关系, 确认哪个任务发送到哪个队列
    | 如果是定时任务触发的任务, 任务key加前缀crontab:
    |
    | 在新开发队列的情况下,如果队列不是绑定到默认发送队列则需要在此定义任务绑定
    | 到对应的队列
    |
    | 这个配置是使用publish()方法推送消息时,判断worker和队列的绑定、分发关系的,
    | publish方法会根据这个绑定关系,自动推送到相应队列中
    */
    'bindings' => array(
        'worker_class'                      => array('queue1', 'queue2'),
        'crontab:site_tasks_createsitemaps' => array('slow'),
        'desktop_tasks_runimport'           => array('normal'),
        'emailbus_tasks_sendemail'          => array('slow'),
        'system_tasks_events'               => array('quick'),
        'system_tasks_notifyPrism'          => array('notifyPrism'),
    ),

    /*
    |--------------------------------------------------------------------------
    | 定义队列
    |--------------------------------------------------------------------------
    |
    | title: 简要说明
    | thread: 处理进程数
    |
    */
    'queues' => array(
        'slow' => array(
            'title' => 'slow queue',
            'thread' => 3,
            'app' =>'topc',
        ),
        'quick' => array(
            'title' => 'quick queue',
            'thread' => 5,
            'app' =>'topc',
        ),
        'normal' => array(
            'title' => 'normal queue',
            'thread' => 3,
            'app' =>'topc',
        ),
        'notifyPrism' => array(
            'title' => 'normal queue',
            'thread' => 1,
            'app' =>'topc',
        )
    ),


    /*
    |--------------------------------------------------------------------------
    | 动作定义
    |--------------------------------------------------------------------------
    |
    | 定义一个动作会触发多少工作
    |
    */
    'action' => array(
        //'action_name' => array('exchange_name'=>'worker_class'),
    ),
);

1.3. 推送

单个队列推送

queue::push($exchange_name, $worker, $params);

//example
queue::push('emailbus_tasks_sendemail', 'emailbus_tasks_sendemail', array('tel'=>'18888888888', 'content'=>'幸运用户您好,你中奖了!'));

redis队列支持推送一个延时队列

/**
 * 创建一个延时队列
 *
 * @param string $exchange_name 队列配置中bindings绑定的任务 key
 * @param string $worker 队列执行类
 * @param array $params 队列执行参数
 * @param int $delay 延时队列延时时间 单位秒
 */
queue::later($exchange_name, $worker, $params, $delay);

//example
queue::later('emailbus_tasks_sendemail', 'emailbus_tasks_sendemail', array('tel'=>'18888888888', 'content'=>'幸运用户您好,你中奖了!'), 300);

批量推送的方法

queue::bulk($workers, $params);

//example
queue::bulk(array('emailbus_tasks_sendemail', 'emailbus_tasks_sendSms'), array('tel'=>'18888888888', 'email'=>'888888@qq.com', 'content'=>'幸运用户您好,你中奖了!'));

目前支持事件方式推送

queue::action($action, $params);

//example
queue::action('winning', array('tel'=>'18888888888', 'email'=>'888888@qq.com', 'content'=>'幸运用户您好,你中奖了!'));

1.4. 消费队列

在服务器上执行一下命令

cd b2b2c && ./script/queueserver/queue.sh

redis队列支持:队列消费失败触发事件,在二次开发的情况下,可以实现该事件,(在队列失败的时候发送短信或者邮件)

/**
 * 队列执行失败超出重试次数,将改队列插入到一个失败队列的数据表中
 * 并且出发队列执行失败事件
 */

 event::fire('queue.failed', ['queue_failed_id'=>$queueFailedId]);

1.5. 创建队列执行类

class system_tasks_testSendSms extends base_task_abstract implements base_interface_task {

      /**
      * 定义队列可以重试次数,默认为5次
      *
      * 目前只有redis队列支持
      */
      public function getTries()
      {
          return 3;
      }

      /**
      * 定义队列执行失败后重试的延时时间 默认为300秒,
      *
      * 目前只有redis队列支持
      */
      public function getDelayTime()
      {
          return 60;
      }

     /**
      * 具体执行队列方法
      */
      public function exec($params=null)
      {
        //这里写具体执行队列代码
      }
}

results matching ""

    No results matching ""