also have tokio do streaming json parsing

This commit is contained in:
Erica Z 2024-11-01 10:16:02 +01:00
parent 6892f99070
commit 66fa21f50f

View file

@ -87,7 +87,48 @@ impl Client {
}) })
} }
async fn get<T: serde::de::DeserializeOwned>( async fn send<T: serde::de::DeserializeOwned + Send + 'static>(
&self,
request: reqwest::RequestBuilder,
) -> Result<T, Error> {
// FIXME: is an entire channel per request overkill? maybe pool them?
let (sender, receiver) = async_channel::bounded(1);
// let tokio take care of the request + further json parsing
// this is because reqwest doesn't like the glib main loop
// note that this is why those silly bounds on T are needed, because we're
// sending back the result of the query from another thread
let future = request.send();
runtime().spawn(async move {
// wrap this logic in a fn so we can use ?
async fn perform<T: serde::de::DeserializeOwned + Send + 'static>(
response: Result<reqwest::Response, reqwest::Error>,
) -> Result<schema::SubsonicResponse<T>, reqwest::Error> {
Ok(response?
.error_for_status()?
.json::<schema::SubsonicResponseOuter<T>>()
.await?
.subsonic_response)
}
sender
.send(perform(future.await).await)
.await
.expect("could not send subsonic response back to the main loop");
});
let response = receiver
.recv()
.await
.expect("could not receive subsonic response from tokio")?;
match response {
schema::SubsonicResponse::Ok { inner } => Ok(inner),
schema::SubsonicResponse::Failed { error } => Err(Error::SubsonicError(error)),
}
}
async fn get<T: serde::de::DeserializeOwned + Send + 'static>(
&self, &self,
path: &[&str], path: &[&str],
query: &[(&str, &str)], query: &[(&str, &str)],
@ -97,32 +138,7 @@ impl Client {
// literally can't fail // literally can't fail
.unwrap_or_else(|_| unsafe { std::hint::unreachable_unchecked() }) .unwrap_or_else(|_| unsafe { std::hint::unreachable_unchecked() })
.extend(path); .extend(path);
self.send(self.client.get(url).query(query)).await
// FIXME: is an entire channel per request overkill? maybe pool them?
let (sender, receiver) = async_channel::bounded(1);
let future = self.client.get(url).query(query).send();
runtime().spawn(async move {
let response = future.await;
sender
.send(response)
.await
.expect("could not send subsonic response back to the main loop");
});
let response = receiver
.recv()
.await
.expect("could not receive subsonic response from tokio")?
.error_for_status()?
.json::<schema::SubsonicResponseOuter<T>>()
.await?
.subsonic_response;
match response {
schema::SubsonicResponse::Ok { inner } => Ok(inner),
schema::SubsonicResponse::Failed { error } => Err(Error::SubsonicError(error)),
}
} }
pub async fn ping(&self) -> Result<(), Error> { pub async fn ping(&self) -> Result<(), Error> {