.Net Core/.net 6/.Net 8 实现Mqtt服务端

  • Mqtt服务端代码
  • `IMqttServer` 接口
  • 业务类,实现 `IMqttServer` 接口
  • `Program.cs`

直接上代码
nuget 引用
MQTTnet

Mqtt服务端代码

using MQTTnet;using MQTTnet.Protocol;using MQTTnet.Server;namespace Code.Mqtt{/// /// mqtt服务端/// public class MqttServerBase{public MqttServer _server;readonly IMqttServer _mqttServer;/// /// 向指定主题发送消息/// public Action<string, string> ToTopic;/// /// 主题/客户端列表/// public Dictionary<string,List<string>> Topic_Client=new Dictionary<string, List<string>>();public MqttServerBase(IMqttServer mqttServer){_mqttServer = mqttServer;if(mqttServer == null){throw new Exception("MqttServer配置错误");}var optionbuilder = new MqttServerOptionsBuilder() .WithDefaultEndpoint()//设置默认地址127.0.0.1 .WithDefaultEndpointPort(_mqttServer.Port);//1883_server = new MqttFactory().CreateMqttServer(optionbuilder.Build());ToTopic = (topic, msg) => {_server.InjectApplicationMessage(new InjectedMqttApplicationMessage(new MqttApplicationMessageBuilder().WithTopic(topic).WithPayload(msg).Build()));};_server.ClientConnectedAsync += (e) =>{_mqttServer.ClientConnectedAsync(e.ClientId, e);return Task.CompletedTask;};_server.ClientDisconnectedAsync += (e) => {_mqttServer.ClientDisconnectedAsync(e.ClientId, e);return Task.CompletedTask;};_server.InterceptingPublishAsync += (e)=> {var msg = e.ApplicationMessage?.PayloadSegment.Array?.BToString();var Topic = e.ApplicationMessage.Topic;//判断主题是否存在if (Topic_Client.ContainsKey(Topic)){_mqttServer.InterceptingPublishAsync(e.ClientId, Topic, msg, e, ToTopic);}return Task.CompletedTask;};_server.ApplicationMessageNotConsumedAsync += (e) => {var Topic = e.ApplicationMessage.Topic;var msg = e.ApplicationMessage.PayloadSegment.Array.BToString();//判断主题是否存在,否则会进入死循环if (Topic_Client.ContainsKey(Topic)){_mqttServer.ApplicationMessageNotConsumedAsync(Topic, msg, e);}return Task.CompletedTask;};_server.ValidatingConnectionAsync += (e) => {if (_mqttServer.ValidatingConnectionAsync(e.UserName, e.Password,e.ClientId, e)){e.ReasonCode = MqttConnectReasonCode.Success;//验证通过}else{e.ReasonCode = MqttConnectReasonCode.Banned;//验证不通过}return Task.CompletedTask;};//订阅主题_server.ClientSubscribedTopicAsync += (e) =>{var _topic = e.TopicFilter.Topic;//保存主题if (!Topic_Client.ContainsKey(_topic)){Topic_Client.Add(_topic, new List<string>());}//添加订阅主题的客户端if (!Topic_Client[_topic].Any(x=>x== e.ClientId)){Topic_Client[_topic].Add(e.ClientId);}_mqttServer.ClientSubscribedTopicAsync(e.ClientId, _topic, e);return Task.CompletedTask;};//取消订阅_server.ClientUnsubscribedTopicAsync += (e) =>{var _topic = e.TopicFilter;//移除客户端if (!Topic_Client.ContainsKey(_topic)){Topic_Client[_topic].Remove(e.ClientId);if (Topic_Client[_topic].Count == 0){// 移除没有客户端订阅的主题Topic_Client.Remove(_topic);}_mqttServer.ClientUnsubscribedTopicAsync(e.ClientId, e.TopicFilter, e);}return Task.CompletedTask;};//服务启动事件_server.StartedAsync += _mqttServer.StartedAsync;//服务停止事件_server.StoppedAsync += _mqttServer.StoppedAsync;Start();}public async Task Start(){Console.WriteLine("正在启动Mqtt服务");await _server.StartAsync();Console.WriteLine("Mqtt服务启动成功,端口:" + _mqttServer.Port);}public async Task Stop(){Console.WriteLine("正在停止Mqtt服务");await _server.StopAsync();Console.WriteLine("Mqtt服务停止");}/// /// 重启服务/// /// public async Task ReStart(){await Stop();await Start();}}}

IMqttServer 接口

using MQTTnet.Server;namespace Code.Mqtt{public interface IMqttServer{/// /// 服务端口/// int Port { get;}/// /// 服务启动事件/// /// /// public Task StartedAsync(EventArgs args);/// /// 服务停止事件/// /// /// public Task StoppedAsync(EventArgs args);/// /// 客户端上线/// /// /// public Task ClientConnectedAsync(string clientId,ClientConnectedEventArgs args);/// /// 客户端下线/// /// /// public Task ClientDisconnectedAsync(string clientId,ClientDisconnectedEventArgs args);/// /// 消息事件/// /// /// 发送消息/// public Task InterceptingPublishAsync(string clientId,string Topic,string msg,InterceptingPublishEventArgs args, Action<string, string> ToTopic);/// /// 验证/// /// 账号/// 密码/// /// public bool ValidatingConnectionAsync(string username,string password,string clientId,ValidatingConnectionEventArgs args);/// /// 消息未消费事件/// /// /// public Task ApplicationMessageNotConsumedAsync(string Topic,string msg,ApplicationMessageNotConsumedEventArgs args);/// /// 订阅主题事件/// /// /// public Task ClientSubscribedTopicAsync(string clientId,string Topic,ClientSubscribedTopicEventArgs args);/// /// 取消订阅主题事件/// /// /// public Task ClientUnsubscribedTopicAsync(string clientId,string Topic,ClientUnsubscribedTopicEventArgs args);}}

业务类,实现 IMqttServer 接口

public class MqttApp : IMqttServer{/// /// 服务端口/// int IMqttServer.Port { get => 10883; }public MqttApp(){}/// /// 消息未消费/// /// 主题/// 消息内容/// 事件原始参数/// async Task IMqttServer.ApplicationMessageNotConsumedAsync(string Topic,string msg,ApplicationMessageNotConsumedEventArgs args){Console.WriteLine($"消息未消费{Topic}:");Console.WriteLine(msg);}/// /// 客户端上线/// /// 客户端id/// 事件原始参数/// async Task IMqttServer.ClientConnectedAsync(string clientId, ClientConnectedEventArgs args){Console.WriteLine($"客户端上线 id:{clientId}");}/// /// 客户端下线/// /// 客户端id/// 事件原始参数/// async Task IMqttServer.ClientDisconnectedAsync(string clientId, ClientDisconnectedEventArgs args){Console.WriteLine($"客户端下线 id:{clientId}");}/// /// 订阅主题/// /// 客户端id/// 主题/// 事件原始参数/// async Task IMqttServer.ClientSubscribedTopicAsync(string clientId, string Topic, ClientSubscribedTopicEventArgs args){Console.WriteLine($"客户端{clientId}订阅主题:{Topic}");}/// /// 取消主题订阅/// /// 客户端id/// 主题/// 事件原始参数/// async Task IMqttServer.ClientUnsubscribedTopicAsync(string clientId, string Topic,ClientUnsubscribedTopicEventArgs args){Console.WriteLine($"客户端{clientId} 取消主题订阅:{Topic}");}/// /// 收到客户端消息/// /// 客户端id/// 主题/// 消息内容/// 事件原始参数/// 推送消息到指定主题 ("主题","内容")/// async Task IMqttServer.InterceptingPublishAsync(string clientId, string Topic, string msg, InterceptingPublishEventArgs args, Action<string, string> ToTopic){Console.WriteLine($"客户端{clientId} 主题{Topic} 发送消息 内容:");Console.WriteLine(msg);//推送消息到指定主题ToTopic("主题","内容");}/// /// 服务启动事件/// /// 事件原始参数/// async Task IMqttServer.StartedAsync(EventArgs args){}/// /// 服务停止事件/// /// 事件原始参数/// async Task IMqttServer.StoppedAsync(EventArgs args){}/// /// 验证账号密码/// /// 账号/// 密码/// 客户端id/// 事件原始参数/// bool IMqttServer.ValidatingConnectionAsync(string username, string password, string clientId, ValidatingConnectionEventArgs args){Console.WriteLine($"验证客户端{clientId}信息:{args.UserName} {args.Password}");return true;//验证通过//return false;//验证不通过}}

Program.cs

// 注入builder.Services.AddSingleton<IMqttServer, MqttApp>();builder.Services.AddSingleton<MqttServerBase>();/* 如果没有下面这段代码,那么程序启动后不会立即启动mqtt服务,需要在控制器注入来初始化实列,app.Services.GetService 相当于访问了一次对象*///立即启动Mqtt服务//app.Services.GetService();//延时启动Mqtt服务Task.Run(async () => {await Task.Delay(3000);app.Services.GetService<MqttServerBase>();});