From 3365ca09a5ebb21c6cc392fde5ad47ef324295c5 Mon Sep 17 00:00:00 2001 From: Raine Date: Sun, 15 Oct 2023 18:06:25 +0200 Subject: [PATCH] feat: implementation for domo_node --- domo_node/Cargo.lock | 160 +++++++++++++++++++++++++++++++++++ domo_node/Cargo.toml | 4 +- domo_node/config.toml | 2 + domo_node/src/config/log.rs | 48 +++++++++++ domo_node/src/config/mod.rs | 30 +++++++ domo_node/src/config/node.rs | 39 +++++++++ domo_node/src/connection.rs | 77 +++++++++++++++++ domo_node/src/main.rs | 157 ++++++++++++++++++---------------- 8 files changed, 445 insertions(+), 72 deletions(-) create mode 100644 domo_node/config.toml create mode 100644 domo_node/src/config/log.rs create mode 100644 domo_node/src/config/mod.rs create mode 100644 domo_node/src/config/node.rs create mode 100644 domo_node/src/connection.rs diff --git a/domo_node/Cargo.lock b/domo_node/Cargo.lock index fe9cb00..80a8835 100644 --- a/domo_node/Cargo.lock +++ b/domo_node/Cargo.lock @@ -80,6 +80,8 @@ version = "0.1.0" dependencies = [ "domo_proto", "fern", + "futures", + "hex", "humantime", "log", "serde", @@ -92,6 +94,7 @@ name = "domo_proto" version = "0.1.0" dependencies = [ "crc32fast", + "rand", ] [[package]] @@ -109,6 +112,106 @@ dependencies = [ "log", ] +[[package]] +name = "futures" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23342abe12aba583913b2e62f22225ff9c950774065e4bfb61a19cd9770fec40" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-channel" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "955518d47e09b25bbebc7a18df10b81f0c766eaf4c4f1cccef2fca5f2a4fb5f2" +dependencies = [ + "futures-core", + "futures-sink", +] + +[[package]] +name = "futures-core" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4bca583b7e26f571124fe5b7561d49cb2868d79116cfa0eefce955557c6fee8c" + +[[package]] +name = "futures-executor" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ccecee823288125bd88b4d7f565c9e58e41858e47ab72e8ea2d64e93624386e0" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4fff74096e71ed47f8e023204cfd0aa1289cd54ae5430a9523be060cdb849964" + +[[package]] +name = "futures-macro" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "futures-sink" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f43be4fe21a13b9781a69afa4985b0f6ee0e1afab2c6f454a8cf30e2b2237b6e" + +[[package]] +name = "futures-task" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76d3d132be6c0e6aa1534069c705a74a5997a356c0dc2f86a47765e5617c5b65" + +[[package]] +name = "futures-util" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26b01e40b772d54cf6c6d721c1d1abd0647a0106a12ecaa1c186273392a69533" +dependencies = [ + "futures-channel", + "futures-core", + "futures-io", + "futures-macro", + "futures-sink", + "futures-task", + "memchr", + "pin-project-lite", + "pin-utils", + "slab", +] + +[[package]] +name = "getrandom" +version = "0.2.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be4136b2a15dd319360be1c07d9933517ccf0be8f16bf62a3bee4f0d618df427" +dependencies = [ + "cfg-if", + "libc", + "wasi", +] + [[package]] name = "gimli" version = "0.28.0" @@ -127,6 +230,12 @@ version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d77f7ec81a6d05a3abb01ab6eb7590f6083d08449fe5a1c8b1e620283546ccb7" +[[package]] +name = "hex" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" + [[package]] name = "humantime" version = "2.1.0" @@ -239,6 +348,18 @@ version = "0.2.13" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8afb450f006bf6385ca15ef45d71d2288452bc3683ce2e2cacc0d18e4be60b58" +[[package]] +name = "pin-utils" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" + +[[package]] +name = "ppv-lite86" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" + [[package]] name = "proc-macro2" version = "1.0.67" @@ -257,6 +378,36 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "rand" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" +dependencies = [ + "libc", + "rand_chacha", + "rand_core", +] + +[[package]] +name = "rand_chacha" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" +dependencies = [ + "ppv-lite86", + "rand_core", +] + +[[package]] +name = "rand_core" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" +dependencies = [ + "getrandom", +] + [[package]] name = "redox_syscall" version = "0.3.5" @@ -316,6 +467,15 @@ dependencies = [ "libc", ] +[[package]] +name = "slab" +version = "0.4.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f92a496fb766b417c996b9c5e57daf2f7ad3b0bebe1ccfca4856390e3d3bb67" +dependencies = [ + "autocfg", +] + [[package]] name = "smallvec" version = "1.11.1" diff --git a/domo_node/Cargo.toml b/domo_node/Cargo.toml index 9452bd1..d173be7 100644 --- a/domo_node/Cargo.toml +++ b/domo_node/Cargo.toml @@ -8,10 +8,12 @@ edition = "2021" [dependencies] fern = "0.6.2" log = "0.4.20" -serde = "1.0.188" +serde = { version = "1.0.188", features = ["serde_derive"] } tokio = { version = "1.32.0", features = ["full"] } toml = "0.8.2" humantime = "2.1.0" +futures = "0.3.28" +hex = "0.4.3" [dependencies.domo_proto] path = "../domo_proto" diff --git a/domo_node/config.toml b/domo_node/config.toml new file mode 100644 index 0000000..17c2b2a --- /dev/null +++ b/domo_node/config.toml @@ -0,0 +1,2 @@ +[node.type.relay] +master_address = "0.0.0.0:2000" \ No newline at end of file diff --git a/domo_node/src/config/log.rs b/domo_node/src/config/log.rs new file mode 100644 index 0000000..256fba5 --- /dev/null +++ b/domo_node/src/config/log.rs @@ -0,0 +1,48 @@ +use log::LevelFilter; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub enum LogLevel { + #[serde(rename = "trace")] + Trace, + #[serde(rename = "debug")] + Debug, + #[serde(rename = "Info")] + Info, + #[serde(rename = "warn")] + Warn, + #[serde(rename = "error")] + Error, +} + +impl Into for LogLevel { + fn into(self) -> LevelFilter { + match self { + LogLevel::Trace => LevelFilter::Trace, + LogLevel::Debug => LevelFilter::Debug, + LogLevel::Info => LevelFilter::Info, + LogLevel::Warn => LevelFilter::Warn, + LogLevel::Error => LevelFilter::Error, + } + } +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct LogConfig { + pub stdout_level: LogLevel, + pub file_level: LogLevel, +} + +impl Default for LogConfig { + fn default() -> Self { + Self { + // Linter says it's not fine but it is. + #[cfg(not(debug_assertions))] + stdout_level: LogLevel::Info, + #[cfg(debug_assertions)] + stdout_level: LogLevel::Trace, + + file_level: LogLevel::Trace, + } + } +} diff --git a/domo_node/src/config/mod.rs b/domo_node/src/config/mod.rs new file mode 100644 index 0000000..5a8e7e8 --- /dev/null +++ b/domo_node/src/config/mod.rs @@ -0,0 +1,30 @@ +use log::LogConfig; +use node::NodeConfig; +use serde::{Deserialize, Serialize}; +use std::{fs, io}; +use ::log::trace; + +pub mod log; +pub mod node; + +#[derive(Debug, Serialize, Deserialize, Default, Clone)] +#[serde(default)] +pub struct Config { + pub node: NodeConfig, + pub log: LogConfig, +} + +impl Config { + pub fn from_path>(path: S) -> io::Result { + let path = path.into().clone(); + trace!("importing config from {}...", path); + let s = fs::read_to_string(path.clone())?; + let c: Config = toml::from_str(s.as_str()).map_err(|_| { + io::Error::new( + io::ErrorKind::InvalidData, + format!("{} has invalid data", path), + ) + })?; + Ok(c) + } +} diff --git a/domo_node/src/config/node.rs b/domo_node/src/config/node.rs new file mode 100644 index 0000000..618350f --- /dev/null +++ b/domo_node/src/config/node.rs @@ -0,0 +1,39 @@ +use serde::{Deserialize, Serialize}; +use std::fmt::{Display, Formatter}; + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub enum NodeType { + #[serde(rename = "master")] + Master, + #[serde(rename = "relay")] + Relay { + master_address: String + }, + #[serde(rename = "subnet")] + Subnet { + master_address: String + }, +} + +impl Display for NodeType { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + NodeType::Master => write!(f, "master"), + NodeType::Relay { .. } => write!(f, "relay"), + NodeType::Subnet { .. } => write!(f, "subnet"), + } + } +} + +impl Default for NodeType { + fn default() -> Self { + Self::Master + } +} + +#[derive(Debug, Serialize, Deserialize, Default, Clone)] +pub struct NodeConfig { + #[serde(rename = "type")] + pub node_type: NodeType, + pub device_id: Option +} diff --git a/domo_node/src/connection.rs b/domo_node/src/connection.rs new file mode 100644 index 0000000..ac6af41 --- /dev/null +++ b/domo_node/src/connection.rs @@ -0,0 +1,77 @@ +use tokio::net::{TcpSocket, TcpStream}; +use std::io; +use domo_proto::packet::Packet; +use std::net::SocketAddr; +use log::trace; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use domo_proto::packet::identifier::Identifier; +use domo_proto::packet::packet_data::PacketData; + +pub struct Client { + socket_addr: SocketAddr, +} + +impl Client { + pub fn new(socket_addr: SocketAddr) -> Self { + Self { + socket_addr + } + } + + pub async fn client_connection(self) -> io::Result { + ClientConnection::new(self).await + } +} + +pub struct ClientConnection { + stream: TcpStream, +} + +impl ClientConnection { + pub async fn new(c: Client) -> io::Result { + let sock = TcpSocket::new_v4()?; + let stream = sock.connect(c.socket_addr).await?; + Ok(Self { + stream + }) + } + + pub async fn send(&mut self, packet: Packet) -> io::Result { + self.stream.write(packet.build_full_packet().as_slice()).await?; + from_tcp_stream(&mut self.stream).await + } +} + +pub async fn from_tcp_stream(stream: &mut TcpStream) -> io::Result { + // Implement the logic to read data from the TcpStream asynchronously and create a Packet instance + + match stream.read_u8().await? { + 0x01 => { + // Example: Read packet data and create a Packet::V1 instance + let src = Identifier::from(stream.read_u32().await?); // Assuming you have a similar function for Identifier + let dest = Identifier::from(stream.read_u32().await?); // Assuming you have a similar function for Identifier + let packet_id = Identifier::from(stream.read_u32().await?); // Assuming you have a similar function for Identifier + let reply_to = Identifier::from(stream.read_u32().await?); // Assuming you have a similar function for Identifier + let command = stream.read_u8().await?; // Read u8 directly + let data_length = stream.read_u16().await?; // Read u16 directly + + trace!("received => src: {src}, dest: {dest}, packet_id: {packet_id}, reply_to: {reply_to}, command: {command}, data_length: {data_length}"); + + let mut data = vec![0u8; data_length as usize]; + stream.read_exact(&mut data).await?; + + // Create a Packet::V1 instance + let packet = Packet::V1 { + src, + dest, + packet_id, + reply_to, + command, + data: PacketData::new(data), // Assuming PacketData has a constructor + }; + + Ok(packet) + } + _ => Err(io::Error::new(io::ErrorKind::InvalidData, "received invalid data.")) + } +} \ No newline at end of file diff --git a/domo_node/src/main.rs b/domo_node/src/main.rs index b1d1a15..1546633 100644 --- a/domo_node/src/main.rs +++ b/domo_node/src/main.rs @@ -1,94 +1,109 @@ -use std::io; -use std::time::Duration; -use log::{error, info, LevelFilter, trace}; -use tokio::io::AsyncWriteExt; -use tokio::net::TcpSocket; -use tokio::sync::mpsc::Receiver; -use tokio::time::sleep; +use crate::config::node::NodeType; +use log::{debug, info, trace}; +use std::{env, io}; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use connection::Client; +use domo_proto::commands::node_management::NodeManagement; +use domo_proto::packet::ToPacket; use domo_proto::packet::identifier::Identifier; -use domo_proto::packet::Packet; -use domo_proto::packet::packet_data::PacketData; +use std::cell::RefCell; -async fn app(mut recv: Receiver) -> io::Result<()> { - trace!("Entered runtime!"); +pub mod config; +pub mod connection; - let sock = TcpSocket::new_v4()?; - let mut stream = sock.connect("127.0.0.1:2000".parse().unwrap()).await?; - - while let Some(p) = recv.recv().await { - let bytes = p.build_full_packet(); - match stream.write(bytes.as_slice()).await { - Ok(s) => trace!("Packet [id: {}] was sent. [sent bytes: {}/{}]", p.packet_id(), s, bytes.len()), - Err(e) => error!("Packet [id: {}] was not sent properly: {}", p.packet_id(), e) - } - } - - stream.shutdown().await?; - Ok(()) +thread_local! { + static CONFIG: RefCell = RefCell::new(config::Config::from_path( + env::var("DOMO_CONFIG_PATH") + .unwrap_or("config.toml".into()) + ).unwrap_or({ info!("Using default config..."); config::Config::default() })) } fn main() { - setup_logging() - .expect("could not setup logging"); - trace!("Building runtime..."); + setup_logging().expect("could not setup logging"); - let (send, recv) = tokio::sync::mpsc::channel::(128); + debug!("Building tokio runtime..."); let runtime = tokio::runtime::Builder::new_multi_thread() .enable_all() .build() .unwrap(); - runtime.spawn(async move { - sleep(Duration::new(1, 0)).await; - let p =Packet::V1 { - src: Identifier::from(0x00000000), - dest: Identifier::from(0x00000000), - packet_id: Identifier::from(0x00000000), - reply_to: Identifier::from(0x00000000), - command: 0x00, - data: PacketData::new( - vec![0x1,0x11,0xDD] - ) - }; - info!("{:X?}", p.build_full_packet()); - send.send(p).await - }); + debug!("Built tokio runtime with all features..."); - runtime.block_on(async { - info!("Starting app..."); - match app(recv).await { - Ok(()) => info!("App exited."), - Err(e) => error!("App exited with error: {}", e) + info!( + "Staring as {} node!", + CONFIG.with_borrow(|c| c.node.node_type.clone()) + ); + + match CONFIG.with_borrow(|c| c.node.node_type.clone()) { + NodeType::Master => runtime.block_on(async {}), + NodeType::Relay { master_address } => { + let client = Client::new(master_address.parse().unwrap()); + runtime.block_on(async { + let mut conn = client.client_connection().await.unwrap(); + let device_id = match CONFIG.with_borrow(|c| c.node.device_id.clone()) { + Some(s) => { + let device_id = Identifier::from(domo_proto::prelude::as_u32_be(hex::decode(s) + .expect("Could not decode hex device_id").as_slice())); + let p = conn.send(NodeManagement::RegisterNode { + device_id + }.to_v1_packet( + Identifier::from(0), + Identifier::from(0), + Identifier::random(), + Identifier::from(0) + )).await.unwrap(); + if p.command() == 0x0E { + panic!("device id is taken") + } + trace!("server assigned us: {}", device_id); + device_id + }, + None => { + trace!("requesting device_id..."); + let p = conn.send(NodeManagement::RegisterNode { + device_id: Identifier::from(0) + }.to_v1_packet( + Identifier::from(0), + Identifier::from(0), + Identifier::random(), + Identifier::from(0) + )).await; + Identifier::from(p.unwrap().data().get_data()) + } + }; + CONFIG.with_borrow_mut(move |c| c.node.device_id = Some(hex::encode(device_id.bytes))); + }) } - }); + NodeType::Subnet { master_address } => { + runtime.block_on(async {}) + } + } } fn setup_logging() -> Result<(), fern::InitError> { - let stdout = fern::Dispatch::new() - .level(LevelFilter::Debug) - .chain(io::stdout()); - - let file = fern::Dispatch::new() - .level(LevelFilter::Trace) - .chain(fern::log_file("node.log")?); - fern::Dispatch::new() - .format(|out, message, record| { - match record.metadata().target() { - _ => out.finish(format_args!( - "[{} - {}] <{}> {}: {}", - humantime::format_rfc3339(std::time::SystemTime::now()), - record.level(), - record.target(), - record.file().unwrap_or("??"), - message - )) - } + .format(|out, message, record| match record.metadata().target() { + _ => out.finish(format_args!( + "[{} - {}] <{}> {}: {}", + humantime::format_rfc3339(std::time::SystemTime::now()), + record.level(), + record.target(), + record.file().unwrap_or("??"), + message + )), + }) + .chain({ + fern::Dispatch::new() + .level(CONFIG.with_borrow(|c| c.log.stdout_level.clone()).into()) + .chain(io::stdout()) + }) + .chain({ + fern::Dispatch::new() + .level(CONFIG.with_borrow(|c| c.log.stdout_level.clone()).into()) + .chain(fern::log_file("./node.log")?) }) - .chain(stdout) - .chain(file) .apply()?; Ok(()) -} \ No newline at end of file +}