From ebcbd3e899f69f0d74ac587f82a469953b46c29b Mon Sep 17 00:00:00 2001 From: Raine Date: Sun, 15 Oct 2023 18:06:27 +0200 Subject: [PATCH] feat: segregated connection and fixed from_tcp_stream --- domo_node/src/connection/client.rs | 79 +++++++++++++++++++ .../src/{connection.rs => connection/mod.rs} | 59 ++++---------- 2 files changed, 93 insertions(+), 45 deletions(-) create mode 100644 domo_node/src/connection/client.rs rename domo_node/src/{connection.rs => connection/mod.rs} (55%) diff --git a/domo_node/src/connection/client.rs b/domo_node/src/connection/client.rs new file mode 100644 index 0000000..00c38a1 --- /dev/null +++ b/domo_node/src/connection/client.rs @@ -0,0 +1,79 @@ +use std::net::SocketAddr; +use domo_proto::packet::identifier::Identifier; +use std::io; +use tokio::net::{TcpSocket, TcpStream}; +use domo_proto::commands::node_management::NodeManagement; +use std::io::ErrorKind; +use domo_proto::packet::{Packet, ToPacket}; +use tokio::io::AsyncWriteExt; +use crate::connection; + +pub struct Client { + socket_addr: SocketAddr, +} + +impl Client { + pub fn new(socket_addr: SocketAddr) -> Self { + Self { + socket_addr + } + } + + pub async fn client_connection(self, device_id: Option) -> io::Result { + ClientConnection::new(self, device_id).await + } +} + +pub struct ClientConnection { + stream: TcpStream, + device_id: Identifier, +} + +impl ClientConnection { + + async fn register_node(stream: &mut TcpStream, device_id: Identifier) -> io::Result { + stream.write( + NodeManagement::RegisterNode { device_id } + .to_packet( + Identifier::default(), + Identifier::default(), + Identifier::random(), + Identifier::default(), + ) + .build_full_packet() + .as_slice() + ).await?; + let response = connection::from_tcp_stream(stream).await?; + if response.command == 0x01 { + if Identifier::from(response.data.data.clone()) != device_id { + return Err(io::Error::new(ErrorKind::InvalidData, "Server did not send appropiate response.")) + } + Ok(Identifier::from(response.data.data)) + } else { + Err(io::Error::new(ErrorKind::AddrNotAvailable, "Address not available")) + } + } + + pub async fn new(c: Client, device_id: Option) -> io::Result { + let sock = TcpSocket::new_v4()?; + let mut stream = sock.connect(c.socket_addr).await?; + + let device_id = match device_id { + Some(device_id) => ClientConnection::register_node(&mut stream, device_id), + None => ClientConnection::register_node(&mut stream, Identifier::default()) + }.await?; + + Ok(Self { + stream, + device_id + }) + } + + pub fn stream(&self) -> &TcpStream { &self.stream } + pub fn device_id(&self) -> Identifier { self.device_id } + + pub async fn send(&mut self, packet: Packet) -> io::Result { + self.stream.write(packet.build_full_packet().as_slice()).await?; + connection::from_tcp_stream(&mut self.stream).await + } +} diff --git a/domo_node/src/connection.rs b/domo_node/src/connection/mod.rs similarity index 55% rename from domo_node/src/connection.rs rename to domo_node/src/connection/mod.rs index ac6af41..5031e5d 100644 --- a/domo_node/src/connection.rs +++ b/domo_node/src/connection/mod.rs @@ -1,51 +1,16 @@ -use tokio::net::{TcpSocket, TcpStream}; +use tokio::net::TcpStream; use std::io; -use domo_proto::packet::Packet; -use std::net::SocketAddr; +use domo_proto::packet::{Packet}; use log::trace; -use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::io::{AsyncReadExt}; use domo_proto::packet::identifier::Identifier; use domo_proto::packet::packet_data::PacketData; -pub struct Client { - socket_addr: SocketAddr, -} - -impl Client { - pub fn new(socket_addr: SocketAddr) -> Self { - Self { - socket_addr - } - } - - pub async fn client_connection(self) -> io::Result { - ClientConnection::new(self).await - } -} - -pub struct ClientConnection { - stream: TcpStream, -} - -impl ClientConnection { - pub async fn new(c: Client) -> io::Result { - let sock = TcpSocket::new_v4()?; - let stream = sock.connect(c.socket_addr).await?; - Ok(Self { - stream - }) - } - - pub async fn send(&mut self, packet: Packet) -> io::Result { - self.stream.write(packet.build_full_packet().as_slice()).await?; - from_tcp_stream(&mut self.stream).await - } -} +pub mod client; pub async fn from_tcp_stream(stream: &mut TcpStream) -> io::Result { - // Implement the logic to read data from the TcpStream asynchronously and create a Packet instance - - match stream.read_u8().await? { + let version = stream.read_u8().await?; + match version { 0x01 => { // Example: Read packet data and create a Packet::V1 instance let src = Identifier::from(stream.read_u32().await?); // Assuming you have a similar function for Identifier @@ -61,7 +26,7 @@ pub async fn from_tcp_stream(stream: &mut TcpStream) -> io::Result { stream.read_exact(&mut data).await?; // Create a Packet::V1 instance - let packet = Packet::V1 { + let packet = Packet { src, dest, packet_id, @@ -70,8 +35,12 @@ pub async fn from_tcp_stream(stream: &mut TcpStream) -> io::Result { data: PacketData::new(data), // Assuming PacketData has a constructor }; - Ok(packet) + if packet.get_crc32() == stream.read_u32().await? { + Ok(packet) + } else { + Err(io::Error::new(io::ErrorKind::InvalidData, "Validation failed")) + } } - _ => Err(io::Error::new(io::ErrorKind::InvalidData, "received invalid data.")) + _ => Err(io::Error::new(io::ErrorKind::InvalidData, format!("received invalid data; version is {version}"))) } -} \ No newline at end of file +}