Database pool working
This commit is contained in:
@ -12,3 +12,5 @@ json = "0.12.4"
|
|||||||
openssl = {version = "0.10", features = ["vendored"]}
|
openssl = {version = "0.10", features = ["vendored"]}
|
||||||
chrono = "0.4"
|
chrono = "0.4"
|
||||||
flate2 = "1.0.0"
|
flate2 = "1.0.0"
|
||||||
|
r2d2 = "0.8.9"
|
||||||
|
r2d2_postgres = "0.18.0"
|
||||||
|
@ -3,6 +3,8 @@ use crate::usimp;
|
|||||||
use crate::websocket;
|
use crate::websocket;
|
||||||
use chrono;
|
use chrono;
|
||||||
use json;
|
use json;
|
||||||
|
use std::sync::{Arc, Mutex};
|
||||||
|
use std::borrow::Borrow;
|
||||||
|
|
||||||
pub struct HttpStream {
|
pub struct HttpStream {
|
||||||
pub stream: super::Stream,
|
pub stream: super::Stream,
|
||||||
@ -157,7 +159,7 @@ fn endpoint_handler(
|
|||||||
let output = usimp::endpoint(endpoint, input);
|
let output = usimp::endpoint(endpoint, input);
|
||||||
|
|
||||||
// TODO compress
|
// TODO compress
|
||||||
let buf = output.to_string() + "\r\n";
|
let buf = json::stringify(output) + "\r\n";
|
||||||
let length = buf.as_bytes().len();
|
let length = buf.as_bytes().len();
|
||||||
res.add_header("Content-Length", length.to_string().as_str());
|
res.add_header("Content-Length", length.to_string().as_str());
|
||||||
res.add_header("Content-Type", "application/json; charset=utf-8");
|
res.add_header("Content-Type", "application/json; charset=utf-8");
|
||||||
|
158
src/main.rs
158
src/main.rs
@ -4,68 +4,128 @@ mod usimp;
|
|||||||
mod websocket;
|
mod websocket;
|
||||||
|
|
||||||
use openssl::ssl::{SslAcceptor, SslFiletype, SslMethod, SslStream};
|
use openssl::ssl::{SslAcceptor, SslFiletype, SslMethod, SslStream};
|
||||||
use std::net::{TcpListener, UdpSocket};
|
use std::net::{TcpListener, UdpSocket, SocketAddr};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::sync::Mutex;
|
use std::sync::Mutex;
|
||||||
use std::thread;
|
use std::thread;
|
||||||
use threadpool::ThreadPool;
|
use threadpool::ThreadPool;
|
||||||
|
use r2d2_postgres::{PostgresConnectionManager, postgres::NoTls};
|
||||||
|
use r2d2;
|
||||||
|
use r2d2::{ManageConnection, Pool};
|
||||||
|
use std::ops::Deref;
|
||||||
|
|
||||||
|
enum SocketType {
|
||||||
|
Http,
|
||||||
|
Https,
|
||||||
|
Udp,
|
||||||
|
}
|
||||||
|
|
||||||
|
struct SocketConfig {
|
||||||
|
address: SocketAddr,
|
||||||
|
socket_type: SocketType,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub enum BackendPool {
|
||||||
|
Postgres(Pool<PostgresConnectionManager<NoTls>>),
|
||||||
|
}
|
||||||
|
|
||||||
|
pub enum BackendClient {
|
||||||
|
Postgres(r2d2::PooledConnection<PostgresConnectionManager<NoTls>>),
|
||||||
|
}
|
||||||
|
|
||||||
|
static mut DB_POOL: Option<Arc<Mutex<BackendPool>>> = None;
|
||||||
|
|
||||||
|
pub fn get_backend() -> BackendClient {
|
||||||
|
unsafe {
|
||||||
|
match DB_POOL.as_ref().unwrap().clone().lock().unwrap().deref() {
|
||||||
|
BackendPool::Postgres(pool) => {
|
||||||
|
BackendClient::Postgres(pool.get().unwrap())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn main() {
|
fn main() {
|
||||||
|
let socket_configs: Vec<SocketConfig> = vec![
|
||||||
|
SocketConfig {
|
||||||
|
address: "[::]:8080".parse().unwrap(),
|
||||||
|
socket_type: SocketType::Http,
|
||||||
|
},
|
||||||
|
SocketConfig {
|
||||||
|
address: "[::]:8443".parse().unwrap(),
|
||||||
|
socket_type: SocketType::Https,
|
||||||
|
},
|
||||||
|
SocketConfig {
|
||||||
|
address: "[::]:3126".parse().unwrap(),
|
||||||
|
socket_type: SocketType::Udp,
|
||||||
|
}
|
||||||
|
];
|
||||||
|
|
||||||
|
let db_manager = PostgresConnectionManager::new(
|
||||||
|
"host=localhost user=postgres dbname=test".parse().unwrap(),
|
||||||
|
NoTls,
|
||||||
|
);
|
||||||
|
let db_pool = r2d2::Pool::new(db_manager).unwrap();
|
||||||
|
unsafe {
|
||||||
|
DB_POOL = Some(Arc::new(Mutex::new(BackendPool::Postgres(db_pool))));
|
||||||
|
}
|
||||||
|
|
||||||
|
let thread_pool = ThreadPool::new(256);
|
||||||
|
let thread_pool_mutex = Arc::new(Mutex::new(thread_pool));
|
||||||
|
|
||||||
let mut threads = Vec::new();
|
let mut threads = Vec::new();
|
||||||
|
|
||||||
let pool = ThreadPool::new(256);
|
for socket_config in socket_configs {
|
||||||
let pool_mutex = Arc::new(Mutex::new(pool));
|
let thread_pool_mutex = thread_pool_mutex.clone();
|
||||||
|
|
||||||
let pool_mutex_ref = pool_mutex.clone();
|
threads.push(match socket_config.socket_type {
|
||||||
threads.push(thread::spawn(move || {
|
SocketType::Http => thread::spawn(move || {
|
||||||
let mut tcp_socket = TcpListener::bind("[::]:8080").unwrap();
|
let mut tcp_socket = TcpListener::bind(socket_config.address).unwrap();
|
||||||
|
|
||||||
for stream in tcp_socket.incoming() {
|
for stream in tcp_socket.incoming() {
|
||||||
pool_mutex_ref.lock().unwrap().execute(|| {
|
thread_pool_mutex.lock().unwrap().execute(|| {
|
||||||
let stream = stream.unwrap();
|
let stream = stream.unwrap();
|
||||||
http::connection_handler(http::Stream::Tcp(stream));
|
http::connection_handler(http::Stream::Tcp(stream));
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}));
|
}),
|
||||||
|
SocketType::Https => thread::spawn(move || {
|
||||||
|
let mut ssl_socket = TcpListener::bind(socket_config.address).unwrap();
|
||||||
|
|
||||||
let pool_mutex_ref = pool_mutex.clone();
|
let mut acceptor = SslAcceptor::mozilla_intermediate(SslMethod::tls()).unwrap();
|
||||||
threads.push(thread::spawn(move || {
|
acceptor
|
||||||
let mut ssl_socket = TcpListener::bind("[::]:8443").unwrap();
|
.set_certificate_chain_file("/home/lorenz/Certificates/chakotay.pem")
|
||||||
|
.unwrap();
|
||||||
|
acceptor
|
||||||
|
.set_private_key_file(
|
||||||
|
"/home/lorenz/Certificates/priv/chakotay.key",
|
||||||
|
SslFiletype::PEM,
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
acceptor.check_private_key().unwrap();
|
||||||
|
let acceptor = Arc::new(acceptor.build());
|
||||||
|
|
||||||
let mut acceptor = SslAcceptor::mozilla_intermediate(SslMethod::tls()).unwrap();
|
for stream in ssl_socket.incoming() {
|
||||||
acceptor
|
let acceptor = acceptor.clone();
|
||||||
.set_certificate_chain_file("/home/lorenz/Certificates/chakotay.pem")
|
thread_pool_mutex.lock().unwrap().execute(move || {
|
||||||
.unwrap();
|
let stream = stream.unwrap();
|
||||||
acceptor
|
let stream = acceptor.accept(stream).unwrap();
|
||||||
.set_private_key_file(
|
http::connection_handler(http::Stream::Ssl(stream));
|
||||||
"/home/lorenz/Certificates/priv/chakotay.key",
|
});
|
||||||
SslFiletype::PEM,
|
}
|
||||||
)
|
}),
|
||||||
.unwrap();
|
SocketType::Udp => thread::spawn(move || {
|
||||||
acceptor.check_private_key().unwrap();
|
let mut udp_socket = UdpSocket::bind(socket_config.address).unwrap();
|
||||||
let acceptor = Arc::new(acceptor.build());
|
let mut buf = [0; 65_536];
|
||||||
|
|
||||||
for stream in ssl_socket.incoming() {
|
loop {
|
||||||
let acceptor = acceptor.clone();
|
let (size, addr) = udp_socket.recv_from(&mut buf).unwrap();
|
||||||
pool_mutex_ref.lock().unwrap().execute(move || {
|
let req = udp::Request::new(&udp_socket, addr, size, &buf);
|
||||||
let stream = stream.unwrap();
|
thread_pool_mutex.lock().unwrap().execute(|| udp::handler(req));
|
||||||
let stream = acceptor.accept(stream).unwrap();
|
}
|
||||||
http::connection_handler(http::Stream::Ssl(stream));
|
}),
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}));
|
|
||||||
|
|
||||||
let pool_mutex_ref = pool_mutex.clone();
|
|
||||||
threads.push(thread::spawn(move || {
|
|
||||||
let mut udp_socket = UdpSocket::bind("[::]:12345").unwrap();
|
|
||||||
let mut buf = [0; 65_536];
|
|
||||||
|
|
||||||
loop {
|
|
||||||
let (size, addr) = udp_socket.recv_from(&mut buf).unwrap();
|
|
||||||
let req = udp::Request::new(&udp_socket, addr, size, &buf);
|
|
||||||
pool_mutex_ref.lock().unwrap().execute(|| udp::handler(req));
|
|
||||||
}
|
|
||||||
}));
|
|
||||||
|
|
||||||
for thread in threads {
|
for thread in threads {
|
||||||
thread.join().unwrap();
|
thread.join().unwrap();
|
||||||
|
@ -1,6 +1,11 @@
|
|||||||
use json;
|
use json;
|
||||||
|
use r2d2::{ManageConnection, Pool, PooledConnection};
|
||||||
|
use std::sync::{Arc, Mutex};
|
||||||
|
use crate::{get_backend, BackendClient};
|
||||||
|
|
||||||
static ENDPOINTS: [(&str, fn(json::JsonValue) -> json::JsonValue); 1] = [("echo", echo)];
|
static ENDPOINTS: [(&str, fn(json::JsonValue) -> json::JsonValue); 1] = [
|
||||||
|
("echo", echo)
|
||||||
|
];
|
||||||
|
|
||||||
pub fn is_valid_endpoint(endpoint: &str) -> bool {
|
pub fn is_valid_endpoint(endpoint: &str) -> bool {
|
||||||
for (name, _func) in &ENDPOINTS {
|
for (name, _func) in &ENDPOINTS {
|
||||||
@ -21,5 +26,16 @@ pub fn endpoint(endpoint: &str, input: json::JsonValue) -> json::JsonValue {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn echo(input: json::JsonValue) -> json::JsonValue {
|
pub fn echo(input: json::JsonValue) -> json::JsonValue {
|
||||||
input
|
let backend = get_backend();
|
||||||
|
let mut output = input.clone();
|
||||||
|
match backend {
|
||||||
|
BackendClient::Postgres(mut client) => {
|
||||||
|
let res = client.query("SELECT * FROM test", &[]).unwrap();
|
||||||
|
for row in res {
|
||||||
|
let val: i32 = row.get(0);
|
||||||
|
output["database"] = val.into();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
output
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user