1use std::collections::{HashMap, VecDeque};
21use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
22use std::sync::{Arc, Mutex};
23use std::time::Duration;
24
25use anyhow::Result;
26use futures_util::StreamExt;
27use tokio::runtime::Runtime;
28use wasmtime::{Caller, Linker};
29
30use crate::capabilities::{
31 console_log, read_guest_bytes, read_guest_string, write_guest_bytes, ConsoleLevel, HostState,
32};
33
34pub const FETCH_PENDING: u32 = 0;
38pub const FETCH_STREAMING: u32 = 1;
40pub const FETCH_DONE: u32 = 2;
42pub const FETCH_ERROR: u32 = 3;
44pub const FETCH_ABORTED: u32 = 4;
46
47const RECV_PENDING: i64 = -1;
50const RECV_EOF: i64 = -2;
51const RECV_ERROR: i64 = -3;
52const RECV_UNKNOWN: i64 = -4;
53
54struct FetchInner {
56 state: AtomicU32,
58 status: AtomicU32,
60 aborted: AtomicBool,
62 chunks: Mutex<VecDeque<Vec<u8>>>,
65 error: Mutex<Option<String>>,
67}
68
69impl FetchInner {
70 fn new() -> Self {
71 Self {
72 state: AtomicU32::new(FETCH_PENDING),
73 status: AtomicU32::new(0),
74 aborted: AtomicBool::new(false),
75 chunks: Mutex::new(VecDeque::new()),
76 error: Mutex::new(None),
77 }
78 }
79
80 fn set_error(&self, msg: impl Into<String>) {
81 *self.error.lock().unwrap() = Some(msg.into());
82 self.state.store(FETCH_ERROR, Ordering::SeqCst);
83 }
84}
85
86pub struct FetchState {
89 runtime: Runtime,
90 handles: HashMap<u32, Arc<FetchInner>>,
91 next_id: u32,
92}
93
94impl FetchState {
95 pub fn new() -> Option<Self> {
96 let runtime = Runtime::new().ok()?;
97 Some(Self {
98 runtime,
99 handles: HashMap::new(),
100 next_id: 1,
101 })
102 }
103
104 fn alloc_id(&mut self) -> u32 {
105 let id = self.next_id;
106 self.next_id = self.next_id.wrapping_add(1).max(1);
107 id
108 }
109
110 fn begin(&mut self, method: String, url: String, content_type: String, body: Vec<u8>) -> u32 {
112 let id = self.alloc_id();
113 let inner = Arc::new(FetchInner::new());
114 let driver_inner = inner.clone();
115
116 self.runtime.spawn(async move {
117 drive_request(driver_inner, method, url, content_type, body).await;
118 });
119
120 self.handles.insert(id, inner);
121 id
122 }
123
124 fn state(&self, id: u32) -> u32 {
125 self.handles
126 .get(&id)
127 .map(|h| h.state.load(Ordering::SeqCst))
128 .unwrap_or(FETCH_ERROR)
129 }
130
131 fn status(&self, id: u32) -> u32 {
132 self.handles
133 .get(&id)
134 .map(|h| h.status.load(Ordering::SeqCst))
135 .unwrap_or(0)
136 }
137
138 fn error(&self, id: u32) -> Option<String> {
139 self.handles
140 .get(&id)
141 .and_then(|h| h.error.lock().unwrap().clone())
142 }
143
144 fn recv(&self, id: u32, cap: usize) -> RecvResult {
151 let Some(inner) = self.handles.get(&id) else {
152 return RecvResult::Sentinel(RECV_UNKNOWN);
153 };
154
155 let mut q = inner.chunks.lock().unwrap();
156 if let Some(mut chunk) = q.pop_front() {
157 if chunk.len() > cap {
158 let remainder = chunk.split_off(cap);
159 q.push_front(remainder);
160 }
161 return RecvResult::Data(chunk);
162 }
163 drop(q);
164
165 match inner.state.load(Ordering::SeqCst) {
166 FETCH_DONE => RecvResult::Sentinel(RECV_EOF),
167 FETCH_ERROR | FETCH_ABORTED => RecvResult::Sentinel(RECV_ERROR),
168 _ => RecvResult::Sentinel(RECV_PENDING),
169 }
170 }
171
172 fn abort(&self, id: u32) -> bool {
173 if let Some(inner) = self.handles.get(&id) {
174 inner.aborted.store(true, Ordering::SeqCst);
175 let _ = inner.state.compare_exchange(
178 FETCH_PENDING,
179 FETCH_ABORTED,
180 Ordering::SeqCst,
181 Ordering::SeqCst,
182 );
183 let _ = inner.state.compare_exchange(
184 FETCH_STREAMING,
185 FETCH_ABORTED,
186 Ordering::SeqCst,
187 Ordering::SeqCst,
188 );
189 true
190 } else {
191 false
192 }
193 }
194
195 fn remove(&mut self, id: u32) {
196 if let Some(inner) = self.handles.remove(&id) {
197 inner.aborted.store(true, Ordering::SeqCst);
198 }
199 }
200}
201
202enum RecvResult {
203 Data(Vec<u8>),
204 Sentinel(i64),
205}
206
207async fn drive_request(
209 inner: Arc<FetchInner>,
210 method: String,
211 url: String,
212 content_type: String,
213 body: Vec<u8>,
214) {
215 if inner.aborted.load(Ordering::SeqCst) {
216 inner.state.store(FETCH_ABORTED, Ordering::SeqCst);
217 return;
218 }
219
220 let client = match reqwest::Client::builder()
221 .timeout(Duration::from_secs(60))
222 .build()
223 {
224 Ok(c) => c,
225 Err(e) => {
226 inner.set_error(format!("client build failed: {e}"));
227 return;
228 }
229 };
230
231 let parsed_method: reqwest::Method = method.parse().unwrap_or(reqwest::Method::GET);
232 let mut req = client.request(parsed_method, &url);
233 if !content_type.is_empty() {
234 req = req.header("Content-Type", &content_type);
235 }
236 if !body.is_empty() {
237 req = req.body(body);
238 }
239
240 let resp = match req.send().await {
241 Ok(r) => r,
242 Err(e) => {
243 inner.set_error(e.to_string());
244 return;
245 }
246 };
247
248 if inner.aborted.load(Ordering::SeqCst) {
249 inner.state.store(FETCH_ABORTED, Ordering::SeqCst);
250 return;
251 }
252
253 inner
254 .status
255 .store(resp.status().as_u16() as u32, Ordering::SeqCst);
256 inner.state.store(FETCH_STREAMING, Ordering::SeqCst);
257
258 let mut stream = resp.bytes_stream();
259 while let Some(next) = stream.next().await {
260 if inner.aborted.load(Ordering::SeqCst) {
261 inner.state.store(FETCH_ABORTED, Ordering::SeqCst);
262 return;
263 }
264 match next {
265 Ok(chunk) => {
266 if !chunk.is_empty() {
267 inner.chunks.lock().unwrap().push_back(chunk.to_vec());
268 }
269 }
270 Err(e) => {
271 inner.set_error(e.to_string());
272 return;
273 }
274 }
275 }
276
277 let _ = inner.state.compare_exchange(
279 FETCH_STREAMING,
280 FETCH_DONE,
281 Ordering::SeqCst,
282 Ordering::SeqCst,
283 );
284}
285
286fn ensure_fetch(state: &Arc<Mutex<Option<FetchState>>>) -> bool {
287 let mut g = state.lock().unwrap();
288 if g.is_none() {
289 *g = FetchState::new();
290 }
291 g.is_some()
292}
293
294pub fn register_fetch_functions(linker: &mut Linker<HostState>) -> Result<()> {
296 linker.func_wrap(
301 "oxide",
302 "api_fetch_begin",
303 |caller: Caller<'_, HostState>,
304 method_ptr: u32,
305 method_len: u32,
306 url_ptr: u32,
307 url_len: u32,
308 ct_ptr: u32,
309 ct_len: u32,
310 body_ptr: u32,
311 body_len: u32|
312 -> u32 {
313 let console = caller.data().console.clone();
314 let fetch = caller.data().fetch.clone();
315 if !ensure_fetch(&fetch) {
316 console_log(&console, ConsoleLevel::Error, "[FETCH] Init failed".into());
317 return 0;
318 }
319 let mem = match caller.data().memory {
320 Some(m) => m,
321 None => return 0,
322 };
323 let method =
324 read_guest_string(&mem, &caller, method_ptr, method_len).unwrap_or_default();
325 let url = read_guest_string(&mem, &caller, url_ptr, url_len).unwrap_or_default();
326 let content_type = read_guest_string(&mem, &caller, ct_ptr, ct_len).unwrap_or_default();
327 let body = if body_len > 0 {
328 read_guest_bytes(&mem, &caller, body_ptr, body_len).unwrap_or_default()
329 } else {
330 Vec::new()
331 };
332
333 let id = fetch.lock().unwrap().as_mut().unwrap().begin(
334 method.clone(),
335 url.clone(),
336 content_type,
337 body,
338 );
339
340 console_log(
341 &console,
342 ConsoleLevel::Log,
343 format!("[FETCH] {method} {url} (id={id})"),
344 );
345 id
346 },
347 )?;
348
349 linker.func_wrap(
352 "oxide",
353 "api_fetch_state",
354 |caller: Caller<'_, HostState>, id: u32| -> u32 {
355 let fetch = caller.data().fetch.clone();
356 let g = fetch.lock().unwrap();
357 g.as_ref().map(|s| s.state(id)).unwrap_or(FETCH_ERROR)
358 },
359 )?;
360
361 linker.func_wrap(
364 "oxide",
365 "api_fetch_status",
366 |caller: Caller<'_, HostState>, id: u32| -> u32 {
367 let fetch = caller.data().fetch.clone();
368 let g = fetch.lock().unwrap();
369 g.as_ref().map(|s| s.status(id)).unwrap_or(0)
370 },
371 )?;
372
373 linker.func_wrap(
381 "oxide",
382 "api_fetch_recv",
383 |mut caller: Caller<'_, HostState>, id: u32, out_ptr: u32, out_cap: u32| -> i64 {
384 let fetch = caller.data().fetch.clone();
385 let result = {
386 let g = fetch.lock().unwrap();
387 match g.as_ref() {
388 Some(s) => s.recv(id, out_cap as usize),
389 None => return RECV_UNKNOWN,
390 }
391 };
392 match result {
393 RecvResult::Sentinel(code) => code,
394 RecvResult::Data(bytes) => {
395 let mem = match caller.data().memory {
396 Some(m) => m,
397 None => return RECV_ERROR,
398 };
399 if write_guest_bytes(&mem, &mut caller, out_ptr, &bytes).is_err() {
400 return RECV_ERROR;
401 }
402 bytes.len() as i64
403 }
404 }
405 },
406 )?;
407
408 linker.func_wrap(
413 "oxide",
414 "api_fetch_error",
415 |mut caller: Caller<'_, HostState>, id: u32, out_ptr: u32, out_cap: u32| -> i32 {
416 let fetch = caller.data().fetch.clone();
417 let msg = {
418 let g = fetch.lock().unwrap();
419 g.as_ref().and_then(|s| s.error(id))
420 };
421 let msg = match msg {
422 Some(m) => m,
423 None => return -1,
424 };
425 let mem = match caller.data().memory {
426 Some(m) => m,
427 None => return -1,
428 };
429 let bytes = msg.as_bytes();
430 let n = bytes.len().min(out_cap as usize);
431 if write_guest_bytes(&mem, &mut caller, out_ptr, &bytes[..n]).is_err() {
432 return -1;
433 }
434 n as i32
435 },
436 )?;
437
438 linker.func_wrap(
441 "oxide",
442 "api_fetch_abort",
443 |caller: Caller<'_, HostState>, id: u32| -> i32 {
444 let fetch = caller.data().fetch.clone();
445 let g = fetch.lock().unwrap();
446 match g.as_ref() {
447 Some(s) => i32::from(s.abort(id)),
448 None => 0,
449 }
450 },
451 )?;
452
453 linker.func_wrap(
456 "oxide",
457 "api_fetch_remove",
458 |caller: Caller<'_, HostState>, id: u32| {
459 let fetch = caller.data().fetch.clone();
460 let mut g = fetch.lock().unwrap();
461 if let Some(ref mut state) = *g {
462 state.remove(id);
463 }
464 },
465 )?;
466
467 Ok(())
468}