feat: implementation for domo_node

This commit is contained in:
Strix 2023-10-15 18:06:25 +02:00
parent 405cf8b368
commit 3365ca09a5
No known key found for this signature in database
GPG key ID: 49B2E37B8915B774
8 changed files with 445 additions and 72 deletions

160
domo_node/Cargo.lock generated
View file

@ -80,6 +80,8 @@ version = "0.1.0"
dependencies = [ dependencies = [
"domo_proto", "domo_proto",
"fern", "fern",
"futures",
"hex",
"humantime", "humantime",
"log", "log",
"serde", "serde",
@ -92,6 +94,7 @@ name = "domo_proto"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"crc32fast", "crc32fast",
"rand",
] ]
[[package]] [[package]]
@ -109,6 +112,106 @@ dependencies = [
"log", "log",
] ]
[[package]]
name = "futures"
version = "0.3.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "23342abe12aba583913b2e62f22225ff9c950774065e4bfb61a19cd9770fec40"
dependencies = [
"futures-channel",
"futures-core",
"futures-executor",
"futures-io",
"futures-sink",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-channel"
version = "0.3.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "955518d47e09b25bbebc7a18df10b81f0c766eaf4c4f1cccef2fca5f2a4fb5f2"
dependencies = [
"futures-core",
"futures-sink",
]
[[package]]
name = "futures-core"
version = "0.3.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4bca583b7e26f571124fe5b7561d49cb2868d79116cfa0eefce955557c6fee8c"
[[package]]
name = "futures-executor"
version = "0.3.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ccecee823288125bd88b4d7f565c9e58e41858e47ab72e8ea2d64e93624386e0"
dependencies = [
"futures-core",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-io"
version = "0.3.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4fff74096e71ed47f8e023204cfd0aa1289cd54ae5430a9523be060cdb849964"
[[package]]
name = "futures-macro"
version = "0.3.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "futures-sink"
version = "0.3.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f43be4fe21a13b9781a69afa4985b0f6ee0e1afab2c6f454a8cf30e2b2237b6e"
[[package]]
name = "futures-task"
version = "0.3.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "76d3d132be6c0e6aa1534069c705a74a5997a356c0dc2f86a47765e5617c5b65"
[[package]]
name = "futures-util"
version = "0.3.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "26b01e40b772d54cf6c6d721c1d1abd0647a0106a12ecaa1c186273392a69533"
dependencies = [
"futures-channel",
"futures-core",
"futures-io",
"futures-macro",
"futures-sink",
"futures-task",
"memchr",
"pin-project-lite",
"pin-utils",
"slab",
]
[[package]]
name = "getrandom"
version = "0.2.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "be4136b2a15dd319360be1c07d9933517ccf0be8f16bf62a3bee4f0d618df427"
dependencies = [
"cfg-if",
"libc",
"wasi",
]
[[package]] [[package]]
name = "gimli" name = "gimli"
version = "0.28.0" version = "0.28.0"
@ -127,6 +230,12 @@ version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d77f7ec81a6d05a3abb01ab6eb7590f6083d08449fe5a1c8b1e620283546ccb7" checksum = "d77f7ec81a6d05a3abb01ab6eb7590f6083d08449fe5a1c8b1e620283546ccb7"
[[package]]
name = "hex"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70"
[[package]] [[package]]
name = "humantime" name = "humantime"
version = "2.1.0" version = "2.1.0"
@ -239,6 +348,18 @@ version = "0.2.13"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8afb450f006bf6385ca15ef45d71d2288452bc3683ce2e2cacc0d18e4be60b58" checksum = "8afb450f006bf6385ca15ef45d71d2288452bc3683ce2e2cacc0d18e4be60b58"
[[package]]
name = "pin-utils"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "ppv-lite86"
version = "0.2.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de"
[[package]] [[package]]
name = "proc-macro2" name = "proc-macro2"
version = "1.0.67" version = "1.0.67"
@ -257,6 +378,36 @@ dependencies = [
"proc-macro2", "proc-macro2",
] ]
[[package]]
name = "rand"
version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404"
dependencies = [
"libc",
"rand_chacha",
"rand_core",
]
[[package]]
name = "rand_chacha"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88"
dependencies = [
"ppv-lite86",
"rand_core",
]
[[package]]
name = "rand_core"
version = "0.6.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c"
dependencies = [
"getrandom",
]
[[package]] [[package]]
name = "redox_syscall" name = "redox_syscall"
version = "0.3.5" version = "0.3.5"
@ -316,6 +467,15 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "slab"
version = "0.4.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f92a496fb766b417c996b9c5e57daf2f7ad3b0bebe1ccfca4856390e3d3bb67"
dependencies = [
"autocfg",
]
[[package]] [[package]]
name = "smallvec" name = "smallvec"
version = "1.11.1" version = "1.11.1"

View file

@ -8,10 +8,12 @@ edition = "2021"
[dependencies] [dependencies]
fern = "0.6.2" fern = "0.6.2"
log = "0.4.20" log = "0.4.20"
serde = "1.0.188" serde = { version = "1.0.188", features = ["serde_derive"] }
tokio = { version = "1.32.0", features = ["full"] } tokio = { version = "1.32.0", features = ["full"] }
toml = "0.8.2" toml = "0.8.2"
humantime = "2.1.0" humantime = "2.1.0"
futures = "0.3.28"
hex = "0.4.3"
[dependencies.domo_proto] [dependencies.domo_proto]
path = "../domo_proto" path = "../domo_proto"

2
domo_node/config.toml Normal file
View file

@ -0,0 +1,2 @@
[node.type.relay]
master_address = "0.0.0.0:2000"

View file

@ -0,0 +1,48 @@
use log::LevelFilter;
use serde::{Deserialize, Serialize};
#[derive(Debug, Serialize, Deserialize, Clone)]
pub enum LogLevel {
#[serde(rename = "trace")]
Trace,
#[serde(rename = "debug")]
Debug,
#[serde(rename = "Info")]
Info,
#[serde(rename = "warn")]
Warn,
#[serde(rename = "error")]
Error,
}
impl Into<LevelFilter> for LogLevel {
fn into(self) -> LevelFilter {
match self {
LogLevel::Trace => LevelFilter::Trace,
LogLevel::Debug => LevelFilter::Debug,
LogLevel::Info => LevelFilter::Info,
LogLevel::Warn => LevelFilter::Warn,
LogLevel::Error => LevelFilter::Error,
}
}
}
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct LogConfig {
pub stdout_level: LogLevel,
pub file_level: LogLevel,
}
impl Default for LogConfig {
fn default() -> Self {
Self {
// Linter says it's not fine but it is.
#[cfg(not(debug_assertions))]
stdout_level: LogLevel::Info,
#[cfg(debug_assertions)]
stdout_level: LogLevel::Trace,
file_level: LogLevel::Trace,
}
}
}

View file

@ -0,0 +1,30 @@
use log::LogConfig;
use node::NodeConfig;
use serde::{Deserialize, Serialize};
use std::{fs, io};
use ::log::trace;
pub mod log;
pub mod node;
#[derive(Debug, Serialize, Deserialize, Default, Clone)]
#[serde(default)]
pub struct Config {
pub node: NodeConfig,
pub log: LogConfig,
}
impl Config {
pub fn from_path<S: Into<String>>(path: S) -> io::Result<Self> {
let path = path.into().clone();
trace!("importing config from {}...", path);
let s = fs::read_to_string(path.clone())?;
let c: Config = toml::from_str(s.as_str()).map_err(|_| {
io::Error::new(
io::ErrorKind::InvalidData,
format!("{} has invalid data", path),
)
})?;
Ok(c)
}
}

View file

@ -0,0 +1,39 @@
use serde::{Deserialize, Serialize};
use std::fmt::{Display, Formatter};
#[derive(Debug, Serialize, Deserialize, Clone)]
pub enum NodeType {
#[serde(rename = "master")]
Master,
#[serde(rename = "relay")]
Relay {
master_address: String
},
#[serde(rename = "subnet")]
Subnet {
master_address: String
},
}
impl Display for NodeType {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
NodeType::Master => write!(f, "master"),
NodeType::Relay { .. } => write!(f, "relay"),
NodeType::Subnet { .. } => write!(f, "subnet"),
}
}
}
impl Default for NodeType {
fn default() -> Self {
Self::Master
}
}
#[derive(Debug, Serialize, Deserialize, Default, Clone)]
pub struct NodeConfig {
#[serde(rename = "type")]
pub node_type: NodeType,
pub device_id: Option<String>
}

View file

@ -0,0 +1,77 @@
use tokio::net::{TcpSocket, TcpStream};
use std::io;
use domo_proto::packet::Packet;
use std::net::SocketAddr;
use log::trace;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use domo_proto::packet::identifier::Identifier;
use domo_proto::packet::packet_data::PacketData;
pub struct Client {
socket_addr: SocketAddr,
}
impl Client {
pub fn new(socket_addr: SocketAddr) -> Self {
Self {
socket_addr
}
}
pub async fn client_connection(self) -> io::Result<ClientConnection> {
ClientConnection::new(self).await
}
}
pub struct ClientConnection {
stream: TcpStream,
}
impl ClientConnection {
pub async fn new(c: Client) -> io::Result<Self> {
let sock = TcpSocket::new_v4()?;
let stream = sock.connect(c.socket_addr).await?;
Ok(Self {
stream
})
}
pub async fn send(&mut self, packet: Packet) -> io::Result<Packet> {
self.stream.write(packet.build_full_packet().as_slice()).await?;
from_tcp_stream(&mut self.stream).await
}
}
pub async fn from_tcp_stream(stream: &mut TcpStream) -> io::Result<Packet> {
// Implement the logic to read data from the TcpStream asynchronously and create a Packet instance
match stream.read_u8().await? {
0x01 => {
// Example: Read packet data and create a Packet::V1 instance
let src = Identifier::from(stream.read_u32().await?); // Assuming you have a similar function for Identifier
let dest = Identifier::from(stream.read_u32().await?); // Assuming you have a similar function for Identifier
let packet_id = Identifier::from(stream.read_u32().await?); // Assuming you have a similar function for Identifier
let reply_to = Identifier::from(stream.read_u32().await?); // Assuming you have a similar function for Identifier
let command = stream.read_u8().await?; // Read u8 directly
let data_length = stream.read_u16().await?; // Read u16 directly
trace!("received => src: {src}, dest: {dest}, packet_id: {packet_id}, reply_to: {reply_to}, command: {command}, data_length: {data_length}");
let mut data = vec![0u8; data_length as usize];
stream.read_exact(&mut data).await?;
// Create a Packet::V1 instance
let packet = Packet::V1 {
src,
dest,
packet_id,
reply_to,
command,
data: PacketData::new(data), // Assuming PacketData has a constructor
};
Ok(packet)
}
_ => Err(io::Error::new(io::ErrorKind::InvalidData, "received invalid data."))
}
}

View file

@ -1,81 +1,89 @@
use std::io; use crate::config::node::NodeType;
use std::time::Duration; use log::{debug, info, trace};
use log::{error, info, LevelFilter, trace}; use std::{env, io};
use tokio::io::AsyncWriteExt; use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpSocket; use connection::Client;
use tokio::sync::mpsc::Receiver; use domo_proto::commands::node_management::NodeManagement;
use tokio::time::sleep; use domo_proto::packet::ToPacket;
use domo_proto::packet::identifier::Identifier; use domo_proto::packet::identifier::Identifier;
use domo_proto::packet::Packet; use std::cell::RefCell;
use domo_proto::packet::packet_data::PacketData;
async fn app(mut recv: Receiver<Packet>) -> io::Result<()> { pub mod config;
trace!("Entered runtime!"); pub mod connection;
let sock = TcpSocket::new_v4()?; thread_local! {
let mut stream = sock.connect("127.0.0.1:2000".parse().unwrap()).await?; static CONFIG: RefCell<config::Config> = RefCell::new(config::Config::from_path(
env::var("DOMO_CONFIG_PATH")
while let Some(p) = recv.recv().await { .unwrap_or("config.toml".into())
let bytes = p.build_full_packet(); ).unwrap_or({ info!("Using default config..."); config::Config::default() }))
match stream.write(bytes.as_slice()).await {
Ok(s) => trace!("Packet [id: {}] was sent. [sent bytes: {}/{}]", p.packet_id(), s, bytes.len()),
Err(e) => error!("Packet [id: {}] was not sent properly: {}", p.packet_id(), e)
}
}
stream.shutdown().await?;
Ok(())
} }
fn main() { fn main() {
setup_logging() setup_logging().expect("could not setup logging");
.expect("could not setup logging");
trace!("Building runtime...");
let (send, recv) = tokio::sync::mpsc::channel::<Packet>(128); debug!("Building tokio runtime...");
let runtime = tokio::runtime::Builder::new_multi_thread() let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all() .enable_all()
.build() .build()
.unwrap(); .unwrap();
runtime.spawn(async move { debug!("Built tokio runtime with all features...");
sleep(Duration::new(1, 0)).await;
let p =Packet::V1 {
src: Identifier::from(0x00000000),
dest: Identifier::from(0x00000000),
packet_id: Identifier::from(0x00000000),
reply_to: Identifier::from(0x00000000),
command: 0x00,
data: PacketData::new(
vec![0x1,0x11,0xDD]
)
};
info!("{:X?}", p.build_full_packet());
send.send(p).await
});
info!(
"Staring as {} node!",
CONFIG.with_borrow(|c| c.node.node_type.clone())
);
match CONFIG.with_borrow(|c| c.node.node_type.clone()) {
NodeType::Master => runtime.block_on(async {}),
NodeType::Relay { master_address } => {
let client = Client::new(master_address.parse().unwrap());
runtime.block_on(async { runtime.block_on(async {
info!("Starting app..."); let mut conn = client.client_connection().await.unwrap();
match app(recv).await { let device_id = match CONFIG.with_borrow(|c| c.node.device_id.clone()) {
Ok(()) => info!("App exited."), Some(s) => {
Err(e) => error!("App exited with error: {}", e) let device_id = Identifier::from(domo_proto::prelude::as_u32_be(hex::decode(s)
.expect("Could not decode hex device_id").as_slice()));
let p = conn.send(NodeManagement::RegisterNode {
device_id
}.to_v1_packet(
Identifier::from(0),
Identifier::from(0),
Identifier::random(),
Identifier::from(0)
)).await.unwrap();
if p.command() == 0x0E {
panic!("device id is taken")
}
trace!("server assigned us: {}", device_id);
device_id
},
None => {
trace!("requesting device_id...");
let p = conn.send(NodeManagement::RegisterNode {
device_id: Identifier::from(0)
}.to_v1_packet(
Identifier::from(0),
Identifier::from(0),
Identifier::random(),
Identifier::from(0)
)).await;
Identifier::from(p.unwrap().data().get_data())
}
};
CONFIG.with_borrow_mut(move |c| c.node.device_id = Some(hex::encode(device_id.bytes)));
})
}
NodeType::Subnet { master_address } => {
runtime.block_on(async {})
}
} }
});
} }
fn setup_logging() -> Result<(), fern::InitError> { fn setup_logging() -> Result<(), fern::InitError> {
let stdout = fern::Dispatch::new()
.level(LevelFilter::Debug)
.chain(io::stdout());
let file = fern::Dispatch::new()
.level(LevelFilter::Trace)
.chain(fern::log_file("node.log")?);
fern::Dispatch::new() fern::Dispatch::new()
.format(|out, message, record| { .format(|out, message, record| match record.metadata().target() {
match record.metadata().target() {
_ => out.finish(format_args!( _ => out.finish(format_args!(
"[{} - {}] <{}> {}: {}", "[{} - {}] <{}> {}: {}",
humantime::format_rfc3339(std::time::SystemTime::now()), humantime::format_rfc3339(std::time::SystemTime::now()),
@ -83,11 +91,18 @@ fn setup_logging() -> Result<(), fern::InitError> {
record.target(), record.target(),
record.file().unwrap_or("??"), record.file().unwrap_or("??"),
message message
)) )),
} })
.chain({
fern::Dispatch::new()
.level(CONFIG.with_borrow(|c| c.log.stdout_level.clone()).into())
.chain(io::stdout())
})
.chain({
fern::Dispatch::new()
.level(CONFIG.with_borrow(|c| c.log.stdout_level.clone()).into())
.chain(fern::log_file("./node.log")?)
}) })
.chain(stdout)
.chain(file)
.apply()?; .apply()?;
Ok(()) Ok(())