fix: rewrite domo_node

This commit is contained in:
Strix 2023-11-05 20:52:42 +01:00
parent 5811d29032
commit a3dfb5d7ea
No known key found for this signature in database
GPG key ID: 49B2E37B8915B774
10 changed files with 29 additions and 303 deletions

8
domo_node/Cargo.lock generated
View file

@ -74,10 +74,18 @@ dependencies = [
"cfg-if", "cfg-if",
] ]
[[package]]
name = "domo_lib"
version = "0.1.0"
dependencies = [
"domo_proto",
]
[[package]] [[package]]
name = "domo_node" name = "domo_node"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"domo_lib",
"domo_proto", "domo_proto",
"fern", "fern",
"futures", "futures",

View file

@ -17,3 +17,6 @@ hex = "0.4.3"
[dependencies.domo_proto] [dependencies.domo_proto]
path = "../domo_proto" path = "../domo_proto"
[dependencies.domo_lib]
path = "../domo_lib"

View file

@ -2,4 +2,4 @@
bind = "0.0.0.0:4480" bind = "0.0.0.0:4480"
[node] [node]
device_id = "ffffffff" node_id = "ffffffff"

View file

@ -1,73 +0,0 @@
use std::collections::HashMap;
use std::io;
use std::net::SocketAddr;
use log::{info, trace};
use domo_proto::data_types::DataType;
use domo_proto::identifier::Identifier;
pub struct Device {
/// Device identifier
pub device_id: Identifier,
/// Device socket address
pub socket_addr: SocketAddr,
/// List of properties and last state.
pub properties: HashMap<String, DataType>
}
pub struct Devices {
devices: HashMap<Identifier, Device>
}
impl Devices {
pub fn new() -> Self {
Self {
devices: HashMap::new()
}
}
pub fn add_device_random(&mut self, socket_addr: SocketAddr) -> Identifier {
let random_id = loop {
// TODO: random should avoid 0xFFFF----
let id = Identifier::random();
trace!("checking if id is taken: {}", id);
if !self.devices.contains_key(&id) { break id; }
};
self.devices.insert(random_id, Device {
device_id: random_id,
socket_addr,
properties: HashMap::new()
});
info!("Registered new device: {}", random_id);
random_id
}
pub fn add_device(&mut self, socket_addr: SocketAddr, identifier: Identifier) -> Identifier {
self.devices.insert(identifier, Device {
device_id: identifier,
socket_addr,
properties: HashMap::new()
});
info!("Registered new device: {}", identifier);
identifier
}
pub fn add_device_auto(&mut self, socket_addr: SocketAddr, identifier: Option<Identifier>) -> Identifier {
let identifier = if identifier == Some(Identifier::default()) {
None
} else {
identifier
};
if let Some(identifier) = identifier {
match (identifier, self.devices.contains_key(&identifier)) {
(identifier, false) => self.add_device(socket_addr, identifier),
_ => self.add_device_random(socket_addr)
}
} else {
self.add_device_random(socket_addr)
}
}
pub fn get_device(&self, identifier: Identifier) -> Option<&Device> {
self.devices.get(&identifier)
}
}

View file

@ -1,108 +0,0 @@
use std::io;
use log::{error, info, trace, warn};
use domo_proto::commands::node_management::NodeManagementCommand;
use domo_proto::identifier::Identifier;
use domo_proto::packet::{ToPacket};
use domo_proto::packet::packet_data::PacketData;
use crate::{CONFIG, prelude};
use crate::app::master::Devices;
use crate::config::node::NodeType;
use crate::connection::server::Server;
pub mod master;
impl NodeType {
pub async fn start(self) -> io::Result<()> {
match self {
NodeType::Master { bind } => {
let mut server = Server::new(
bind,
Some(
Identifier::from(
hex::decode(CONFIG.with_borrow(|c| c.node.device_id.clone()))
.map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "could not parse hex"))?
)
),
).await?;
info!("Started server at {}", server.sock().local_addr()?);
let mut devices = Devices::new();
{
std::fs::write("./register_packet", Into::<Vec<u8>>::into(NodeManagementCommand::RegisterNode {
device_id: Identifier::from(0x000000AA)
}.to_packet(
Identifier::random(),
server.device_id(),
Identifier::random(),
Identifier::default(),
)))?;
std::fs::write("./ping_packet", Into::<Vec<u8>>::into(NodeManagementCommand::Ping.to_packet(
Identifier::from(0x000000AA),
server.device_id(),
Identifier::random(),
Identifier::default(),
)))?;
}
loop {
match server.recv_packet().await {
(Ok(p), Some(s)) => {
if devices.get_device(p.src).is_none() {
if let PacketData::NodeManagement(NodeManagementCommand::RegisterNode { device_id }) = p.data {
let device_id = devices.add_device_auto(s, Some(device_id));
let _ = server.send(s, NodeManagementCommand::RegisterNode {
device_id
}.to_packet(
server.device_id(),
device_id,
Identifier::random(),
p.packet_id
)).await;
} else {
warn!("{s} is unregistered and tried to send a packet {p:#?}.");
let _ = server.send(s, prelude::quick_err::errc_not_registered(
server.device_id(),
p.src,
p.packet_id
)).await;
continue;
}
}
match p.data {
PacketData::NodeManagement(NodeManagementCommand::Ping) => {
trace!("ping from: [{s}->{}]", p.src);
let _ = server.send(s, NodeManagementCommand::Ping.to_packet(
server.device_id(),
p.src,
Identifier::random(),
p.packet_id,
)).await;
}
PacketData::NodeManagement(NodeManagementCommand::RegisterNode { .. }) => (),
_ => warn!("dropped packet")
}
}
(Err(e), Some(source)) => {
error!("{source} sent intelligible data: {e}");
let device_id = server.device_id();
if let Err(e) = server.send(source,
prelude::quick_err::net_broken_packet(
device_id,
Identifier::random(),
),
).await {
error!("could not send error packet: {e}");
}
}
_ => warn!("dropped packet")
}
}
}
NodeType::Bridge { .. } => {}
NodeType::Subnet { .. } => {}
}
Ok(())
}
}

View file

@ -43,14 +43,14 @@ pub struct NodeConfig {
#[serde(rename = "type")] #[serde(rename = "type")]
pub node_type: NodeType, pub node_type: NodeType,
#[serde(default)] #[serde(default)]
pub device_id: String pub node_id: String
} }
impl Default for NodeConfig { impl Default for NodeConfig {
fn default() -> Self { fn default() -> Self {
Self { Self {
node_type: NodeType::default(), node_type: NodeType::default(),
device_id: Identifier::random().to_string() node_id: Identifier::random().to_string()
} }
} }
} }

View file

@ -1 +0,0 @@
pub mod server;

View file

@ -1,58 +0,0 @@
use tokio::net::UdpSocket;
use domo_proto::identifier::Identifier;
use std::collections::HashMap;
use std::net::SocketAddr;
use std::io;
use domo_proto::packet::{FULL_PACKET_SIZE, Packet};
use std::io::ErrorKind;
use log::{error, trace};
use domo_proto::data_types::DataType;
use domo_proto::packet::raw_packet::RawPacket;
pub struct Server {
sock: UdpSocket,
device_id: Identifier,
}
impl Server {
pub async fn new(socket_addr: SocketAddr, device_id: Option<Identifier>) -> io::Result<Self> {
Ok(Self {
sock: UdpSocket::bind(socket_addr).await?,
device_id: device_id.unwrap_or(Identifier::random()),
})
}
/// Receive 1 packet.
pub async fn recv_packet(&self) -> (io::Result<Packet>, Option<SocketAddr>) {
let mut buf = vec![0; FULL_PACKET_SIZE];
let (size, socket_addr) = match self.sock.recv_from(&mut buf).await {
Ok(v) => v,
Err(e) => return (Err(e), None)
};
trace!("received: {} bytes from {}", size, socket_addr);
(
Packet::try_from(buf[..size].to_vec()),
Some(socket_addr)
)
}
pub async fn send(&self, addr: SocketAddr, packet: Packet) -> io::Result<()> {
self.sock().send_to(
Into::<Vec<u8>>::into(RawPacket::from(packet)).as_slice(),
addr
).await?;
Ok(())
}
pub fn device_id(&self) -> Identifier {
self.device_id.clone()
}
pub fn sock(&self) -> &UdpSocket {
&self.sock
}
}

View file

@ -1,11 +1,11 @@
use domo_lib::node::Node;
use domo_proto::data_types::DataType;
use domo_proto::identifier::Identifier;
use log::{debug, error, info, trace, warn}; use log::{debug, error, info, trace, warn};
use std::{env, io}; use std::{env, io};
use std::cell::RefCell; use std::cell::RefCell;
pub mod prelude;
pub mod config; pub mod config;
pub mod connection;
pub mod app;
thread_local! { thread_local! {
static CONFIG: RefCell<config::Config> = RefCell::new(config::Config::from_path( static CONFIG: RefCell<config::Config> = RefCell::new(config::Config::from_path(
@ -27,13 +27,22 @@ fn main() -> io::Result<()> {
debug!("Built tokio runtime with all features..."); debug!("Built tokio runtime with all features...");
info!( info!(
"Staring as {} node!", "Staring {} node as 0x{}!",
CONFIG.with_borrow(|c| c.node.node_type.clone()) CONFIG.with_borrow(|c| c.node.node_type.clone()),
CONFIG.with_borrow(|c| c.node.node_id.clone())
); );
runtime.block_on(async { runtime.block_on(async {
CONFIG.with_borrow(|c| c.node.node_type.clone()).start().await; let self_node = Node::new(
Identifier::from(
CONFIG.with_borrow(|c|
hex::decode(&c.node.node_id)
.expect("Could not decode node_id")
)
)
);
}); });
Ok(()) Ok(())

View file

@ -1,54 +0,0 @@
pub mod quick_err {
use domo_proto::commands::node_management::{NodeManagementCommand, errors::NodeManagementError};
use domo_proto::identifier::Identifier;
use domo_proto::packet::{Packet, ToPacket};
pub fn net_invalid_packet(src: Identifier, dest: Identifier, reply_to: Identifier) -> Packet {
NodeManagementCommand::Error {
error_code: NodeManagementError::net_invalid_packet.into(),
// todo: non rude meta
metadata: "don't send me stupid data, stupid".to_string().into_bytes()
}.to_packet(
src,
dest,
Identifier::random(),
reply_to
)
}
pub fn net_broken_packet(src: Identifier, reply_to: Identifier) -> Packet {
NodeManagementCommand::Error {
error_code: NodeManagementError::net_broken_packet.into(),
metadata: "intelligible data, send again.".to_string().into_bytes()
}.to_packet(
src,
Identifier::default(),
Identifier::random(),
reply_to
)
}
pub fn net_dest_unreachable(src: Identifier, dest: Identifier, reply_to: Identifier) -> Packet {
NodeManagementCommand::Error {
error_code: NodeManagementError::net_dest_unreachable.into(),
metadata: "packet could not reach destination".to_string().into_bytes()
}.to_packet(
src,
dest,
Identifier::random(),
reply_to
)
}
pub fn errc_not_registered(src: Identifier, dest: Identifier, reply_to: Identifier) -> Packet {
NodeManagementCommand::Error {
error_code: NodeManagementError::errc_not_registered.into(),
metadata: "this node is not registered".to_string().into_bytes()
}.to_packet(
src,
dest,
Identifier::random(),
reply_to
)
}
}