//! ANARCHY-ZHTP-NODE server //! - Provides /status, /mesh, /blockchain, /defense, /ui endpoints on port 4820 //! - Lightweight dependencies for cheap VPS use axum::{ body::Body, extract::ConnectInfo, http::{Method, Request}, middleware::Next, response::{Html, Redirect}, routing::{get, post}, Json, Router, }; use anarchy_blockchain as blockchain; use anarchy_core as core; use anarchy_defense as defense; use anarchy_mesh as mesh; use anarchy_webui as webui; use zhtp_blocksync as blocksync; use zhtp_gossip as zgossip; use zhtp_handshake as zhello; use zhtp_identity as zid; use zhtp_mesh as zmesh; use zhtp_peer as zpeer; use serde::Serialize; use std::{net::{SocketAddr, ToSocketAddrs}, sync::{OnceLock, RwLock}, time::{Duration, SystemTime}}; use std::path::Path; use tower_http::{cors::{Any, CorsLayer}, services::ServeDir}; use axum::response::sse::{Sse, Event, KeepAlive}; use tokio_stream::StreamExt; use tokio::sync::broadcast; use tokio_stream::wrappers::BroadcastStream; use std::fs; use std::io::Write; use tracing::{info, Level}; static CHAIN: OnceLock = OnceLock::new(); static DEFENSE_STATUS: OnceLock> = OnceLock::new(); static NODE_ID: OnceLock = OnceLock::new(); static PEERS: OnceLock>> = OnceLock::new(); static GOSSIP: OnceLock> = OnceLock::new(); static LOGS: OnceLock>> = OnceLock::new(); static LOG_TX: OnceLock> = OnceLock::new(); fn write_atomic(path: &str, contents: &str) { let tmp = format!("{}.tmp", path); let _ = fs::write(&tmp, contents); let _ = fs::rename(&tmp, path); } #[derive(Debug, Clone, serde::Deserialize)] struct AppConfig { #[serde(default)] bootstrap: Bootstrap, #[serde(default)] decoys: Decoys, #[serde(default)] dns: Dns, #[serde(default)] node: NodeCfg, } #[derive(Debug, Clone, serde::Deserialize, Default)] struct Bootstrap { #[serde(default)] peers: Vec } #[derive(Debug, Clone, serde::Deserialize, Default)] struct Decoys { #[serde(default)] ips: Vec } #[derive(Debug, Clone, serde::Deserialize, Default)] struct Dns { #[serde(default)] domains: Vec } #[derive(Debug, Clone, serde::Deserialize, Default)] struct NodeCfg { #[serde(default)] nickname: String, #[serde(default)] continuity: bool } static APPCFG: OnceLock = OnceLock::new(); static DATADIR: &str = "/opt/anarchy-zhtp-node/data"; #[derive(Clone, Serialize)] struct LogEntry { ts: u64, path: String } #[derive(Serialize)] struct Status { ok: bool, version: &'static str, bind_addr: String, uptime_secs: u64, } #[derive(Serialize)] struct Stats { peers: usize, onboard_events: usize, ts: u64 } #[derive(Debug, Clone, serde::Deserialize)] struct OnboardReq { user_id: Option, nickname: Option, ua: Option, ip: Option } #[derive(Debug, Clone, serde::Deserialize)] struct PeerSeenReq { addr: String } #[tokio::main] async fn main() { // Logging tracing_subscriber::fmt() .with_env_filter(tracing_subscriber::EnvFilter::from_default_env() .add_directive(Level::INFO.into())) .with_target(false) .with_timer(tracing_subscriber::fmt::time::UtcTime::rfc_3339()) .init(); // Config and state let cfg = core::NodeConfig::default(); // Persisted identity let id_path = format!("{}/identity.json", DATADIR); let loaded_id = fs::read_to_string(&id_path).ok().and_then(|t| serde_json::from_str::(&t).ok()); let id = match loaded_id { Some(i) => i, None => { let g = core::NodeIdentity::generate(); let _ = fs::write(&id_path, serde_json::to_string(&g).unwrap_or("{}".into())); g } }; NODE_ID.set(id).ok(); CHAIN.get_or_init(blockchain::Chain::new); DEFENSE_STATUS.get_or_init(|| RwLock::new(defense::DefenseStatus::default())); PEERS.get_or_init(|| RwLock::new(Vec::new())); GOSSIP.get_or_init(|| RwLock::new(zgossip::GossipStatus { messages_seen: 0, last_msg: None })); LOGS.get_or_init(|| RwLock::new(Vec::new())); // Init broadcast channel for SSE logs let (tx, _rx) = broadcast::channel::(200); LOG_TX.set(tx).ok(); // Load config.toml let cfg_path = "/opt/anarchy-zhtp-node/config.toml"; let loaded = std::fs::read_to_string(cfg_path).ok().and_then(|s| toml::from_str::(&s).ok()); let appcfg = loaded.unwrap_or(AppConfig{ bootstrap: Default::default(), decoys: Default::default(), dns: Default::default(), node: Default::default()}); APPCFG.set(appcfg).ok(); // Ensure data dir exists let _ = fs::create_dir_all(DATADIR); // Attempt to load persisted peers if let Ok(peers_json) = fs::read_to_string(format!("{}/peers.json", DATADIR)) { if let Ok(list) = serde_json::from_str::>(&peers_json) { if let Ok(mut v) = PEERS.get().unwrap().write() { *v = list; } } } // Attempt to load persisted chain (as list of data strings) if let Ok(chain_json) = fs::read_to_string(format!("{}/chain.json", DATADIR)) { if let Ok(datas) = serde_json::from_str::>(&chain_json) { let ch = CHAIN.get().unwrap(); for d in datas { ch.push(d); } } } // Background tasks: decoy rotation and domain finder // These are lightweight periodic tasks to maintain censorship resistance metadata. let ds = DEFENSE_STATUS.get().unwrap(); tokio::spawn(async move { let mut ticker = tokio::time::interval(Duration::from_secs(60)); let decoy_pool = APPCFG.get().map(|c| c.decoys.ips.clone()).unwrap_or_default(); loop { ticker.tick().await; let set = defense::rotate_decoy_ip(&decoy_pool); let ts = std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_secs(); if let Ok(mut g) = ds.write() { g.decoy_current = set.current; g.decoy_pool_size = set.pool.len(); g.last_ip_rotation_ts = Some(ts); } } }); let ds2 = DEFENSE_STATUS.get().unwrap(); tokio::spawn(async move { let mut ticker = tokio::time::interval(Duration::from_secs(120)); let domains = APPCFG.get().map(|c| c.dns.domains.clone()).unwrap_or_else(|| vec!["example.com".to_string()]); loop { ticker.tick().await; for d in &domains { let dom = d.to_string(); let res = tokio::task::spawn_blocking(move || { // Try to resolve d:80 using system resolver (dom.as_str(), 80).to_socket_addrs().ok().map(|mut it| it.next().is_some()).unwrap_or(false) }).await.unwrap_or(false); if let Ok(mut g) = ds2.write() { g.last_domain_checked = Some(d.to_string()); if res { g.last_domain_ok = Some(d.to_string()); break; } } } } }); // Bootstrap peers via DNS lookups (placeholder) and keep last-seen timestamps updated let peers_lock = PEERS.get().unwrap(); tokio::spawn(async move { let mut ticker = tokio::time::interval(Duration::from_secs(90)); let seeds = APPCFG.get().map(|c| c.bootstrap.peers.clone()).unwrap_or_else(|| vec!["example.com:80".to_string()]); loop { ticker.tick().await; let now = zpeer::now(); let mut v = peers_lock.write().unwrap(); v.clear(); for s in &seeds { v.push(zpeer::PeerInfo { addr: s.to_string(), last_seen: now, hops: 1, score: 0, failures: 0, quarantine_until: None, backoff_secs: 60 }); } } }); // Periodic decay and quarantine maintenance (every 60s) let peers_for_decay = PEERS.get().unwrap(); tokio::spawn(async move { let mut ticker = tokio::time::interval(Duration::from_secs(60)); loop { ticker.tick().await; let now = zpeer::now(); let mut v = peers_for_decay.write().unwrap(); for p in v.iter_mut() { // score decay toward zero if p.score > 0 { p.score -= 1; } else if p.score < 0 { p.score += 1; } // quarantine if too low if p.score < -15 { p.quarantine_until = Some(now + 120); } // expire quarantine if let Some(until) = p.quarantine_until { if until <= now { p.quarantine_until = None; } } } } }); // Persistence task (every 120s): write peers and chain to DATADIR tokio::spawn(async move { let mut ticker = tokio::time::interval(Duration::from_secs(120)); loop { ticker.tick().await; let peers = PEERS.get().unwrap().read().unwrap().clone(); if let Ok(mut f) = fs::File::create(format!("{}/peers.json", DATADIR)) { let _ = write!(f, "{}", serde_json::to_string(&peers).unwrap_or("[]".into())); } let ch = CHAIN.get().unwrap(); let datas: Vec = ch.list().into_iter().map(|b| b.data).collect(); if let Ok(mut f) = fs::File::create(format!("{}/chain.json", DATADIR)) { let _ = write!(f, "{}", serde_json::to_string(&datas).unwrap_or("[]".into())); } } }); // Routes // CORS for cross-origin UI (e.g., anarchyspace.com -> 192.3.249.4) let cors = CorsLayer::new() .allow_origin(Any) .allow_methods([Method::GET]) .allow_headers(Any); // per-response noise header to obfuscate simple fingerprinting async fn add_noise_header(req: Request, next: Next) -> impl axum::response::IntoResponse { use rand::{distributions::Alphanumeric, Rng}; // Log request path into in-process ring buffer let ts = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs(); if let Some(lock) = LOGS.get() { if let Ok(mut v) = lock.write() { v.push(LogEntry { ts, path: req.uri().path().to_string() }); let len = v.len(); if len > 200 { let drop_n = len - 200; let _ = v.drain(0..drop_n); } } } if let Some(tx) = LOG_TX.get() { let _ = tx.send(LogEntry { ts, path: req.uri().path().to_string() }); } let mut res = next.run(req).await; let noise: String = rand::thread_rng().sample_iter(&Alphanumeric).take(12).map(char::from).collect(); res.headers_mut().insert("X-Noise", axum::http::HeaderValue::from_str(&noise).unwrap()); res } #[derive(Debug, Clone, serde::Deserialize)] struct AddPeerReq { addr: String } async fn route_peers_add(Json(req): Json) -> Json { if req.addr.trim().is_empty() { return Json(serde_json::json!({"ok": false, "error": "empty-addr"})); } if let Ok(mut v) = PEERS.get().unwrap().write() { if !v.iter().any(|p| p.addr == req.addr) { v.push(zpeer::PeerInfo { addr: req.addr.clone(), last_seen: zpeer::now(), hops: 1, score: 0, failures: 0, quarantine_until: None, backoff_secs: 60 }); } } Json(serde_json::json!({"ok": true})) } #[derive(Serialize)] struct PeerExport { peers: Vec } async fn route_peers_export() -> Json { let list = PEERS.get().unwrap().read().unwrap(); let addrs = list.iter().map(|p| p.addr.clone()).collect::>(); Json(PeerExport { peers: addrs }) } let app = Router::new() .route("/status", get(route_status)) .route("/mesh", get(route_mesh)) .route("/blockchain", get(route_blockchain)) .route("/defense", get(route_defense)) // ZHTP federation endpoints .route("/zhtp/hello", get(route_zhello)) .route("/zhtp/peers", get(route_zpeers)) .route("/zhtp/gossip", get(route_zgossip)) .route("/zhtp/blocksync", get(route_zblocksync)) .route("/zhtp/mesh", get(route_zmesh)) .route("/zhtp/identity", get(route_zidentity)) .route("/zhtp/logs", get(route_zlogs)) .route("/zhtp/logs/stream", get(route_zlogs_stream)) .route("/zhtp/config", get(route_config)) .route("/zhtp/onboard", post(route_onboard)) .route("/zhtp/peer/seen", post(route_peer_seen)) .route("/zhtp/stats", get(route_stats)) .route("/zhtp/peers/add", post(route_peers_add)) .route("/zhtp/peers/export", get(route_peers_export)) // Redirect /home/zhtp to /ui/index.html for public URL mapping .route("/home/zhtp", get(|| async { Redirect::permanent("/ui/index.html") })) // Serve static UI from /opt/anarchy-zhtp-node/webui .nest_service("/ui", ServeDir::new("/opt/anarchy-zhtp-node/webui").append_index_html_on_directories(true)) .layer(cors) .layer(axum::middleware::from_fn(add_noise_header)); let addr: SocketAddr = cfg.bind_addr.parse().expect("bind addr"); info!("listening on {}", addr); let listener = tokio::net::TcpListener::bind(addr).await.unwrap(); // Provide ConnectInfo to handlers that need the peer addr axum::serve(listener, app.into_make_service_with_connect_info::()) .await .expect("server run"); } async fn route_status() -> Json { let cfg = core::NodeConfig::default(); static START: OnceLock = OnceLock::new(); let now = std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_secs(); let start = *START.get_or_init(|| now); let uptime = now - start; Json(Status { ok: true, version: env!("CARGO_PKG_VERSION"), bind_addr: cfg.bind_addr, uptime_secs: uptime }) } async fn route_mesh() -> Json { Json(mesh::heartbeat()) } async fn route_blockchain() -> Json> { let ch = CHAIN.get().unwrap(); Json(ch.list()) } #[derive(Serialize)] struct DefenseReport { bot: defense::BotCheck, status: defense::DefenseStatus, } async fn route_defense(ConnectInfo(addr): ConnectInfo) -> Json { let check = defense::suspicion_score(None, Some(addr.ip())); let mut status = DEFENSE_STATUS.get().unwrap().read().unwrap().clone(); status.ua_hint = defense::random_user_agent(); Json(DefenseReport { bot: check, status }) } // ZHTP federation handlers async fn route_zhello() -> Json { let id = NODE_ID.get().unwrap(); Json(zhello::Hello { protocol: "zhtp", version: env!("CARGO_PKG_VERSION"), node_id: id.node_id.clone() }) } #[derive(Serialize)] struct PeerTable { node_id: String, peers: Vec, } async fn route_zpeers() -> Json { let id = NODE_ID.get().unwrap(); let table = PEERS.get().unwrap().read().unwrap().clone(); Json(PeerTable { node_id: id.node_id.clone(), peers: table }) } async fn route_zgossip() -> Json { let g = GOSSIP.get().unwrap().read().unwrap().clone(); Json(g) } async fn route_zblocksync() -> Json { let ch = CHAIN.get().unwrap(); let list = ch.list(); let height = (list.len().saturating_sub(1)) as u64; let latest = list.last().map(|b| b.hash.clone()).unwrap_or_default(); Json(blocksync::BlockSyncInfo { height, latest_hash: latest }) } async fn route_zmesh() -> Json { let peers = PEERS.get().unwrap().read().unwrap().len(); Json(zmesh::MeshHealth { peers, last_gossip_ts: None }) } async fn route_zidentity() -> Json { let id = NODE_ID.get().unwrap(); Json(zid::view(id)) } #[derive(Serialize)] struct LogsView { entries: Vec } async fn route_zlogs() -> Json { let v = LOGS.get().unwrap().read().unwrap().clone(); Json(LogsView { entries: v }) } async fn route_stats() -> Json { let peers = PEERS.get().unwrap().read().unwrap().len(); let ch = CHAIN.get().unwrap(); let onboard = ch.list().into_iter().filter(|b| b.data.starts_with("onboard:")).count(); Json(Stats { peers, onboard_events: onboard, ts: zpeer::now() }) } async fn route_onboard(Json(req): Json) -> Json { let now = zpeer::now(); let ch = CHAIN.get().unwrap(); let payload = format!("onboard:{}:{}", req.user_id.clone().unwrap_or_default(), now); ch.push(payload); if let Ok(mut v) = PEERS.get().unwrap().write() { let addr = req.ip.clone().unwrap_or_else(|| req.user_id.clone().unwrap_or_else(|| "unknown".into())); v.push(zpeer::PeerInfo { addr, last_seen: now, hops: 1, score: 3, failures: 0, quarantine_until: None, backoff_secs: 60 }); } Json(serde_json::json!({"ok": true})) } async fn route_peer_seen(Json(req): Json) -> Json { let now = zpeer::now(); if let Ok(mut v) = PEERS.get().unwrap().write() { for p in v.iter_mut() { if p.addr == req.addr { p.last_seen = now; p.score += 1; } } } Json(serde_json::json!({"ok": true})) } async fn route_zlogs_stream() -> Sse>> { let rx = LOG_TX.get().unwrap().subscribe(); let stream = BroadcastStream::new(rx); let mapped = stream.map(|evt| { let e = evt.unwrap_or(LogEntry{ ts:0, path:"stream_error".into() }); Ok(Event::default().json_data(e).unwrap()) }); Sse::new(mapped).keep_alive(KeepAlive::new().interval(Duration::from_secs(15)).text("keepalive")) } #[derive(Serialize)] struct ConfigView { bootstrap: Vec, decoys: Vec, domains: Vec, nickname: String, continuity: bool } async fn route_config() -> Json { let c = APPCFG.get().unwrap(); Json(ConfigView { bootstrap: c.bootstrap.peers.clone(), decoys: c.decoys.ips.clone(), domains: c.dns.domains.clone(), nickname: c.node.nickname.clone(), continuity: c.node.continuity, }) } // Note: static UI now handled by ServeDir at /ui