fix: parsing

This commit is contained in:
Strix 2023-10-15 18:06:34 +02:00
parent 913ae5e74a
commit 3eece7ecd9
No known key found for this signature in database
GPG key ID: 49B2E37B8915B774
5 changed files with 105 additions and 104 deletions

80
domo_node/src/app.rs Normal file
View file

@ -0,0 +1,80 @@
use std::io;
use log::{error, trace, warn};
use tokio::sync::mpsc::channel;
use domo_proto::commands::node_management::NodeManagementCommand;
use domo_proto::identifier::Identifier;
use domo_proto::packet::{Packet, ToPacket};
use domo_proto::packet::packet_data::PacketData;
use domo_proto::packet::raw_packet::RawPacket;
use crate::{CONFIG, prelude};
use crate::config::node::NodeType;
use crate::connection::server::Server;
impl NodeType {
pub async fn start(self) -> io::Result<()> {
match self {
NodeType::Master { bind } => {
let server = Server::new(
bind,
Some(
Identifier::from(
hex::decode(CONFIG.with_borrow(|c| c.node.device_id.clone()))
.map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "could not parse hex"))?
)
),
).await?;
{
let p = NodeManagementCommand::Ping.to_packet(
Identifier::random(),
server.device_id(),
Identifier::random(),
Identifier::default(),
);
std::fs::write("./packet", Into::<Vec<u8>>::into(p))?;
}
loop {
match server.recv_packet().await {
(Ok(p), Some(s)) => {
if p.dest != server.device_id() {
if let Some(d) = server.devices().get(&p.dest) {
let _ = &server.send(d.socket_addr, p).await;
}
continue;
}
match p.data {
PacketData::NodeManagement(NodeManagementCommand::Ping) => {
trace!("ping from: [{s}->{}]", p.src);
let _ = server.send(s, NodeManagementCommand::Ping.to_packet(
server.device_id(),
p.src,
Identifier::random(),
p.packet_id,
)).await;
}
_ => {}
}
}
(Err(e), Some(source)) => {
error!("{source} sent intelligible data: {e}");
let device_id = server.device_id();
if let Err(e) = server.send(source,
prelude::quick_err::net_broken_packet(
device_id,
Identifier::random(),
),
).await {
error!("could not send error packet: {e}");
}
}
_ => warn!("dropped packet")
}
}
}
NodeType::Bridge { .. } => {}
NodeType::Subnet { .. } => {}
}
Ok(())
}
}

View file

@ -5,6 +5,8 @@ use std::net::SocketAddr;
use std::io; use std::io;
use domo_proto::packet::{FULL_PACKET_SIZE, Packet}; use domo_proto::packet::{FULL_PACKET_SIZE, Packet};
use std::io::ErrorKind; use std::io::ErrorKind;
use log::error;
use domo_proto::packet::raw_packet::RawPacket;
pub struct Device { pub struct Device {
/// Device identifier /// Device identifier
@ -29,7 +31,7 @@ impl Server {
} }
/// Receive 1 packet. /// Receive 1 packet.
pub async fn recv_packet(&mut self) -> (io::Result<Packet>, Option<SocketAddr>) { pub async fn recv_packet(&self) -> (io::Result<Packet>, Option<SocketAddr>) {
let mut buf = vec![0; FULL_PACKET_SIZE]; let mut buf = vec![0; FULL_PACKET_SIZE];
let (size, socket_addr) = match self.sock.recv_from(&mut buf).await { let (size, socket_addr) = match self.sock.recv_from(&mut buf).await {
Ok(v) => v, Ok(v) => v,
@ -37,12 +39,19 @@ impl Server {
}; };
( (
Packet::try_from(buf[..size].to_vec()) Packet::try_from(buf[..size].to_vec()),
.map_err(|_| io::Error::new(ErrorKind::InvalidData, "Received invalid data")),
Some(socket_addr) Some(socket_addr)
) )
} }
pub async fn send(&self, addr: SocketAddr, packet: Packet) -> io::Result<()> {
self.sock().send_to(
Into::<Vec<u8>>::into(RawPacket::from(packet)).as_slice(),
addr
).await?;
Ok(())
}
pub fn device_id(&self) -> Identifier { pub fn device_id(&self) -> Identifier {
self.device_id.clone() self.device_id.clone()
} }

View file

@ -1,18 +1,11 @@
use crate::config::node::NodeType;
use log::{debug, error, info, trace, warn}; use log::{debug, error, info, trace, warn};
use std::{env, io}; use std::{env, io};
use std::cell::RefCell; use std::cell::RefCell;
use std::error::Error;
use tokio::net::UdpSocket;
use domo_proto::packet::{FULL_PACKET_SIZE, ToPacket};
use connection::server::Server;
use domo_proto::commands::node_management::NodeManagementCommand;
use domo_proto::identifier::Identifier;
use domo_proto::packet::raw_packet::RawPacket;
pub mod prelude; pub mod prelude;
pub mod config; pub mod config;
pub mod connection; pub mod connection;
pub mod app;
thread_local! { thread_local! {
static CONFIG: RefCell<config::Config> = RefCell::new(config::Config::from_path( static CONFIG: RefCell<config::Config> = RefCell::new(config::Config::from_path(
@ -39,91 +32,9 @@ fn main() -> io::Result<()> {
); );
match CONFIG.with_borrow(|c| c.node.node_type.clone()) { runtime.block_on(async {
NodeType::Master { bind } => runtime.block_on(async { CONFIG.with_borrow(|c| c.node.node_type.clone()).start().await;
let mut server = Server::new( });
bind,
Some(
Identifier::from(
hex::decode(CONFIG.with_borrow(|c| c.node.device_id.clone()))
.unwrap()
)
),
).await.unwrap();
info!("bound server on {} as {}", server.sock().local_addr().unwrap(), server.device_id());
loop {
match server.recv_packet().await {
(Ok(packet), Some(source)) => {
if packet.dest != server.device_id() {
if let Some(d) = server.devices().get(&packet.dest) {
if let Err(e) = server.sock().send_to(packet.build_full_packet().as_slice(), d.socket_addr).await {
error!("Error forwarding to {}: {}", d.device_id, e);
continue
}
trace!("fwd {} -> {}", packet.packet_id, packet.dest);
} else {
if let Err(e) = server.sock().send_to(
prelude::quick_err::net_dest_unreachable(
server.device_id(),
packet.dest,
packet.packet_id
).build_full_packet().as_slice(),
source
).await {
error!("Could not send error packet: {e}");
};
}
continue;
}
match packet.command >> 4 {
// Node management
0x0 => {
if let Ok(nm) = NodeManagementCommand::try_from(RawPacket::from(packet)) {
trace!("{nm:?}");
}
}
// Property control
0x1 => {}
_ => {
let device_id = server.device_id();
match server.sock().send_to(
prelude::quick_err::net_invalid_packet(
device_id,
packet.src,
packet.packet_id,
).build_full_packet().as_slice(),
source,
).await {
Ok(_) => {}
Err(e) => error!("could not send error packet: {e}")
}
}
}
}
(Err(e), Some(source)) => {
error!("{source} sent intelligible data: {e}");
let device_id = server.device_id();
if let Err(e) = server.sock().send_to(
prelude::quick_err::net_broken_packet(
device_id,
Identifier::random(),
).build_full_packet().as_slice(),
source,
).await {
error!("could not send error packet: {e}");
}
}
_ => warn!("dropped intelligible packet")
}
}
}),
NodeType::Bridge { bind, master_address } => runtime.block_on(async {
}),
NodeType::Subnet { master_address: _ } => unimplemented!()
}
Ok(()) Ok(())
} }

View file

@ -9,13 +9,14 @@ impl TryFrom<Vec<u8>> for RawPacket {
if data.len() < PACKET_HEADER_SIZE { if data.len() < PACKET_HEADER_SIZE {
return Err(io::Error::new(io::ErrorKind::InvalidData, "Can't parse data into RawPacket")) return Err(io::Error::new(io::ErrorKind::InvalidData, "Can't parse data into RawPacket"))
} }
let data_length = u16::from_be_bytes([data[0x12], data[0x13]]) as usize; let data_length = (((data[0x12] as u16) << 8) | data[0x13] as u16) as usize;
if data.len() < 0x14 + data_length + 4 { if data.len() < 0x14 + data_length {
return Err(io::Error::new(io::ErrorKind::InvalidData, "Can't parse data into RawPacket")) return Err(io::Error::new(io::ErrorKind::InvalidData, "Can't parse data into RawPacket"))
} }
let checksum = as_u32_be(data[(data.len() - 5)..].as_ref()); let checksum = as_u32_be(data[(data.len() - 4)..].as_ref());
if checksum != crc32fast::hash(data[..(data.len() - 4)].as_ref()) { let built_checksum = crc32fast::hash(data[..(data.len() - 4)].as_ref());
return Err(io::Error::new(io::ErrorKind::InvalidData, "Checksum does not match")) if checksum != built_checksum {
return Err(io::Error::new(io::ErrorKind::InvalidData, format!("Checksum does not match {checksum:X?} != {built_checksum:X?}")))
} }
Ok(RawPacket { Ok(RawPacket {
version: data[0], version: data[0],
@ -25,7 +26,7 @@ impl TryFrom<Vec<u8>> for RawPacket {
reply_to: as_u32_be(data[0x0D..0x11].as_ref()), reply_to: as_u32_be(data[0x0D..0x11].as_ref()),
command: data[0x11], command: data[0x11],
data_length, data_length,
data: data[0x14..data_length].to_vec(), data: data[0x14..(0x14 + data_length)].to_vec(),
checksum, checksum,
}) })
} }