fix: abolish tcp code.. who needs it? not me!
This commit is contained in:
		
							parent
							
								
									a6366d61e0
								
							
						
					
					
						commit
						e386efc5ae
					
				
					 6 changed files with 25 additions and 170 deletions
				
			
		|  | @ -1,2 +1,3 @@ | ||||||
| [node.type.relay] | [node.type.relay] | ||||||
|  | bind = "127.0.0.1:4481" | ||||||
| master_address = "127.0.0.1:4480" | master_address = "127.0.0.1:4480" | ||||||
|  |  | ||||||
|  | @ -1,2 +1,5 @@ | ||||||
|  | [node.type.master] | ||||||
|  | bind = "127.0.0.1:4480" | ||||||
|  | 
 | ||||||
| [node] | [node] | ||||||
| device_id = "000000ff" | device_id = "000000ff" | ||||||
|  | @ -1,16 +1,18 @@ | ||||||
| use serde::{Deserialize, Serialize}; | use serde::{Deserialize, Serialize}; | ||||||
| use std::fmt::{Display, Formatter}; | use std::fmt::{Display, Formatter}; | ||||||
|  | use std::net::SocketAddr; | ||||||
| use domo_proto::packet::identifier::Identifier; | use domo_proto::packet::identifier::Identifier; | ||||||
| 
 | 
 | ||||||
| #[derive(Debug, Serialize, Deserialize, Clone)] | #[derive(Debug, Serialize, Deserialize, Clone)] | ||||||
| pub enum NodeType { | pub enum NodeType { | ||||||
|     #[serde(rename = "master")] |     #[serde(rename = "master")] | ||||||
|     Master { |     Master { | ||||||
|         bind: String, |         bind: SocketAddr, | ||||||
|         identifier: String |         identifier: String | ||||||
|     }, |     }, | ||||||
|     #[serde(rename = "relay")] |     #[serde(rename = "relay")] | ||||||
|     Relay { |     Relay { | ||||||
|  |         bind: SocketAddr, | ||||||
|         master_address: String |         master_address: String | ||||||
|     }, |     }, | ||||||
|     #[serde(rename = "subnet")] |     #[serde(rename = "subnet")] | ||||||
|  | @ -32,7 +34,7 @@ impl Display for NodeType { | ||||||
| impl Default for NodeType { | impl Default for NodeType { | ||||||
|     fn default() -> Self { |     fn default() -> Self { | ||||||
|         Self::Master { |         Self::Master { | ||||||
|             bind: String::from("0.0.0.0:4480"), |             bind: SocketAddr::from(([0,0,0,0], 4480)), | ||||||
|             identifier: hex::encode(Identifier::random().to_string()) |             identifier: hex::encode(Identifier::random().to_string()) | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
|  |  | ||||||
|  | @ -1,79 +0,0 @@ | ||||||
| use std::net::SocketAddr; |  | ||||||
| use domo_proto::packet::identifier::Identifier; |  | ||||||
| use std::io; |  | ||||||
| use tokio::net::{TcpSocket, TcpStream}; |  | ||||||
| use domo_proto::commands::node_management::NodeManagement; |  | ||||||
| use std::io::ErrorKind; |  | ||||||
| use domo_proto::packet::{Packet, ToPacket}; |  | ||||||
| use tokio::io::AsyncWriteExt; |  | ||||||
| use crate::connection; |  | ||||||
| 
 |  | ||||||
| pub struct Client { |  | ||||||
|     socket_addr: SocketAddr, |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| impl Client { |  | ||||||
|     pub fn new(socket_addr: SocketAddr) -> Self { |  | ||||||
|         Self { |  | ||||||
|             socket_addr |  | ||||||
|         } |  | ||||||
|     } |  | ||||||
| 
 |  | ||||||
|     pub async fn client_connection(self, device_id: Option<Identifier>) -> io::Result<ClientConnection> { |  | ||||||
|         ClientConnection::new(self, device_id).await |  | ||||||
|     } |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| pub struct ClientConnection { |  | ||||||
|     stream: TcpStream, |  | ||||||
|     device_id: Identifier, |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| impl ClientConnection { |  | ||||||
| 
 |  | ||||||
|     async fn register_node(stream: &mut TcpStream, device_id: Identifier) -> io::Result<Identifier> { |  | ||||||
|         stream.write( |  | ||||||
|             NodeManagement::RegisterNode { device_id } |  | ||||||
|                 .to_packet( |  | ||||||
|                     Identifier::default(), |  | ||||||
|                     Identifier::default(), |  | ||||||
|                     Identifier::random(), |  | ||||||
|                     Identifier::default(), |  | ||||||
|                 ) |  | ||||||
|                 .build_full_packet() |  | ||||||
|                 .as_slice() |  | ||||||
|         ).await?; |  | ||||||
|         let response = connection::from_tcp_stream(stream).await?; |  | ||||||
|         if response.command == 0x01 { |  | ||||||
|             if Identifier::from(response.data.data.clone()) != device_id { |  | ||||||
|                 return Err(io::Error::new(ErrorKind::InvalidData, "Server did not send appropiate response.")) |  | ||||||
|             } |  | ||||||
|             Ok(Identifier::from(response.data.data)) |  | ||||||
|         } else { |  | ||||||
|             Err(io::Error::new(ErrorKind::AddrNotAvailable, "Address not available")) |  | ||||||
|         } |  | ||||||
|     } |  | ||||||
| 
 |  | ||||||
|     pub async fn new(c: Client, device_id: Option<Identifier>) -> io::Result<Self> { |  | ||||||
|         let sock = TcpSocket::new_v4()?; |  | ||||||
|         let mut stream = sock.connect(c.socket_addr).await?; |  | ||||||
| 
 |  | ||||||
|         let device_id = match device_id { |  | ||||||
|             Some(device_id) => ClientConnection::register_node(&mut stream, device_id), |  | ||||||
|             None => ClientConnection::register_node(&mut stream, Identifier::default()) |  | ||||||
|         }.await?; |  | ||||||
| 
 |  | ||||||
|         Ok(Self { |  | ||||||
|             stream, |  | ||||||
|             device_id |  | ||||||
|         }) |  | ||||||
|     } |  | ||||||
| 
 |  | ||||||
|     pub fn stream(&self) -> &TcpStream { &self.stream } |  | ||||||
|     pub fn device_id(&self) -> Identifier { self.device_id } |  | ||||||
| 
 |  | ||||||
|     pub async fn send(&mut self, packet: Packet) -> io::Result<Packet> { |  | ||||||
|         self.stream.write(packet.build_full_packet().as_slice()).await?; |  | ||||||
|         connection::from_tcp_stream(&mut self.stream).await |  | ||||||
|     } |  | ||||||
| } |  | ||||||
|  | @ -1,46 +0,0 @@ | ||||||
| use tokio::net::TcpStream; |  | ||||||
| use std::io; |  | ||||||
| use domo_proto::packet::{Packet}; |  | ||||||
| use log::trace; |  | ||||||
| use tokio::io::{AsyncReadExt}; |  | ||||||
| use domo_proto::packet::identifier::Identifier; |  | ||||||
| use domo_proto::packet::packet_data::PacketData; |  | ||||||
| 
 |  | ||||||
| pub mod client; |  | ||||||
| 
 |  | ||||||
| pub async fn from_tcp_stream(stream: &mut TcpStream) -> io::Result<Packet> { |  | ||||||
|     let version = stream.read_u8().await?; |  | ||||||
|     match version { |  | ||||||
|         0x01 => { |  | ||||||
|             // Example: Read packet data and create a Packet::V1 instance
 |  | ||||||
|             let src = Identifier::from(stream.read_u32().await?); // Assuming you have a similar function for Identifier
 |  | ||||||
|             let dest = Identifier::from(stream.read_u32().await?); // Assuming you have a similar function for Identifier
 |  | ||||||
|             let packet_id = Identifier::from(stream.read_u32().await?); // Assuming you have a similar function for Identifier
 |  | ||||||
|             let reply_to = Identifier::from(stream.read_u32().await?); // Assuming you have a similar function for Identifier
 |  | ||||||
|             let command = stream.read_u8().await?; // Read u8 directly
 |  | ||||||
|             let data_length = stream.read_u16().await?; // Read u16 directly
 |  | ||||||
| 
 |  | ||||||
|             trace!("received => src: {src}, dest: {dest}, packet_id: {packet_id}, reply_to: {reply_to}, command: {command}, data_length: {data_length}"); |  | ||||||
| 
 |  | ||||||
|             let mut data = vec![0u8; data_length as usize]; |  | ||||||
|             stream.read_exact(&mut data).await?; |  | ||||||
| 
 |  | ||||||
|             // Create a Packet::V1 instance
 |  | ||||||
|             let packet = Packet { |  | ||||||
|                 src, |  | ||||||
|                 dest, |  | ||||||
|                 packet_id, |  | ||||||
|                 reply_to, |  | ||||||
|                 command, |  | ||||||
|                 data: PacketData::new(data), // Assuming PacketData has a constructor
 |  | ||||||
|             }; |  | ||||||
| 
 |  | ||||||
|             if packet.get_crc32() == stream.read_u32().await? { |  | ||||||
|                 Ok(packet) |  | ||||||
|             } else { |  | ||||||
|                 Err(io::Error::new(io::ErrorKind::InvalidData, "Validation failed")) |  | ||||||
|             } |  | ||||||
|         } |  | ||||||
|         _ => Err(io::Error::new(io::ErrorKind::InvalidData, format!("received invalid data; version is {version}"))) |  | ||||||
|     } |  | ||||||
| } |  | ||||||
|  | @ -1,16 +1,13 @@ | ||||||
| use crate::config::node::NodeType; | use crate::config::node::NodeType; | ||||||
| use log::{debug, error, info, trace}; | use log::{debug, error, info, trace}; | ||||||
| use std::{env, io}; | use std::{env, io}; | ||||||
| use connection::client::Client; |  | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| use domo_proto::packet::identifier::Identifier; |  | ||||||
| use std::cell::RefCell; | use std::cell::RefCell; | ||||||
| use tokio::io::AsyncWriteExt; | use std::future::Future; | ||||||
| use tokio::net::TcpSocket; | use std::net::SocketAddr; | ||||||
| use domo_proto::commands::node_management::NodeManagement; | use std::str::FromStr; | ||||||
| use domo_proto::packet::ToPacket; | use tokio::net::UdpSocket; | ||||||
| use crate::connection::from_tcp_stream; |  | ||||||
| 
 | 
 | ||||||
| pub mod config; | pub mod config; | ||||||
| pub mod connection; | pub mod connection; | ||||||
|  | @ -22,7 +19,7 @@ thread_local! { | ||||||
|     ).unwrap_or({ info!("Using default config..."); config::Config::default() })) |     ).unwrap_or({ info!("Using default config..."); config::Config::default() })) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| fn main() { | fn main() -> io::Result<()> { | ||||||
|     setup_logging().expect("could not setup logging"); |     setup_logging().expect("could not setup logging"); | ||||||
| 
 | 
 | ||||||
|     debug!("Building tokio runtime..."); |     debug!("Building tokio runtime..."); | ||||||
|  | @ -42,50 +39,27 @@ fn main() { | ||||||
| 
 | 
 | ||||||
|     match CONFIG.with_borrow(|c| c.node.node_type.clone()) { |     match CONFIG.with_borrow(|c| c.node.node_type.clone()) { | ||||||
|         NodeType::Master { bind, identifier } => runtime.block_on(async { |         NodeType::Master { bind, identifier } => runtime.block_on(async { | ||||||
|             let identifier = Identifier::from(hex::decode(identifier).unwrap().as_slice()); |             let sock = UdpSocket::bind(bind).await.unwrap(); | ||||||
|             trace!("identifier: {identifier}"); |             info!("bound to: {}", sock.local_addr().unwrap()); | ||||||
|             let mut sock = TcpSocket::new_v4().unwrap(); |  | ||||||
|             sock.bind(bind.parse().unwrap()).unwrap(); |  | ||||||
| 
 |  | ||||||
|             let mut listener = sock.listen(1024).unwrap(); |  | ||||||
|             info!("Listening at {}", bind); |  | ||||||
|             loop { |             loop { | ||||||
|                 match listener.accept().await { |                 let mut buf = vec![0; domo_proto::packet::FULL_PACKET_SIZE]; | ||||||
|                     Ok((mut stream, addr)) => { |                 match sock.recv_from(&mut buf).await { | ||||||
|                         tokio::spawn(async move { |                     Ok((size, addr)) => { | ||||||
|                             loop { |                         trace!("recv[{addr} -> {size}]: {:?}", &buf[..size]); | ||||||
|                                 let packet = from_tcp_stream(&mut stream).await.unwrap(); |  | ||||||
|                                 stream.write(NodeManagement::RegisterNode { |  | ||||||
|                                     device_id: Identifier::from(packet.data.data.clone()) |  | ||||||
|                                 }.to_packet( |  | ||||||
|                                     identifier, |  | ||||||
|                                     Identifier::from(packet.data.data), |  | ||||||
|                                     Identifier::random(), |  | ||||||
|                                     Identifier::default() |  | ||||||
|                                 ).build_full_packet().as_slice()).await.unwrap(); |  | ||||||
|                             } |  | ||||||
|                         }); |  | ||||||
|                     }, |                     }, | ||||||
|                     Err(e) => error!("Could not accept client: {e}") |                     Err(e) => { | ||||||
|  |                     } | ||||||
|                 } |                 } | ||||||
|             } |             } | ||||||
|         }), |         }), | ||||||
|         NodeType::Relay { master_address } => { |         NodeType::Relay { bind, master_address } => runtime.block_on(async { | ||||||
|             let client = Client::new(master_address.parse().unwrap()); |         }), | ||||||
|             runtime.block_on(async { |  | ||||||
|                 let conn = client.client_connection( |  | ||||||
|                     CONFIG.with_borrow(|c| c.node.device_id.clone()) |  | ||||||
|                         .map(|s| Identifier::from(hex::decode(s) |  | ||||||
|                             .expect("Could not decode device_id hex") |  | ||||||
|                             .as_slice())) |  | ||||||
|                 ).await.unwrap(); |  | ||||||
|                 info!("Registered as: {}", conn.device_id()); |  | ||||||
|             }) |  | ||||||
|         }, |  | ||||||
|         NodeType::Subnet { master_address: _ } => { |         NodeType::Subnet { master_address: _ } => { | ||||||
|             runtime.block_on(async {}) |             runtime.block_on(async {}) | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
|  | 
 | ||||||
|  |     Ok(()) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| fn setup_logging() -> Result<(), fern::InitError> { | fn setup_logging() -> Result<(), fern::InitError> { | ||||||
|  |  | ||||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue