Skip to content

Commit e54bc21

Browse files
committed
MQTT client examples
1 parent 0d2d0e9 commit e54bc21

File tree

3 files changed

+243
-0
lines changed

3 files changed

+243
-0
lines changed

examples/mqtt_client.rs

+112
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
use core::time::Duration;
2+
3+
use esp_idf_svc::eventloop::EspSystemEventLoop;
4+
use esp_idf_svc::hal::peripherals::Peripherals;
5+
use esp_idf_svc::mqtt::client::*;
6+
use esp_idf_svc::nvs::EspDefaultNvsPartition;
7+
use esp_idf_svc::sys::EspError;
8+
use esp_idf_svc::wifi::*;
9+
10+
use log::*;
11+
12+
const SSID: &str = env!("WIFI_SSID");
13+
const PASSWORD: &str = env!("WIFI_PASS");
14+
15+
const MQTT_URL: &str = "mqtt://broker.emqx.io:1883";
16+
const MQTT_CLIENT_ID: &str = "esp-mqtt-demo";
17+
const MQTT_TOPIC: &str = "esp-mqtt-demo";
18+
19+
fn main() -> Result<(), EspError> {
20+
esp_idf_svc::sys::link_patches();
21+
esp_idf_svc::log::EspLogger::initialize_default();
22+
23+
let _wifi = wifi_create()?;
24+
25+
let (mut client, mut conn) = mqtt_create(MQTT_URL, MQTT_CLIENT_ID)?;
26+
27+
run(&mut client, &mut conn, MQTT_TOPIC)
28+
}
29+
30+
fn run(
31+
client: &mut EspMqttClient<'_>,
32+
connection: &mut EspMqttConnection,
33+
topic: &str,
34+
) -> Result<(), EspError> {
35+
std::thread::scope(|s| {
36+
info!("About to start the MQTT client");
37+
38+
info!("MQTT client started");
39+
40+
s.spawn(move || {
41+
info!("MQTT Listening for messages");
42+
43+
while let Ok(event) = connection.next() {
44+
info!("[Queue] Event: {}", event.payload());
45+
}
46+
47+
info!("Connection closed");
48+
});
49+
50+
client.subscribe(topic, QoS::AtMostOnce)?;
51+
52+
info!("Subscribed to topic \"{topic}\"");
53+
54+
// Just to give a chance of our connection to get even the first published message
55+
std::thread::sleep(Duration::from_millis(500));
56+
57+
let payload = "Hello from esp-mqtt-demo!";
58+
59+
loop {
60+
client.enqueue(topic, QoS::AtMostOnce, false, payload.as_bytes())?;
61+
62+
info!("Published \"{payload}\" to topic \"{topic}\"");
63+
64+
let sleep_secs = 2;
65+
66+
info!("Now sleeping for {sleep_secs}s...");
67+
std::thread::sleep(Duration::from_secs(sleep_secs));
68+
}
69+
})
70+
}
71+
72+
fn mqtt_create(
73+
url: &str,
74+
client_id: &str,
75+
) -> Result<(EspMqttClient<'static>, EspMqttConnection), EspError> {
76+
let (mqtt_client, mqtt_conn) = EspMqttClient::new_with_conn(
77+
url,
78+
&MqttClientConfiguration {
79+
client_id: Some(client_id),
80+
..Default::default()
81+
},
82+
)?;
83+
84+
Ok((mqtt_client, mqtt_conn))
85+
}
86+
87+
fn wifi_create() -> Result<EspWifi<'static>, EspError> {
88+
let peripherals = Peripherals::take()?;
89+
90+
let sys_loop = EspSystemEventLoop::take()?;
91+
let nvs = EspDefaultNvsPartition::take()?;
92+
93+
let mut esp_wifi = EspWifi::new(peripherals.modem, sys_loop.clone(), Some(nvs))?;
94+
95+
let mut wifi = BlockingWifi::wrap(&mut esp_wifi, sys_loop)?;
96+
97+
wifi.set_configuration(&Configuration::Client(ClientConfiguration {
98+
ssid: SSID.try_into().unwrap(),
99+
password: PASSWORD.try_into().unwrap(),
100+
..Default::default()
101+
}))?;
102+
103+
wifi.start()?;
104+
wifi.wait_netif_up()?;
105+
106+
info!(
107+
"Created Wi-Fi with WIFI_SSID `{}` and WIFI_PASS `{}`",
108+
SSID, PASSWORD
109+
);
110+
111+
Ok(esp_wifi)
112+
}

examples/mqtt_client_async.rs

+129
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
use core::time::Duration;
2+
3+
use embassy_futures::select::{select, Either};
4+
5+
use esp_idf_svc::eventloop::EspSystemEventLoop;
6+
use esp_idf_svc::hal::peripherals::Peripherals;
7+
use esp_idf_svc::mqtt::client::*;
8+
use esp_idf_svc::nvs::EspDefaultNvsPartition;
9+
use esp_idf_svc::sys::EspError;
10+
use esp_idf_svc::timer::{EspAsyncTimer, EspTaskTimerService, EspTimerService};
11+
use esp_idf_svc::wifi::*;
12+
13+
use log::*;
14+
15+
const SSID: &str = env!("WIFI_SSID");
16+
const PASSWORD: &str = env!("WIFI_PASS");
17+
18+
const MQTT_URL: &str = "mqtt://broker.emqx.io:1883";
19+
const MQTT_CLIENT_ID: &str = "esp-mqtt-demo";
20+
const MQTT_TOPIC: &str = "esp-mqtt-demo";
21+
22+
fn main() -> Result<(), EspError> {
23+
esp_idf_svc::sys::link_patches();
24+
esp_idf_svc::log::EspLogger::initialize_default();
25+
26+
esp_idf_svc::hal::task::block_on(async {
27+
let timer_service = EspTimerService::new()?;
28+
let _wifi = wifi_create(&timer_service).await?;
29+
30+
let (mut client, mut conn) = mqtt_create(MQTT_URL, MQTT_CLIENT_ID)?;
31+
32+
let mut timer = timer_service.timer_async()?;
33+
run(&mut client, &mut conn, &mut timer, MQTT_TOPIC).await
34+
})
35+
}
36+
37+
async fn run(
38+
client: &mut EspMqttClient<'_>,
39+
connection: &mut EspMqttConnection,
40+
timer: &mut EspAsyncTimer,
41+
topic: &str,
42+
) -> Result<(), EspError> {
43+
info!("About to start the MQTT client");
44+
45+
info!("MQTT client started");
46+
47+
let res = select(
48+
async move {
49+
info!("MQTT Listening for messages");
50+
51+
while let Ok(event) = connection.next_async().await {
52+
info!("[Queue] Event: {}", event.payload());
53+
}
54+
55+
info!("Connection closed");
56+
57+
Ok(())
58+
},
59+
async move {
60+
client.subscribe(topic, QoS::AtMostOnce)?;
61+
62+
info!("Subscribed to topic \"{topic}\"");
63+
64+
// Just to give a chance of our connection to get even the first published message
65+
timer.after(Duration::from_millis(500)).await?;
66+
67+
let payload = "Hello from esp-mqtt-demo!";
68+
69+
loop {
70+
client.enqueue(topic, QoS::AtMostOnce, false, payload.as_bytes())?;
71+
72+
info!("Published \"{payload}\" to topic \"{topic}\"");
73+
74+
let sleep_secs = 2;
75+
76+
info!("Now sleeping for {sleep_secs}s...");
77+
timer.after(Duration::from_secs(sleep_secs)).await?;
78+
}
79+
},
80+
)
81+
.await;
82+
83+
match res {
84+
Either::First(res) => res,
85+
Either::Second(res) => res,
86+
}
87+
}
88+
89+
fn mqtt_create(
90+
url: &str,
91+
client_id: &str,
92+
) -> Result<(EspMqttClient<'static>, EspMqttConnection), EspError> {
93+
let (mqtt_client, mqtt_conn) = EspMqttClient::new_with_conn(
94+
url,
95+
&MqttClientConfiguration {
96+
client_id: Some(client_id),
97+
..Default::default()
98+
},
99+
)?;
100+
101+
Ok((mqtt_client, mqtt_conn))
102+
}
103+
104+
async fn wifi_create(timer_service: &EspTaskTimerService) -> Result<EspWifi<'static>, EspError> {
105+
let peripherals = Peripherals::take()?;
106+
107+
let sys_loop = EspSystemEventLoop::take()?;
108+
let nvs = EspDefaultNvsPartition::take()?;
109+
110+
let mut esp_wifi = EspWifi::new(peripherals.modem, sys_loop.clone(), Some(nvs))?;
111+
112+
let mut wifi = AsyncWifi::wrap(&mut esp_wifi, sys_loop, timer_service.clone())?;
113+
114+
wifi.set_configuration(&Configuration::Client(ClientConfiguration {
115+
ssid: SSID.try_into().unwrap(),
116+
password: PASSWORD.try_into().unwrap(),
117+
..Default::default()
118+
}))?;
119+
120+
wifi.start().await?;
121+
wifi.wait_netif_up().await?;
122+
123+
info!(
124+
"Created Wi-Fi with WIFI_SSID `{}` and WIFI_PASS `{}`",
125+
SSID, PASSWORD
126+
);
127+
128+
Ok(esp_wifi)
129+
}

examples/ws_guessing_game.rs

+2
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ use std::{borrow::Cow, collections::BTreeMap, str, sync::Mutex};
2929

3030
const SSID: &str = env!("WIFI_SSID");
3131
const PASSWORD: &str = env!("WIFI_PASS");
32+
3233
static INDEX_HTML: &str = include_str!("ws_guessing_game.html");
3334

3435
// Max payload length
@@ -122,6 +123,7 @@ fn nth(n: u32) -> Cow<'static, str> {
122123
fn main() -> anyhow::Result<()> {
123124
esp_idf_svc::sys::link_patches();
124125
esp_idf_svc::log::EspLogger::initialize_default();
126+
125127
let mut server = create_server()?;
126128

127129
server.fn_handler("/", Method::Get, |req| {

0 commit comments

Comments
 (0)