Skip to content

Commit 6444ed3

Browse files
committed
Add support for Connect method, allowing https proxying
1 parent 3e192ae commit 6444ed3

File tree

4 files changed

+114
-3
lines changed

4 files changed

+114
-3
lines changed

examples/proxy_con.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
use hteapot::{Hteapot, HttpMethod, HttpRequest, HttpResponse, TunnelResponse, headers};
2+
3+
fn main() {
4+
let server = Hteapot::new("0.0.0.0", 8081);
5+
server.listen(move |req: HttpRequest| {
6+
println!("New request to {} {}!", req.method.to_str(), &req.path);
7+
if req.method == HttpMethod::CONNECT {
8+
TunnelResponse::new(&req.path)
9+
} else {
10+
let secure_path = req.path.replace("http", "https");
11+
HttpResponse::new(
12+
hteapot::HttpStatus::MovedPermanently,
13+
"Moved",
14+
headers! {"location" => secure_path},
15+
)
16+
}
17+
});
18+
}

src/hteapot/engine.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -279,7 +279,7 @@ impl Hteapot {
279279
response
280280
.base()
281281
.headers
282-
.entry("Connection".to_string())
282+
.entry("connection".to_string())
283283
.or_insert("keep-alive".to_string());
284284
response.base().headers.insert(
285285
"Keep-Alive".to_string(),
@@ -293,8 +293,10 @@ impl Hteapot {
293293
}
294294
status.write = true;
295295
status.response = response;
296+
status.response.set_stream(&socket_data.stream);
296297
}
297298

299+
// Seting the stream in case is needed for the response, (example: streaming)
298300
// Write the response to the client in chunks
299301
loop {
300302
match status.response.peek() {

src/hteapot/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ pub use self::request::HttpRequest;
3131
pub use engine::Hteapot;
3232
pub use methods::HttpMethod;
3333

34-
pub use response::{HttpResponse, StreamedResponse};
34+
pub use response::{HttpResponse, StreamedResponse, TunnelResponse};
3535
pub use status::HttpStatus;
3636

3737
/// Crate version as set by `Cargo.toml`.

src/hteapot/response.rs

Lines changed: 92 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,19 @@
77
//!
88
//! All response types implement the [`HttpResponseCommon`] trait.
99
10+
use crate::headers;
11+
1012
use super::HttpStatus;
1113
use super::{BUFFER_SIZE, VERSION};
1214
use std::collections::{HashMap, VecDeque};
15+
use std::io::Write;
16+
use std::net::TcpStream;
1317
use std::sync::Arc;
1418
use std::sync::atomic::{AtomicBool, Ordering};
1519
use std::sync::mpsc::{self, Receiver, SendError, Sender, TryRecvError};
16-
use std::thread;
1720
use std::thread::JoinHandle;
21+
use std::time::Duration;
22+
use std::{io, thread};
1823

1924
/// Basic HTTP status line + headers.
2025
pub struct BaseResponse {
@@ -62,6 +67,8 @@ pub trait HttpResponseCommon {
6267

6368
/// Advances and returns the next chunk of the response body.
6469
fn peek(&mut self) -> Result<Vec<u8>, IterError>;
70+
71+
fn set_stream(&mut self, stream: &TcpStream);
6572
}
6673

6774
/// Error returned during response iteration.
@@ -165,6 +172,10 @@ impl HttpResponseCommon for HttpResponse {
165172
let byte_chunk = raw.next().ok_or(IterError::Finished)?.to_vec();
166173
return Ok(byte_chunk);
167174
}
175+
176+
fn set_stream(&mut self, _: &TcpStream) {
177+
()
178+
}
168179
}
169180

170181
/// Dummy response used when nothing needs to be returned.
@@ -183,6 +194,10 @@ impl HttpResponseCommon for EmptyHttpResponse {
183194
fn peek(&mut self) -> Result<Vec<u8>, IterError> {
184195
Err(IterError::Finished)
185196
}
197+
198+
fn set_stream(&mut self, _: &TcpStream) {
199+
()
200+
}
186201
}
187202

188203
/// Sends response chunks in a `Transfer-Encoding: chunked` format.
@@ -292,4 +307,80 @@ impl HttpResponseCommon for StreamedResponse {
292307
self.queue.pop_front().ok_or(IterError::WouldBlock)
293308
}
294309
}
310+
311+
fn set_stream(&mut self, _: &TcpStream) {
312+
()
313+
}
314+
}
315+
316+
pub struct TunnelResponse {
317+
base: BaseResponse,
318+
addr: String,
319+
has_end: Arc<AtomicBool>,
320+
stream_in: Option<TcpStream>, // In as Stream from the client *in* this server
321+
stream_out: Option<TcpStream>, // Out as Stream from the server *to* this server
322+
}
323+
324+
impl TunnelResponse {
325+
pub fn new(addr: &str) -> Box<Self> {
326+
return Box::new(TunnelResponse {
327+
base: BaseResponse {
328+
status: HttpStatus::OK,
329+
headers: HashMap::new(),
330+
// headers: headers! {"connection" => "keep-alive"}.unwrap(),
331+
},
332+
addr: addr.to_string(),
333+
has_end: Arc::new(AtomicBool::new(false)),
334+
stream_in: None,
335+
stream_out: None,
336+
});
337+
}
338+
}
339+
340+
impl HttpResponseCommon for TunnelResponse {
341+
fn base(&mut self) -> &mut BaseResponse {
342+
&mut self.base
343+
}
344+
345+
fn next(&mut self) -> Result<Vec<u8>, IterError> {
346+
self.peek()
347+
}
348+
349+
fn peek(&mut self) -> Result<Vec<u8>, IterError> {
350+
if self.has_end.load(Ordering::SeqCst) {
351+
return Err(IterError::Finished);
352+
}
353+
let mut buf = [0; 1];
354+
let r = self.stream_in.as_ref().unwrap().peek(&mut buf);
355+
356+
return Err(IterError::WouldBlock);
357+
}
358+
359+
fn set_stream(&mut self, stream: &TcpStream) {
360+
let mut client_stream = stream.try_clone().expect("clone failed...");
361+
self.stream_in = Some(client_stream.try_clone().expect("clone failed..."));
362+
let server_stream = TcpStream::connect(&self.addr);
363+
if server_stream.is_err() {
364+
println!("Error connecting");
365+
}
366+
let mut server_stream = server_stream.unwrap();
367+
let _ = client_stream.set_nonblocking(false);
368+
let _ = client_stream.set_nodelay(false);
369+
let _ = client_stream.set_read_timeout(Some(Duration::from_secs(3)));
370+
let _ = client_stream.set_write_timeout(Some(Duration::from_secs(3)));
371+
let _ = client_stream.write_all(&self.base.to_bytes());
372+
let mut server_stream_1 = server_stream.try_clone().expect("Error cloning");
373+
let mut client_stream_1 = client_stream.try_clone().expect("clone failed...");
374+
let has_ended = self.has_end.clone();
375+
thread::spawn(move || {
376+
let _ = io::copy(&mut client_stream_1, &mut server_stream_1);
377+
has_ended.store(true, Ordering::SeqCst);
378+
});
379+
380+
let has_ended = self.has_end.clone();
381+
thread::spawn(move || {
382+
let _ = io::copy(&mut server_stream, &mut client_stream);
383+
has_ended.store(true, Ordering::SeqCst);
384+
});
385+
}
295386
}

0 commit comments

Comments
 (0)