Add action to output envelope
This commit is contained in:
@ -41,6 +41,7 @@ impl InputEnvelope {
|
||||
OutputEnvelope {
|
||||
request_nr: self.request_nr,
|
||||
error: Some(Error::new(kind, class, msg)),
|
||||
action: None,
|
||||
data: Value::Null,
|
||||
}
|
||||
}
|
||||
@ -49,6 +50,7 @@ impl InputEnvelope {
|
||||
OutputEnvelope {
|
||||
request_nr: self.request_nr,
|
||||
error: Some(error),
|
||||
action: None,
|
||||
data: Value::Null,
|
||||
}
|
||||
}
|
||||
@ -85,8 +87,9 @@ impl From<Error> for OutputEnvelope {
|
||||
fn from(error: Error) -> Self {
|
||||
OutputEnvelope {
|
||||
error: Some(error),
|
||||
data: Value::Null,
|
||||
request_nr: None,
|
||||
action: None,
|
||||
data: Value::Null,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
35
src/http.rs
35
src/http.rs
@ -3,7 +3,7 @@ use crate::usimp;
|
||||
use crate::usimp::*;
|
||||
use crate::websocket;
|
||||
use hyper::{body, header, Body, Method, Request, Response, StatusCode};
|
||||
use serde_json::{Map, Value};
|
||||
use serde_json::Value;
|
||||
use std::str::FromStr;
|
||||
use uuid::Uuid;
|
||||
|
||||
@ -146,32 +146,19 @@ pub async fn handler(mut req: Request<Body>) -> Result<Response<Body>, hyper::Er
|
||||
};
|
||||
|
||||
if let Some(output) = output {
|
||||
let mut data = Value::Object(Map::new());
|
||||
|
||||
match output.error {
|
||||
Some(error) => {
|
||||
res = match error.class {
|
||||
ErrorClass::ClientProtocolError => res.status(StatusCode::BAD_REQUEST),
|
||||
ErrorClass::ServerError => {
|
||||
res.status(StatusCode::INTERNAL_SERVER_ERROR)
|
||||
}
|
||||
_ => res.status(StatusCode::OK),
|
||||
};
|
||||
data["status"] = Value::from("error");
|
||||
data["error"] = Value::from(error);
|
||||
}
|
||||
None => {
|
||||
data["status"] = Value::from("success");
|
||||
}
|
||||
if let Some(ref error) = output.error {
|
||||
res = match error.class {
|
||||
ErrorClass::ClientProtocolError => res.status(StatusCode::BAD_REQUEST),
|
||||
ErrorClass::ServerError => {
|
||||
res.status(StatusCode::INTERNAL_SERVER_ERROR)
|
||||
}
|
||||
_ => res.status(StatusCode::OK),
|
||||
};
|
||||
}
|
||||
|
||||
data["request_nr"] = match output.request_nr {
|
||||
Some(nr) => Value::from(nr),
|
||||
None => Value::Null,
|
||||
};
|
||||
data["data"] = output.data;
|
||||
let data: Value = output.into();
|
||||
return Ok(res
|
||||
.body(Body::from(serde_json::to_string(&data).unwrap() + "\r\n"))
|
||||
.body(Body::from(data.to_string() + "\r\n"))
|
||||
.unwrap());
|
||||
} else {
|
||||
res = res.status(StatusCode::NO_CONTENT);
|
||||
|
@ -28,10 +28,10 @@ pub async fn endpoint(
|
||||
|
||||
println!("Endpoint: {}", input.endpoint);
|
||||
Ok(match input.endpoint.as_str() {
|
||||
"ping" => input.respond(ping::handle(&input, session).await?),
|
||||
"authenticate" => input.respond(authenticate::handle(&input, session).await?),
|
||||
"subscribe" => input.respond(subscribe::handle(&input, session, tx).await?),
|
||||
"new_event" => input.respond(new_event::handle(&input, session).await?),
|
||||
"ping" => input.respond(ping::handle(&input, session).await?, None),
|
||||
"authenticate" => input.respond(authenticate::handle(&input, session).await?, None),
|
||||
"subscribe" => input.respond(subscribe::handle(&input, session, tx).await?, Some(OutputAction::Subscribe)),
|
||||
"new_event" => input.respond(new_event::handle(&input, session).await?, None),
|
||||
_ => input.new_error(
|
||||
ErrorKind::UsimpError,
|
||||
ErrorClass::ClientProtocolError,
|
||||
|
@ -25,8 +25,7 @@ pub async fn handle(
|
||||
session,
|
||||
input.request_nr,
|
||||
tx,
|
||||
)
|
||||
.await?,
|
||||
).await?,
|
||||
)?)
|
||||
}
|
||||
|
||||
@ -46,6 +45,7 @@ async fn subscribe(
|
||||
.send(OutputEnvelope {
|
||||
error: None,
|
||||
request_nr: req_nr,
|
||||
action: Some(OutputAction::Push),
|
||||
data: serde_json::json![{"events": [event]}],
|
||||
}.into())
|
||||
.await;
|
||||
|
@ -9,9 +9,25 @@ use base64_url;
|
||||
use crypto::digest::Digest;
|
||||
use crypto::sha2::Sha256;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::Value;
|
||||
use serde_json::{Map, Value};
|
||||
use uuid::Uuid;
|
||||
|
||||
pub enum OutputAction {
|
||||
Subscribe,
|
||||
Unsubscribe,
|
||||
Push,
|
||||
}
|
||||
|
||||
impl From<OutputAction> for Value {
|
||||
fn from(action: OutputAction) -> Self {
|
||||
Value::from(match action {
|
||||
OutputAction::Subscribe => "subscribe",
|
||||
OutputAction::Unsubscribe => "unsubscribe",
|
||||
OutputAction::Push => "push",
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct InputEnvelope {
|
||||
pub endpoint: String,
|
||||
@ -25,9 +41,40 @@ pub struct InputEnvelope {
|
||||
pub struct OutputEnvelope {
|
||||
pub error: Option<Error>,
|
||||
pub request_nr: Option<u64>,
|
||||
pub action: Option<OutputAction>,
|
||||
pub data: Value,
|
||||
}
|
||||
|
||||
impl From<OutputEnvelope> for Value {
|
||||
fn from(msg: OutputEnvelope) -> Self {
|
||||
let mut envelope = Value::Object(Map::new());
|
||||
|
||||
envelope["request_nr"] = match msg.request_nr {
|
||||
Some(nr) => Value::from(nr),
|
||||
None => Value::Null,
|
||||
};
|
||||
|
||||
match msg.error {
|
||||
Some(error) => {
|
||||
envelope["status"] = Value::from("error");
|
||||
envelope["error"] = Value::from(error);
|
||||
}
|
||||
None => {
|
||||
envelope["status"] = Value::from("success");
|
||||
}
|
||||
}
|
||||
|
||||
envelope["action"] = match msg.action {
|
||||
Some(a) => Value::from(a),
|
||||
None => Value::Null,
|
||||
};
|
||||
|
||||
envelope["data"] = msg.data;
|
||||
|
||||
envelope
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Serialize, Deserialize)]
|
||||
pub struct Event {
|
||||
data: Value,
|
||||
@ -69,10 +116,11 @@ pub fn get_account(session: &Option<Session>) -> Result<&Account, Error> {
|
||||
}
|
||||
|
||||
impl InputEnvelope {
|
||||
pub fn respond(&self, data: Value) -> OutputEnvelope {
|
||||
pub fn respond(&self, data: Value, action: Option<OutputAction>) -> OutputEnvelope {
|
||||
OutputEnvelope {
|
||||
error: None,
|
||||
request_nr: self.request_nr,
|
||||
action,
|
||||
data,
|
||||
}
|
||||
}
|
||||
|
@ -8,7 +8,7 @@ use hyper::{header, Body, Request, StatusCode};
|
||||
use hyper_tungstenite::hyper::upgrade::Upgraded;
|
||||
use hyper_tungstenite::tungstenite::{handshake, Message};
|
||||
use hyper_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
|
||||
use serde_json::{Map, Value};
|
||||
use serde_json::Value;
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
pub enum WebSocketEnvelope {
|
||||
@ -50,21 +50,7 @@ async fn sender(
|
||||
break;
|
||||
}
|
||||
WebSocketEnvelope::Text(msg) => {
|
||||
let mut envelope = Value::Object(Map::new());
|
||||
envelope["data"] = msg.data;
|
||||
envelope["request_nr"] = match msg.request_nr {
|
||||
Some(nr) => Value::from(nr),
|
||||
None => Value::Null,
|
||||
};
|
||||
match msg.error {
|
||||
Some(error) => {
|
||||
envelope["status"] = Value::from("error");
|
||||
envelope["error"] = Value::from(error);
|
||||
}
|
||||
None => {
|
||||
envelope["status"] = Value::from("success");
|
||||
}
|
||||
}
|
||||
let envelope: Value = msg.into();
|
||||
if let Err(error) = sink.send(Message::Text(envelope.to_string())).await {
|
||||
eprintln!("{:?}", error);
|
||||
return;
|
||||
@ -90,16 +76,11 @@ async fn receiver(
|
||||
_res = tx.send(WebSocketEnvelope::Close).await;
|
||||
break;
|
||||
} else if msg.is_binary() {
|
||||
_res = tx.send(WebSocketEnvelope::Text(OutputEnvelope {
|
||||
error: Some(Error {
|
||||
kind: ErrorKind::WebSocketError,
|
||||
class: ErrorClass::ClientProtocolError,
|
||||
msg: Some("Binary frames are not allowed".to_string()),
|
||||
desc: None,
|
||||
}),
|
||||
request_nr: None,
|
||||
data: Value::Null,
|
||||
})).await;
|
||||
_res = tx.send(WebSocketEnvelope::Text(OutputEnvelope::from(Error::new(
|
||||
ErrorKind::WebSocketError,
|
||||
ErrorClass::ClientProtocolError,
|
||||
Some("Binary frames are not allowed".to_string())
|
||||
)))).await;
|
||||
} else if msg.is_text() {
|
||||
let input: InputEnvelope = serde_json::from_slice(&msg.into_data()[..]).unwrap();
|
||||
let output = match usimp::endpoint(&input, Some(tx.clone())).await {
|
||||
@ -108,16 +89,11 @@ async fn receiver(
|
||||
};
|
||||
_res = tx.send(WebSocketEnvelope::Text(output)).await;
|
||||
} else {
|
||||
_res = tx.send(WebSocketEnvelope::Text(OutputEnvelope {
|
||||
error: Some(Error {
|
||||
kind: ErrorKind::WebSocketError,
|
||||
class: ErrorClass::ClientProtocolError,
|
||||
msg: Some("Unknown frame opcode".to_string()),
|
||||
desc: None,
|
||||
}),
|
||||
request_nr: None,
|
||||
data: Value::Null,
|
||||
})).await;
|
||||
_res = tx.send(WebSocketEnvelope::Text(OutputEnvelope::from(Error::new(
|
||||
ErrorKind::WebSocketError,
|
||||
ErrorClass::ClientProtocolError,
|
||||
Some("Unknown frame opcode".to_string())
|
||||
)))).await;
|
||||
}
|
||||
}
|
||||
Err(error) => println!("{:?}", error),
|
||||
|
Reference in New Issue
Block a user