149 lines
		
	
	
		
			3.8 KiB
		
	
	
	
		
			PHP
		
	
	
	
			
		
		
	
	
			149 lines
		
	
	
		
			3.8 KiB
		
	
	
	
		
			PHP
		
	
	
	
| <?php
 | |
| 
 | |
| /*
 | |
|  * This file is part of the Monolog package.
 | |
|  *
 | |
|  * (c) Jordi Boggiano <j.boggiano@seld.be>
 | |
|  *
 | |
|  * For the full copyright and license information, please view the LICENSE
 | |
|  * file that was distributed with this source code.
 | |
|  */
 | |
| 
 | |
| namespace Monolog\Handler;
 | |
| 
 | |
| use Monolog\Logger;
 | |
| use Monolog\Formatter\JsonFormatter;
 | |
| use PhpAmqpLib\Message\AMQPMessage;
 | |
| use PhpAmqpLib\Channel\AMQPChannel;
 | |
| use AMQPExchange;
 | |
| 
 | |
| class AmqpHandler extends AbstractProcessingHandler
 | |
| {
 | |
|     /**
 | |
|      * @var AMQPExchange|AMQPChannel $exchange
 | |
|      */
 | |
|     protected $exchange;
 | |
| 
 | |
|     /**
 | |
|      * @var string
 | |
|      */
 | |
|     protected $exchangeName;
 | |
| 
 | |
|     /**
 | |
|      * @param AMQPExchange|AMQPChannel $exchange     AMQPExchange (php AMQP ext) or PHP AMQP lib channel, ready for use
 | |
|      * @param string                   $exchangeName
 | |
|      * @param int                      $level
 | |
|      * @param bool                     $bubble       Whether the messages that are handled can bubble up the stack or not
 | |
|      */
 | |
|     public function __construct($exchange, $exchangeName = 'log', $level = Logger::DEBUG, $bubble = true)
 | |
|     {
 | |
|         if ($exchange instanceof AMQPExchange) {
 | |
|             $exchange->setName($exchangeName);
 | |
|         } elseif ($exchange instanceof AMQPChannel) {
 | |
|             $this->exchangeName = $exchangeName;
 | |
|         } else {
 | |
|             throw new \InvalidArgumentException('PhpAmqpLib\Channel\AMQPChannel or AMQPExchange instance required');
 | |
|         }
 | |
|         $this->exchange = $exchange;
 | |
| 
 | |
|         parent::__construct($level, $bubble);
 | |
|     }
 | |
| 
 | |
|     /**
 | |
|      * {@inheritDoc}
 | |
|      */
 | |
|     protected function write(array $record)
 | |
|     {
 | |
|         $data = $record["formatted"];
 | |
|         $routingKey = $this->getRoutingKey($record);
 | |
| 
 | |
|         if ($this->exchange instanceof AMQPExchange) {
 | |
|             $this->exchange->publish(
 | |
|                 $data,
 | |
|                 $routingKey,
 | |
|                 0,
 | |
|                 array(
 | |
|                     'delivery_mode' => 2,
 | |
|                     'content_type' => 'application/json',
 | |
|                 )
 | |
|             );
 | |
|         } else {
 | |
|             $this->exchange->basic_publish(
 | |
|                 $this->createAmqpMessage($data),
 | |
|                 $this->exchangeName,
 | |
|                 $routingKey
 | |
|             );
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     /**
 | |
|      * {@inheritDoc}
 | |
|      */
 | |
|     public function handleBatch(array $records)
 | |
|     {
 | |
|         if ($this->exchange instanceof AMQPExchange) {
 | |
|             parent::handleBatch($records);
 | |
| 
 | |
|             return;
 | |
|         }
 | |
| 
 | |
|         foreach ($records as $record) {
 | |
|             if (!$this->isHandling($record)) {
 | |
|                 continue;
 | |
|             }
 | |
| 
 | |
|             $record = $this->processRecord($record);
 | |
|             $data = $this->getFormatter()->format($record);
 | |
| 
 | |
|             $this->exchange->batch_basic_publish(
 | |
|                 $this->createAmqpMessage($data),
 | |
|                 $this->exchangeName,
 | |
|                 $this->getRoutingKey($record)
 | |
|             );
 | |
|         }
 | |
| 
 | |
|         $this->exchange->publish_batch();
 | |
|     }
 | |
| 
 | |
|     /**
 | |
|      * Gets the routing key for the AMQP exchange
 | |
|      *
 | |
|      * @param  array  $record
 | |
|      * @return string
 | |
|      */
 | |
|     protected function getRoutingKey(array $record)
 | |
|     {
 | |
|         $routingKey = sprintf(
 | |
|             '%s.%s',
 | |
|             // TODO 2.0 remove substr call
 | |
|             substr($record['level_name'], 0, 4),
 | |
|             $record['channel']
 | |
|         );
 | |
| 
 | |
|         return strtolower($routingKey);
 | |
|     }
 | |
| 
 | |
|     /**
 | |
|      * @param  string      $data
 | |
|      * @return AMQPMessage
 | |
|      */
 | |
|     private function createAmqpMessage($data)
 | |
|     {
 | |
|         return new AMQPMessage(
 | |
|             (string) $data,
 | |
|             array(
 | |
|                 'delivery_mode' => 2,
 | |
|                 'content_type' => 'application/json',
 | |
|             )
 | |
|         );
 | |
|     }
 | |
| 
 | |
|     /**
 | |
|      * {@inheritDoc}
 | |
|      */
 | |
|     protected function getDefaultFormatter()
 | |
|     {
 | |
|         return new JsonFormatter(JsonFormatter::BATCH_MODE_JSON, false);
 | |
|     }
 | |
| }
 |