Skip to main content

oxide_browser/
fetch.rs

1//! Non-blocking / streaming HTTP fetch for Oxide guest modules.
2//!
3//! The legacy `api_fetch` import (in [`crate::capabilities`]) blocks the guest
4//! until the entire response body has been downloaded. That prevents guests
5//! from rendering frames during large downloads and makes LLM-style
6//! token-streaming, chunked feeds, or progressive image loads impossible.
7//!
8//! This module exposes a second fetch API that is fully async from the guest's
9//! perspective. The shape mirrors [`crate::websocket`]:
10//!
11//! * `api_fetch_begin` dispatches a request and returns a handle immediately.
12//! * `api_fetch_state` / `api_fetch_status` report progress.
13//! * `api_fetch_recv` pulls the next body chunk from an in-memory queue.
14//! * `api_fetch_abort` cancels an in-flight request.
15//! * `api_fetch_remove` frees host-side resources once the guest is done.
16//!
17//! A background tokio task per request drives `reqwest`'s `bytes_stream()`,
18//! pushing chunks into a `VecDeque<Vec<u8>>` drained by the guest.
19
20use std::collections::{HashMap, VecDeque};
21use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
22use std::sync::{Arc, Mutex};
23use std::time::Duration;
24
25use anyhow::Result;
26use futures_util::StreamExt;
27use tokio::runtime::Runtime;
28use wasmtime::{Caller, Linker};
29
30use crate::capabilities::{
31    console_log, read_guest_bytes, read_guest_string, write_guest_bytes, ConsoleLevel, HostState,
32};
33
34// ── Ready-state constants (mirrored in oxide-sdk) ──────────────────────────
35
36/// Request dispatched; waiting for response headers.
37pub const FETCH_PENDING: u32 = 0;
38/// Headers received; body chunks may be streaming in.
39pub const FETCH_STREAMING: u32 = 1;
40/// Body fully delivered and the queue may still contain trailing chunks.
41pub const FETCH_DONE: u32 = 2;
42/// Request failed. Use `api_fetch_error` to retrieve the message.
43pub const FETCH_ERROR: u32 = 3;
44/// Request was aborted by the guest.
45pub const FETCH_ABORTED: u32 = 4;
46
47// ── Recv return-code sentinels (i64, negative) ─────────────────────────────
48
49const RECV_PENDING: i64 = -1;
50const RECV_EOF: i64 = -2;
51const RECV_ERROR: i64 = -3;
52const RECV_UNKNOWN: i64 = -4;
53
54/// Per-request state shared between the host API and the background driver task.
55struct FetchInner {
56    /// One of the `FETCH_*` constants.
57    state: AtomicU32,
58    /// HTTP status code once headers arrive, 0 until then.
59    status: AtomicU32,
60    /// Guest-side signal to stop streaming. Polled by the driver task between chunks.
61    aborted: AtomicBool,
62    /// Queued body chunks. Each `Vec<u8>` is one network chunk; the guest may
63    /// drain them in smaller pieces — see [`FetchState::recv`] for splitting.
64    chunks: Mutex<VecDeque<Vec<u8>>>,
65    /// Last error message (set once before transitioning to [`FETCH_ERROR`]).
66    error: Mutex<Option<String>>,
67}
68
69impl FetchInner {
70    fn new() -> Self {
71        Self {
72            state: AtomicU32::new(FETCH_PENDING),
73            status: AtomicU32::new(0),
74            aborted: AtomicBool::new(false),
75            chunks: Mutex::new(VecDeque::new()),
76            error: Mutex::new(None),
77        }
78    }
79
80    fn set_error(&self, msg: impl Into<String>) {
81        *self.error.lock().unwrap() = Some(msg.into());
82        self.state.store(FETCH_ERROR, Ordering::SeqCst);
83    }
84}
85
86/// All streaming-fetch state for a host. Lazily initialised on the first
87/// `api_fetch_begin` call.
88pub struct FetchState {
89    runtime: Runtime,
90    handles: HashMap<u32, Arc<FetchInner>>,
91    next_id: u32,
92}
93
94impl FetchState {
95    pub fn new() -> Option<Self> {
96        let runtime = Runtime::new().ok()?;
97        Some(Self {
98            runtime,
99            handles: HashMap::new(),
100            next_id: 1,
101        })
102    }
103
104    fn alloc_id(&mut self) -> u32 {
105        let id = self.next_id;
106        self.next_id = self.next_id.wrapping_add(1).max(1);
107        id
108    }
109
110    /// Dispatch a new request. Returns a handle (`> 0`) or `0` on init failure.
111    fn begin(&mut self, method: String, url: String, content_type: String, body: Vec<u8>) -> u32 {
112        let id = self.alloc_id();
113        let inner = Arc::new(FetchInner::new());
114        let driver_inner = inner.clone();
115
116        self.runtime.spawn(async move {
117            drive_request(driver_inner, method, url, content_type, body).await;
118        });
119
120        self.handles.insert(id, inner);
121        id
122    }
123
124    fn state(&self, id: u32) -> u32 {
125        self.handles
126            .get(&id)
127            .map(|h| h.state.load(Ordering::SeqCst))
128            .unwrap_or(FETCH_ERROR)
129    }
130
131    fn status(&self, id: u32) -> u32 {
132        self.handles
133            .get(&id)
134            .map(|h| h.status.load(Ordering::SeqCst))
135            .unwrap_or(0)
136    }
137
138    fn error(&self, id: u32) -> Option<String> {
139        self.handles
140            .get(&id)
141            .and_then(|h| h.error.lock().unwrap().clone())
142    }
143
144    /// Pop up to `cap` bytes from the head of the body queue.
145    ///
146    /// Returns one of the negative sentinels (`RECV_*`) or a non-negative byte
147    /// count. When a single queued chunk is larger than `cap`, the extra bytes
148    /// are re-queued at the front so the next call returns them — no data is
149    /// dropped.
150    fn recv(&self, id: u32, cap: usize) -> RecvResult {
151        let Some(inner) = self.handles.get(&id) else {
152            return RecvResult::Sentinel(RECV_UNKNOWN);
153        };
154
155        let mut q = inner.chunks.lock().unwrap();
156        if let Some(mut chunk) = q.pop_front() {
157            if chunk.len() > cap {
158                let remainder = chunk.split_off(cap);
159                q.push_front(remainder);
160            }
161            return RecvResult::Data(chunk);
162        }
163        drop(q);
164
165        match inner.state.load(Ordering::SeqCst) {
166            FETCH_DONE => RecvResult::Sentinel(RECV_EOF),
167            FETCH_ERROR | FETCH_ABORTED => RecvResult::Sentinel(RECV_ERROR),
168            _ => RecvResult::Sentinel(RECV_PENDING),
169        }
170    }
171
172    fn abort(&self, id: u32) -> bool {
173        if let Some(inner) = self.handles.get(&id) {
174            inner.aborted.store(true, Ordering::SeqCst);
175            // Flip state immediately so `state()` reports the intent even
176            // before the driver task observes the flag.
177            let _ = inner.state.compare_exchange(
178                FETCH_PENDING,
179                FETCH_ABORTED,
180                Ordering::SeqCst,
181                Ordering::SeqCst,
182            );
183            let _ = inner.state.compare_exchange(
184                FETCH_STREAMING,
185                FETCH_ABORTED,
186                Ordering::SeqCst,
187                Ordering::SeqCst,
188            );
189            true
190        } else {
191            false
192        }
193    }
194
195    fn remove(&mut self, id: u32) {
196        if let Some(inner) = self.handles.remove(&id) {
197            inner.aborted.store(true, Ordering::SeqCst);
198        }
199    }
200}
201
202enum RecvResult {
203    Data(Vec<u8>),
204    Sentinel(i64),
205}
206
207/// Background task body: run the request and feed chunks into `inner`.
208async fn drive_request(
209    inner: Arc<FetchInner>,
210    method: String,
211    url: String,
212    content_type: String,
213    body: Vec<u8>,
214) {
215    if inner.aborted.load(Ordering::SeqCst) {
216        inner.state.store(FETCH_ABORTED, Ordering::SeqCst);
217        return;
218    }
219
220    let client = match reqwest::Client::builder()
221        .timeout(Duration::from_secs(60))
222        .build()
223    {
224        Ok(c) => c,
225        Err(e) => {
226            inner.set_error(format!("client build failed: {e}"));
227            return;
228        }
229    };
230
231    let parsed_method: reqwest::Method = method.parse().unwrap_or(reqwest::Method::GET);
232    let mut req = client.request(parsed_method, &url);
233    if !content_type.is_empty() {
234        req = req.header("Content-Type", &content_type);
235    }
236    if !body.is_empty() {
237        req = req.body(body);
238    }
239
240    let resp = match req.send().await {
241        Ok(r) => r,
242        Err(e) => {
243            inner.set_error(e.to_string());
244            return;
245        }
246    };
247
248    if inner.aborted.load(Ordering::SeqCst) {
249        inner.state.store(FETCH_ABORTED, Ordering::SeqCst);
250        return;
251    }
252
253    inner
254        .status
255        .store(resp.status().as_u16() as u32, Ordering::SeqCst);
256    inner.state.store(FETCH_STREAMING, Ordering::SeqCst);
257
258    let mut stream = resp.bytes_stream();
259    while let Some(next) = stream.next().await {
260        if inner.aborted.load(Ordering::SeqCst) {
261            inner.state.store(FETCH_ABORTED, Ordering::SeqCst);
262            return;
263        }
264        match next {
265            Ok(chunk) => {
266                if !chunk.is_empty() {
267                    inner.chunks.lock().unwrap().push_back(chunk.to_vec());
268                }
269            }
270            Err(e) => {
271                inner.set_error(e.to_string());
272                return;
273            }
274        }
275    }
276
277    // Preserve ABORTED if the guest aborted between the final chunk and here.
278    let _ = inner.state.compare_exchange(
279        FETCH_STREAMING,
280        FETCH_DONE,
281        Ordering::SeqCst,
282        Ordering::SeqCst,
283    );
284}
285
286fn ensure_fetch(state: &Arc<Mutex<Option<FetchState>>>) -> bool {
287    let mut g = state.lock().unwrap();
288    if g.is_none() {
289        *g = FetchState::new();
290    }
291    g.is_some()
292}
293
294/// Register all `api_fetch_*` streaming host functions on the given linker.
295pub fn register_fetch_functions(linker: &mut Linker<HostState>) -> Result<()> {
296    // ── fetch_begin ──────────────────────────────────────────────────────
297    // api_fetch_begin(method_ptr, method_len, url_ptr, url_len,
298    //                 ct_ptr, ct_len, body_ptr, body_len) -> u32
299    //   Returns a handle (> 0), or 0 on error.
300    linker.func_wrap(
301        "oxide",
302        "api_fetch_begin",
303        |caller: Caller<'_, HostState>,
304         method_ptr: u32,
305         method_len: u32,
306         url_ptr: u32,
307         url_len: u32,
308         ct_ptr: u32,
309         ct_len: u32,
310         body_ptr: u32,
311         body_len: u32|
312         -> u32 {
313            let console = caller.data().console.clone();
314            let fetch = caller.data().fetch.clone();
315            if !ensure_fetch(&fetch) {
316                console_log(&console, ConsoleLevel::Error, "[FETCH] Init failed".into());
317                return 0;
318            }
319            let mem = match caller.data().memory {
320                Some(m) => m,
321                None => return 0,
322            };
323            let method =
324                read_guest_string(&mem, &caller, method_ptr, method_len).unwrap_or_default();
325            let url = read_guest_string(&mem, &caller, url_ptr, url_len).unwrap_or_default();
326            let content_type = read_guest_string(&mem, &caller, ct_ptr, ct_len).unwrap_or_default();
327            let body = if body_len > 0 {
328                read_guest_bytes(&mem, &caller, body_ptr, body_len).unwrap_or_default()
329            } else {
330                Vec::new()
331            };
332
333            let id = fetch.lock().unwrap().as_mut().unwrap().begin(
334                method.clone(),
335                url.clone(),
336                content_type,
337                body,
338            );
339
340            console_log(
341                &console,
342                ConsoleLevel::Log,
343                format!("[FETCH] {method} {url} (id={id})"),
344            );
345            id
346        },
347    )?;
348
349    // ── fetch_state ──────────────────────────────────────────────────────
350    // api_fetch_state(id) -> u32  (one of FETCH_* constants)
351    linker.func_wrap(
352        "oxide",
353        "api_fetch_state",
354        |caller: Caller<'_, HostState>, id: u32| -> u32 {
355            let fetch = caller.data().fetch.clone();
356            let g = fetch.lock().unwrap();
357            g.as_ref().map(|s| s.state(id)).unwrap_or(FETCH_ERROR)
358        },
359    )?;
360
361    // ── fetch_status ─────────────────────────────────────────────────────
362    // api_fetch_status(id) -> u32  (HTTP status, 0 before headers)
363    linker.func_wrap(
364        "oxide",
365        "api_fetch_status",
366        |caller: Caller<'_, HostState>, id: u32| -> u32 {
367            let fetch = caller.data().fetch.clone();
368            let g = fetch.lock().unwrap();
369            g.as_ref().map(|s| s.status(id)).unwrap_or(0)
370        },
371    )?;
372
373    // ── fetch_recv ───────────────────────────────────────────────────────
374    // api_fetch_recv(id, out_ptr, out_cap) -> i64
375    //   >= 0 : bytes written into `out_ptr` (one chunk, possibly partial)
376    //   -1   : pending (no chunk available, more may arrive)
377    //   -2   : end of stream (body fully delivered)
378    //   -3   : error (see api_fetch_error)
379    //   -4   : unknown handle
380    linker.func_wrap(
381        "oxide",
382        "api_fetch_recv",
383        |mut caller: Caller<'_, HostState>, id: u32, out_ptr: u32, out_cap: u32| -> i64 {
384            let fetch = caller.data().fetch.clone();
385            let result = {
386                let g = fetch.lock().unwrap();
387                match g.as_ref() {
388                    Some(s) => s.recv(id, out_cap as usize),
389                    None => return RECV_UNKNOWN,
390                }
391            };
392            match result {
393                RecvResult::Sentinel(code) => code,
394                RecvResult::Data(bytes) => {
395                    let mem = match caller.data().memory {
396                        Some(m) => m,
397                        None => return RECV_ERROR,
398                    };
399                    if write_guest_bytes(&mem, &mut caller, out_ptr, &bytes).is_err() {
400                        return RECV_ERROR;
401                    }
402                    bytes.len() as i64
403                }
404            }
405        },
406    )?;
407
408    // ── fetch_error ──────────────────────────────────────────────────────
409    // api_fetch_error(id, out_ptr, out_cap) -> i32
410    //   >= 0 : number of UTF-8 bytes written (possibly truncated)
411    //   -1   : no error set
412    linker.func_wrap(
413        "oxide",
414        "api_fetch_error",
415        |mut caller: Caller<'_, HostState>, id: u32, out_ptr: u32, out_cap: u32| -> i32 {
416            let fetch = caller.data().fetch.clone();
417            let msg = {
418                let g = fetch.lock().unwrap();
419                g.as_ref().and_then(|s| s.error(id))
420            };
421            let msg = match msg {
422                Some(m) => m,
423                None => return -1,
424            };
425            let mem = match caller.data().memory {
426                Some(m) => m,
427                None => return -1,
428            };
429            let bytes = msg.as_bytes();
430            let n = bytes.len().min(out_cap as usize);
431            if write_guest_bytes(&mem, &mut caller, out_ptr, &bytes[..n]).is_err() {
432                return -1;
433            }
434            n as i32
435        },
436    )?;
437
438    // ── fetch_abort ──────────────────────────────────────────────────────
439    // api_fetch_abort(id) -> i32  (1 if aborted, 0 if unknown handle)
440    linker.func_wrap(
441        "oxide",
442        "api_fetch_abort",
443        |caller: Caller<'_, HostState>, id: u32| -> i32 {
444            let fetch = caller.data().fetch.clone();
445            let g = fetch.lock().unwrap();
446            match g.as_ref() {
447                Some(s) => i32::from(s.abort(id)),
448                None => 0,
449            }
450        },
451    )?;
452
453    // ── fetch_remove ─────────────────────────────────────────────────────
454    // api_fetch_remove(id)  — free host-side resources.
455    linker.func_wrap(
456        "oxide",
457        "api_fetch_remove",
458        |caller: Caller<'_, HostState>, id: u32| {
459            let fetch = caller.data().fetch.clone();
460            let mut g = fetch.lock().unwrap();
461            if let Some(ref mut state) = *g {
462                state.remove(id);
463            }
464        },
465    )?;
466
467    Ok(())
468}