rabbitmq系列——(4 Exchange Type -- Direct)

标签:gets   publish   receive   nfa   密码   直接   技术   用户   reac   

  收集日志后,日志级别分类,error 级别发送运维邮件需单独处理,其他记录

  直接交换机,工作方式类似于单播,Exchange会将消息发送完全匹配Routing_key的Queue;筛选消息通过key进行;

1. 生产者

using RabbitMQMsgProducer.MessageProducer;
using Microsoft.Extensions.Configuration;
using System;
using System.IO;
using RabbitMQMsgProducer.ExchangeDemo;

namespace RabbitMQMsgProducer
{
    class Program
    {
        static void Main(string[] args)
        {
            try
            {
                {
                    // ExchangeType: Direct
                    // 收集日志后,日志级别分类,error 级别发送运维邮件需单独处理,其他记录
                    ProducerDirectExchange.Send();
                }
                Console.ReadLine();
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex.Message);
            }
        }
    }
}

using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;

namespace RabbitMQMsgProducer.ExchangeDemo
{
    public class ProducerDirectExchange
    {
        public static void Send()
        {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.HostName = "localhost"; // 服务地址
            connectionFactory.UserName = "guest"; // 用户名
            connectionFactory.Password = "guest"; // 密码
            string logAllQueueName = "LogAllQueue"; // 所有日志队列
            string logErrorQueueName = "LogErrorQueue"; // 仅Error日志队列
            string exchangeName = "DirectExchange";
            using (IConnection connection = connectionFactory.CreateConnection())
            {
                using (IModel channel = connection.CreateModel())
                {
                    //队列声明
                    channel.QueueDeclare(queue: logAllQueueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
                    channel.QueueDeclare(queue: logErrorQueueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
                    //路由声明
                    channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);
                    //队列绑定
                    string[] logTypes = new string[] { "debug", "info", "warn", "error" };
                    foreach (string logType in logTypes)
                    {
                        channel.QueueBind(queue: logAllQueueName, exchange: exchangeName, routingKey: logType, arguments: null);
                    }
                    channel.QueueBind(queue: logErrorQueueName, exchange: exchangeName, routingKey: "error", arguments: null);

                    Console.WriteLine("ProducerDirectExchange is ready, go.");
                    List<LogMsgModel> logMsgModels = GetLogMsgList();
                    foreach (var log in logMsgModels)
                    {
                        channel.BasicPublish(exchange: exchangeName, routingKey: log.LogType, basicProperties: null, body: log.Msg);
                        Console.WriteLine($"{Encoding.UTF8.GetString(log.Msg)} is send.");
                    }
                }
            }
        }
        private static List<LogMsgModel> GetLogMsgList()
        {
            List<LogMsgModel> logList = new List<LogMsgModel>();
            for (int i = 1; i <= 100; i++)
            {
                if (i % 4 == 0)
                {
                    logList.Add(new LogMsgModel() { LogType = "info", Msg = Encoding.UTF8.GetBytes($"info第{i}条信息") });
                }
                if (i % 4 == 1)
                {
                    logList.Add(new LogMsgModel() { LogType = "debug", Msg = Encoding.UTF8.GetBytes($"debug第{i}条信息") });
                }
                if (i % 4 == 2)
                {
                    logList.Add(new LogMsgModel() { LogType = "warn", Msg = Encoding.UTF8.GetBytes($"warn第{i}条信息") });
                }
                if (i % 4 == 3)
                {
                    logList.Add(new LogMsgModel() { LogType = "error", Msg = Encoding.UTF8.GetBytes($"error第{i}条信息") });
                }
            }
            return logList;
        }
    }
    public class LogMsgModel
    {
        public string LogType { set; get; }
        public byte[] Msg { set; get; }
    }
}

 

2. 消费者

// 消费者001 消费所有日志信息

// 消费者002 消费error信息,并发送邮件给运维

using RabbitMQMsgConsumer001.ExchangeDemo;
using RabbitMQMsgConsumer001.MessageConsumer;
using System;
using System.Threading.Tasks;

namespace RabbitMQMsgConsumer001
{
    class Program
    {
        static void Main(string[] args)
        {
            try
            {
                {
                    // ExchageType : Direct
                    // 消费者001 消费所有日志信息
                    ConsumerDirectExchange.Receive();
                }
                Console.ReadLine();
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex.Message);
            }
        }
    }
}

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Text;

namespace RabbitMQMsgConsumer001.ExchangeDemo
{
    public class ConsumerDirectExchange
    {
        public static void Receive()
        {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.HostName = "localhost"; // 服务地址
            connectionFactory.UserName = "guest"; // 用户名
            connectionFactory.Password = "guest"; // 密码
            string logAllQueueName = "LogAllQueue"; // 所有日志队列
            string exchangeName = "DirectExchange";
            using (IConnection connection = connectionFactory.CreateConnection())
            {
                using (IModel channel = connection.CreateModel())
                {
                    //队列声明
                    channel.QueueDeclare(queue: logAllQueueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
                    //路由声明
                    channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);
                    //队列绑定
                    string[] logTypes = new string[] { "debug", "info", "warn", "error" };
                    foreach (string logType in logTypes)
                    {
                        channel.QueueBind(queue: logAllQueueName, exchange: exchangeName, routingKey: logType, arguments: null);
                    }
                    EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                    {
                        var body = ea.Body;
                        var msg = Encoding.UTF8.GetString(body.ToArray());
                        Console.WriteLine($"logall:{ea.RoutingKey}, {msg} ,is receive.");
                    };
                    // 处理消息
                    channel.BasicConsume(queue: logAllQueueName, autoAck: true, consumer: consumer);
                    Console.ReadLine();
                }
            }
        }
    }
}

 

using RabbitMQMsgConsumer002.ExchangeDemo;
using System;

namespace RabbitMQMsgConsumer002
{
    class Program
    {
        static void Main(string[] args)
        {
            try
            {
                {
                    // ExchageType : Direct
                    // 消费者002 消费error信息,并发送邮件给运维
                    ConsumerDirectExchange.Receive();
                }
                Console.ReadLine();
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex.Message);
            }
        }
    }
}

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Text;

namespace RabbitMQMsgConsumer002.ExchangeDemo
{
    public class ConsumerDirectExchange
    {
        public static void Receive()
        {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.HostName = "localhost"; // 服务地址
            connectionFactory.UserName = "guest"; // 用户名
            connectionFactory.Password = "guest"; // 密码
            string logErrorQueueName = "LogErrorQueue"; // 所有日志队列
            string exchangeName = "DirectExchange";
            using (IConnection connection = connectionFactory.CreateConnection())
            {
                using (IModel channel = connection.CreateModel())
                {
                    //队列声明
                    channel.QueueDeclare(queue: logErrorQueueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
                    //路由声明
                    channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);
                    //队列绑定

                    channel.QueueBind(queue: logErrorQueueName, exchange: exchangeName, routingKey: "error", arguments: null);
                    EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                    {
                        var body = ea.Body;
                        var msg = Encoding.UTF8.GetString(body.ToArray());
                        Console.WriteLine($"logerror:{ea.RoutingKey}, {msg} , is receive , the email is send .");
                    };
                    // 处理消息
                    channel.BasicConsume(queue: logErrorQueueName, autoAck: true, consumer: consumer);
                    Console.ReadLine();
                }
            }
        }
    }
}

 

3. 结果

技术图片

 

技术图片

 

rabbitmq系列——(4 Exchange Type -- Direct)

标签:gets   publish   receive   nfa   密码   直接   技术   用户   reac   

原文地址:https://www.cnblogs.com/Fletcher/p/14174375.html

版权声明:完美者 发表于 2021-01-19 12:13:34。
转载请注明:rabbitmq系列——(4 Exchange Type -- Direct) | 完美导航

暂无评论

暂无评论...