常用的redis,database都可以 -
一个连接可有多个队列 -
队列里的数据 -
make:job 任务创建
public function handle()
//$this->getNameInput() 得到命令参数如php artisan make:job xxx则得到xxx
$name = $this->qualifyClass($this->getNameInput());
//拼接成xxx.php文件 得到任务类文件
$path = $this->getPath($name);
// First we will check to see if the class already exists. If it does, we don't want
// to create the class and overwrite the user's code. So, we will bail out so the
// code is untouched. Otherwise, we will continue generating this class' files.
if ((! $this->hasOption('force') || ! $this->option('force')) && $this->alreadyExists($this->getNameInput())) {
$this->error($this->type.' already exists!');
return false;
// Next, we will generate the path to the location where this class' file should get
// written. Then, we will build the class and make the proper replacements on the
// stub files so that it gets the correctly formatted namespace and class name.
//往$this->files FileSystem文件系统类 】指定的文件里写数据【数据来自模板,模板已经替换好】
$this->files->put($path, $this->buildClass($name));
$this->info($this->type.' created successfully.');
$this->app->singleton(Dispatcher::class, function ($app) {
return new Dispatcher($app, function ($connection = null) use ($app) {
//'queue' => [\Illuminate\Queue\QueueManager::class,
// \Illuminate\Contracts\Queue\Factory::class,
// \Illuminate\Contracts\Queue\Monitor::class],
//QueueFactoryContract = \Illuminate\Queue\QueueManager::class
return $app[QueueFactoryContract::class]->connection($connection);
$this->pushCommandToQueue($queue, $command);
$queue 队列连接
$command 任务实例
- 运行任务
public function handle()
if ($this->downForMaintenance() && $this->option('once')) {
return $this->worker->sleep($this->option('sleep'));
// We'll listen to the processed and failed events so we can write information
// to the console as jobs are processed, which will let the developer watch
// which jobs are coming through a queue and be informed on its progress.
$connection = $this->argument('connection')
?: $this->laravel['config']['queue.default'];
// We need to get the right queue for the connection which is set in the queue
// configuration file for the application. We will pull it based on the set
// connection being run for the queue operation currently being executed.
$queue = $this->getQueue($connection);
$connection, $queue
public function daemon($connectionName, $queue, WorkerOptions $options)
$lastRestart = $this->getTimestampOfLastQueueRestart();
while (true) {
// Before reserving any jobs, we will make sure this queue is not paused and
// if it is we will just pause this worker for a given amount of time and
// make sure we do not need to kill this worker process off completely.
if (! $this->daemonShouldRun($options, $connectionName, $queue)) {
$this->pauseWorker($options, $lastRestart);
// First, we will attempt to get the next job off of the queue. We will also
// register the timeout handler and reset the alarm for this job so it is
// not stuck in a frozen state forever. Then, we can fire off this job.
$job = $this->getNextJob(
$this->manager->connection($connectionName), $queue
$this->registerTimeoutHandler($job, $options);
// If the daemon should run (not in maintenance mode, etc.), then we can run
// fire off this job for processing. Otherwise, we will need to sleep the
// worker so no more jobs are processed until they should be processed.
if ($job) {
$this->runJob($job, $connectionName, $options);
} else {
// Finally, we will check to see if we have exceeded our memory limits or if
// the queue should restart based on other indications. If so, we'll stop
// this worker and let whatever is "monitoring" it restart the process.
$this->stopIfNecessary($options, $lastRestart);
//进程信号监听 $this->listenForSignals();
protected function listenForSignals()
if ($this->supportsAsyncSignals()) {
pcntl_signal(SIGTERM, function () {
$this->shouldQuit = true;
pcntl_signal(SIGUSR2, function () {
$this->paused = true;
pcntl_signal(SIGCONT, function () {
$this->paused = false;