149 lines
		
	
	
		
			3.8 KiB
		
	
	
	
		
			PHP
		
	
	
	
		
		
			
		
	
	
			149 lines
		
	
	
		
			3.8 KiB
		
	
	
	
		
			PHP
		
	
	
	
|  | <?php | ||
|  | 
 | ||
|  | /* | ||
|  |  * This file is part of the Monolog package. | ||
|  |  * | ||
|  |  * (c) Jordi Boggiano <j.boggiano@seld.be> | ||
|  |  * | ||
|  |  * For the full copyright and license information, please view the LICENSE | ||
|  |  * file that was distributed with this source code. | ||
|  |  */ | ||
|  | 
 | ||
|  | namespace Monolog\Handler; | ||
|  | 
 | ||
|  | use Monolog\Logger; | ||
|  | use Monolog\Formatter\JsonFormatter; | ||
|  | use PhpAmqpLib\Message\AMQPMessage; | ||
|  | use PhpAmqpLib\Channel\AMQPChannel; | ||
|  | use AMQPExchange; | ||
|  | 
 | ||
|  | class AmqpHandler extends AbstractProcessingHandler | ||
|  | { | ||
|  |     /** | ||
|  |      * @var AMQPExchange|AMQPChannel $exchange | ||
|  |      */ | ||
|  |     protected $exchange; | ||
|  | 
 | ||
|  |     /** | ||
|  |      * @var string | ||
|  |      */ | ||
|  |     protected $exchangeName; | ||
|  | 
 | ||
|  |     /** | ||
|  |      * @param AMQPExchange|AMQPChannel $exchange     AMQPExchange (php AMQP ext) or PHP AMQP lib channel, ready for use | ||
|  |      * @param string                   $exchangeName | ||
|  |      * @param int                      $level | ||
|  |      * @param bool                     $bubble       Whether the messages that are handled can bubble up the stack or not | ||
|  |      */ | ||
|  |     public function __construct($exchange, $exchangeName = 'log', $level = Logger::DEBUG, $bubble = true) | ||
|  |     { | ||
|  |         if ($exchange instanceof AMQPExchange) { | ||
|  |             $exchange->setName($exchangeName); | ||
|  |         } elseif ($exchange instanceof AMQPChannel) { | ||
|  |             $this->exchangeName = $exchangeName; | ||
|  |         } else { | ||
|  |             throw new \InvalidArgumentException('PhpAmqpLib\Channel\AMQPChannel or AMQPExchange instance required'); | ||
|  |         } | ||
|  |         $this->exchange = $exchange; | ||
|  | 
 | ||
|  |         parent::__construct($level, $bubble); | ||
|  |     } | ||
|  | 
 | ||
|  |     /** | ||
|  |      * {@inheritDoc} | ||
|  |      */ | ||
|  |     protected function write(array $record) | ||
|  |     { | ||
|  |         $data = $record["formatted"]; | ||
|  |         $routingKey = $this->getRoutingKey($record); | ||
|  | 
 | ||
|  |         if ($this->exchange instanceof AMQPExchange) { | ||
|  |             $this->exchange->publish( | ||
|  |                 $data, | ||
|  |                 $routingKey, | ||
|  |                 0, | ||
|  |                 array( | ||
|  |                     'delivery_mode' => 2, | ||
|  |                     'content_type' => 'application/json', | ||
|  |                 ) | ||
|  |             ); | ||
|  |         } else { | ||
|  |             $this->exchange->basic_publish( | ||
|  |                 $this->createAmqpMessage($data), | ||
|  |                 $this->exchangeName, | ||
|  |                 $routingKey | ||
|  |             ); | ||
|  |         } | ||
|  |     } | ||
|  | 
 | ||
|  |     /** | ||
|  |      * {@inheritDoc} | ||
|  |      */ | ||
|  |     public function handleBatch(array $records) | ||
|  |     { | ||
|  |         if ($this->exchange instanceof AMQPExchange) { | ||
|  |             parent::handleBatch($records); | ||
|  | 
 | ||
|  |             return; | ||
|  |         } | ||
|  | 
 | ||
|  |         foreach ($records as $record) { | ||
|  |             if (!$this->isHandling($record)) { | ||
|  |                 continue; | ||
|  |             } | ||
|  | 
 | ||
|  |             $record = $this->processRecord($record); | ||
|  |             $data = $this->getFormatter()->format($record); | ||
|  | 
 | ||
|  |             $this->exchange->batch_basic_publish( | ||
|  |                 $this->createAmqpMessage($data), | ||
|  |                 $this->exchangeName, | ||
|  |                 $this->getRoutingKey($record) | ||
|  |             ); | ||
|  |         } | ||
|  | 
 | ||
|  |         $this->exchange->publish_batch(); | ||
|  |     } | ||
|  | 
 | ||
|  |     /** | ||
|  |      * Gets the routing key for the AMQP exchange | ||
|  |      * | ||
|  |      * @param  array  $record | ||
|  |      * @return string | ||
|  |      */ | ||
|  |     protected function getRoutingKey(array $record) | ||
|  |     { | ||
|  |         $routingKey = sprintf( | ||
|  |             '%s.%s', | ||
|  |             // TODO 2.0 remove substr call
 | ||
|  |             substr($record['level_name'], 0, 4), | ||
|  |             $record['channel'] | ||
|  |         ); | ||
|  | 
 | ||
|  |         return strtolower($routingKey); | ||
|  |     } | ||
|  | 
 | ||
|  |     /** | ||
|  |      * @param  string      $data | ||
|  |      * @return AMQPMessage | ||
|  |      */ | ||
|  |     private function createAmqpMessage($data) | ||
|  |     { | ||
|  |         return new AMQPMessage( | ||
|  |             (string) $data, | ||
|  |             array( | ||
|  |                 'delivery_mode' => 2, | ||
|  |                 'content_type' => 'application/json', | ||
|  |             ) | ||
|  |         ); | ||
|  |     } | ||
|  | 
 | ||
|  |     /** | ||
|  |      * {@inheritDoc} | ||
|  |      */ | ||
|  |     protected function getDefaultFormatter() | ||
|  |     { | ||
|  |         return new JsonFormatter(JsonFormatter::BATCH_MODE_JSON, false); | ||
|  |     } | ||
|  | } |