domo/domo_node/src/main.rs

156 lines
6 KiB
Rust

use crate::config::node::NodeType;
use log::{debug, error, info, trace, warn};
use std::{env, io};
use std::cell::RefCell;
use std::error::Error;
use tokio::net::UdpSocket;
use domo_proto::packet::{FULL_PACKET_SIZE, ToPacket};
use connection::server::Server;
use domo_proto::commands::node_management::NodeManagementCommand;
use domo_proto::identifier::Identifier;
use domo_proto::packet::raw_packet::RawPacket;
pub mod prelude;
pub mod config;
pub mod connection;
thread_local! {
static CONFIG: RefCell<config::Config> = RefCell::new(config::Config::from_path(
env::var("DOMO_CONFIG_PATH")
.unwrap_or("config.toml".into())
).unwrap_or({ info!("Using default config..."); config::Config::default() }))
}
fn main() -> io::Result<()> {
setup_logging().expect("could not setup logging");
debug!("Building tokio runtime...");
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap();
debug!("Built tokio runtime with all features...");
info!(
"Staring as {} node!",
CONFIG.with_borrow(|c| c.node.node_type.clone())
);
match CONFIG.with_borrow(|c| c.node.node_type.clone()) {
NodeType::Master { bind } => runtime.block_on(async {
let mut server = Server::new(
bind,
Some(
Identifier::from(
hex::decode(CONFIG.with_borrow(|c| c.node.device_id.clone()))
.unwrap()
)
),
).await.unwrap();
info!("bound server on {} as {}", server.sock().local_addr().unwrap(), server.device_id());
loop {
match server.recv_packet().await {
(Ok(packet), Some(source)) => {
if packet.dest != server.device_id() {
if let Some(d) = server.devices().get(&packet.dest) {
if let Err(e) = server.sock().send_to(packet.build_full_packet().as_slice(), d.socket_addr).await {
error!("Error forwarding to {}: {}", d.device_id, e);
continue
}
trace!("fwd {} -> {}", packet.packet_id, packet.dest);
} else {
if let Err(e) = server.sock().send_to(
prelude::quick_err::net_dest_unreachable(
server.device_id(),
packet.dest,
packet.packet_id
).build_full_packet().as_slice(),
source
).await {
error!("Could not send error packet: {e}");
};
}
continue;
}
match packet.command >> 4 {
// Node management
0x0 => {
if let Ok(nm) = NodeManagementCommand::try_from(RawPacket::from(packet)) {
trace!("{nm:?}");
}
}
// Property control
0x1 => {}
_ => {
let device_id = server.device_id();
match server.sock().send_to(
prelude::quick_err::net_invalid_packet(
device_id,
packet.src,
packet.packet_id,
).build_full_packet().as_slice(),
source,
).await {
Ok(_) => {}
Err(e) => error!("could not send error packet: {e}")
}
}
}
}
(Err(e), Some(source)) => {
error!("{source} sent intelligible data: {e}");
let device_id = server.device_id();
if let Err(e) = server.sock().send_to(
prelude::quick_err::net_broken_packet(
device_id,
Identifier::random(),
).build_full_packet().as_slice(),
source,
).await {
error!("could not send error packet: {e}");
}
}
_ => warn!("dropped intelligible packet")
}
}
}),
NodeType::Bridge { bind, master_address } => runtime.block_on(async {
}),
NodeType::Subnet { master_address: _ } => unimplemented!()
}
Ok(())
}
fn setup_logging() -> Result<(), fern::InitError> {
fern::Dispatch::new()
.format(|out, message, record| match record.metadata().target() {
_ => out.finish(format_args!(
"[{} - {}] <{}> {}: {}",
humantime::format_rfc3339(std::time::SystemTime::now()),
record.level(),
record.target(),
record.file().unwrap_or("??"),
message
)),
})
.chain({
fern::Dispatch::new()
.level(CONFIG.with_borrow(|c| c.log.stdout_level.clone()).into())
.chain(io::stdout())
})
.chain({
fern::Dispatch::new()
.level(CONFIG.with_borrow(|c| c.log.stdout_level.clone()).into())
.chain(fern::log_file("./node.log")?)
})
.apply()?;
Ok(())
}