|
| 1 | +use crate::graph_monitoring_subgraph::{GraphMonitoringSubgraph, OracleConfig}; |
| 2 | +use common::prelude::*; |
| 3 | +use ethers::abi::Address; |
| 4 | +use ethers::core::types::U256; |
| 5 | +use ethers::middleware::SignerMiddleware; |
| 6 | +use ethers::providers::{Http, Middleware, Provider}; |
| 7 | +use ethers::signers::{LocalWallet, Signer}; |
| 8 | +use ethers::types::TransactionRequest; |
| 9 | +use secp256k1::SecretKey; |
| 10 | +use std::sync::Arc; |
| 11 | +use std::time::Duration; |
| 12 | +use url::Url; |
| 13 | + |
| 14 | +/// Extracts a subgraph deployment ID (CID) from a gateway URL. |
| 15 | +/// Expects URLs in the format: https://gateway.thegraph.com/api/[api-key]/deployments/id/Qm... |
| 16 | +pub fn extract_deployment_id_from_url(url: &str) -> Result<String, Error> { |
| 17 | + let url = Url::parse(url).map_err(|e| anyhow!("Invalid URL: {}", e))?; |
| 18 | + |
| 19 | + let path_segments: Vec<&str> = url.path().split('/').collect(); |
| 20 | + for (i, segment) in path_segments.iter().enumerate() { |
| 21 | + if *segment == "id" && i + 1 < path_segments.len() { |
| 22 | + let deployment_id = path_segments[i + 1]; |
| 23 | + if deployment_id.starts_with("Qm") { |
| 24 | + return Ok(deployment_id.to_string()); |
| 25 | + } |
| 26 | + } |
| 27 | + } |
| 28 | + |
| 29 | + Err(anyhow!( |
| 30 | + "Could not extract deployment ID from URL: {}. Expected format: .../deployments/id/Qm...", |
| 31 | + url |
| 32 | + )) |
| 33 | +} |
| 34 | + |
| 35 | +/// Configuration needed to build an OracleConfig from CLI parameters. |
| 36 | +pub struct OracleConfigParams<'a> { |
| 37 | + pub ipfs_concurrency: usize, |
| 38 | + pub ipfs_timeout: Duration, |
| 39 | + pub min_signal: u64, |
| 40 | + pub period: Duration, |
| 41 | + pub grace_period: u64, |
| 42 | + pub supported_data_source_kinds: &'a [String], |
| 43 | + pub network_subgraph_url: &'a str, |
| 44 | + pub epoch_block_oracle_subgraph_url: &'a str, |
| 45 | + pub subgraph_availability_manager_contract: Option<Address>, |
| 46 | + pub oracle_index: Option<u64>, |
| 47 | +} |
| 48 | + |
| 49 | +/// Builds an OracleConfig from CLI config parameters. |
| 50 | +pub fn build_oracle_config(params: &OracleConfigParams) -> Result<OracleConfig, Error> { |
| 51 | + let network_subgraph_deployment_id = |
| 52 | + extract_deployment_id_from_url(params.network_subgraph_url)?; |
| 53 | + let epoch_block_oracle_subgraph_deployment_id = |
| 54 | + extract_deployment_id_from_url(params.epoch_block_oracle_subgraph_url)?; |
| 55 | + |
| 56 | + Ok(OracleConfig { |
| 57 | + version: format!("v{}", env!("CARGO_PKG_VERSION")), |
| 58 | + ipfs_concurrency: params.ipfs_concurrency.to_string(), |
| 59 | + ipfs_timeout: params.ipfs_timeout.as_millis().to_string(), |
| 60 | + min_signal: params.min_signal.to_string(), |
| 61 | + period: params.period.as_secs().to_string(), |
| 62 | + grace_period: params.grace_period.to_string(), |
| 63 | + supported_data_source_kinds: params.supported_data_source_kinds.join(","), |
| 64 | + network_subgraph_deployment_id, |
| 65 | + epoch_block_oracle_subgraph_deployment_id, |
| 66 | + subgraph_availability_manager_contract: params |
| 67 | + .subgraph_availability_manager_contract |
| 68 | + .map(|a| format!("{:?}", a)) |
| 69 | + .unwrap_or_default(), |
| 70 | + oracle_index: params |
| 71 | + .oracle_index |
| 72 | + .map(|i| i.to_string()) |
| 73 | + .unwrap_or_default(), |
| 74 | + }) |
| 75 | +} |
| 76 | + |
| 77 | +pub struct DataEdgeContract { |
| 78 | + provider: Arc<SignerMiddleware<Provider<Http>, LocalWallet>>, |
| 79 | + contract_address: Address, |
| 80 | + logger: Logger, |
| 81 | +} |
| 82 | + |
| 83 | +impl DataEdgeContract { |
| 84 | + pub async fn new( |
| 85 | + signing_key: &SecretKey, |
| 86 | + rpc_url: Url, |
| 87 | + contract_address: Address, |
| 88 | + logger: Logger, |
| 89 | + ) -> Result<Self, Error> { |
| 90 | + let http_client = reqwest::ClientBuilder::new() |
| 91 | + .tcp_nodelay(true) |
| 92 | + .timeout(Duration::from_secs(30)) |
| 93 | + .build() |
| 94 | + .unwrap(); |
| 95 | + let provider = Provider::new(Http::new_with_client(rpc_url, http_client)); |
| 96 | + let chain_id = provider.get_chainid().await?.as_u64(); |
| 97 | + let wallet = LocalWallet::from_bytes(signing_key.as_ref()) |
| 98 | + .unwrap() |
| 99 | + .with_chain_id(chain_id); |
| 100 | + let provider = Arc::new(SignerMiddleware::new(provider, wallet)); |
| 101 | + |
| 102 | + Ok(Self { |
| 103 | + provider, |
| 104 | + contract_address, |
| 105 | + logger, |
| 106 | + }) |
| 107 | + } |
| 108 | + |
| 109 | + /// Posts the oracle configuration to the DataEdge contract if it has changed. |
| 110 | + /// Returns Ok(true) if posted, Ok(false) if skipped because unchanged. |
| 111 | + pub async fn post_config_if_changed( |
| 112 | + &self, |
| 113 | + local_config: &OracleConfig, |
| 114 | + monitoring_subgraph: &impl GraphMonitoringSubgraph, |
| 115 | + oracle_index: u64, |
| 116 | + ) -> Result<bool, Error> { |
| 117 | + // Check current config from subgraph |
| 118 | + match monitoring_subgraph.fetch_oracle_config(oracle_index).await { |
| 119 | + Ok(Some(current_config)) => { |
| 120 | + if *local_config == current_config { |
| 121 | + info!(self.logger, "Config unchanged, skipping DataEdge post"; |
| 122 | + "oracle_index" => oracle_index |
| 123 | + ); |
| 124 | + return Ok(false); |
| 125 | + } else { |
| 126 | + let changed_fields = local_config.diff(¤t_config); |
| 127 | + info!(self.logger, "Config changed, will post to DataEdge"; |
| 128 | + "oracle_index" => oracle_index, |
| 129 | + "changed_fields" => changed_fields.join(",") |
| 130 | + ); |
| 131 | + } |
| 132 | + } |
| 133 | + Ok(None) => { |
| 134 | + info!(self.logger, "Oracle not found in subgraph, posting initial config"; |
| 135 | + "oracle_index" => oracle_index |
| 136 | + ); |
| 137 | + } |
| 138 | + Err(e) => { |
| 139 | + warn!(self.logger, "Failed to fetch current oracle config from subgraph, will post anyway"; |
| 140 | + "oracle_index" => oracle_index, |
| 141 | + "error" => format!("{:#}", e) |
| 142 | + ); |
| 143 | + } |
| 144 | + } |
| 145 | + |
| 146 | + self.post_config(local_config).await?; |
| 147 | + Ok(true) |
| 148 | + } |
| 149 | + |
| 150 | + /// Posts the oracle configuration to the DataEdge contract. |
| 151 | + async fn post_config(&self, config: &OracleConfig) -> Result<(), Error> { |
| 152 | + // Build the configuration JSON for posting |
| 153 | + let config_json = serde_json::json!({ |
| 154 | + "version": &config.version, |
| 155 | + "config": { |
| 156 | + "ipfs_concurrency": &config.ipfs_concurrency, |
| 157 | + "ipfs_timeout": &config.ipfs_timeout, |
| 158 | + "min_signal": &config.min_signal, |
| 159 | + "period": &config.period, |
| 160 | + "grace_period": &config.grace_period, |
| 161 | + "supported_data_source_kinds": &config.supported_data_source_kinds, |
| 162 | + "network_subgraph_deloyment_id": &config.network_subgraph_deployment_id, |
| 163 | + "epoch_block_oracle_subgraph_deloyment_id": &config.epoch_block_oracle_subgraph_deployment_id, |
| 164 | + "subgraph_availability_manager_contract": &config.subgraph_availability_manager_contract, |
| 165 | + "oracle_index": &config.oracle_index, |
| 166 | + } |
| 167 | + }); |
| 168 | + |
| 169 | + info!(self.logger, "Posting oracle configuration to DataEdge"; |
| 170 | + "version" => &config.version, |
| 171 | + "data_edge_contract" => format!("{:?}", self.contract_address), |
| 172 | + "network_subgraph_deployment_id" => &config.network_subgraph_deployment_id, |
| 173 | + "epoch_block_oracle_subgraph_deployment_id" => &config.epoch_block_oracle_subgraph_deployment_id, |
| 174 | + ); |
| 175 | + |
| 176 | + let calldata = json_oracle_encoder::json_to_calldata(config_json) |
| 177 | + .map_err(|e| anyhow!("Failed to encode config as calldata: {}", e))?; |
| 178 | + |
| 179 | + let gas_price = self.provider.get_gas_price().await?; |
| 180 | + let gas_price_with_buffer = gas_price * U256::from(120) / U256::from(100); |
| 181 | + |
| 182 | + let tx = TransactionRequest::new() |
| 183 | + .to(self.contract_address) |
| 184 | + .data(calldata.clone()); |
| 185 | + |
| 186 | + let estimated_gas = self.provider.estimate_gas(&tx.clone().into(), None).await?; |
| 187 | + let gas_with_buffer = estimated_gas * U256::from(120) / U256::from(100); |
| 188 | + |
| 189 | + let tx = tx.gas(gas_with_buffer).gas_price(gas_price_with_buffer); |
| 190 | + |
| 191 | + let pending_tx = self.provider.send_transaction(tx, None).await?; |
| 192 | + info!(self.logger, "DataEdge transaction sent, waiting for confirmation"; |
| 193 | + "tx_hash" => format!("{:?}", pending_tx.tx_hash()), |
| 194 | + "gas_price" => gas_price_with_buffer.as_u64(), |
| 195 | + "gas_limit" => gas_with_buffer.as_u64() |
| 196 | + ); |
| 197 | + |
| 198 | + let receipt = pending_tx |
| 199 | + .await? |
| 200 | + .ok_or_else(|| anyhow!("DataEdge transaction was dropped from mempool"))?; |
| 201 | + |
| 202 | + info!(self.logger, "Successfully posted config to DataEdge"; |
| 203 | + "tx_hash" => format!("{:?}", receipt.transaction_hash), |
| 204 | + "block_number" => receipt.block_number.map(|b| b.as_u64()), |
| 205 | + "gas_used" => receipt.gas_used.map(|g| g.as_u64()), |
| 206 | + ); |
| 207 | + |
| 208 | + Ok(()) |
| 209 | + } |
| 210 | +} |
| 211 | + |
| 212 | +/// Logs what would happen in dry-run mode by checking against the subgraph. |
| 213 | +pub async fn log_dry_run_config( |
| 214 | + logger: &Logger, |
| 215 | + local_config: &OracleConfig, |
| 216 | + monitoring_subgraph: Option<&impl GraphMonitoringSubgraph>, |
| 217 | + oracle_index: Option<u64>, |
| 218 | +) { |
| 219 | + if let (Some(subgraph), Some(oracle_index)) = (monitoring_subgraph, oracle_index) { |
| 220 | + match subgraph.fetch_oracle_config(oracle_index).await { |
| 221 | + Ok(Some(current_config)) => { |
| 222 | + if *local_config == current_config { |
| 223 | + info!(logger, "Config unchanged, would skip DataEdge post (dry-run)"; |
| 224 | + "oracle_index" => oracle_index |
| 225 | + ); |
| 226 | + } else { |
| 227 | + let changed_fields = local_config.diff(¤t_config); |
| 228 | + info!(logger, "Config changed, would post to DataEdge (dry-run)"; |
| 229 | + "oracle_index" => oracle_index, |
| 230 | + "changed_fields" => changed_fields.join(",") |
| 231 | + ); |
| 232 | + } |
| 233 | + } |
| 234 | + Ok(None) => { |
| 235 | + info!(logger, "Oracle not found in subgraph, would post initial config (dry-run)"; |
| 236 | + "oracle_index" => oracle_index |
| 237 | + ); |
| 238 | + } |
| 239 | + Err(e) => { |
| 240 | + warn!(logger, "Failed to fetch current config (dry-run)"; |
| 241 | + "error" => format!("{:#}", e) |
| 242 | + ); |
| 243 | + } |
| 244 | + } |
| 245 | + } |
| 246 | + |
| 247 | + info!(logger, "Local config values"; |
| 248 | + "version" => &local_config.version, |
| 249 | + "ipfs_concurrency" => &local_config.ipfs_concurrency, |
| 250 | + "ipfs_timeout" => &local_config.ipfs_timeout, |
| 251 | + "min_signal" => &local_config.min_signal, |
| 252 | + "period" => &local_config.period, |
| 253 | + "grace_period" => &local_config.grace_period, |
| 254 | + "supported_data_source_kinds" => &local_config.supported_data_source_kinds, |
| 255 | + "network_subgraph_deployment_id" => &local_config.network_subgraph_deployment_id, |
| 256 | + "epoch_block_oracle_subgraph_deployment_id" => &local_config.epoch_block_oracle_subgraph_deployment_id, |
| 257 | + "subgraph_availability_manager_contract" => &local_config.subgraph_availability_manager_contract, |
| 258 | + "oracle_index" => &local_config.oracle_index, |
| 259 | + ); |
| 260 | +} |
| 261 | + |
| 262 | +#[cfg(test)] |
| 263 | +mod tests { |
| 264 | + use super::*; |
| 265 | + |
| 266 | + #[test] |
| 267 | + fn test_extract_deployment_id_from_url_valid() { |
| 268 | + // Standard gateway URL format |
| 269 | + let url = "https://gateway.thegraph.com/api/some-api-key/deployments/id/QmSWxvd8SaQK6qZKJ7xtfxCCGoRzGnoi2WNzmJYYJW9BXY"; |
| 270 | + assert_eq!( |
| 271 | + extract_deployment_id_from_url(url).unwrap(), |
| 272 | + "QmSWxvd8SaQK6qZKJ7xtfxCCGoRzGnoi2WNzmJYYJW9BXY" |
| 273 | + ); |
| 274 | + |
| 275 | + // Another gateway URL |
| 276 | + let url = "https://gateway-arbitrum.network.thegraph.com/api/key123/deployments/id/QmQEGDTb3xeykCXLdWx7pPX3qeeGMUvHmGWP4SpMkv5QJf"; |
| 277 | + assert_eq!( |
| 278 | + extract_deployment_id_from_url(url).unwrap(), |
| 279 | + "QmQEGDTb3xeykCXLdWx7pPX3qeeGMUvHmGWP4SpMkv5QJf" |
| 280 | + ); |
| 281 | + |
| 282 | + // URL with query parameters |
| 283 | + let url = "https://gateway.thegraph.com/api/key/deployments/id/QmSWxvd8SaQK6qZKJ7xtfxCCGoRzGnoi2WNzmJYYJW9BXY?foo=bar"; |
| 284 | + assert_eq!( |
| 285 | + extract_deployment_id_from_url(url).unwrap(), |
| 286 | + "QmSWxvd8SaQK6qZKJ7xtfxCCGoRzGnoi2WNzmJYYJW9BXY" |
| 287 | + ); |
| 288 | + } |
| 289 | + |
| 290 | + #[test] |
| 291 | + fn test_extract_deployment_id_from_url_invalid() { |
| 292 | + // Missing /id/ segment |
| 293 | + let url = "https://api.thegraph.com/subgraphs/name/graphprotocol/graph-network-arbitrum"; |
| 294 | + assert!(extract_deployment_id_from_url(url).is_err()); |
| 295 | + |
| 296 | + // /id/ segment but no Qm prefix |
| 297 | + let url = "https://gateway.thegraph.com/api/key/deployments/id/not-a-cid"; |
| 298 | + assert!(extract_deployment_id_from_url(url).is_err()); |
| 299 | + |
| 300 | + // Invalid URL |
| 301 | + let url = "not-a-valid-url"; |
| 302 | + assert!(extract_deployment_id_from_url(url).is_err()); |
| 303 | + |
| 304 | + // Empty URL |
| 305 | + let url = ""; |
| 306 | + assert!(extract_deployment_id_from_url(url).is_err()); |
| 307 | + } |
| 308 | +} |
0 commit comments