1
0
Lixcraft/src/passthroughmain.rs

135 Zeilen
3.5 KiB
Rust

2022-03-01 20:45:10 +01:00
extern crate core;
mod serializer;
mod datatypes;
mod deserializer;
mod packets;
use tokio::io;
use tokio::io::{AsyncRead, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream};
use futures::FutureExt;
use std::error::Error;
use std::sync::Arc;
use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
use async_trait::async_trait;
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let listen_addr = String::from("127.0.0.1:25565");
let server_addr = String::from("steamwar.de:25565");
println!("Listening on: {}", listen_addr);
println!("Proxying to: {}", server_addr);
let listener = TcpListener::bind(listen_addr).await?;
while let Ok((inbound, _)) = listener.accept().await {
let transfer = transfer(inbound, server_addr.clone()).map(|r| {
if let Err(e) = r {
println!("Failed to transfer; error={}", e);
}
});
tokio::spawn(transfer);
}
Ok(())
}
pub struct Stoppable {
interruptor: tokio::sync::oneshot::Receiver<bool>,
interrupted: tokio::sync::oneshot::Sender<bool>
}
pub struct Stopper {
interruptable: tokio::sync::oneshot::Sender<bool>,
interrupted: tokio::sync::oneshot::Receiver<bool>
}
impl Stopper {
pub fn new() -> (Stopper, Stoppable) {
let (interruptable, interruptor) = tokio::sync::oneshot::channel();
let (interrupted_s, interrupted_r) = tokio::sync::oneshot::channel();
(Stopper {interruptable, interrupted: interrupted_r}, Stoppable {interruptor, interrupted: interrupted_s})
}
pub async fn stop(&mut self) {
if self.interruptable.is_closed() {
return;
}
self.interruptable.send(true).unwrap_or_default();
self.interrupted.await.expect_err("Send on Stopper.interrupted occured");
}
}
pub trait InputStream {}
pub trait OutputStream {}
pub struct StoppableAsyncRead<T: AsyncRead> {
inner: T,
stoppable: Stoppable
}
impl <T: AsyncRead> StoppableAsyncRead<T> {
pub fn new(inner: T, stoppable: Stoppable) -> StoppableAsyncRead<T> {
StoppableAsyncRead { inner, stoppable }
}
pub fn unwrap(self) -> T {
self.inner
}
}
impl <T: AsyncRead> InputStream for StoppableAsyncRead<T> {}
impl OutputStream for OwnedWriteHalf {}
#[async_trait]
pub trait Reader<T: InputStream> {
async fn run(self, stream: T, connection: Arc<Connection>);
}
pub struct Connection {
writer: Box<parking_lot::Mutex<dyn OutputStream>>,
stopper: Stopper
}
impl Connection {
pub fn new<T: 'static + Reader<StoppableAsyncRead<OwnedReadHalf>>>(stream: TcpStream, reader: T) -> Arc<Connection> {
let (read, write) = stream.into_split();
let (stopper, stoppable) = Stopper::new();
let connection = Arc::new(Connection {
writer: Box::new(parking_lot::Mutex::new(write)),
stopper
});
tokio::spawn(reader.run(StoppableAsyncRead::new(read, stoppable), connection.clone()));
connection
}
}
async fn transfer(mut inbound: TcpStream, proxy_addr: String) -> Result<(), Box<dyn Error>> {
let mut outbound = TcpStream::connect(proxy_addr).await?;
let (mut ri, mut wi) = inbound.split();
let (mut ro, mut wo) = outbound.split();
let client_to_server = async {
io::copy(&mut ri, &mut wo).await?;
wo.shutdown().await
};
let server_to_client = async {
io::copy(&mut ro, &mut wi).await?;
wi.shutdown().await
};
tokio::try_join!(client_to_server, server_to_client)?;
Ok(())
}