MqService.php 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. <?php
  2. namespace Tool\MayouTool\Mq;
  3. use PhpAmqpLib\Connection\AMQPStreamConnection;
  4. use PhpAmqpLib\Message\AMQPMessage;
  5. /**
  6. * mq服务
  7. * Class MqService
  8. * @package Tool\MayouTool\Mq
  9. */
  10. class MqService
  11. {
  12. /**
  13. * @var string 地址
  14. */
  15. private static $host;
  16. /**
  17. * @var string 端口
  18. */
  19. private static $port;
  20. /**
  21. * @var string 账号
  22. */
  23. private static $userName;
  24. /**
  25. * @var string 密码
  26. */
  27. private static $userPassword;
  28. /**
  29. * 设置mq连接属性
  30. * @param string $host 连接地址
  31. * @param string $port 连接端口
  32. * @param string $userName 账号
  33. * @param string $userPassword 密码
  34. */
  35. public static function setMqAttributes($host, $port, $userName, $userPassword)
  36. {
  37. self::$host=$host;
  38. self::$port=$port;
  39. self::$userName=$userName;
  40. self::$userPassword=$userPassword;
  41. }
  42. /**
  43. * 获取mq连接
  44. * @return AMQPStreamConnection
  45. */
  46. public static function getMqConnection()
  47. {
  48. self::$host=env("MQ_HOST");
  49. self::$port=env("MQ_PORT");
  50. self::$userName=env("MQ_USER_NAME");
  51. self::$userPassword=env("MQ_USER_PASSWORD");
  52. //连接mq
  53. $connection = new AMQPStreamConnection(self::$host,self::$port,self::$userName,self::$userPassword);
  54. return $connection;
  55. }
  56. /**
  57. * 推送日志消息到mq(提前设置好交换机和队列)
  58. * @param string $message 消息
  59. * @param string $routeKey 路由键
  60. */
  61. public static function pushLogMessage($message,$routeKey)
  62. {
  63. $connection = self::getMqConnection();
  64. //创建channel
  65. $channel = $connection->channel();
  66. //声明队列(前提mq上面的交换机logExchange已经创建)
  67. // $channel->queue_declare($queueName);
  68. // //将交换机和队列绑定
  69. // $channel->queue_bind($queueName,"logExchange");
  70. //创建mq消息
  71. $msg = new AMQPMessage($message);
  72. //推送消息到mq
  73. $channel->basic_publish($msg,"logExchange",$routeKey);
  74. //发送成功之后关闭channel和connection
  75. $channel->close();
  76. $connection->close();
  77. }
  78. /**
  79. * 推送消息到mq(提前设置好交换机和队列)
  80. * @param string $message 消息
  81. * @param string $exchange 交换机
  82. * @param string $routeKey 路由键
  83. */
  84. public static function pushMessage($message,$exchange,$routeKey)
  85. {
  86. $connection = self::getMqConnection();
  87. //创建channel
  88. $channel = $connection->channel();
  89. //创建mq消息
  90. $msg = new AMQPMessage($message);
  91. //推送消息到mq
  92. $channel->basic_publish($msg,$exchange,$routeKey);
  93. //发送成功之后关闭channel和connection
  94. $channel->close();
  95. $connection->close();
  96. }
  97. }