Struct jsonrpsee::SubscriptionSink
source · pub struct SubscriptionSink { /* private fields */ }
Expand description
Represents a single subscription.
Implementations§
source§impl SubscriptionSink
impl SubscriptionSink
sourcepub fn reject(
&mut self,
err: impl Into<ErrorObject<'static>>
) -> Result<(), SubscriptionAcceptRejectError>
pub fn reject( &mut self, err: impl Into<ErrorObject<'static>> ) -> Result<(), SubscriptionAcceptRejectError>
Reject the subscription call from ErrorObject
.
sourcepub fn accept(&mut self) -> Result<(), SubscriptionAcceptRejectError>
pub fn accept(&mut self) -> Result<(), SubscriptionAcceptRejectError>
Attempt to accept the subscription and respond the subscription method call.
Fails if the connection was closed, or if called multiple times.
sourcepub fn subscription_id(&self) -> Option<SubscriptionId<'static>>
pub fn subscription_id(&self) -> Option<SubscriptionId<'static>>
Return the subscription ID if the the subscription was accepted.
SubscriptionSink::accept
should be called prior to this method.
sourcepub fn send<T>(&mut self, result: &T) -> Result<bool, Error>where
T: Serialize,
pub fn send<T>(&mut self, result: &T) -> Result<bool, Error>where T: Serialize,
Send a message back to subscribers.
Returns
Ok(true)
if the message could be send.Ok(false)
if the sink was closed (either because the subscription was closed or the connection was terminated), or the subscription could not be accepted.Err(err)
if the message could not be serialized.
sourcepub async fn pipe_from_try_stream<S, T, E>(
&mut self,
stream: S
) -> impl Future<Output = SubscriptionClosed>where
S: TryStream<Ok = T, Error = E> + Unpin,
T: Serialize,
E: Display,
pub async fn pipe_from_try_stream<S, T, E>( &mut self, stream: S ) -> impl Future<Output = SubscriptionClosed>where S: TryStream<Ok = T, Error = E> + Unpin, T: Serialize, E: Display,
Reads data from the stream
and sends back data on the subscription
when items gets produced by the stream.
The underlying stream must produce Result values, see [
futures_util::TryStream`] for further information.
Returns Ok(())
if the stream or connection was terminated.
Returns Err(_)
immediately if the underlying stream returns an error or if an item from the stream could not be serialized.
Examples
use jsonrpsee_core::server::rpc_module::RpcModule;
use jsonrpsee_core::error::{Error, SubscriptionClosed};
use jsonrpsee_types::ErrorObjectOwned;
use anyhow::anyhow;
let mut m = RpcModule::new(());
m.register_subscription("sub", "_", "unsub", |params, mut sink, _| {
let stream = futures_util::stream::iter(vec![Ok(1_u32), Ok(2), Err("error on the stream")]);
// This will return send `[Ok(1_u32), Ok(2_u32), Err(Error::SubscriptionClosed))]` to the subscriber
// because after the `Err(_)` the stream is terminated.
let stream = futures_util::stream::iter(vec![Ok(1_u32), Ok(2), Err("error on the stream")]);
tokio::spawn(async move {
// jsonrpsee doesn't send an error notification unless `close` is explicitly called.
// If we pipe messages to the sink, we can inspect why it ended:
match sink.pipe_from_try_stream(stream).await {
SubscriptionClosed::Success => {
let err_obj: ErrorObjectOwned = SubscriptionClosed::Success.into();
sink.close(err_obj);
}
// we don't want to send close reason when the client is unsubscribed or disconnected.
SubscriptionClosed::RemotePeerAborted => (),
SubscriptionClosed::Failed(e) => {
sink.close(e);
}
}
});
Ok(())
});
sourcepub async fn pipe_from_stream<S, T>(
&mut self,
stream: S
) -> impl Future<Output = SubscriptionClosed>where
S: Stream<Item = T> + Unpin,
T: Serialize,
pub async fn pipe_from_stream<S, T>( &mut self, stream: S ) -> impl Future<Output = SubscriptionClosed>where S: Stream<Item = T> + Unpin, T: Serialize,
Similar to SubscriptionSink::pipe_from_try_stream
but it doesn’t require the stream return Result
.
Warning: it’s possible to pass in a stream that returns Result
if Result: Serialize
is satisfied
but it won’t cancel the stream when an error occurs. If you want the stream to be canceled when an
error occurs use SubscriptionSink::pipe_from_try_stream
instead.
Examples
use jsonrpsee_core::server::rpc_module::RpcModule;
let mut m = RpcModule::new(());
m.register_subscription("sub", "_", "unsub", |params, mut sink, _| {
let stream = futures_util::stream::iter(vec![1_usize, 2, 3]);
tokio::spawn(async move { sink.pipe_from_stream(stream).await; });
Ok(())
});
sourcepub fn close(self, err: impl Into<ErrorObject<'static>>) -> bool
pub fn close(self, err: impl Into<ErrorObject<'static>>) -> bool
Close the subscription, sending a notification with a special error
field containing the provided error.
This can be used to signal an actual error, or just to signal that the subscription has been closed, depending on your preference.
If you’d like to to close the subscription without sending an error, just drop it and don’t call this method.
{
"jsonrpc": "2.0",
"method": "<method>",
"params": {
"subscription": "<subscriptionID>",
"error": { "code": <code from error>, "message": <message from error>, "data": <data from error> }
}
}
}