From 1132f8c5fb115dadbc3fb333858e5ab813607eab Mon Sep 17 00:00:00 2001 From: Raine Date: Sun, 15 Oct 2023 18:06:27 +0200 Subject: [PATCH] feat: started master node impl --- domo_node/src/main.rs | 89 +++++++++++++++++++++++-------------------- 1 file changed, 48 insertions(+), 41 deletions(-) diff --git a/domo_node/src/main.rs b/domo_node/src/main.rs index 1546633..0486287 100644 --- a/domo_node/src/main.rs +++ b/domo_node/src/main.rs @@ -1,12 +1,16 @@ use crate::config::node::NodeType; -use log::{debug, info, trace}; +use log::{debug, error, info, trace}; use std::{env, io}; -use tokio::io::{AsyncReadExt, AsyncWriteExt}; -use connection::Client; -use domo_proto::commands::node_management::NodeManagement; -use domo_proto::packet::ToPacket; +use connection::client::Client; + + use domo_proto::packet::identifier::Identifier; use std::cell::RefCell; +use tokio::io::AsyncWriteExt; +use tokio::net::TcpSocket; +use domo_proto::commands::node_management::NodeManagement; +use domo_proto::packet::ToPacket; +use crate::connection::from_tcp_stream; pub mod config; pub mod connection; @@ -35,47 +39,50 @@ fn main() { CONFIG.with_borrow(|c| c.node.node_type.clone()) ); + match CONFIG.with_borrow(|c| c.node.node_type.clone()) { - NodeType::Master => runtime.block_on(async {}), + NodeType::Master { bind, identifier } => runtime.block_on(async { + let identifier = Identifier::from(hex::decode(identifier).unwrap().as_slice()); + trace!("identifier: {identifier}"); + let mut sock = TcpSocket::new_v4().unwrap(); + sock.bind(bind.parse().unwrap()).unwrap(); + + let mut listener = sock.listen(1024).unwrap(); + info!("Listening at {}", bind); + loop { + match listener.accept().await { + Ok((mut stream, addr)) => { + tokio::spawn(async move { + loop { + let packet = from_tcp_stream(&mut stream).await.unwrap(); + stream.write(NodeManagement::RegisterNode { + device_id: Identifier::from(packet.data.data.clone()) + }.to_packet( + identifier, + Identifier::from(packet.data.data), + Identifier::random(), + Identifier::default() + ).build_full_packet().as_slice()).await.unwrap(); + } + }); + }, + Err(e) => error!("Could not accept client: {e}") + } + } + }), NodeType::Relay { master_address } => { let client = Client::new(master_address.parse().unwrap()); runtime.block_on(async { - let mut conn = client.client_connection().await.unwrap(); - let device_id = match CONFIG.with_borrow(|c| c.node.device_id.clone()) { - Some(s) => { - let device_id = Identifier::from(domo_proto::prelude::as_u32_be(hex::decode(s) - .expect("Could not decode hex device_id").as_slice())); - let p = conn.send(NodeManagement::RegisterNode { - device_id - }.to_v1_packet( - Identifier::from(0), - Identifier::from(0), - Identifier::random(), - Identifier::from(0) - )).await.unwrap(); - if p.command() == 0x0E { - panic!("device id is taken") - } - trace!("server assigned us: {}", device_id); - device_id - }, - None => { - trace!("requesting device_id..."); - let p = conn.send(NodeManagement::RegisterNode { - device_id: Identifier::from(0) - }.to_v1_packet( - Identifier::from(0), - Identifier::from(0), - Identifier::random(), - Identifier::from(0) - )).await; - Identifier::from(p.unwrap().data().get_data()) - } - }; - CONFIG.with_borrow_mut(move |c| c.node.device_id = Some(hex::encode(device_id.bytes))); + let conn = client.client_connection( + CONFIG.with_borrow(|c| c.node.device_id.clone()) + .map(|s| Identifier::from(hex::decode(s) + .expect("Could not decode device_id hex") + .as_slice())) + ).await.unwrap(); + info!("Registered as: {}", conn.device_id()); }) - } - NodeType::Subnet { master_address } => { + }, + NodeType::Subnet { master_address: _ } => { runtime.block_on(async {}) } }