feat: segregated connection and fixed from_tcp_stream
This commit is contained in:
parent
a108ce5226
commit
ebcbd3e899
2 changed files with 93 additions and 45 deletions
79
domo_node/src/connection/client.rs
Normal file
79
domo_node/src/connection/client.rs
Normal file
|
@ -0,0 +1,79 @@
|
||||||
|
use std::net::SocketAddr;
|
||||||
|
use domo_proto::packet::identifier::Identifier;
|
||||||
|
use std::io;
|
||||||
|
use tokio::net::{TcpSocket, TcpStream};
|
||||||
|
use domo_proto::commands::node_management::NodeManagement;
|
||||||
|
use std::io::ErrorKind;
|
||||||
|
use domo_proto::packet::{Packet, ToPacket};
|
||||||
|
use tokio::io::AsyncWriteExt;
|
||||||
|
use crate::connection;
|
||||||
|
|
||||||
|
pub struct Client {
|
||||||
|
socket_addr: SocketAddr,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Client {
|
||||||
|
pub fn new(socket_addr: SocketAddr) -> Self {
|
||||||
|
Self {
|
||||||
|
socket_addr
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn client_connection(self, device_id: Option<Identifier>) -> io::Result<ClientConnection> {
|
||||||
|
ClientConnection::new(self, device_id).await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct ClientConnection {
|
||||||
|
stream: TcpStream,
|
||||||
|
device_id: Identifier,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ClientConnection {
|
||||||
|
|
||||||
|
async fn register_node(stream: &mut TcpStream, device_id: Identifier) -> io::Result<Identifier> {
|
||||||
|
stream.write(
|
||||||
|
NodeManagement::RegisterNode { device_id }
|
||||||
|
.to_packet(
|
||||||
|
Identifier::default(),
|
||||||
|
Identifier::default(),
|
||||||
|
Identifier::random(),
|
||||||
|
Identifier::default(),
|
||||||
|
)
|
||||||
|
.build_full_packet()
|
||||||
|
.as_slice()
|
||||||
|
).await?;
|
||||||
|
let response = connection::from_tcp_stream(stream).await?;
|
||||||
|
if response.command == 0x01 {
|
||||||
|
if Identifier::from(response.data.data.clone()) != device_id {
|
||||||
|
return Err(io::Error::new(ErrorKind::InvalidData, "Server did not send appropiate response."))
|
||||||
|
}
|
||||||
|
Ok(Identifier::from(response.data.data))
|
||||||
|
} else {
|
||||||
|
Err(io::Error::new(ErrorKind::AddrNotAvailable, "Address not available"))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn new(c: Client, device_id: Option<Identifier>) -> io::Result<Self> {
|
||||||
|
let sock = TcpSocket::new_v4()?;
|
||||||
|
let mut stream = sock.connect(c.socket_addr).await?;
|
||||||
|
|
||||||
|
let device_id = match device_id {
|
||||||
|
Some(device_id) => ClientConnection::register_node(&mut stream, device_id),
|
||||||
|
None => ClientConnection::register_node(&mut stream, Identifier::default())
|
||||||
|
}.await?;
|
||||||
|
|
||||||
|
Ok(Self {
|
||||||
|
stream,
|
||||||
|
device_id
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn stream(&self) -> &TcpStream { &self.stream }
|
||||||
|
pub fn device_id(&self) -> Identifier { self.device_id }
|
||||||
|
|
||||||
|
pub async fn send(&mut self, packet: Packet) -> io::Result<Packet> {
|
||||||
|
self.stream.write(packet.build_full_packet().as_slice()).await?;
|
||||||
|
connection::from_tcp_stream(&mut self.stream).await
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,51 +1,16 @@
|
||||||
use tokio::net::{TcpSocket, TcpStream};
|
use tokio::net::TcpStream;
|
||||||
use std::io;
|
use std::io;
|
||||||
use domo_proto::packet::Packet;
|
use domo_proto::packet::{Packet};
|
||||||
use std::net::SocketAddr;
|
|
||||||
use log::trace;
|
use log::trace;
|
||||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
use tokio::io::{AsyncReadExt};
|
||||||
use domo_proto::packet::identifier::Identifier;
|
use domo_proto::packet::identifier::Identifier;
|
||||||
use domo_proto::packet::packet_data::PacketData;
|
use domo_proto::packet::packet_data::PacketData;
|
||||||
|
|
||||||
pub struct Client {
|
pub mod client;
|
||||||
socket_addr: SocketAddr,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Client {
|
|
||||||
pub fn new(socket_addr: SocketAddr) -> Self {
|
|
||||||
Self {
|
|
||||||
socket_addr
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn client_connection(self) -> io::Result<ClientConnection> {
|
|
||||||
ClientConnection::new(self).await
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct ClientConnection {
|
|
||||||
stream: TcpStream,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl ClientConnection {
|
|
||||||
pub async fn new(c: Client) -> io::Result<Self> {
|
|
||||||
let sock = TcpSocket::new_v4()?;
|
|
||||||
let stream = sock.connect(c.socket_addr).await?;
|
|
||||||
Ok(Self {
|
|
||||||
stream
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn send(&mut self, packet: Packet) -> io::Result<Packet> {
|
|
||||||
self.stream.write(packet.build_full_packet().as_slice()).await?;
|
|
||||||
from_tcp_stream(&mut self.stream).await
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn from_tcp_stream(stream: &mut TcpStream) -> io::Result<Packet> {
|
pub async fn from_tcp_stream(stream: &mut TcpStream) -> io::Result<Packet> {
|
||||||
// Implement the logic to read data from the TcpStream asynchronously and create a Packet instance
|
let version = stream.read_u8().await?;
|
||||||
|
match version {
|
||||||
match stream.read_u8().await? {
|
|
||||||
0x01 => {
|
0x01 => {
|
||||||
// Example: Read packet data and create a Packet::V1 instance
|
// Example: Read packet data and create a Packet::V1 instance
|
||||||
let src = Identifier::from(stream.read_u32().await?); // Assuming you have a similar function for Identifier
|
let src = Identifier::from(stream.read_u32().await?); // Assuming you have a similar function for Identifier
|
||||||
|
@ -61,7 +26,7 @@ pub async fn from_tcp_stream(stream: &mut TcpStream) -> io::Result<Packet> {
|
||||||
stream.read_exact(&mut data).await?;
|
stream.read_exact(&mut data).await?;
|
||||||
|
|
||||||
// Create a Packet::V1 instance
|
// Create a Packet::V1 instance
|
||||||
let packet = Packet::V1 {
|
let packet = Packet {
|
||||||
src,
|
src,
|
||||||
dest,
|
dest,
|
||||||
packet_id,
|
packet_id,
|
||||||
|
@ -70,8 +35,12 @@ pub async fn from_tcp_stream(stream: &mut TcpStream) -> io::Result<Packet> {
|
||||||
data: PacketData::new(data), // Assuming PacketData has a constructor
|
data: PacketData::new(data), // Assuming PacketData has a constructor
|
||||||
};
|
};
|
||||||
|
|
||||||
Ok(packet)
|
if packet.get_crc32() == stream.read_u32().await? {
|
||||||
|
Ok(packet)
|
||||||
|
} else {
|
||||||
|
Err(io::Error::new(io::ErrorKind::InvalidData, "Validation failed"))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
_ => Err(io::Error::new(io::ErrorKind::InvalidData, "received invalid data."))
|
_ => Err(io::Error::new(io::ErrorKind::InvalidData, format!("received invalid data; version is {version}")))
|
||||||
}
|
}
|
||||||
}
|
}
|
Loading…
Reference in a new issue