149 lines
		
	
	
		
			3.8 KiB
		
	
	
	
		
			PHP
		
	
	
	
		
		
			
		
	
	
			149 lines
		
	
	
		
			3.8 KiB
		
	
	
	
		
			PHP
		
	
	
	
| 
								 | 
							
								<?php
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								/*
							 | 
						||
| 
								 | 
							
								 * This file is part of the Monolog package.
							 | 
						||
| 
								 | 
							
								 *
							 | 
						||
| 
								 | 
							
								 * (c) Jordi Boggiano <j.boggiano@seld.be>
							 | 
						||
| 
								 | 
							
								 *
							 | 
						||
| 
								 | 
							
								 * For the full copyright and license information, please view the LICENSE
							 | 
						||
| 
								 | 
							
								 * file that was distributed with this source code.
							 | 
						||
| 
								 | 
							
								 */
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								namespace Monolog\Handler;
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								use Monolog\Logger;
							 | 
						||
| 
								 | 
							
								use Monolog\Formatter\JsonFormatter;
							 | 
						||
| 
								 | 
							
								use PhpAmqpLib\Message\AMQPMessage;
							 | 
						||
| 
								 | 
							
								use PhpAmqpLib\Channel\AMQPChannel;
							 | 
						||
| 
								 | 
							
								use AMQPExchange;
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								class AmqpHandler extends AbstractProcessingHandler
							 | 
						||
| 
								 | 
							
								{
							 | 
						||
| 
								 | 
							
								    /**
							 | 
						||
| 
								 | 
							
								     * @var AMQPExchange|AMQPChannel $exchange
							 | 
						||
| 
								 | 
							
								     */
							 | 
						||
| 
								 | 
							
								    protected $exchange;
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    /**
							 | 
						||
| 
								 | 
							
								     * @var string
							 | 
						||
| 
								 | 
							
								     */
							 | 
						||
| 
								 | 
							
								    protected $exchangeName;
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    /**
							 | 
						||
| 
								 | 
							
								     * @param AMQPExchange|AMQPChannel $exchange     AMQPExchange (php AMQP ext) or PHP AMQP lib channel, ready for use
							 | 
						||
| 
								 | 
							
								     * @param string                   $exchangeName
							 | 
						||
| 
								 | 
							
								     * @param int                      $level
							 | 
						||
| 
								 | 
							
								     * @param bool                     $bubble       Whether the messages that are handled can bubble up the stack or not
							 | 
						||
| 
								 | 
							
								     */
							 | 
						||
| 
								 | 
							
								    public function __construct($exchange, $exchangeName = 'log', $level = Logger::DEBUG, $bubble = true)
							 | 
						||
| 
								 | 
							
								    {
							 | 
						||
| 
								 | 
							
								        if ($exchange instanceof AMQPExchange) {
							 | 
						||
| 
								 | 
							
								            $exchange->setName($exchangeName);
							 | 
						||
| 
								 | 
							
								        } elseif ($exchange instanceof AMQPChannel) {
							 | 
						||
| 
								 | 
							
								            $this->exchangeName = $exchangeName;
							 | 
						||
| 
								 | 
							
								        } else {
							 | 
						||
| 
								 | 
							
								            throw new \InvalidArgumentException('PhpAmqpLib\Channel\AMQPChannel or AMQPExchange instance required');
							 | 
						||
| 
								 | 
							
								        }
							 | 
						||
| 
								 | 
							
								        $this->exchange = $exchange;
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        parent::__construct($level, $bubble);
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    /**
							 | 
						||
| 
								 | 
							
								     * {@inheritDoc}
							 | 
						||
| 
								 | 
							
								     */
							 | 
						||
| 
								 | 
							
								    protected function write(array $record)
							 | 
						||
| 
								 | 
							
								    {
							 | 
						||
| 
								 | 
							
								        $data = $record["formatted"];
							 | 
						||
| 
								 | 
							
								        $routingKey = $this->getRoutingKey($record);
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        if ($this->exchange instanceof AMQPExchange) {
							 | 
						||
| 
								 | 
							
								            $this->exchange->publish(
							 | 
						||
| 
								 | 
							
								                $data,
							 | 
						||
| 
								 | 
							
								                $routingKey,
							 | 
						||
| 
								 | 
							
								                0,
							 | 
						||
| 
								 | 
							
								                array(
							 | 
						||
| 
								 | 
							
								                    'delivery_mode' => 2,
							 | 
						||
| 
								 | 
							
								                    'content_type' => 'application/json',
							 | 
						||
| 
								 | 
							
								                )
							 | 
						||
| 
								 | 
							
								            );
							 | 
						||
| 
								 | 
							
								        } else {
							 | 
						||
| 
								 | 
							
								            $this->exchange->basic_publish(
							 | 
						||
| 
								 | 
							
								                $this->createAmqpMessage($data),
							 | 
						||
| 
								 | 
							
								                $this->exchangeName,
							 | 
						||
| 
								 | 
							
								                $routingKey
							 | 
						||
| 
								 | 
							
								            );
							 | 
						||
| 
								 | 
							
								        }
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    /**
							 | 
						||
| 
								 | 
							
								     * {@inheritDoc}
							 | 
						||
| 
								 | 
							
								     */
							 | 
						||
| 
								 | 
							
								    public function handleBatch(array $records)
							 | 
						||
| 
								 | 
							
								    {
							 | 
						||
| 
								 | 
							
								        if ($this->exchange instanceof AMQPExchange) {
							 | 
						||
| 
								 | 
							
								            parent::handleBatch($records);
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								            return;
							 | 
						||
| 
								 | 
							
								        }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        foreach ($records as $record) {
							 | 
						||
| 
								 | 
							
								            if (!$this->isHandling($record)) {
							 | 
						||
| 
								 | 
							
								                continue;
							 | 
						||
| 
								 | 
							
								            }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								            $record = $this->processRecord($record);
							 | 
						||
| 
								 | 
							
								            $data = $this->getFormatter()->format($record);
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								            $this->exchange->batch_basic_publish(
							 | 
						||
| 
								 | 
							
								                $this->createAmqpMessage($data),
							 | 
						||
| 
								 | 
							
								                $this->exchangeName,
							 | 
						||
| 
								 | 
							
								                $this->getRoutingKey($record)
							 | 
						||
| 
								 | 
							
								            );
							 | 
						||
| 
								 | 
							
								        }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        $this->exchange->publish_batch();
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    /**
							 | 
						||
| 
								 | 
							
								     * Gets the routing key for the AMQP exchange
							 | 
						||
| 
								 | 
							
								     *
							 | 
						||
| 
								 | 
							
								     * @param  array  $record
							 | 
						||
| 
								 | 
							
								     * @return string
							 | 
						||
| 
								 | 
							
								     */
							 | 
						||
| 
								 | 
							
								    protected function getRoutingKey(array $record)
							 | 
						||
| 
								 | 
							
								    {
							 | 
						||
| 
								 | 
							
								        $routingKey = sprintf(
							 | 
						||
| 
								 | 
							
								            '%s.%s',
							 | 
						||
| 
								 | 
							
								            // TODO 2.0 remove substr call
							 | 
						||
| 
								 | 
							
								            substr($record['level_name'], 0, 4),
							 | 
						||
| 
								 | 
							
								            $record['channel']
							 | 
						||
| 
								 | 
							
								        );
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        return strtolower($routingKey);
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    /**
							 | 
						||
| 
								 | 
							
								     * @param  string      $data
							 | 
						||
| 
								 | 
							
								     * @return AMQPMessage
							 | 
						||
| 
								 | 
							
								     */
							 | 
						||
| 
								 | 
							
								    private function createAmqpMessage($data)
							 | 
						||
| 
								 | 
							
								    {
							 | 
						||
| 
								 | 
							
								        return new AMQPMessage(
							 | 
						||
| 
								 | 
							
								            (string) $data,
							 | 
						||
| 
								 | 
							
								            array(
							 | 
						||
| 
								 | 
							
								                'delivery_mode' => 2,
							 | 
						||
| 
								 | 
							
								                'content_type' => 'application/json',
							 | 
						||
| 
								 | 
							
								            )
							 | 
						||
| 
								 | 
							
								        );
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    /**
							 | 
						||
| 
								 | 
							
								     * {@inheritDoc}
							 | 
						||
| 
								 | 
							
								     */
							 | 
						||
| 
								 | 
							
								    protected function getDefaultFormatter()
							 | 
						||
| 
								 | 
							
								    {
							 | 
						||
| 
								 | 
							
								        return new JsonFormatter(JsonFormatter::BATCH_MODE_JSON, false);
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								}
							 |