From 66fa21f50f669abc28fb145f4c1c830040324abc Mon Sep 17 00:00:00 2001 From: Erica Z Date: Fri, 1 Nov 2024 10:16:02 +0100 Subject: [PATCH] also have tokio do streaming json parsing --- src/subsonic.rs | 70 ++++++++++++++++++++++++++++++------------------- 1 file changed, 43 insertions(+), 27 deletions(-) diff --git a/src/subsonic.rs b/src/subsonic.rs index 8cd71f6..8c0d36e 100644 --- a/src/subsonic.rs +++ b/src/subsonic.rs @@ -87,7 +87,48 @@ impl Client { }) } - async fn get( + async fn send( + &self, + request: reqwest::RequestBuilder, + ) -> Result { + // FIXME: is an entire channel per request overkill? maybe pool them? + let (sender, receiver) = async_channel::bounded(1); + + // let tokio take care of the request + further json parsing + // this is because reqwest doesn't like the glib main loop + // note that this is why those silly bounds on T are needed, because we're + // sending back the result of the query from another thread + let future = request.send(); + runtime().spawn(async move { + // wrap this logic in a fn so we can use ? + async fn perform( + response: Result, + ) -> Result, reqwest::Error> { + Ok(response? + .error_for_status()? + .json::>() + .await? + .subsonic_response) + } + + sender + .send(perform(future.await).await) + .await + .expect("could not send subsonic response back to the main loop"); + }); + + let response = receiver + .recv() + .await + .expect("could not receive subsonic response from tokio")?; + + match response { + schema::SubsonicResponse::Ok { inner } => Ok(inner), + schema::SubsonicResponse::Failed { error } => Err(Error::SubsonicError(error)), + } + } + + async fn get( &self, path: &[&str], query: &[(&str, &str)], @@ -97,32 +138,7 @@ impl Client { // literally can't fail .unwrap_or_else(|_| unsafe { std::hint::unreachable_unchecked() }) .extend(path); - - // FIXME: is an entire channel per request overkill? maybe pool them? - let (sender, receiver) = async_channel::bounded(1); - - let future = self.client.get(url).query(query).send(); - runtime().spawn(async move { - let response = future.await; - sender - .send(response) - .await - .expect("could not send subsonic response back to the main loop"); - }); - - let response = receiver - .recv() - .await - .expect("could not receive subsonic response from tokio")? - .error_for_status()? - .json::>() - .await? - .subsonic_response; - - match response { - schema::SubsonicResponse::Ok { inner } => Ok(inner), - schema::SubsonicResponse::Failed { error } => Err(Error::SubsonicError(error)), - } + self.send(self.client.get(url).query(query)).await } pub async fn ping(&self) -> Result<(), Error> {