diff --git a/domo_node/config.relay.toml b/domo_node/config.relay.toml index b7c2eb3..53f66e5 100644 --- a/domo_node/config.relay.toml +++ b/domo_node/config.relay.toml @@ -1,2 +1,3 @@ [node.type.relay] +bind = "127.0.0.1:4481" master_address = "127.0.0.1:4480" diff --git a/domo_node/config.toml b/domo_node/config.toml index 56bd5b1..e104f0b 100644 --- a/domo_node/config.toml +++ b/domo_node/config.toml @@ -1,2 +1,5 @@ +[node.type.master] +bind = "127.0.0.1:4480" + [node] device_id = "000000ff" \ No newline at end of file diff --git a/domo_node/src/config/node.rs b/domo_node/src/config/node.rs index 5ee5b3e..c7930f5 100644 --- a/domo_node/src/config/node.rs +++ b/domo_node/src/config/node.rs @@ -1,16 +1,18 @@ use serde::{Deserialize, Serialize}; use std::fmt::{Display, Formatter}; +use std::net::SocketAddr; use domo_proto::packet::identifier::Identifier; #[derive(Debug, Serialize, Deserialize, Clone)] pub enum NodeType { #[serde(rename = "master")] Master { - bind: String, + bind: SocketAddr, identifier: String }, #[serde(rename = "relay")] Relay { + bind: SocketAddr, master_address: String }, #[serde(rename = "subnet")] @@ -32,7 +34,7 @@ impl Display for NodeType { impl Default for NodeType { fn default() -> Self { Self::Master { - bind: String::from("0.0.0.0:4480"), + bind: SocketAddr::from(([0,0,0,0], 4480)), identifier: hex::encode(Identifier::random().to_string()) } } diff --git a/domo_node/src/connection/client.rs b/domo_node/src/connection/client.rs deleted file mode 100644 index 00c38a1..0000000 --- a/domo_node/src/connection/client.rs +++ /dev/null @@ -1,79 +0,0 @@ -use std::net::SocketAddr; -use domo_proto::packet::identifier::Identifier; -use std::io; -use tokio::net::{TcpSocket, TcpStream}; -use domo_proto::commands::node_management::NodeManagement; -use std::io::ErrorKind; -use domo_proto::packet::{Packet, ToPacket}; -use tokio::io::AsyncWriteExt; -use crate::connection; - -pub struct Client { - socket_addr: SocketAddr, -} - -impl Client { - pub fn new(socket_addr: SocketAddr) -> Self { - Self { - socket_addr - } - } - - pub async fn client_connection(self, device_id: Option) -> io::Result { - ClientConnection::new(self, device_id).await - } -} - -pub struct ClientConnection { - stream: TcpStream, - device_id: Identifier, -} - -impl ClientConnection { - - async fn register_node(stream: &mut TcpStream, device_id: Identifier) -> io::Result { - stream.write( - NodeManagement::RegisterNode { device_id } - .to_packet( - Identifier::default(), - Identifier::default(), - Identifier::random(), - Identifier::default(), - ) - .build_full_packet() - .as_slice() - ).await?; - let response = connection::from_tcp_stream(stream).await?; - if response.command == 0x01 { - if Identifier::from(response.data.data.clone()) != device_id { - return Err(io::Error::new(ErrorKind::InvalidData, "Server did not send appropiate response.")) - } - Ok(Identifier::from(response.data.data)) - } else { - Err(io::Error::new(ErrorKind::AddrNotAvailable, "Address not available")) - } - } - - pub async fn new(c: Client, device_id: Option) -> io::Result { - let sock = TcpSocket::new_v4()?; - let mut stream = sock.connect(c.socket_addr).await?; - - let device_id = match device_id { - Some(device_id) => ClientConnection::register_node(&mut stream, device_id), - None => ClientConnection::register_node(&mut stream, Identifier::default()) - }.await?; - - Ok(Self { - stream, - device_id - }) - } - - pub fn stream(&self) -> &TcpStream { &self.stream } - pub fn device_id(&self) -> Identifier { self.device_id } - - pub async fn send(&mut self, packet: Packet) -> io::Result { - self.stream.write(packet.build_full_packet().as_slice()).await?; - connection::from_tcp_stream(&mut self.stream).await - } -} diff --git a/domo_node/src/connection/mod.rs b/domo_node/src/connection/mod.rs index 5031e5d..e69de29 100644 --- a/domo_node/src/connection/mod.rs +++ b/domo_node/src/connection/mod.rs @@ -1,46 +0,0 @@ -use tokio::net::TcpStream; -use std::io; -use domo_proto::packet::{Packet}; -use log::trace; -use tokio::io::{AsyncReadExt}; -use domo_proto::packet::identifier::Identifier; -use domo_proto::packet::packet_data::PacketData; - -pub mod client; - -pub async fn from_tcp_stream(stream: &mut TcpStream) -> io::Result { - let version = stream.read_u8().await?; - match version { - 0x01 => { - // Example: Read packet data and create a Packet::V1 instance - let src = Identifier::from(stream.read_u32().await?); // Assuming you have a similar function for Identifier - let dest = Identifier::from(stream.read_u32().await?); // Assuming you have a similar function for Identifier - let packet_id = Identifier::from(stream.read_u32().await?); // Assuming you have a similar function for Identifier - let reply_to = Identifier::from(stream.read_u32().await?); // Assuming you have a similar function for Identifier - let command = stream.read_u8().await?; // Read u8 directly - let data_length = stream.read_u16().await?; // Read u16 directly - - trace!("received => src: {src}, dest: {dest}, packet_id: {packet_id}, reply_to: {reply_to}, command: {command}, data_length: {data_length}"); - - let mut data = vec![0u8; data_length as usize]; - stream.read_exact(&mut data).await?; - - // Create a Packet::V1 instance - let packet = Packet { - src, - dest, - packet_id, - reply_to, - command, - data: PacketData::new(data), // Assuming PacketData has a constructor - }; - - if packet.get_crc32() == stream.read_u32().await? { - Ok(packet) - } else { - Err(io::Error::new(io::ErrorKind::InvalidData, "Validation failed")) - } - } - _ => Err(io::Error::new(io::ErrorKind::InvalidData, format!("received invalid data; version is {version}"))) - } -} diff --git a/domo_node/src/main.rs b/domo_node/src/main.rs index 0486287..e1e25d7 100644 --- a/domo_node/src/main.rs +++ b/domo_node/src/main.rs @@ -1,16 +1,13 @@ use crate::config::node::NodeType; use log::{debug, error, info, trace}; use std::{env, io}; -use connection::client::Client; -use domo_proto::packet::identifier::Identifier; use std::cell::RefCell; -use tokio::io::AsyncWriteExt; -use tokio::net::TcpSocket; -use domo_proto::commands::node_management::NodeManagement; -use domo_proto::packet::ToPacket; -use crate::connection::from_tcp_stream; +use std::future::Future; +use std::net::SocketAddr; +use std::str::FromStr; +use tokio::net::UdpSocket; pub mod config; pub mod connection; @@ -22,7 +19,7 @@ thread_local! { ).unwrap_or({ info!("Using default config..."); config::Config::default() })) } -fn main() { +fn main() -> io::Result<()> { setup_logging().expect("could not setup logging"); debug!("Building tokio runtime..."); @@ -42,50 +39,27 @@ fn main() { match CONFIG.with_borrow(|c| c.node.node_type.clone()) { NodeType::Master { bind, identifier } => runtime.block_on(async { - let identifier = Identifier::from(hex::decode(identifier).unwrap().as_slice()); - trace!("identifier: {identifier}"); - let mut sock = TcpSocket::new_v4().unwrap(); - sock.bind(bind.parse().unwrap()).unwrap(); - - let mut listener = sock.listen(1024).unwrap(); - info!("Listening at {}", bind); + let sock = UdpSocket::bind(bind).await.unwrap(); + info!("bound to: {}", sock.local_addr().unwrap()); loop { - match listener.accept().await { - Ok((mut stream, addr)) => { - tokio::spawn(async move { - loop { - let packet = from_tcp_stream(&mut stream).await.unwrap(); - stream.write(NodeManagement::RegisterNode { - device_id: Identifier::from(packet.data.data.clone()) - }.to_packet( - identifier, - Identifier::from(packet.data.data), - Identifier::random(), - Identifier::default() - ).build_full_packet().as_slice()).await.unwrap(); - } - }); + let mut buf = vec![0; domo_proto::packet::FULL_PACKET_SIZE]; + match sock.recv_from(&mut buf).await { + Ok((size, addr)) => { + trace!("recv[{addr} -> {size}]: {:?}", &buf[..size]); }, - Err(e) => error!("Could not accept client: {e}") + Err(e) => { + } } } }), - NodeType::Relay { master_address } => { - let client = Client::new(master_address.parse().unwrap()); - runtime.block_on(async { - let conn = client.client_connection( - CONFIG.with_borrow(|c| c.node.device_id.clone()) - .map(|s| Identifier::from(hex::decode(s) - .expect("Could not decode device_id hex") - .as_slice())) - ).await.unwrap(); - info!("Registered as: {}", conn.device_id()); - }) - }, + NodeType::Relay { bind, master_address } => runtime.block_on(async { + }), NodeType::Subnet { master_address: _ } => { runtime.block_on(async {}) } } + + Ok(()) } fn setup_logging() -> Result<(), fern::InitError> {