diff --git a/src/error.rs b/src/error.rs index 5060ec8..9b93841 100644 --- a/src/error.rs +++ b/src/error.rs @@ -41,6 +41,7 @@ impl InputEnvelope { OutputEnvelope { request_nr: self.request_nr, error: Some(Error::new(kind, class, msg)), + action: None, data: Value::Null, } } @@ -49,6 +50,7 @@ impl InputEnvelope { OutputEnvelope { request_nr: self.request_nr, error: Some(error), + action: None, data: Value::Null, } } @@ -85,8 +87,9 @@ impl From for OutputEnvelope { fn from(error: Error) -> Self { OutputEnvelope { error: Some(error), - data: Value::Null, request_nr: None, + action: None, + data: Value::Null, } } } diff --git a/src/http.rs b/src/http.rs index edb021a..8a771e5 100644 --- a/src/http.rs +++ b/src/http.rs @@ -3,7 +3,7 @@ use crate::usimp; use crate::usimp::*; use crate::websocket; use hyper::{body, header, Body, Method, Request, Response, StatusCode}; -use serde_json::{Map, Value}; +use serde_json::Value; use std::str::FromStr; use uuid::Uuid; @@ -146,32 +146,19 @@ pub async fn handler(mut req: Request) -> Result, hyper::Er }; if let Some(output) = output { - let mut data = Value::Object(Map::new()); - - match output.error { - Some(error) => { - res = match error.class { - ErrorClass::ClientProtocolError => res.status(StatusCode::BAD_REQUEST), - ErrorClass::ServerError => { - res.status(StatusCode::INTERNAL_SERVER_ERROR) - } - _ => res.status(StatusCode::OK), - }; - data["status"] = Value::from("error"); - data["error"] = Value::from(error); - } - None => { - data["status"] = Value::from("success"); - } + if let Some(ref error) = output.error { + res = match error.class { + ErrorClass::ClientProtocolError => res.status(StatusCode::BAD_REQUEST), + ErrorClass::ServerError => { + res.status(StatusCode::INTERNAL_SERVER_ERROR) + } + _ => res.status(StatusCode::OK), + }; } - data["request_nr"] = match output.request_nr { - Some(nr) => Value::from(nr), - None => Value::Null, - }; - data["data"] = output.data; + let data: Value = output.into(); return Ok(res - .body(Body::from(serde_json::to_string(&data).unwrap() + "\r\n")) + .body(Body::from(data.to_string() + "\r\n")) .unwrap()); } else { res = res.status(StatusCode::NO_CONTENT); diff --git a/src/usimp/handler/mod.rs b/src/usimp/handler/mod.rs index 33dbfa1..2f67148 100644 --- a/src/usimp/handler/mod.rs +++ b/src/usimp/handler/mod.rs @@ -28,10 +28,10 @@ pub async fn endpoint( println!("Endpoint: {}", input.endpoint); Ok(match input.endpoint.as_str() { - "ping" => input.respond(ping::handle(&input, session).await?), - "authenticate" => input.respond(authenticate::handle(&input, session).await?), - "subscribe" => input.respond(subscribe::handle(&input, session, tx).await?), - "new_event" => input.respond(new_event::handle(&input, session).await?), + "ping" => input.respond(ping::handle(&input, session).await?, None), + "authenticate" => input.respond(authenticate::handle(&input, session).await?, None), + "subscribe" => input.respond(subscribe::handle(&input, session, tx).await?, Some(OutputAction::Subscribe)), + "new_event" => input.respond(new_event::handle(&input, session).await?, None), _ => input.new_error( ErrorKind::UsimpError, ErrorClass::ClientProtocolError, diff --git a/src/usimp/handler/subscribe.rs b/src/usimp/handler/subscribe.rs index 2927dac..159d3dc 100644 --- a/src/usimp/handler/subscribe.rs +++ b/src/usimp/handler/subscribe.rs @@ -25,8 +25,7 @@ pub async fn handle( session, input.request_nr, tx, - ) - .await?, + ).await?, )?) } @@ -46,6 +45,7 @@ async fn subscribe( .send(OutputEnvelope { error: None, request_nr: req_nr, + action: Some(OutputAction::Push), data: serde_json::json![{"events": [event]}], }.into()) .await; diff --git a/src/usimp/mod.rs b/src/usimp/mod.rs index daed83c..c2f4270 100644 --- a/src/usimp/mod.rs +++ b/src/usimp/mod.rs @@ -9,9 +9,25 @@ use base64_url; use crypto::digest::Digest; use crypto::sha2::Sha256; use serde::{Deserialize, Serialize}; -use serde_json::Value; +use serde_json::{Map, Value}; use uuid::Uuid; +pub enum OutputAction { + Subscribe, + Unsubscribe, + Push, +} + +impl From for Value { + fn from(action: OutputAction) -> Self { + Value::from(match action { + OutputAction::Subscribe => "subscribe", + OutputAction::Unsubscribe => "unsubscribe", + OutputAction::Push => "push", + }) + } +} + #[derive(Serialize, Deserialize)] pub struct InputEnvelope { pub endpoint: String, @@ -25,9 +41,40 @@ pub struct InputEnvelope { pub struct OutputEnvelope { pub error: Option, pub request_nr: Option, + pub action: Option, pub data: Value, } +impl From for Value { + fn from(msg: OutputEnvelope) -> Self { + let mut envelope = Value::Object(Map::new()); + + envelope["request_nr"] = match msg.request_nr { + Some(nr) => Value::from(nr), + None => Value::Null, + }; + + match msg.error { + Some(error) => { + envelope["status"] = Value::from("error"); + envelope["error"] = Value::from(error); + } + None => { + envelope["status"] = Value::from("success"); + } + } + + envelope["action"] = match msg.action { + Some(a) => Value::from(a), + None => Value::Null, + }; + + envelope["data"] = msg.data; + + envelope + } +} + #[derive(Clone, Serialize, Deserialize)] pub struct Event { data: Value, @@ -69,10 +116,11 @@ pub fn get_account(session: &Option) -> Result<&Account, Error> { } impl InputEnvelope { - pub fn respond(&self, data: Value) -> OutputEnvelope { + pub fn respond(&self, data: Value, action: Option) -> OutputEnvelope { OutputEnvelope { error: None, request_nr: self.request_nr, + action, data, } } diff --git a/src/websocket.rs b/src/websocket.rs index 88e1037..d7be4e7 100644 --- a/src/websocket.rs +++ b/src/websocket.rs @@ -8,7 +8,7 @@ use hyper::{header, Body, Request, StatusCode}; use hyper_tungstenite::hyper::upgrade::Upgraded; use hyper_tungstenite::tungstenite::{handshake, Message}; use hyper_tungstenite::{tungstenite::protocol::Role, WebSocketStream}; -use serde_json::{Map, Value}; +use serde_json::Value; use tokio::sync::mpsc; pub enum WebSocketEnvelope { @@ -50,21 +50,7 @@ async fn sender( break; } WebSocketEnvelope::Text(msg) => { - let mut envelope = Value::Object(Map::new()); - envelope["data"] = msg.data; - envelope["request_nr"] = match msg.request_nr { - Some(nr) => Value::from(nr), - None => Value::Null, - }; - match msg.error { - Some(error) => { - envelope["status"] = Value::from("error"); - envelope["error"] = Value::from(error); - } - None => { - envelope["status"] = Value::from("success"); - } - } + let envelope: Value = msg.into(); if let Err(error) = sink.send(Message::Text(envelope.to_string())).await { eprintln!("{:?}", error); return; @@ -90,16 +76,11 @@ async fn receiver( _res = tx.send(WebSocketEnvelope::Close).await; break; } else if msg.is_binary() { - _res = tx.send(WebSocketEnvelope::Text(OutputEnvelope { - error: Some(Error { - kind: ErrorKind::WebSocketError, - class: ErrorClass::ClientProtocolError, - msg: Some("Binary frames are not allowed".to_string()), - desc: None, - }), - request_nr: None, - data: Value::Null, - })).await; + _res = tx.send(WebSocketEnvelope::Text(OutputEnvelope::from(Error::new( + ErrorKind::WebSocketError, + ErrorClass::ClientProtocolError, + Some("Binary frames are not allowed".to_string()) + )))).await; } else if msg.is_text() { let input: InputEnvelope = serde_json::from_slice(&msg.into_data()[..]).unwrap(); let output = match usimp::endpoint(&input, Some(tx.clone())).await { @@ -108,16 +89,11 @@ async fn receiver( }; _res = tx.send(WebSocketEnvelope::Text(output)).await; } else { - _res = tx.send(WebSocketEnvelope::Text(OutputEnvelope { - error: Some(Error { - kind: ErrorKind::WebSocketError, - class: ErrorClass::ClientProtocolError, - msg: Some("Unknown frame opcode".to_string()), - desc: None, - }), - request_nr: None, - data: Value::Null, - })).await; + _res = tx.send(WebSocketEnvelope::Text(OutputEnvelope::from(Error::new( + ErrorKind::WebSocketError, + ErrorClass::ClientProtocolError, + Some("Unknown frame opcode".to_string()) + )))).await; } } Err(error) => println!("{:?}", error),