feat: started master node impl

This commit is contained in:
Strix 2023-10-15 18:06:27 +02:00
parent ebcbd3e899
commit 1132f8c5fb
No known key found for this signature in database
GPG key ID: 49B2E37B8915B774

View file

@ -1,12 +1,16 @@
use crate::config::node::NodeType;
use log::{debug, info, trace};
use log::{debug, error, info, trace};
use std::{env, io};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use connection::Client;
use domo_proto::commands::node_management::NodeManagement;
use domo_proto::packet::ToPacket;
use connection::client::Client;
use domo_proto::packet::identifier::Identifier;
use std::cell::RefCell;
use tokio::io::AsyncWriteExt;
use tokio::net::TcpSocket;
use domo_proto::commands::node_management::NodeManagement;
use domo_proto::packet::ToPacket;
use crate::connection::from_tcp_stream;
pub mod config;
pub mod connection;
@ -35,47 +39,50 @@ fn main() {
CONFIG.with_borrow(|c| c.node.node_type.clone())
);
match CONFIG.with_borrow(|c| c.node.node_type.clone()) {
NodeType::Master => runtime.block_on(async {}),
NodeType::Master { bind, identifier } => runtime.block_on(async {
let identifier = Identifier::from(hex::decode(identifier).unwrap().as_slice());
trace!("identifier: {identifier}");
let mut sock = TcpSocket::new_v4().unwrap();
sock.bind(bind.parse().unwrap()).unwrap();
let mut listener = sock.listen(1024).unwrap();
info!("Listening at {}", bind);
loop {
match listener.accept().await {
Ok((mut stream, addr)) => {
tokio::spawn(async move {
loop {
let packet = from_tcp_stream(&mut stream).await.unwrap();
stream.write(NodeManagement::RegisterNode {
device_id: Identifier::from(packet.data.data.clone())
}.to_packet(
identifier,
Identifier::from(packet.data.data),
Identifier::random(),
Identifier::default()
).build_full_packet().as_slice()).await.unwrap();
}
});
},
Err(e) => error!("Could not accept client: {e}")
}
}
}),
NodeType::Relay { master_address } => {
let client = Client::new(master_address.parse().unwrap());
runtime.block_on(async {
let mut conn = client.client_connection().await.unwrap();
let device_id = match CONFIG.with_borrow(|c| c.node.device_id.clone()) {
Some(s) => {
let device_id = Identifier::from(domo_proto::prelude::as_u32_be(hex::decode(s)
.expect("Could not decode hex device_id").as_slice()));
let p = conn.send(NodeManagement::RegisterNode {
device_id
}.to_v1_packet(
Identifier::from(0),
Identifier::from(0),
Identifier::random(),
Identifier::from(0)
)).await.unwrap();
if p.command() == 0x0E {
panic!("device id is taken")
}
trace!("server assigned us: {}", device_id);
device_id
},
None => {
trace!("requesting device_id...");
let p = conn.send(NodeManagement::RegisterNode {
device_id: Identifier::from(0)
}.to_v1_packet(
Identifier::from(0),
Identifier::from(0),
Identifier::random(),
Identifier::from(0)
)).await;
Identifier::from(p.unwrap().data().get_data())
}
};
CONFIG.with_borrow_mut(move |c| c.node.device_id = Some(hex::encode(device_id.bytes)));
let conn = client.client_connection(
CONFIG.with_borrow(|c| c.node.device_id.clone())
.map(|s| Identifier::from(hex::decode(s)
.expect("Could not decode device_id hex")
.as_slice()))
).await.unwrap();
info!("Registered as: {}", conn.device_id());
})
}
NodeType::Subnet { master_address } => {
},
NodeType::Subnet { master_address: _ } => {
runtime.block_on(async {})
}
}