vendor/symfony/messenger/EventListener/SendFailedMessageToFailureTransportListener.php line 46

Open in your IDE?
  1. <?php
  2. /*
  3. * This file is part of the Symfony package.
  4. *
  5. * (c) Fabien Potencier <fabien@symfony.com>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Symfony\Component\Messenger\EventListener;
  11. use Psr\Container\ContainerInterface;
  12. use Psr\Log\LoggerInterface;
  13. use Symfony\Component\EventDispatcher\EventSubscriberInterface;
  14. use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
  15. use Symfony\Component\Messenger\Stamp\DelayStamp;
  16. use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
  17. use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp;
  18. use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
  19. /**
  20. * Sends a rejected message to a "failure transport".
  21. *
  22. * @author Ryan Weaver <ryan@symfonycasts.com>
  23. */
  24. class SendFailedMessageToFailureTransportListener implements EventSubscriberInterface
  25. {
  26. private $failureSenders;
  27. private $logger;
  28. /**
  29. * @param ContainerInterface $failureSenders
  30. */
  31. public function __construct($failureSenders, ?LoggerInterface $logger = null)
  32. {
  33. if (!$failureSenders instanceof ContainerInterface) {
  34. trigger_deprecation('symfony/messenger', '5.3', 'Passing a SenderInterface value as 1st argument to "%s()" is deprecated, pass a ServiceLocator instead.', __METHOD__);
  35. }
  36. $this->failureSenders = $failureSenders;
  37. $this->logger = $logger;
  38. }
  39. public function onMessageFailed(WorkerMessageFailedEvent $event)
  40. {
  41. if ($event->willRetry()) {
  42. return;
  43. }
  44. if (!$this->hasFailureTransports($event)) {
  45. return;
  46. }
  47. $failureSender = $this->getFailureSender($event->getReceiverName());
  48. if (null === $failureSender) {
  49. return;
  50. }
  51. $envelope = $event->getEnvelope();
  52. // avoid re-sending to the failed sender
  53. if (null !== $envelope->last(SentToFailureTransportStamp::class)) {
  54. return;
  55. }
  56. $envelope = $envelope->with(
  57. new SentToFailureTransportStamp($event->getReceiverName()),
  58. new DelayStamp(0),
  59. new RedeliveryStamp(0)
  60. );
  61. if (null !== $this->logger) {
  62. $this->logger->info('Rejected message {class} will be sent to the failure transport {transport}.', [
  63. 'class' => \get_class($envelope->getMessage()),
  64. 'transport' => \get_class($failureSender),
  65. ]);
  66. }
  67. $failureSender->send($envelope);
  68. }
  69. public static function getSubscribedEvents()
  70. {
  71. return [
  72. WorkerMessageFailedEvent::class => ['onMessageFailed', -100],
  73. ];
  74. }
  75. private function getFailureSender(string $receiverName): SenderInterface
  76. {
  77. if ($this->failureSenders instanceof SenderInterface) {
  78. return $this->failureSenders;
  79. }
  80. return $this->failureSenders->get($receiverName);
  81. }
  82. private function hasFailureTransports(WorkerMessageFailedEvent $event): bool
  83. {
  84. return ($this->failureSenders instanceof ContainerInterface && $this->failureSenders->has($event->getReceiverName())) || $this->failureSenders instanceof SenderInterface;
  85. }
  86. }