1use std::collections::{HashMap, VecDeque};
9use std::sync::{Arc, Mutex};
10
11use anyhow::Result;
12use tokio::runtime::Runtime;
13use wasmtime::{Caller, Linker};
14use webrtc::api::interceptor_registry::register_default_interceptors;
15use webrtc::api::media_engine::MediaEngine;
16use webrtc::api::APIBuilder;
17use webrtc::data_channel::data_channel_message::DataChannelMessage;
18use webrtc::data_channel::RTCDataChannel;
19use webrtc::ice_transport::ice_candidate::RTCIceCandidateInit;
20use webrtc::ice_transport::ice_server::RTCIceServer;
21use webrtc::interceptor::registry::Registry;
22use webrtc::peer_connection::configuration::RTCConfiguration;
23use webrtc::peer_connection::peer_connection_state::RTCPeerConnectionState;
24use webrtc::peer_connection::sdp::session_description::RTCSessionDescription;
25use webrtc::peer_connection::RTCPeerConnection;
26
27use crate::capabilities::{
28 console_log, read_guest_bytes, read_guest_string, write_guest_bytes, ConsoleLevel, HostState,
29};
30
31struct IncomingMessage {
33 channel_id: u32,
34 is_binary: bool,
35 data: Vec<u8>,
36}
37
38struct PendingChannel {
40 channel_id: u32,
41 label: String,
42}
43
44struct PendingTrack {
46 kind: u32,
47 id: String,
48 stream_id: String,
49}
50
51struct PeerState {
53 conn: Arc<RTCPeerConnection>,
54 data_channels: Arc<Mutex<HashMap<u32, Arc<RTCDataChannel>>>>,
55 incoming_messages: Arc<Mutex<VecDeque<IncomingMessage>>>,
56 pending_channels: Arc<Mutex<VecDeque<PendingChannel>>>,
57 pending_tracks: Arc<Mutex<VecDeque<PendingTrack>>>,
58 ice_candidates: Arc<Mutex<VecDeque<String>>>,
59 connection_state: Arc<Mutex<u32>>,
60 next_channel_id: u32,
61}
62
63struct SignalingSession {
65 base_url: String,
66 room: String,
67 client: reqwest::blocking::Client,
68}
69
70pub struct RtcState {
72 runtime: Runtime,
73 peers: HashMap<u32, PeerState>,
74 next_peer_id: u32,
75 signaling: Option<SignalingSession>,
76}
77
78const STATE_NEW: u32 = 0;
80const STATE_CONNECTING: u32 = 1;
81const STATE_CONNECTED: u32 = 2;
82const STATE_DISCONNECTED: u32 = 3;
83const STATE_FAILED: u32 = 4;
84const STATE_CLOSED: u32 = 5;
85
86fn map_connection_state(s: RTCPeerConnectionState) -> u32 {
87 match s {
88 RTCPeerConnectionState::New => STATE_NEW,
89 RTCPeerConnectionState::Connecting => STATE_CONNECTING,
90 RTCPeerConnectionState::Connected => STATE_CONNECTED,
91 RTCPeerConnectionState::Disconnected => STATE_DISCONNECTED,
92 RTCPeerConnectionState::Failed => STATE_FAILED,
93 RTCPeerConnectionState::Closed => STATE_CLOSED,
94 _ => STATE_NEW,
95 }
96}
97
98impl RtcState {
99 pub fn new() -> Option<Self> {
100 let runtime = Runtime::new().ok()?;
101 Some(Self {
102 runtime,
103 peers: HashMap::new(),
104 next_peer_id: 1,
105 signaling: None,
106 })
107 }
108
109 fn alloc_peer_id(&mut self) -> u32 {
110 let id = self.next_peer_id;
111 self.next_peer_id = self.next_peer_id.wrapping_add(1).max(1);
112 id
113 }
114
115 fn create_peer(&mut self, stun_urls: Vec<String>) -> Result<u32> {
116 let config = RTCConfiguration {
117 ice_servers: if stun_urls.is_empty() {
118 vec![RTCIceServer {
119 urls: vec!["stun:stun.l.google.com:19302".to_string()],
120 ..Default::default()
121 }]
122 } else {
123 vec![RTCIceServer {
124 urls: stun_urls,
125 ..Default::default()
126 }]
127 },
128 ..Default::default()
129 };
130
131 let peer_id = self.alloc_peer_id();
132 let conn = self.runtime.block_on(async {
133 let mut me = MediaEngine::default();
134 me.register_default_codecs()?;
135 let mut registry = Registry::new();
136 registry = register_default_interceptors(registry, &mut me)?;
137 let api = APIBuilder::new()
138 .with_media_engine(me)
139 .with_interceptor_registry(registry)
140 .build();
141 api.new_peer_connection(config).await
142 })?;
143
144 let conn = Arc::new(conn);
145 let connection_state = Arc::new(Mutex::new(STATE_NEW));
146 let ice_candidates: Arc<Mutex<VecDeque<String>>> = Arc::new(Mutex::new(VecDeque::new()));
147 let incoming_messages: Arc<Mutex<VecDeque<IncomingMessage>>> =
148 Arc::new(Mutex::new(VecDeque::new()));
149 let pending_channels: Arc<Mutex<VecDeque<PendingChannel>>> =
150 Arc::new(Mutex::new(VecDeque::new()));
151 let pending_tracks: Arc<Mutex<VecDeque<PendingTrack>>> =
152 Arc::new(Mutex::new(VecDeque::new()));
153 let data_channels: Arc<Mutex<HashMap<u32, Arc<RTCDataChannel>>>> =
154 Arc::new(Mutex::new(HashMap::new()));
155
156 let cs = connection_state.clone();
158 conn.on_peer_connection_state_change(Box::new(move |s| {
159 *cs.lock().unwrap() = map_connection_state(s);
160 Box::pin(async {})
161 }));
162
163 let ice = ice_candidates.clone();
165 conn.on_ice_candidate(Box::new(move |c| {
166 if let Some(candidate) = c {
167 if let Ok(json) = serde_json::to_string(&candidate.to_json().unwrap_or_default()) {
168 ice.lock().unwrap().push_back(json);
169 }
170 }
171 Box::pin(async {})
172 }));
173
174 let pending = pending_channels.clone();
176 let msgs = incoming_messages.clone();
177 let dc_map = data_channels.clone();
178 let next_ch = Arc::new(Mutex::new(1u32));
179 conn.on_data_channel(Box::new(move |dc| {
180 let ch_id = {
181 let mut n = next_ch.lock().unwrap();
182 let id = *n;
183 *n = n.wrapping_add(1).max(1);
184 id
185 };
186 let label = dc.label().to_string();
187 pending.lock().unwrap().push_back(PendingChannel {
188 channel_id: ch_id,
189 label,
190 });
191
192 let msgs2 = msgs.clone();
193 dc.on_message(Box::new(move |msg: DataChannelMessage| {
194 msgs2.lock().unwrap().push_back(IncomingMessage {
195 channel_id: ch_id,
196 is_binary: !msg.is_string,
197 data: msg.data.to_vec(),
198 });
199 Box::pin(async {})
200 }));
201
202 dc_map.lock().unwrap().insert(ch_id, dc);
203
204 Box::pin(async {})
205 }));
206
207 let tracks = pending_tracks.clone();
209 conn.on_track(Box::new(move |track, _receiver, _transceiver| {
210 let kind = match track.kind() {
211 webrtc::rtp_transceiver::rtp_codec::RTPCodecType::Audio => 0,
212 webrtc::rtp_transceiver::rtp_codec::RTPCodecType::Video => 1,
213 _ => 2,
214 };
215 tracks.lock().unwrap().push_back(PendingTrack {
216 kind,
217 id: track.id().to_string(),
218 stream_id: track.stream_id().to_string(),
219 });
220 Box::pin(async {})
221 }));
222
223 self.peers.insert(
224 peer_id,
225 PeerState {
226 conn,
227 data_channels,
228 incoming_messages,
229 pending_channels,
230 pending_tracks,
231 ice_candidates,
232 connection_state,
233 next_channel_id: 1,
234 },
235 );
236
237 Ok(peer_id)
238 }
239
240 fn close_peer(&mut self, peer_id: u32) -> bool {
241 if let Some(peer) = self.peers.remove(&peer_id) {
242 let _ = self.runtime.block_on(peer.conn.close());
243 true
244 } else {
245 false
246 }
247 }
248
249 fn create_offer(&mut self, peer_id: u32) -> Result<String> {
250 let peer = self
251 .peers
252 .get(&peer_id)
253 .ok_or_else(|| anyhow::anyhow!("unknown peer"))?;
254 let offer = self.runtime.block_on(peer.conn.create_offer(None))?;
255 self.runtime
256 .block_on(peer.conn.set_local_description(offer.clone()))?;
257 Ok(offer.sdp)
258 }
259
260 fn create_answer(&mut self, peer_id: u32) -> Result<String> {
261 let peer = self
262 .peers
263 .get(&peer_id)
264 .ok_or_else(|| anyhow::anyhow!("unknown peer"))?;
265 let answer = self.runtime.block_on(peer.conn.create_answer(None))?;
266 self.runtime
267 .block_on(peer.conn.set_local_description(answer.clone()))?;
268 Ok(answer.sdp)
269 }
270
271 fn set_local_description(&mut self, peer_id: u32, sdp: &str, is_offer: bool) -> Result<()> {
272 let peer = self
273 .peers
274 .get(&peer_id)
275 .ok_or_else(|| anyhow::anyhow!("unknown peer"))?;
276 let desc = if is_offer {
277 RTCSessionDescription::offer(sdp.to_string())?
278 } else {
279 RTCSessionDescription::answer(sdp.to_string())?
280 };
281 self.runtime
282 .block_on(peer.conn.set_local_description(desc))?;
283 Ok(())
284 }
285
286 fn set_remote_description(&mut self, peer_id: u32, sdp: &str, is_offer: bool) -> Result<()> {
287 let peer = self
288 .peers
289 .get(&peer_id)
290 .ok_or_else(|| anyhow::anyhow!("unknown peer"))?;
291 let desc = if is_offer {
292 RTCSessionDescription::offer(sdp.to_string())?
293 } else {
294 RTCSessionDescription::answer(sdp.to_string())?
295 };
296 self.runtime
297 .block_on(peer.conn.set_remote_description(desc))?;
298 Ok(())
299 }
300
301 fn add_ice_candidate(&mut self, peer_id: u32, candidate_json: &str) -> Result<()> {
302 let peer = self
303 .peers
304 .get(&peer_id)
305 .ok_or_else(|| anyhow::anyhow!("unknown peer"))?;
306 let init: RTCIceCandidateInit = serde_json::from_str(candidate_json)?;
307 self.runtime.block_on(peer.conn.add_ice_candidate(init))?;
308 Ok(())
309 }
310
311 fn connection_state(&self, peer_id: u32) -> u32 {
312 self.peers
313 .get(&peer_id)
314 .map(|p| *p.connection_state.lock().unwrap())
315 .unwrap_or(STATE_CLOSED)
316 }
317
318 fn poll_ice_candidate(&self, peer_id: u32) -> Option<String> {
319 self.peers
320 .get(&peer_id)
321 .and_then(|p| p.ice_candidates.lock().unwrap().pop_front())
322 }
323
324 fn create_data_channel(&mut self, peer_id: u32, label: &str, ordered: bool) -> Result<u32> {
325 let peer = self
326 .peers
327 .get_mut(&peer_id)
328 .ok_or_else(|| anyhow::anyhow!("unknown peer"))?;
329
330 let opts = if ordered {
331 None
332 } else {
333 Some(
334 webrtc::data_channel::data_channel_init::RTCDataChannelInit {
335 ordered: Some(false),
336 ..Default::default()
337 },
338 )
339 };
340
341 let dc = self.runtime.block_on(async {
342 if let Some(opts) = opts {
343 peer.conn.create_data_channel(label, Some(opts)).await
344 } else {
345 peer.conn.create_data_channel(label, None).await
346 }
347 })?;
348
349 let ch_id = peer.next_channel_id;
350 peer.next_channel_id = peer.next_channel_id.wrapping_add(1).max(1);
351
352 let msgs = peer.incoming_messages.clone();
353 let ch_id_copy = ch_id;
354 dc.on_message(Box::new(move |msg: DataChannelMessage| {
355 msgs.lock().unwrap().push_back(IncomingMessage {
356 channel_id: ch_id_copy,
357 is_binary: !msg.is_string,
358 data: msg.data.to_vec(),
359 });
360 Box::pin(async {})
361 }));
362
363 peer.data_channels.lock().unwrap().insert(ch_id, dc);
364 Ok(ch_id)
365 }
366
367 fn send_data(&self, peer_id: u32, channel_id: u32, data: &[u8], is_binary: bool) -> Result<()> {
368 let peer = self
369 .peers
370 .get(&peer_id)
371 .ok_or_else(|| anyhow::anyhow!("unknown peer"))?;
372 let dc = peer
373 .data_channels
374 .lock()
375 .unwrap()
376 .get(&channel_id)
377 .cloned()
378 .ok_or_else(|| anyhow::anyhow!("unknown channel"))?;
379 if is_binary {
380 self.runtime
381 .block_on(dc.send(&bytes::Bytes::copy_from_slice(data)))?;
382 } else {
383 let text = String::from_utf8_lossy(data).to_string();
384 self.runtime.block_on(dc.send_text(text))?;
385 }
386 Ok(())
387 }
388
389 fn recv_message(&self, peer_id: u32) -> Option<IncomingMessage> {
390 self.peers
391 .get(&peer_id)
392 .and_then(|p| p.incoming_messages.lock().unwrap().pop_front())
393 }
394
395 fn recv_from_channel(&self, peer_id: u32, channel_id: u32) -> Option<IncomingMessage> {
396 let peer = self.peers.get(&peer_id)?;
397 let mut q = peer.incoming_messages.lock().unwrap();
398 if let Some(pos) = q.iter().position(|m| m.channel_id == channel_id) {
399 q.remove(pos)
400 } else {
401 None
402 }
403 }
404
405 fn poll_new_channel(&self, peer_id: u32) -> Option<PendingChannel> {
406 self.peers
407 .get(&peer_id)
408 .and_then(|p| p.pending_channels.lock().unwrap().pop_front())
409 }
410
411 fn poll_track(&self, peer_id: u32) -> Option<PendingTrack> {
412 self.peers
413 .get(&peer_id)
414 .and_then(|p| p.pending_tracks.lock().unwrap().pop_front())
415 }
416
417 fn add_track(&mut self, peer_id: u32, kind: u32) -> Result<u32> {
418 let peer = self
419 .peers
420 .get_mut(&peer_id)
421 .ok_or_else(|| anyhow::anyhow!("unknown peer"))?;
422
423 let handle = peer.next_channel_id;
424 peer.next_channel_id = peer.next_channel_id.wrapping_add(1).max(1);
425
426 let track_id = format!("track-{kind}-{handle}");
427
428 let mime = if kind == 0 {
429 webrtc::api::media_engine::MIME_TYPE_OPUS
430 } else {
431 webrtc::api::media_engine::MIME_TYPE_VP8
432 };
433
434 let track = Arc::new(
435 webrtc::track::track_local::track_local_static_rtp::TrackLocalStaticRTP::new(
436 webrtc::rtp_transceiver::rtp_codec::RTCRtpCodecCapability {
437 mime_type: mime.to_string(),
438 ..Default::default()
439 },
440 track_id,
441 format!("oxide-{}", if kind == 0 { "audio" } else { "video" }),
442 ),
443 );
444
445 let _sender = self.runtime.block_on(async {
446 peer.conn
447 .add_track(track as Arc<dyn webrtc::track::track_local::TrackLocal + Send + Sync>)
448 .await
449 })?;
450
451 Ok(handle)
452 }
453
454 fn signal_connect(&mut self, url: &str) -> bool {
457 self.signaling = Some(SignalingSession {
458 base_url: url.trim_end_matches('/').to_string(),
459 room: String::new(),
460 client: reqwest::blocking::Client::new(),
461 });
462 true
463 }
464
465 fn signal_join_room(&mut self, room: &str) -> bool {
466 if let Some(ref mut sig) = self.signaling {
467 sig.room = room.to_string();
468 let url = format!("{}/rooms/{}/join", sig.base_url, room);
469 sig.client.post(&url).send().ok();
470 true
471 } else {
472 false
473 }
474 }
475
476 fn signal_send(&self, data: &[u8]) -> bool {
477 if let Some(ref sig) = self.signaling {
478 let url = if sig.room.is_empty() {
479 format!("{}/signal", sig.base_url)
480 } else {
481 format!("{}/rooms/{}/signal", sig.base_url, sig.room)
482 };
483 sig.client
484 .post(&url)
485 .header("Content-Type", "application/json")
486 .body(data.to_vec())
487 .send()
488 .is_ok()
489 } else {
490 false
491 }
492 }
493
494 fn signal_recv(&self) -> Option<Vec<u8>> {
495 let sig = self.signaling.as_ref()?;
496 let url = if sig.room.is_empty() {
497 format!("{}/signal", sig.base_url)
498 } else {
499 format!("{}/rooms/{}/signal", sig.base_url, sig.room)
500 };
501 let resp = sig.client.get(&url).send().ok()?;
502 if resp.status().is_success() {
503 resp.bytes().ok().map(|b| b.to_vec())
504 } else {
505 None
506 }
507 }
508}
509
510fn ensure_rtc(state: &Arc<Mutex<Option<RtcState>>>) -> bool {
511 let mut g = state.lock().unwrap();
512 if g.is_none() {
513 *g = RtcState::new();
514 }
515 g.is_some()
516}
517
518pub fn register_rtc_functions(linker: &mut Linker<HostState>) -> Result<()> {
520 linker.func_wrap(
523 "oxide",
524 "api_rtc_create_peer",
525 |caller: Caller<'_, HostState>, stun_ptr: u32, stun_len: u32| -> u32 {
526 let console = caller.data().console.clone();
527 let rtc = caller.data().rtc.clone();
528 if !ensure_rtc(&rtc) {
529 console_log(&console, ConsoleLevel::Error, "[RTC] Init failed".into());
530 return 0;
531 }
532 let stun_config = if stun_len > 0 {
533 let mem = caller.data().memory.expect("memory not set");
534 read_guest_string(&mem, &caller, stun_ptr, stun_len).unwrap_or_default()
535 } else {
536 String::new()
537 };
538 let stun_urls: Vec<String> = if stun_config.is_empty() {
539 Vec::new()
540 } else {
541 stun_config
542 .split(',')
543 .map(|s| s.trim().to_string())
544 .collect()
545 };
546 let mut g = rtc.lock().unwrap();
547 match g.as_mut().unwrap().create_peer(stun_urls) {
548 Ok(id) => {
549 console_log(
550 &console,
551 ConsoleLevel::Log,
552 format!("[RTC] Peer {id} created"),
553 );
554 id
555 }
556 Err(e) => {
557 console_log(
558 &console,
559 ConsoleLevel::Error,
560 format!("[RTC] Create peer: {e}"),
561 );
562 0
563 }
564 }
565 },
566 )?;
567
568 linker.func_wrap(
569 "oxide",
570 "api_rtc_close_peer",
571 |caller: Caller<'_, HostState>, peer_id: u32| -> u32 {
572 let rtc = caller.data().rtc.clone();
573 let mut g = rtc.lock().unwrap();
574 if let Some(r) = g.as_mut() {
575 if r.close_peer(peer_id) {
576 1
577 } else {
578 0
579 }
580 } else {
581 0
582 }
583 },
584 )?;
585
586 linker.func_wrap(
589 "oxide",
590 "api_rtc_create_offer",
591 |mut caller: Caller<'_, HostState>, peer_id: u32, out_ptr: u32, out_cap: u32| -> i32 {
592 let mem = match caller.data().memory {
593 Some(m) => m,
594 None => return -4,
595 };
596 let console = caller.data().console.clone();
597 let rtc = caller.data().rtc.clone();
598 let mut g = rtc.lock().unwrap();
599 let r = match g.as_mut() {
600 Some(r) => r,
601 None => return -1,
602 };
603 match r.create_offer(peer_id) {
604 Ok(sdp) => {
605 let bytes = sdp.as_bytes();
606 let write_len = bytes.len().min(out_cap as usize);
607 if write_guest_bytes(&mem, &mut caller, out_ptr, &bytes[..write_len]).is_err() {
608 return -4;
609 }
610 write_len as i32
611 }
612 Err(e) => {
613 console_log(&console, ConsoleLevel::Error, format!("[RTC] Offer: {e}"));
614 -2
615 }
616 }
617 },
618 )?;
619
620 linker.func_wrap(
621 "oxide",
622 "api_rtc_create_answer",
623 |mut caller: Caller<'_, HostState>, peer_id: u32, out_ptr: u32, out_cap: u32| -> i32 {
624 let mem = match caller.data().memory {
625 Some(m) => m,
626 None => return -4,
627 };
628 let console = caller.data().console.clone();
629 let rtc = caller.data().rtc.clone();
630 let mut g = rtc.lock().unwrap();
631 let r = match g.as_mut() {
632 Some(r) => r,
633 None => return -1,
634 };
635 match r.create_answer(peer_id) {
636 Ok(sdp) => {
637 let bytes = sdp.as_bytes();
638 let write_len = bytes.len().min(out_cap as usize);
639 if write_guest_bytes(&mem, &mut caller, out_ptr, &bytes[..write_len]).is_err() {
640 return -4;
641 }
642 write_len as i32
643 }
644 Err(e) => {
645 console_log(&console, ConsoleLevel::Error, format!("[RTC] Answer: {e}"));
646 -2
647 }
648 }
649 },
650 )?;
651
652 linker.func_wrap(
655 "oxide",
656 "api_rtc_set_local_description",
657 |caller: Caller<'_, HostState>,
658 peer_id: u32,
659 sdp_ptr: u32,
660 sdp_len: u32,
661 is_offer: u32|
662 -> i32 {
663 let mem = caller.data().memory.expect("memory not set");
664 let console = caller.data().console.clone();
665 let sdp = read_guest_string(&mem, &caller, sdp_ptr, sdp_len).unwrap_or_default();
666 let rtc = caller.data().rtc.clone();
667 let mut g = rtc.lock().unwrap();
668 match g.as_mut() {
669 Some(r) => match r.set_local_description(peer_id, &sdp, is_offer != 0) {
670 Ok(()) => 0,
671 Err(e) => {
672 console_log(
673 &console,
674 ConsoleLevel::Error,
675 format!("[RTC] Set local desc: {e}"),
676 );
677 -2
678 }
679 },
680 None => -1,
681 }
682 },
683 )?;
684
685 linker.func_wrap(
686 "oxide",
687 "api_rtc_set_remote_description",
688 |caller: Caller<'_, HostState>,
689 peer_id: u32,
690 sdp_ptr: u32,
691 sdp_len: u32,
692 is_offer: u32|
693 -> i32 {
694 let mem = caller.data().memory.expect("memory not set");
695 let console = caller.data().console.clone();
696 let sdp = read_guest_string(&mem, &caller, sdp_ptr, sdp_len).unwrap_or_default();
697 let rtc = caller.data().rtc.clone();
698 let mut g = rtc.lock().unwrap();
699 match g.as_mut() {
700 Some(r) => match r.set_remote_description(peer_id, &sdp, is_offer != 0) {
701 Ok(()) => 0,
702 Err(e) => {
703 console_log(
704 &console,
705 ConsoleLevel::Error,
706 format!("[RTC] Set remote desc: {e}"),
707 );
708 -2
709 }
710 },
711 None => -1,
712 }
713 },
714 )?;
715
716 linker.func_wrap(
719 "oxide",
720 "api_rtc_add_ice_candidate",
721 |caller: Caller<'_, HostState>, peer_id: u32, cand_ptr: u32, cand_len: u32| -> i32 {
722 let mem = caller.data().memory.expect("memory not set");
723 let console = caller.data().console.clone();
724 let candidate =
725 read_guest_string(&mem, &caller, cand_ptr, cand_len).unwrap_or_default();
726 let rtc = caller.data().rtc.clone();
727 let mut g = rtc.lock().unwrap();
728 match g.as_mut() {
729 Some(r) => match r.add_ice_candidate(peer_id, &candidate) {
730 Ok(()) => 0,
731 Err(e) => {
732 console_log(
733 &console,
734 ConsoleLevel::Error,
735 format!("[RTC] Add ICE candidate: {e}"),
736 );
737 -2
738 }
739 },
740 None => -1,
741 }
742 },
743 )?;
744
745 linker.func_wrap(
746 "oxide",
747 "api_rtc_connection_state",
748 |caller: Caller<'_, HostState>, peer_id: u32| -> u32 {
749 let rtc = caller.data().rtc.clone();
750 let g = rtc.lock().unwrap();
751 match g.as_ref() {
752 Some(r) => r.connection_state(peer_id),
753 None => STATE_CLOSED,
754 }
755 },
756 )?;
757
758 linker.func_wrap(
759 "oxide",
760 "api_rtc_poll_ice_candidate",
761 |mut caller: Caller<'_, HostState>, peer_id: u32, out_ptr: u32, out_cap: u32| -> i32 {
762 let mem = match caller.data().memory {
763 Some(m) => m,
764 None => return -4,
765 };
766 let rtc = caller.data().rtc.clone();
767 let g = rtc.lock().unwrap();
768 match g.as_ref() {
769 Some(r) => match r.poll_ice_candidate(peer_id) {
770 Some(json) => {
771 let bytes = json.as_bytes();
772 let write_len = bytes.len().min(out_cap as usize);
773 if write_guest_bytes(&mem, &mut caller, out_ptr, &bytes[..write_len])
774 .is_err()
775 {
776 return -4;
777 }
778 write_len as i32
779 }
780 None => 0,
781 },
782 None => -1,
783 }
784 },
785 )?;
786
787 linker.func_wrap(
790 "oxide",
791 "api_rtc_create_data_channel",
792 |caller: Caller<'_, HostState>,
793 peer_id: u32,
794 label_ptr: u32,
795 label_len: u32,
796 ordered: u32|
797 -> u32 {
798 let mem = caller.data().memory.expect("memory not set");
799 let console = caller.data().console.clone();
800 let label = read_guest_string(&mem, &caller, label_ptr, label_len).unwrap_or_default();
801 let rtc = caller.data().rtc.clone();
802 let mut g = rtc.lock().unwrap();
803 match g.as_mut() {
804 Some(r) => match r.create_data_channel(peer_id, &label, ordered != 0) {
805 Ok(ch) => ch,
806 Err(e) => {
807 console_log(
808 &console,
809 ConsoleLevel::Error,
810 format!("[RTC] Create data channel: {e}"),
811 );
812 0
813 }
814 },
815 None => 0,
816 }
817 },
818 )?;
819
820 linker.func_wrap(
821 "oxide",
822 "api_rtc_send",
823 |caller: Caller<'_, HostState>,
824 peer_id: u32,
825 channel_id: u32,
826 data_ptr: u32,
827 data_len: u32,
828 is_binary: u32|
829 -> i32 {
830 let mem = caller.data().memory.expect("memory not set");
831 let console = caller.data().console.clone();
832 let data = read_guest_bytes(&mem, &caller, data_ptr, data_len).unwrap_or_default();
833 let rtc = caller.data().rtc.clone();
834 let g = rtc.lock().unwrap();
835 match g.as_ref() {
836 Some(r) => match r.send_data(peer_id, channel_id, &data, is_binary != 0) {
837 Ok(()) => data.len() as i32,
838 Err(e) => {
839 console_log(&console, ConsoleLevel::Error, format!("[RTC] Send: {e}"));
840 -2
841 }
842 },
843 None => -1,
844 }
845 },
846 )?;
847
848 linker.func_wrap(
849 "oxide",
850 "api_rtc_recv",
851 |mut caller: Caller<'_, HostState>,
852 peer_id: u32,
853 channel_id: u32,
854 out_ptr: u32,
855 out_cap: u32|
856 -> i64 {
857 let mem = match caller.data().memory {
858 Some(m) => m,
859 None => return -4,
860 };
861 let rtc = caller.data().rtc.clone();
862 let g = rtc.lock().unwrap();
863 match g.as_ref() {
864 Some(r) => {
865 let msg = if channel_id == 0 {
866 r.recv_message(peer_id)
867 } else {
868 r.recv_from_channel(peer_id, channel_id)
869 };
870 match msg {
871 Some(m) => {
872 let write_len = m.data.len().min(out_cap as usize);
873 if write_guest_bytes(&mem, &mut caller, out_ptr, &m.data[..write_len])
874 .is_err()
875 {
876 return -4;
877 }
878 let flags = if m.is_binary { 1u64 << 32 } else { 0 };
879 let ch = (m.channel_id as u64) << 48;
880 (ch | flags | write_len as u64) as i64
881 }
882 None => 0,
883 }
884 }
885 None => -1,
886 }
887 },
888 )?;
889
890 linker.func_wrap(
891 "oxide",
892 "api_rtc_poll_data_channel",
893 |mut caller: Caller<'_, HostState>, peer_id: u32, out_ptr: u32, out_cap: u32| -> i32 {
894 let mem = match caller.data().memory {
895 Some(m) => m,
896 None => return -4,
897 };
898 let rtc = caller.data().rtc.clone();
899 let g = rtc.lock().unwrap();
900 match g.as_ref() {
901 Some(r) => match r.poll_new_channel(peer_id) {
902 Some(ch) => {
903 let info = format!("{}:{}", ch.channel_id, ch.label);
904 let bytes = info.as_bytes();
905 let write_len = bytes.len().min(out_cap as usize);
906 if write_guest_bytes(&mem, &mut caller, out_ptr, &bytes[..write_len])
907 .is_err()
908 {
909 return -4;
910 }
911 write_len as i32
912 }
913 None => 0,
914 },
915 None => -1,
916 }
917 },
918 )?;
919
920 linker.func_wrap(
923 "oxide",
924 "api_rtc_add_track",
925 |caller: Caller<'_, HostState>, peer_id: u32, kind: u32| -> u32 {
926 let console = caller.data().console.clone();
927 let rtc = caller.data().rtc.clone();
928 let mut g = rtc.lock().unwrap();
929 match g.as_mut() {
930 Some(r) => match r.add_track(peer_id, kind) {
931 Ok(id) => id,
932 Err(e) => {
933 console_log(
934 &console,
935 ConsoleLevel::Error,
936 format!("[RTC] Add track: {e}"),
937 );
938 0
939 }
940 },
941 None => 0,
942 }
943 },
944 )?;
945
946 linker.func_wrap(
947 "oxide",
948 "api_rtc_poll_track",
949 |mut caller: Caller<'_, HostState>, peer_id: u32, out_ptr: u32, out_cap: u32| -> i32 {
950 let mem = match caller.data().memory {
951 Some(m) => m,
952 None => return -4,
953 };
954 let rtc = caller.data().rtc.clone();
955 let g = rtc.lock().unwrap();
956 match g.as_ref() {
957 Some(r) => match r.poll_track(peer_id) {
958 Some(t) => {
959 let info = format!("{}:{}:{}", t.kind, t.id, t.stream_id);
960 let bytes = info.as_bytes();
961 let write_len = bytes.len().min(out_cap as usize);
962 if write_guest_bytes(&mem, &mut caller, out_ptr, &bytes[..write_len])
963 .is_err()
964 {
965 return -4;
966 }
967 write_len as i32
968 }
969 None => 0,
970 },
971 None => -1,
972 }
973 },
974 )?;
975
976 linker.func_wrap(
979 "oxide",
980 "api_rtc_signal_connect",
981 |caller: Caller<'_, HostState>, url_ptr: u32, url_len: u32| -> u32 {
982 let mem = caller.data().memory.expect("memory not set");
983 let console = caller.data().console.clone();
984 let url = read_guest_string(&mem, &caller, url_ptr, url_len).unwrap_or_default();
985 let rtc = caller.data().rtc.clone();
986 if !ensure_rtc(&rtc) {
987 console_log(&console, ConsoleLevel::Error, "[RTC] Init failed".into());
988 return 0;
989 }
990 let mut g = rtc.lock().unwrap();
991 if g.as_mut().unwrap().signal_connect(&url) {
992 console_log(
993 &console,
994 ConsoleLevel::Log,
995 format!("[RTC] Signaling connected to {url}"),
996 );
997 1
998 } else {
999 0
1000 }
1001 },
1002 )?;
1003
1004 linker.func_wrap(
1005 "oxide",
1006 "api_rtc_signal_join_room",
1007 |caller: Caller<'_, HostState>, room_ptr: u32, room_len: u32| -> i32 {
1008 let mem = caller.data().memory.expect("memory not set");
1009 let room = read_guest_string(&mem, &caller, room_ptr, room_len).unwrap_or_default();
1010 let rtc = caller.data().rtc.clone();
1011 let mut g = rtc.lock().unwrap();
1012 match g.as_mut() {
1013 Some(r) => {
1014 if r.signal_join_room(&room) {
1015 0
1016 } else {
1017 -2
1018 }
1019 }
1020 None => -1,
1021 }
1022 },
1023 )?;
1024
1025 linker.func_wrap(
1026 "oxide",
1027 "api_rtc_signal_send",
1028 |caller: Caller<'_, HostState>, data_ptr: u32, data_len: u32| -> i32 {
1029 let mem = caller.data().memory.expect("memory not set");
1030 let data = read_guest_bytes(&mem, &caller, data_ptr, data_len).unwrap_or_default();
1031 let rtc = caller.data().rtc.clone();
1032 let g = rtc.lock().unwrap();
1033 match g.as_ref() {
1034 Some(r) => {
1035 if r.signal_send(&data) {
1036 0
1037 } else {
1038 -2
1039 }
1040 }
1041 None => -1,
1042 }
1043 },
1044 )?;
1045
1046 linker.func_wrap(
1047 "oxide",
1048 "api_rtc_signal_recv",
1049 |mut caller: Caller<'_, HostState>, out_ptr: u32, out_cap: u32| -> i32 {
1050 let mem = match caller.data().memory {
1051 Some(m) => m,
1052 None => return -4,
1053 };
1054 let rtc = caller.data().rtc.clone();
1055 let g = rtc.lock().unwrap();
1056 match g.as_ref() {
1057 Some(r) => match r.signal_recv() {
1058 Some(data) => {
1059 let write_len = data.len().min(out_cap as usize);
1060 if write_guest_bytes(&mem, &mut caller, out_ptr, &data[..write_len])
1061 .is_err()
1062 {
1063 return -4;
1064 }
1065 write_len as i32
1066 }
1067 None => 0,
1068 },
1069 None => -1,
1070 }
1071 },
1072 )?;
1073
1074 Ok(())
1075}