diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 24993639945..dd8ed4766a2 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -107,6 +107,13 @@ jobs: command: test args: --package graph-tests --test runner_tests + - name: Run file link resolver test + id: file-link-resolver-test + uses: actions-rs/cargo@v1 + with: + command: test + args: --package graph-tests --test file_link_resolver + integration-tests: name: Run integration tests runs-on: ubuntu-latest diff --git a/.github/workflows/gnd-binary-build.yml b/.github/workflows/gnd-binary-build.yml new file mode 100644 index 00000000000..7622d608a91 --- /dev/null +++ b/.github/workflows/gnd-binary-build.yml @@ -0,0 +1,189 @@ +name: Build gnd Binaries + +on: + workflow_dispatch: + +jobs: + build: + name: Build gnd for ${{ matrix.target }} + runs-on: ${{ matrix.runner }} + strategy: + fail-fast: false + matrix: + include: + - target: x86_64-unknown-linux-gnu + runner: ubuntu-latest + asset_name: gnd-linux-x86_64 + - target: aarch64-unknown-linux-gnu + runner: ubuntu-24.04-arm + asset_name: gnd-linux-aarch64 + - target: x86_64-apple-darwin + runner: macos-13 + asset_name: gnd-macos-x86_64 + - target: aarch64-apple-darwin + runner: macos-latest + asset_name: gnd-macos-aarch64 + - target: x86_64-pc-windows-msvc + runner: windows-latest + asset_name: gnd-windows-x86_64.exe + + steps: + - name: Checkout repository + uses: actions/checkout@v4 + + - name: Install Rust toolchain + run: | + rustup toolchain install stable + rustup target add ${{ matrix.target }} + rustup default stable + + - name: Rust Cache + uses: Swatinem/rust-cache@v2 + with: + key: ${{ matrix.target }} + + - name: Install dependencies (Ubuntu) + if: startsWith(matrix.runner, 'ubuntu') + run: | + sudo apt-get update + sudo apt-get install -y libpq-dev protobuf-compiler musl-tools libssl-dev + + - name: Install dependencies (macOS) + if: startsWith(matrix.runner, 'macos') + run: | + brew install postgresql protobuf + + - name: Install protobuf (Windows) + if: startsWith(matrix.runner, 'windows') + run: choco install protoc + + - name: Cache vcpkg + uses: actions/cache@v4 + if: startsWith(matrix.runner, 'windows') + id: vcpkg-cache + with: + path: | + ${{ github.workspace }}/vcpkg + C:/vcpkg/installed + C:/vcpkg/packages + key: ${{ runner.os }}-vcpkg-${{ hashFiles('**/Cargo.lock') }} + restore-keys: | + ${{ runner.os }}-vcpkg- + + - name: Install vcpkg and dependencies (Windows) + if: startsWith(matrix.runner, 'windows') && steps.vcpkg-cache.outputs.cache-hit != 'true' + run: | + # Install vcpkg + git clone https://github.com/microsoft/vcpkg.git + cd vcpkg + .\bootstrap-vcpkg.bat + + # Install libpq using vcpkg + .\vcpkg.exe install libpq:x64-windows + shell: pwsh + + - name: Set Windows environment variables + if: startsWith(matrix.runner, 'windows') + run: | + echo "VCPKG_ROOT=${{ github.workspace }}/vcpkg" | Out-File -FilePath $env:GITHUB_ENV -Append + echo "LIBPQ_DIR=${{ github.workspace }}/vcpkg/installed/x64-windows" | Out-File -FilePath $env:GITHUB_ENV -Append + echo "RUSTFLAGS=-L ${{ github.workspace }}/vcpkg/installed/x64-windows/lib" | Out-File -FilePath $env:GITHUB_ENV -Append + shell: pwsh + + - name: Build gnd binary (Unix/Mac) + if: ${{ !startsWith(matrix.runner, 'windows') }} + run: cargo build --bin gnd --release --target ${{ matrix.target }} + + - name: Build gnd binary (Windows) + if: startsWith(matrix.runner, 'windows') + run: cargo build --bin gnd --release --target ${{ matrix.target }} + env: + LIBPQ_DIR: ${{ format('{0}/vcpkg/installed/x64-windows', github.workspace) }} + VCPKGRS_DYNAMIC: 1 + + - name: Sign macOS binary + if: startsWith(matrix.runner, 'macos') + uses: lando/code-sign-action@v3 + with: + file: target/${{ matrix.target }}/release/gnd + certificate-data: ${{ secrets.APPLE_CERT_DATA }} + certificate-password: ${{ secrets.APPLE_CERT_PASSWORD }} + certificate-id: ${{ secrets.APPLE_TEAM_ID }} + options: --options runtime + + - name: Notarize macOS binary + if: startsWith(matrix.runner, 'macos') + uses: lando/notarize-action@v2 + with: + product-path: target/${{ matrix.target }}/release/gnd + appstore-connect-username: ${{ secrets.NOTARIZATION_USERNAME }} + appstore-connect-password: ${{ secrets.NOTARIZATION_PASSWORD }} + appstore-connect-team-id: ${{ secrets.APPLE_TEAM_ID }} + + - name: Prepare binary (Unix) + if: ${{ !startsWith(matrix.runner, 'windows') }} + run: | + cp target/${{ matrix.target }}/release/gnd ${{ matrix.asset_name }} + chmod +x ${{ matrix.asset_name }} + gzip ${{ matrix.asset_name }} + + - name: Prepare binary (Windows) + if: startsWith(matrix.runner, 'windows') + run: | + copy target\${{ matrix.target }}\release\gnd.exe ${{ matrix.asset_name }} + 7z a -tzip ${{ matrix.asset_name }}.zip ${{ matrix.asset_name }} + + - name: Upload artifact + uses: actions/upload-artifact@v4 + with: + name: ${{ matrix.asset_name }} + path: | + ${{ matrix.asset_name }}.gz + ${{ matrix.asset_name }}.zip + if-no-files-found: error + + release: + name: Create Release + needs: build + if: startsWith(github.ref, 'refs/tags/') + runs-on: ubuntu-latest + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Setup GitHub CLI + run: | + # GitHub CLI is pre-installed on GitHub-hosted runners + gh --version + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + + - name: Download all artifacts + uses: actions/download-artifact@v4 + with: + path: artifacts + + - name: Display structure of downloaded artifacts + run: ls -R artifacts + + - name: Upload Assets to Release + run: | + # Extract version from ref (remove refs/tags/ prefix) + VERSION=${GITHUB_REF#refs/tags/} + + # Upload Linux x86_64 asset + gh release upload $VERSION artifacts/gnd-linux-x86_64/gnd-linux-x86_64.gz --repo $GITHUB_REPOSITORY + + # Upload Linux ARM64 asset + gh release upload $VERSION artifacts/gnd-linux-aarch64/gnd-linux-aarch64.gz --repo $GITHUB_REPOSITORY + + # Upload macOS x86_64 asset + gh release upload $VERSION artifacts/gnd-macos-x86_64/gnd-macos-x86_64.gz --repo $GITHUB_REPOSITORY + + # Upload macOS ARM64 asset + gh release upload $VERSION artifacts/gnd-macos-aarch64/gnd-macos-aarch64.gz --repo $GITHUB_REPOSITORY + + # Upload Windows x86_64 asset + gh release upload $VERSION artifacts/gnd-windows-x86_64.exe/gnd-windows-x86_64.exe.zip --repo $GITHUB_REPOSITORY + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 4515f57f7b9..51ae390703d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -540,9 +540,9 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" [[package]] name = "bitflags" -version = "2.6.0" +version = "2.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b048fb63fd8b5923fc5aa7b340d8e156aec7ec02f0c78fa8a6ddc2613f6f71de" +checksum = "5c8214115b7bf84099f1309324e63141d4c5d7cc26862f97a0a857dbefe165bd" [[package]] name = "bitvec" @@ -1235,7 +1235,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "04001f23ba8843dc315804fa324000376084dfb1c30794ff68dd279e6e5696d5" dependencies = [ "bigdecimal 0.3.1", - "bitflags 2.6.0", + "bitflags 2.9.0", "byteorder", "chrono", "diesel_derives", @@ -1562,6 +1562,18 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9fc0510504f03c51ada170672ac806f1f105a88aa97a5281117e1ddc3368e51a" +[[package]] +name = "filetime" +version = "0.2.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "35c0522e981e68cbfa8c3f978441a5f34b30b96e146b33cd3359176b50fe8586" +dependencies = [ + "cfg-if 1.0.0", + "libc", + "libredox", + "windows-sys 0.59.0", +] + [[package]] name = "firestorm" version = "0.4.6" @@ -1638,6 +1650,15 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "fsevent-sys" +version = "4.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76ee7a02da4d231650c7cea31349b889be2f45ddb3ef3032d2ec8185f6313fd2" +dependencies = [ + "libc", +] + [[package]] name = "funty" version = "2.0.0" @@ -1761,7 +1782,7 @@ version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "27d12c0aed7f1e24276a241aadc4cb8ea9f83000f34bc062b7cc2d51e3b0fabd" dependencies = [ - "bitflags 2.6.0", + "bitflags 2.9.0", "debugid", "fxhash", "serde", @@ -1842,9 +1863,9 @@ dependencies = [ [[package]] name = "globset" -version = "0.4.14" +version = "0.4.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "57da3b9b5b85bd66f31093f8c408b90a74431672542466497dcbdfdc02034be1" +checksum = "54a1028dfc5f5df5da8a56a73e6c153c9a9708ec57232470703592a3f18e49f5" dependencies = [ "aho-corasick", "bstr", @@ -2062,6 +2083,7 @@ dependencies = [ "diesel", "env_logger", "git-testament", + "globset", "graph", "graph-chain-arweave", "graph-chain-ethereum", @@ -2079,6 +2101,8 @@ dependencies = [ "itertools 0.13.0", "json-structural-diff", "lazy_static", + "notify", + "pgtemp", "prometheus", "serde", "shellexpand", @@ -2929,6 +2953,26 @@ dependencies = [ "serde", ] +[[package]] +name = "inotify" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f37dccff2791ab604f9babef0ba14fbe0be30bd368dc541e2b08d07c8aa908f3" +dependencies = [ + "bitflags 2.9.0", + "inotify-sys", + "libc", +] + +[[package]] +name = "inotify-sys" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e05c02b5e89bff3b946cedeca278abc628fe811e604f027c45a8aa3cf793d0eb" +dependencies = [ + "libc", +] + [[package]] name = "ipnet" version = "2.9.0" @@ -3134,6 +3178,26 @@ dependencies = [ "cpufeatures", ] +[[package]] +name = "kqueue" +version = "1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7447f1ca1b7b563588a205fe93dea8df60fd981423a768bc1c0ded35ed147d0c" +dependencies = [ + "kqueue-sys", + "libc", +] + +[[package]] +name = "kqueue-sys" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed9625ffda8729b85e45cf04090035ac368927b8cebc34898e7c120f52e4838b" +dependencies = [ + "bitflags 1.3.2", + "libc", +] + [[package]] name = "lazy_static" version = "1.5.0" @@ -3158,8 +3222,9 @@ version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c0ff37bd590ca25063e35af745c343cb7a0271906fb7b37e4813e8f79f00268d" dependencies = [ - "bitflags 2.6.0", + "bitflags 2.9.0", "libc", + "redox_syscall 0.5.2", ] [[package]] @@ -3322,6 +3387,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2886843bf800fba2e3377cff24abf6379b4c4d5c6681eaf9ea5b0d15090450bd" dependencies = [ "libc", + "log", "wasi 0.11.0+wasi-snapshot-preview1", "windows-sys 0.52.0", ] @@ -3393,6 +3459,31 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c96aba5aa877601bb3f6dd6a63a969e1f82e60646e81e71b14496995e9853c91" +[[package]] +name = "notify" +version = "8.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2fee8403b3d66ac7b26aee6e40a897d85dc5ce26f44da36b8b73e987cc52e943" +dependencies = [ + "bitflags 2.9.0", + "filetime", + "fsevent-sys", + "inotify", + "kqueue", + "libc", + "log", + "mio", + "notify-types", + "walkdir", + "windows-sys 0.59.0", +] + +[[package]] +name = "notify-types" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e0826a989adedc2a244799e823aece04662b66609d96af8dff7ac6df9a8925d" + [[package]] name = "num-bigint" version = "0.2.6" @@ -3518,7 +3609,7 @@ version = "0.10.72" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fedfea7d58a1f73118430a55da6a286e7b044961736ce96a16a17068ea25e5da" dependencies = [ - "bitflags 2.6.0", + "bitflags 2.9.0", "cfg-if 1.0.0", "foreign-types", "libc", @@ -3544,6 +3635,15 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" +[[package]] +name = "openssl-src" +version = "300.5.0+3.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8ce546f549326b0e6052b649198487d91320875da901e7bd11a06d1ee3f9c2f" +dependencies = [ + "cc", +] + [[package]] name = "openssl-sys" version = "0.9.107" @@ -3552,6 +3652,7 @@ checksum = "8288979acd84749c744a9014b4382d42b8f7b2592847b5afb2ed29e5d16ede07" dependencies = [ "cc", "libc", + "openssl-src", "pkg-config", "vcpkg", ] @@ -3705,6 +3806,17 @@ dependencies = [ "serde", ] +[[package]] +name = "pgtemp" +version = "0.6.0" +source = "git+https://github.com/incrypto32/pgtemp?branch=initdb-args#929a9f96eab841d880c2ebf280e00054ca55ec0e" +dependencies = [ + "libc", + "tempfile", + "tokio", + "url", +] + [[package]] name = "phf" version = "0.11.2" @@ -4213,7 +4325,7 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c82cf8cff14456045f55ec4241383baeff27af886adb72ffb2162f99911de0fd" dependencies = [ - "bitflags 2.6.0", + "bitflags 2.9.0", ] [[package]] @@ -4385,7 +4497,7 @@ version = "0.38.34" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "70dc5ec042f7a43c4a73241207cecc9873a06d45debb38b329f8541d85c2730f" dependencies = [ - "bitflags 2.6.0", + "bitflags 2.9.0", "errno", "libc", "linux-raw-sys", @@ -4528,7 +4640,7 @@ version = "2.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c627723fd09706bacdb5cf41499e95098555af3c3c29d014dc3c458ef6be11c0" dependencies = [ - "bitflags 2.6.0", + "bitflags 2.9.0", "core-foundation 0.9.4", "core-foundation-sys", "libc", @@ -4541,7 +4653,7 @@ version = "3.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "271720403f46ca04f7ba6f55d438f8bd878d6b8ca0a1046e8228c4145bcbb316" dependencies = [ - "bitflags 2.6.0", + "bitflags 2.9.0", "core-foundation 0.10.0", "core-foundation-sys", "libc", @@ -5156,7 +5268,7 @@ version = "0.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3c879d448e9d986b661742763247d3693ed13609438cf3d006f51f5368a5ba6b" dependencies = [ - "bitflags 2.6.0", + "bitflags 2.9.0", "core-foundation 0.9.4", "system-configuration-sys", ] @@ -5700,7 +5812,7 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e9cd434a998747dd2c4276bc96ee2e0c7a2eadf3cae88e52be55a05fa9053f5" dependencies = [ - "bitflags 2.6.0", + "bitflags 2.9.0", "bytes", "http 1.1.0", "http-body 1.0.0", @@ -6896,7 +7008,7 @@ version = "0.33.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3268f3d866458b787f390cf61f4bbb563b922d091359f9608842999eaee3943c" dependencies = [ - "bitflags 2.6.0", + "bitflags 2.9.0", ] [[package]] diff --git a/chain/ethereum/src/transport.rs b/chain/ethereum/src/transport.rs index a90a6b9720b..ef571efacb8 100644 --- a/chain/ethereum/src/transport.rs +++ b/chain/ethereum/src/transport.rs @@ -32,6 +32,11 @@ impl Transport { .expect("Failed to connect to Ethereum IPC") } + #[cfg(not(unix))] + pub async fn new_ipc(_ipc: &str) -> Self { + panic!("IPC connections are not supported on non-Unix platforms") + } + /// Creates a WebSocket transport. pub async fn new_ws(ws: &str) -> Self { ws::WebSocket::new(ws) diff --git a/chain/substreams/src/data_source.rs b/chain/substreams/src/data_source.rs index dff2cfa31c4..3969f83e373 100644 --- a/chain/substreams/src/data_source.rs +++ b/chain/substreams/src/data_source.rs @@ -705,6 +705,10 @@ mod test { unimplemented!() } + fn for_manifest(&self, _manifest_path: &str) -> Result, Error> { + unimplemented!() + } + async fn cat(&self, _logger: &Logger, _link: &Link) -> Result, Error> { Ok(gen_package().encode_to_vec()) } diff --git a/core/src/subgraph/instance_manager.rs b/core/src/subgraph/instance_manager.rs index 8c2b76e5b6c..f6e1e7ffbe1 100644 --- a/core/src/subgraph/instance_manager.rs +++ b/core/src/subgraph/instance_manager.rs @@ -287,7 +287,12 @@ impl SubgraphInstanceManager { let manifest = UnresolvedSubgraphManifest::parse(deployment.hash.cheap_clone(), manifest)?; // Allow for infinite retries for subgraph definition files. - let link_resolver = Arc::from(self.link_resolver.with_retries()); + let link_resolver = Arc::from( + self.link_resolver + .for_manifest(&deployment.hash.to_string()) + .map_err(SubgraphRegistrarError::Unknown)? + .with_retries(), + ); // Make sure the `raw_yaml` is present on both this subgraph and the graft base. self.subgraph_store diff --git a/core/src/subgraph/provider.rs b/core/src/subgraph/provider.rs index 00d379db01f..9ad50f43942 100644 --- a/core/src/subgraph/provider.rs +++ b/core/src/subgraph/provider.rs @@ -86,8 +86,12 @@ impl SubgraphAssignmentProviderTrait for SubgraphAss )); } - let file_bytes = self + let link_resolver = self .link_resolver + .for_manifest(&loc.hash.to_string()) + .map_err(SubgraphAssignmentProviderError::ResolveError)?; + + let file_bytes = link_resolver .cat(&logger, &loc.hash.to_ipfs_link()) .await .map_err(SubgraphAssignmentProviderError::ResolveError)?; diff --git a/core/src/subgraph/registrar.rs b/core/src/subgraph/registrar.rs index 6f7ae17425f..fa0c31390e0 100644 --- a/core/src/subgraph/registrar.rs +++ b/core/src/subgraph/registrar.rs @@ -278,6 +278,7 @@ where start_block_override: Option, graft_block_override: Option, history_blocks: Option, + ignore_graft_base: bool, ) -> Result { // We don't have a location for the subgraph yet; that will be // assigned when we deploy for real. For logging purposes, make up a @@ -286,19 +287,33 @@ where .logger_factory .subgraph_logger(&DeploymentLocator::new(DeploymentId(0), hash.clone())); - let raw: serde_yaml::Mapping = { - let file_bytes = self - .resolver - .cat(&logger, &hash.to_ipfs_link()) - .await - .map_err(|e| { - SubgraphRegistrarError::ResolveError( - SubgraphManifestResolveError::ResolveError(e), - ) - })?; - - serde_yaml::from_slice(&file_bytes) - .map_err(|e| SubgraphRegistrarError::ResolveError(e.into()))? + let resolver: Arc = Arc::from( + self.resolver + .for_manifest(&hash.to_string()) + .map_err(SubgraphRegistrarError::Unknown)?, + ); + + let raw = { + let mut raw: serde_yaml::Mapping = { + let file_bytes = + resolver + .cat(&logger, &hash.to_ipfs_link()) + .await + .map_err(|e| { + SubgraphRegistrarError::ResolveError( + SubgraphManifestResolveError::ResolveError(e), + ) + })?; + + serde_yaml::from_slice(&file_bytes) + .map_err(|e| SubgraphRegistrarError::ResolveError(e.into()))? + }; + + if ignore_graft_base { + raw.remove("graft"); + } + + raw }; let kind = BlockchainKind::from_manifest(&raw).map_err(|e| { @@ -323,7 +338,7 @@ where node_id, debug_fork, self.version_switching_mode, - &self.resolver, + &resolver, history_blocks, ) .await? @@ -341,7 +356,7 @@ where node_id, debug_fork, self.version_switching_mode, - &self.resolver, + &resolver, history_blocks, ) .await? @@ -359,7 +374,7 @@ where node_id, debug_fork, self.version_switching_mode, - &self.resolver, + &resolver, history_blocks, ) .await? @@ -377,7 +392,7 @@ where node_id, debug_fork, self.version_switching_mode, - &self.resolver, + &resolver, history_blocks, ) .await? @@ -567,10 +582,11 @@ async fn create_subgraph_version( history_blocks_override: Option, ) -> Result { let raw_string = serde_yaml::to_string(&raw).unwrap(); + let unvalidated = UnvalidatedSubgraphManifest::::resolve( deployment.clone(), raw, - resolver, + &resolver, logger, ENV_VARS.max_spec_version.clone(), ) diff --git a/graph/src/components/link_resolver/file.rs b/graph/src/components/link_resolver/file.rs new file mode 100644 index 00000000000..3d78bb9244d --- /dev/null +++ b/graph/src/components/link_resolver/file.rs @@ -0,0 +1,307 @@ +use std::collections::HashMap; +use std::path::{Path, PathBuf}; +use std::time::Duration; + +use anyhow::anyhow; +use async_trait::async_trait; +use slog::Logger; + +use crate::data::subgraph::Link; +use crate::prelude::{Error, JsonValueStream, LinkResolver as LinkResolverTrait}; + +#[derive(Clone, Debug)] +pub struct FileLinkResolver { + base_dir: Option, + timeout: Duration, + // This is a hashmap that maps the alias name to the path of the file that is aliased + aliases: HashMap, +} + +impl Default for FileLinkResolver { + fn default() -> Self { + Self { + base_dir: None, + timeout: Duration::from_secs(30), + aliases: HashMap::new(), + } + } +} + +impl FileLinkResolver { + /// Create a new FileLinkResolver + /// + /// All paths are treated as absolute paths. + pub fn new(base_dir: Option, aliases: HashMap) -> Self { + Self { + base_dir: base_dir, + timeout: Duration::from_secs(30), + aliases, + } + } + + /// Create a new FileLinkResolver with a base directory + /// + /// All paths that are not absolute will be considered + /// relative to this base directory. + pub fn with_base_dir>(base_dir: P) -> Self { + Self { + base_dir: Some(base_dir.as_ref().to_owned()), + timeout: Duration::from_secs(30), + aliases: HashMap::new(), + } + } + + fn resolve_path(&self, link: &str) -> PathBuf { + let path = Path::new(link); + + // If the path is an alias, use the aliased path + if let Some(aliased) = self.aliases.get(link) { + return aliased.clone(); + } + + // Return the path as is if base_dir is None, or join with base_dir if present. + // if "link" is an absolute path, join will simply return that path. + self.base_dir + .as_ref() + .map_or_else(|| path.to_owned(), |base_dir| base_dir.join(link)) + } + + /// This method creates a new resolver that is scoped to a specific subgraph + /// It will set the base directory to the parent directory of the manifest path + /// This is required because paths mentioned in the subgraph manifest are relative paths + /// and we need a new resolver with the right base directory for the specific subgraph + fn clone_for_manifest(&self, manifest_path_str: &str) -> Result { + let mut resolver = self.clone(); + + // Create a path to the manifest based on the current resolver's + // base directory or default to using the deployment string as path + // If the deployment string is an alias, use the aliased path + let manifest_path = if let Some(aliased) = self.aliases.get(&manifest_path_str.to_string()) + { + aliased.clone() + } else { + match &resolver.base_dir { + Some(dir) => dir.join(&manifest_path_str), + None => PathBuf::from(manifest_path_str), + } + }; + + let canonical_manifest_path = manifest_path + .canonicalize() + .map_err(|e| Error::from(anyhow!("Failed to canonicalize manifest path: {}", e)))?; + + // The manifest path is the path of the subgraph manifest file in the build directory + // We use the parent directory as the base directory for the new resolver + let base_dir = canonical_manifest_path + .parent() + .ok_or_else(|| Error::from(anyhow!("Manifest path has no parent directory")))? + .to_path_buf(); + + resolver.base_dir = Some(base_dir); + Ok(resolver) + } +} + +pub fn remove_prefix(link: &str) -> &str { + const IPFS: &str = "/ipfs/"; + if link.starts_with(IPFS) { + &link[IPFS.len()..] + } else { + link + } +} + +#[async_trait] +impl LinkResolverTrait for FileLinkResolver { + fn with_timeout(&self, timeout: Duration) -> Box { + let mut resolver = self.clone(); + resolver.timeout = timeout; + Box::new(resolver) + } + + fn with_retries(&self) -> Box { + Box::new(self.clone()) + } + + async fn cat(&self, logger: &Logger, link: &Link) -> Result, Error> { + let link = remove_prefix(&link.link); + let path = self.resolve_path(&link); + + slog::debug!(logger, "File resolver: reading file"; + "path" => path.to_string_lossy().to_string()); + + match tokio::fs::read(&path).await { + Ok(data) => Ok(data), + Err(e) => { + slog::error!(logger, "Failed to read file"; + "path" => path.to_string_lossy().to_string(), + "error" => e.to_string()); + Err(anyhow!("Failed to read file {}: {}", path.display(), e).into()) + } + } + } + + fn for_manifest(&self, manifest_path: &str) -> Result, Error> { + Ok(Box::new(self.clone_for_manifest(manifest_path)?)) + } + + async fn get_block(&self, _logger: &Logger, _link: &Link) -> Result, Error> { + Err(anyhow!("get_block is not implemented for FileLinkResolver").into()) + } + + async fn json_stream(&self, _logger: &Logger, _link: &Link) -> Result { + Err(anyhow!("json_stream is not implemented for FileLinkResolver").into()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::env; + use std::fs; + use std::io::Write; + + #[tokio::test] + async fn test_file_resolver_absolute() { + // Test the resolver without a base directory (absolute paths only) + + // Create a temporary directory for test files + let temp_dir = env::temp_dir().join("file_resolver_test"); + let _ = fs::create_dir_all(&temp_dir); + + // Create a test file in the temp directory + let test_file_path = temp_dir.join("test.txt"); + let test_content = b"Hello, world!"; + let mut file = fs::File::create(&test_file_path).unwrap(); + file.write_all(test_content).unwrap(); + + // Create a resolver without a base directory + let resolver = FileLinkResolver::default(); + let logger = slog::Logger::root(slog::Discard, slog::o!()); + + // Test valid path resolution + let link = Link { + link: test_file_path.to_string_lossy().to_string(), + }; + let result = resolver.cat(&logger, &link).await.unwrap(); + assert_eq!(result, test_content); + + // Test path with leading slash that likely doesn't exist + let link = Link { + link: "/test.txt".to_string(), + }; + let result = resolver.cat(&logger, &link).await; + assert!( + result.is_err(), + "Reading /test.txt should fail as it doesn't exist" + ); + + // Clean up + let _ = fs::remove_file(test_file_path); + let _ = fs::remove_dir(temp_dir); + } + + #[tokio::test] + async fn test_file_resolver_with_base_dir() { + // Test the resolver with a base directory + + // Create a temporary directory for test files + let temp_dir = env::temp_dir().join("file_resolver_test_base_dir"); + let _ = fs::create_dir_all(&temp_dir); + + // Create a test file in the temp directory + let test_file_path = temp_dir.join("test.txt"); + let test_content = b"Hello from base dir!"; + let mut file = fs::File::create(&test_file_path).unwrap(); + file.write_all(test_content).unwrap(); + + // Create a resolver with a base directory + let resolver = FileLinkResolver::with_base_dir(&temp_dir); + let logger = slog::Logger::root(slog::Discard, slog::o!()); + + // Test relative path (no leading slash) + let link = Link { + link: "test.txt".to_string(), + }; + let result = resolver.cat(&logger, &link).await.unwrap(); + assert_eq!(result, test_content); + + // Test absolute path + let link = Link { + link: test_file_path.to_string_lossy().to_string(), + }; + let result = resolver.cat(&logger, &link).await.unwrap(); + assert_eq!(result, test_content); + + // Test missing file + let link = Link { + link: "missing.txt".to_string(), + }; + let result = resolver.cat(&logger, &link).await; + assert!(result.is_err()); + + // Clean up + let _ = fs::remove_file(test_file_path); + let _ = fs::remove_dir(temp_dir); + } + + #[tokio::test] + async fn test_file_resolver_with_aliases() { + // Create a temporary directory for test files + let temp_dir = env::temp_dir().join("file_resolver_test_aliases"); + let _ = fs::create_dir_all(&temp_dir); + + // Create two test files with different content + let test_file1_path = temp_dir.join("file.txt"); + let test_content1 = b"This is the file content"; + let mut file1 = fs::File::create(&test_file1_path).unwrap(); + file1.write_all(test_content1).unwrap(); + + let test_file2_path = temp_dir.join("another_file.txt"); + let test_content2 = b"This is another file content"; + let mut file2 = fs::File::create(&test_file2_path).unwrap(); + file2.write_all(test_content2).unwrap(); + + // Create aliases mapping + let mut aliases = HashMap::new(); + aliases.insert("alias1".to_string(), test_file1_path.clone()); + aliases.insert("alias2".to_string(), test_file2_path.clone()); + aliases.insert("deployment-id".to_string(), test_file1_path.clone()); + + // Create resolver with aliases + let resolver = FileLinkResolver::new(Some(temp_dir.clone()), aliases); + let logger = slog::Logger::root(slog::Discard, slog::o!()); + + // Test resolving by aliases + let link1 = Link { + link: "alias1".to_string(), + }; + let result1 = resolver.cat(&logger, &link1).await.unwrap(); + assert_eq!(result1, test_content1); + + let link2 = Link { + link: "alias2".to_string(), + }; + let result2 = resolver.cat(&logger, &link2).await.unwrap(); + assert_eq!(result2, test_content2); + + // Test that the alias works in for_deployment as well + let deployment_resolver = resolver.clone_for_manifest("deployment-id").unwrap(); + + let expected_dir = test_file1_path.parent().unwrap(); + let deployment_base_dir = deployment_resolver.base_dir.clone().unwrap(); + + let canonical_expected_dir = expected_dir.canonicalize().unwrap(); + let canonical_deployment_dir = deployment_base_dir.canonicalize().unwrap(); + + assert_eq!( + canonical_deployment_dir, canonical_expected_dir, + "Build directory paths don't match" + ); + + // Clean up + let _ = fs::remove_file(test_file1_path); + let _ = fs::remove_file(test_file2_path); + let _ = fs::remove_dir(temp_dir); + } +} diff --git a/graph/src/components/link_resolver/ipfs.rs b/graph/src/components/link_resolver/ipfs.rs index 9ecf4ff02e3..d81944ab70f 100644 --- a/graph/src/components/link_resolver/ipfs.rs +++ b/graph/src/components/link_resolver/ipfs.rs @@ -74,6 +74,10 @@ impl LinkResolverTrait for IpfsResolver { Box::new(s) } + fn for_manifest(&self, _manifest_path: &str) -> Result, Error> { + Ok(Box::new(self.cheap_clone())) + } + async fn cat(&self, logger: &Logger, link: &Link) -> Result, Error> { let path = ContentPath::new(&link.link)?; let timeout = self.timeout; diff --git a/graph/src/components/link_resolver/mod.rs b/graph/src/components/link_resolver/mod.rs index 1115b59cdc3..4788a9bd51f 100644 --- a/graph/src/components/link_resolver/mod.rs +++ b/graph/src/components/link_resolver/mod.rs @@ -7,10 +7,12 @@ use crate::prelude::Error; use std::fmt::Debug; mod arweave; +mod file; mod ipfs; pub use arweave::*; use async_trait::async_trait; +pub use file::*; pub use ipfs::*; /// Resolves links to subgraph manifests and resources referenced by them. @@ -28,6 +30,20 @@ pub trait LinkResolver: Send + Sync + 'static + Debug { /// Fetches the IPLD block contents as bytes. async fn get_block(&self, logger: &Logger, link: &Link) -> Result, Error>; + /// Creates a new resolver scoped to a specific subgraph manifest. + /// + /// For FileLinkResolver, this sets the base directory to the manifest's parent directory. + /// Note the manifest here is the manifest in the build directory, not the manifest in the source directory + /// to properly resolve relative paths referenced in the manifest (schema, mappings, etc.). + /// For other resolvers (IPFS/Arweave), this simply returns a clone since they use + /// absolute content identifiers. + /// + /// The `manifest_path` parameter can be a filesystem path or an alias. Aliases are used + /// in development environments (via `gnd --sources`) to map user-defined + /// aliases to actual subgraph paths, enabling local development with file-based + /// subgraphs that reference each other. + fn for_manifest(&self, manifest_path: &str) -> Result, Error>; + /// Read the contents of `link` and deserialize them into a stream of JSON /// values. The values must each be on a single line; newlines are significant /// as they are used to split the file contents and each line is deserialized diff --git a/graph/src/components/store/traits.rs b/graph/src/components/store/traits.rs index 063ca4aa010..9f066bb62f6 100644 --- a/graph/src/components/store/traits.rs +++ b/graph/src/components/store/traits.rs @@ -108,6 +108,8 @@ pub trait SubgraphStore: Send + Sync + 'static { node_id: &NodeId, ) -> Result<(), StoreError>; + fn unassign_subgraph(&self, deployment: &DeploymentLocator) -> Result<(), StoreError>; + fn pause_subgraph(&self, deployment: &DeploymentLocator) -> Result<(), StoreError>; fn resume_subgraph(&self, deployment: &DeploymentLocator) -> Result<(), StoreError>; diff --git a/graph/src/components/subgraph/registrar.rs b/graph/src/components/subgraph/registrar.rs index 691c341e38b..361a704e754 100644 --- a/graph/src/components/subgraph/registrar.rs +++ b/graph/src/components/subgraph/registrar.rs @@ -45,6 +45,7 @@ pub trait SubgraphRegistrar: Send + Sync + 'static { start_block_block: Option, graft_block_override: Option, history_blocks: Option, + ignore_graft_base: bool, ) -> Result; async fn remove_subgraph(&self, name: SubgraphName) -> Result<(), SubgraphRegistrarError>; diff --git a/graph/src/data/subgraph/mod.rs b/graph/src/data/subgraph/mod.rs index 77c8ba67d36..10c4e471e38 100644 --- a/graph/src/data/subgraph/mod.rs +++ b/graph/src/data/subgraph/mod.rs @@ -116,20 +116,23 @@ impl DeploymentHash { pub fn new(s: impl Into) -> Result { let s = s.into(); - // Enforce length limit - if s.len() > 46 { - return Err(s); - } + // When the disable_deployment_hash_validation flag is set, we skip the validation + if !ENV_VARS.disable_deployment_hash_validation { + // Enforce length limit + if s.len() > 46 { + return Err(s); + } - // Check that the ID contains only allowed characters. - if !s.chars().all(|c| c.is_ascii_alphanumeric() || c == '_') { - return Err(s); - } + // Check that the ID contains only allowed characters. + if !s.chars().all(|c| c.is_ascii_alphanumeric() || c == '_') { + return Err(s); + } - // Allow only deployment id's for 'real' subgraphs, not the old - // metadata subgraph. - if s == "subgraphs" { - return Err(s); + // Allow only deployment id's for 'real' subgraphs, not the old + // metadata subgraph. + if s == "subgraphs" { + return Err(s); + } } Ok(DeploymentHash(s)) @@ -397,12 +400,65 @@ impl From> for DataSourceContext { } /// IPLD link. -#[derive(Clone, Debug, Default, Hash, Eq, PartialEq, Deserialize)] +#[derive(Clone, Debug, Default, Hash, Eq, PartialEq)] pub struct Link { - #[serde(rename = "/")] pub link: String, } +/// Custom deserializer for Link +/// This handles both formats: +/// 1. Simple string: "schema.graphql" or "subgraph.yaml" which is used in [`FileLinkResolver`] +/// FileLinkResolver is used in local development environments +/// 2. IPLD format: { "/": "Qm..." } which is used in [`IpfsLinkResolver`] +impl<'de> de::Deserialize<'de> for Link { + fn deserialize(deserializer: D) -> Result + where + D: de::Deserializer<'de>, + { + struct LinkVisitor; + + impl<'de> de::Visitor<'de> for LinkVisitor { + type Value = Link; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + formatter.write_str("string or map with '/' key") + } + + fn visit_str(self, value: &str) -> Result + where + E: de::Error, + { + Ok(Link { + link: value.to_string(), + }) + } + + fn visit_map(self, mut map: A) -> Result + where + A: de::MapAccess<'de>, + { + let mut link = None; + + while let Some(key) = map.next_key::()? { + if key == "/" { + if link.is_some() { + return Err(de::Error::duplicate_field("/")); + } + link = Some(map.next_value()?); + } else { + return Err(de::Error::unknown_field(&key, &["/"])); + } + } + + link.map(|l: String| Link { link: l }) + .ok_or_else(|| de::Error::missing_field("/")) + } + } + + deserializer.deserialize_any(LinkVisitor) + } +} + impl From for Link { fn from(s: S) -> Self { Self { diff --git a/graph/src/data_source/subgraph.rs b/graph/src/data_source/subgraph.rs index 87b44e66174..b1b83a5137b 100644 --- a/graph/src/data_source/subgraph.rs +++ b/graph/src/data_source/subgraph.rs @@ -259,6 +259,8 @@ impl UnresolvedDataSource { resolver: &Arc, logger: &Logger, ) -> Result>, Error> { + let resolver: Arc = + Arc::from(resolver.for_manifest(&self.source.address.to_string())?); let source_raw = resolver .cat(logger, &self.source.address.to_ipfs_link()) .await @@ -281,8 +283,10 @@ impl UnresolvedDataSource { self.source.address ))?; + let resolver: Arc = + Arc::from(resolver.for_manifest(&self.source.address.to_string())?); source_manifest - .resolve(resolver, logger, LATEST_VERSION.clone()) + .resolve(&resolver, logger, LATEST_VERSION.clone()) .await .context(format!( "Failed to resolve source subgraph [{}] manifest", diff --git a/graph/src/env/mod.rs b/graph/src/env/mod.rs index eff0ebea16e..9611bd2726a 100644 --- a/graph/src/env/mod.rs +++ b/graph/src/env/mod.rs @@ -225,6 +225,12 @@ pub struct EnvVars { /// if no genesis hash can be retrieved from an adapter. If enabled, the adapter is /// ignored if unable to produce a genesis hash or produces a different an unexpected hash. pub genesis_validation_enabled: bool, + /// Whether to enforce deployment hash validation rules. + /// When disabled, any string can be used as a deployment hash. + /// When enabled, deployment hashes must meet length and character constraints. + /// + /// Set by the flag `GRAPH_NODE_DISABLE_DEPLOYMENT_HASH_VALIDATION`. Enabled by default. + pub disable_deployment_hash_validation: bool, /// How long do we wait for a response from the provider before considering that it is unavailable. /// Default is 30s. pub genesis_validation_timeout: Duration, @@ -332,6 +338,7 @@ impl EnvVars { section_map: inner.section_map, firehose_grpc_max_decode_size_mb: inner.firehose_grpc_max_decode_size_mb, genesis_validation_enabled: inner.genesis_validation_enabled.0, + disable_deployment_hash_validation: inner.disable_deployment_hash_validation.0, genesis_validation_timeout: Duration::from_secs(inner.genesis_validation_timeout), graphman_server_auth_token: inner.graphman_server_auth_token, firehose_disable_extended_blocks_for_chains: @@ -528,6 +535,11 @@ struct Inner { firehose_block_fetch_timeout: u64, #[envconfig(from = "GRAPH_FIREHOSE_FETCH_BLOCK_BATCH_SIZE", default = "10")] firehose_block_fetch_batch_size: usize, + #[envconfig( + from = "GRAPH_NODE_DISABLE_DEPLOYMENT_HASH_VALIDATION", + default = "false" + )] + disable_deployment_hash_validation: EnvVarBoolean, } #[derive(Clone, Debug)] diff --git a/node/Cargo.toml b/node/Cargo.toml index 4e2f4ddbbfb..16393c36af5 100644 --- a/node/Cargo.toml +++ b/node/Cargo.toml @@ -12,6 +12,10 @@ path = "src/main.rs" name = "graphman" path = "src/bin/manager.rs" +[[bin]] +name = "gnd" +path = "src/bin/dev.rs" + [dependencies] anyhow = { workspace = true } env_logger = "0.11.3" @@ -40,3 +44,8 @@ termcolor = "1.4.1" diesel = { workspace = true } prometheus = { version = "0.13.4", features = ["push"] } json-structural-diff = { version = "0.2", features = ["colorize"] } +globset = "0.4.16" +notify = "8.0.0" + +[target.'cfg(unix)'.dependencies] +pgtemp = { git = "https://github.com/incrypto32/pgtemp", branch = "initdb-args" } diff --git a/node/src/bin/dev.rs b/node/src/bin/dev.rs new file mode 100644 index 00000000000..27020ee3eb3 --- /dev/null +++ b/node/src/bin/dev.rs @@ -0,0 +1,230 @@ +use std::{mem, path::Path, sync::Arc}; + +use anyhow::{Context, Result}; +use clap::Parser; +use git_testament::{git_testament, render_testament}; +use graph::{ + components::link_resolver::FileLinkResolver, + env::EnvVars, + log::logger, + prelude::{CheapClone, DeploymentHash, LinkResolver, SubgraphName}, + slog::{error, info, Logger}, + tokio::{self, sync::mpsc}, +}; +use graph_core::polling_monitor::ipfs_service; +use graph_node::{ + dev::watcher::{parse_manifest_args, watch_subgraphs}, + launcher, + opt::Opt, +}; +use lazy_static::lazy_static; + +#[cfg(unix)] +use pgtemp::PgTempDBBuilder; + +git_testament!(TESTAMENT); +lazy_static! { + static ref RENDERED_TESTAMENT: String = render_testament!(TESTAMENT); +} + +#[derive(Clone, Debug, Parser)] +#[clap( + name = "gnd", + about = "Graph Node Dev", + author = "Graph Protocol, Inc.", + version = RENDERED_TESTAMENT.as_str() +)] +pub struct DevOpt { + #[clap( + long, + help = "Start a graph-node in dev mode watching a build directory for changes" + )] + pub watch: bool, + + #[clap( + long, + value_name = "MANIFEST:[BUILD_DIR]", + help = "The location of the subgraph manifest file. If no build directory is provided, the default is 'build'. The file can be an alias, in the format '[BUILD_DIR:]manifest' where 'manifest' is the path to the manifest file, and 'BUILD_DIR' is the path to the build directory relative to the manifest file.", + default_value = "./subgraph.yaml", + value_delimiter = ',' + )] + pub manifests: Vec, + + #[clap( + long, + value_name = "ALIAS:MANIFEST:[BUILD_DIR]", + value_delimiter = ',', + help = "The location of the source subgraph manifest files. This is used to resolve aliases in the manifest files for subgraph data sources. The format is ALIAS:MANIFEST:[BUILD_DIR], where ALIAS is the alias name, BUILD_DIR is the build directory relative to the manifest file, and MANIFEST is the manifest file location." + )] + pub sources: Vec, + + #[clap( + long, + help = "The location of the database directory.", + default_value = "./build" + )] + pub database_dir: String, + + #[clap( + long, + value_name = "URL", + env = "POSTGRES_URL", + help = "Location of the Postgres database used for storing entities" + )] + pub postgres_url: Option, + + #[clap( + long, + allow_negative_numbers = false, + value_name = "NETWORK_NAME:[CAPABILITIES]:URL", + env = "ETHEREUM_RPC", + help = "Ethereum network name (e.g. 'mainnet'), optional comma-seperated capabilities (eg 'full,archive'), and an Ethereum RPC URL, separated by a ':'" + )] + pub ethereum_rpc: Vec, + + #[clap( + long, + value_name = "HOST:PORT", + env = "IPFS", + help = "HTTP addresses of IPFS servers (RPC, Gateway)", + default_value = "https://api.thegraph.com/ipfs" + )] + pub ipfs: Vec, +} + +/// Builds the Graph Node options from DevOpt +fn build_args(dev_opt: &DevOpt, db_url: &str) -> Result { + let mut args = vec!["gnd".to_string()]; + + if !dev_opt.ipfs.is_empty() { + args.push("--ipfs".to_string()); + args.push(dev_opt.ipfs.join(",")); + } + + if !dev_opt.ethereum_rpc.is_empty() { + args.push("--ethereum-rpc".to_string()); + args.push(dev_opt.ethereum_rpc.join(",")); + } + + args.push("--postgres-url".to_string()); + args.push(db_url.to_string()); + + let opt = Opt::parse_from(args); + + Ok(opt) +} + +async fn run_graph_node( + logger: &Logger, + opt: Opt, + link_resolver: Arc, + subgraph_updates_channel: Option>, +) -> Result<()> { + let env_vars = Arc::new(EnvVars::from_env().context("Failed to load environment variables")?); + + let ipfs_client = graph::ipfs::new_ipfs_client(&opt.ipfs, &logger) + .await + .unwrap_or_else(|err| panic!("Failed to create IPFS client: {err:#}")); + + let ipfs_service = ipfs_service( + ipfs_client.cheap_clone(), + env_vars.mappings.max_ipfs_file_bytes, + env_vars.mappings.ipfs_timeout, + env_vars.mappings.ipfs_request_limit, + ); + + launcher::run( + logger.clone(), + opt, + env_vars, + ipfs_service, + link_resolver, + subgraph_updates_channel, + ) + .await; + Ok(()) +} + +/// Get the database URL, either from the provided option or by creating a temporary database +fn get_database_url(postgres_url: Option<&String>, database_dir: &Path) -> Result { + if let Some(url) = postgres_url { + Ok(url.clone()) + } else { + #[cfg(unix)] + { + // Check the database directory exists + if !database_dir.exists() { + anyhow::bail!( + "Database directory does not exist: {}", + database_dir.display() + ); + } + + let db = PgTempDBBuilder::new() + .with_data_dir_prefix(database_dir) + .with_initdb_param("-E", "UTF8") + .with_initdb_param("--locale", "C") + .start(); + let url = db.connection_uri().to_string(); + // Prevent the database from being dropped by forgetting it + mem::forget(db); + Ok(url) + } + + #[cfg(not(unix))] + { + anyhow::bail!( + "Please provide a postgres_url manually using the --postgres-url option." + ); + } + } +} + +#[tokio::main] +async fn main() -> Result<()> { + env_logger::init(); + let dev_opt = DevOpt::parse(); + + let database_dir = Path::new(&dev_opt.database_dir); + + let logger = logger(true); + + info!(logger, "Starting Graph Node Dev"); + info!(logger, "Database directory: {}", database_dir.display()); + + // Get the database URL + let db_url = get_database_url(dev_opt.postgres_url.as_ref(), database_dir)?; + + let opt = build_args(&dev_opt, &db_url)?; + + let (manifests_paths, source_subgraph_aliases) = + parse_manifest_args(dev_opt.manifests, dev_opt.sources, &logger)?; + let file_link_resolver = Arc::new(FileLinkResolver::new(None, source_subgraph_aliases.clone())); + + let (tx, rx) = dev_opt.watch.then(|| mpsc::channel(1)).unzip(); + + let logger_clone = logger.clone(); + graph::spawn(async move { + let _ = run_graph_node(&logger_clone, opt, file_link_resolver, rx).await; + }); + + if let Some(tx) = tx { + graph::spawn_blocking(async move { + if let Err(e) = watch_subgraphs( + &logger, + manifests_paths, + source_subgraph_aliases, + vec!["pgtemp-*".to_string()], + tx, + ) + .await + { + error!(logger, "Error watching subgraphs"; "error" => e.to_string()); + std::process::exit(1); + } + }); + } + + graph::futures03::future::pending::<()>().await; + Ok(()) +} diff --git a/node/src/dev/helpers.rs b/node/src/dev/helpers.rs new file mode 100644 index 00000000000..19af9d23382 --- /dev/null +++ b/node/src/dev/helpers.rs @@ -0,0 +1,160 @@ +use std::sync::Arc; + +use anyhow::{Context, Result}; +use graph::prelude::{ + BlockPtr, DeploymentHash, NodeId, SubgraphRegistrarError, SubgraphStore as SubgraphStoreTrait, +}; +use graph::slog::{error, info, Logger}; +use graph::tokio::sync::mpsc::Receiver; +use graph::{ + components::store::DeploymentLocator, + prelude::{SubgraphName, SubgraphRegistrar}, +}; +use graph_store_postgres::SubgraphStore; + +pub struct DevModeContext { + pub watch: bool, + pub updates_rx: Receiver<(DeploymentHash, SubgraphName)>, +} + +/// Cleanup a subgraph +/// This is used to remove a subgraph before redeploying it when using the watch flag +fn cleanup_dev_subgraph( + logger: &Logger, + subgraph_store: &SubgraphStore, + name: &SubgraphName, + locator: &DeploymentLocator, +) -> Result<()> { + info!(logger, "Removing subgraph"; "name" => name.to_string(), "id" => locator.id.to_string(), "hash" => locator.hash.to_string()); + subgraph_store.remove_subgraph(name.clone())?; + subgraph_store.unassign_subgraph(locator)?; + subgraph_store.remove_deployment(locator.id.into())?; + info!(logger, "Subgraph removed"; "name" => name.to_string(), "id" => locator.id.to_string(), "hash" => locator.hash.to_string()); + Ok(()) +} + +async fn deploy_subgraph( + logger: &Logger, + subgraph_registrar: Arc, + name: SubgraphName, + subgraph_id: DeploymentHash, + node_id: NodeId, + debug_fork: Option, + start_block: Option, +) -> Result { + info!(logger, "Re-deploying subgraph"; "name" => name.to_string(), "id" => subgraph_id.to_string()); + subgraph_registrar.create_subgraph(name.clone()).await?; + subgraph_registrar + .create_subgraph_version( + name.clone(), + subgraph_id.clone(), + node_id, + debug_fork, + start_block, + None, + None, + true, + ) + .await + .and_then(|locator| { + info!(logger, "Subgraph deployed"; "name" => name.to_string(), "id" => subgraph_id.to_string(), "locator" => locator.to_string()); + Ok(locator) + }) +} + +pub async fn drop_and_recreate_subgraph( + logger: &Logger, + subgraph_store: Arc, + subgraph_registrar: Arc, + name: SubgraphName, + subgraph_id: DeploymentHash, + node_id: NodeId, + hash: DeploymentHash, +) -> Result { + let locator = subgraph_store.active_locator(&hash)?; + if let Some(locator) = locator.clone() { + cleanup_dev_subgraph(logger, &subgraph_store, &name, &locator)?; + } + + deploy_subgraph( + logger, + subgraph_registrar, + name, + subgraph_id, + node_id, + None, + None, + ) + .await + .map_err(|e| anyhow::anyhow!("Failed to deploy subgraph: {}", e)) +} + +/// Watch for subgraph updates, drop and recreate them +/// This is used to listen to file changes in the subgraph directory +/// And drop and recreate the subgraph when it changes +pub async fn watch_subgraph_updates( + logger: &Logger, + subgraph_store: Arc, + subgraph_registrar: Arc, + node_id: NodeId, + mut rx: Receiver<(DeploymentHash, SubgraphName)>, +) { + while let Some((hash, name)) = rx.recv().await { + let res = drop_and_recreate_subgraph( + logger, + subgraph_store.clone(), + subgraph_registrar.clone(), + name.clone(), + hash.clone(), + node_id.clone(), + hash.clone(), + ) + .await; + + if let Err(e) = res { + error!(logger, "Failed to drop and recreate subgraph"; + "name" => name.to_string(), + "hash" => hash.to_string(), + "error" => e.to_string() + ); + std::process::exit(1); + } + } + + error!(logger, "Subgraph watcher terminated unexpectedly"; "action" => "exiting"); + std::process::exit(1); +} + +/// Parse an alias string into a tuple of (alias_name, manifest, Option) +pub fn parse_alias(alias: &str) -> anyhow::Result<(String, String, Option)> { + let mut split = alias.split(':'); + let alias_name = split.next(); + let alias_value = split.next(); + + if alias_name.is_none() || alias_value.is_none() || split.next().is_some() { + return Err(anyhow::anyhow!( + "Invalid alias format: expected 'alias=[BUILD_DIR:]manifest', got '{}'", + alias + )); + } + + let alias_name = alias_name.unwrap().to_owned(); + let (manifest, build_dir) = parse_manifest_arg(alias_value.unwrap()) + .with_context(|| format!("While parsing alias '{}'", alias))?; + + Ok((alias_name, manifest, build_dir)) +} + +/// Parse a manifest string into a tuple of (manifest, Option) +pub fn parse_manifest_arg(value: &str) -> anyhow::Result<(String, Option)> { + match value.split_once(':') { + Some((manifest, build_dir)) if !manifest.is_empty() => { + Ok((manifest.to_owned(), Some(build_dir.to_owned()))) + } + Some(_) => Err(anyhow::anyhow!( + "Invalid manifest arg: missing manifest in '{}'", + value + )), + None => Ok((value.to_owned(), None)), + } +} diff --git a/node/src/dev/mod.rs b/node/src/dev/mod.rs new file mode 100644 index 00000000000..20d269524b3 --- /dev/null +++ b/node/src/dev/mod.rs @@ -0,0 +1,2 @@ +pub mod helpers; +pub mod watcher; diff --git a/node/src/dev/watcher.rs b/node/src/dev/watcher.rs new file mode 100644 index 00000000000..9436db9ac2c --- /dev/null +++ b/node/src/dev/watcher.rs @@ -0,0 +1,334 @@ +use anyhow::{anyhow, Context, Result}; +use globset::{Glob, GlobSet, GlobSetBuilder}; +use graph::prelude::{DeploymentHash, SubgraphName}; +use graph::slog::{self, error, info, Logger}; +use graph::tokio::sync::mpsc::Sender; +use notify::{recommended_watcher, Event, RecursiveMode, Watcher}; +use std::collections::HashMap; +use std::path::{Path, PathBuf}; +use std::sync::mpsc; +use std::time::Duration; + +use super::helpers::{parse_alias, parse_manifest_arg}; + +const WATCH_DELAY: Duration = Duration::from_secs(5); +const DEFAULT_BUILD_DIR: &str = "build"; + +// Parses manifest arguments and returns a vector of paths to the manifest files +pub fn parse_manifest_args( + manifests: Vec, + subgraph_sources: Vec, + logger: &Logger, +) -> Result<(Vec, HashMap)> { + let mut manifests_paths = Vec::new(); + let mut source_subgraph_aliases = HashMap::new(); + + for subgraph_source in subgraph_sources { + let (alias_name, manifest_path_str, build_dir_opt) = parse_alias(&subgraph_source)?; + let manifest_path = + process_manifest(build_dir_opt, &manifest_path_str, Some(&alias_name), logger)?; + + manifests_paths.push(manifest_path.clone()); + source_subgraph_aliases.insert(alias_name, manifest_path); + } + + for manifest_str in manifests { + let (manifest_path_str, build_dir_opt) = parse_manifest_arg(&manifest_str) + .with_context(|| format!("While parsing manifest '{}'", manifest_str))?; + + let built_manifest_path = + process_manifest(build_dir_opt, &manifest_path_str, None, logger)?; + + manifests_paths.push(built_manifest_path); + } + + Ok((manifests_paths, source_subgraph_aliases)) +} + +/// Helper function to process a manifest +fn process_manifest( + build_dir_opt: Option, + manifest_path_str: &str, + alias_name: Option<&String>, + logger: &Logger, +) -> Result { + let build_dir_str = build_dir_opt.unwrap_or_else(|| DEFAULT_BUILD_DIR.to_owned()); + + info!(logger, "Validating manifest: {}", manifest_path_str); + + let manifest_path = Path::new(manifest_path_str); + let manifest_path = manifest_path + .canonicalize() + .with_context(|| format!("Manifest path does not exist: {}", manifest_path_str))?; + + // Get the parent directory of the manifest + let parent_dir = manifest_path + .parent() + .ok_or_else(|| { + anyhow!( + "Failed to get parent directory for manifest: {}", + manifest_path_str + ) + })? + .canonicalize() + .with_context(|| { + format!( + "Parent directory does not exist for manifest: {}", + manifest_path_str + ) + })?; + + // Create the build directory path by joining the parent directory with the build_dir_str + let build_dir = parent_dir.join(build_dir_str); + let build_dir = build_dir + .canonicalize() + .with_context(|| format!("Build directory does not exist: {}", build_dir.display()))?; + + let manifest_file_name = manifest_path.file_name().ok_or_else(|| { + anyhow!( + "Failed to get file name for manifest: {}", + manifest_path_str + ) + })?; + + let built_manifest_path = build_dir.join(manifest_file_name); + + info!( + logger, + "Watching manifest: {}", + built_manifest_path.display() + ); + + if let Some(name) = alias_name { + info!( + logger, + "Using build directory for {}: {}", + name, + build_dir.display() + ); + } else { + info!(logger, "Using build directory: {}", build_dir.display()); + } + + Ok(built_manifest_path) +} + +/// Sets up a watcher for the given directory with optional exclusions. +/// Exclusions can include glob patterns like "pgtemp-*". +pub async fn watch_subgraphs( + logger: &Logger, + manifests_paths: Vec, + source_subgraph_aliases: HashMap, + exclusions: Vec, + sender: Sender<(DeploymentHash, SubgraphName)>, +) -> Result<()> { + let logger = logger.new(slog::o!("component" => "Watcher")); + + watch_subgraph_dirs( + &logger, + manifests_paths, + source_subgraph_aliases, + exclusions, + sender, + ) + .await?; + Ok(()) +} + +/// Sets up a watcher for the given directories with optional exclusions. +/// Exclusions can include glob patterns like "pgtemp-*". +pub async fn watch_subgraph_dirs( + logger: &Logger, + manifests_paths: Vec, + source_subgraph_aliases: HashMap, + exclusions: Vec, + sender: Sender<(DeploymentHash, SubgraphName)>, +) -> Result<()> { + if manifests_paths.is_empty() { + info!(logger, "No directories to watch"); + return Ok(()); + } + + info!( + logger, + "Watching for changes in {} directories", + manifests_paths.len() + ); + + if !exclusions.is_empty() { + info!(logger, "Excluding patterns: {}", exclusions.join(", ")); + } + + // Create exclusion matcher + let exclusion_set = build_glob_set(&exclusions, logger); + + // Create a channel to receive the events + let (tx, rx) = mpsc::channel(); + + let mut watcher = match recommended_watcher(tx) { + Ok(w) => w, + Err(e) => { + error!(logger, "Error creating file watcher: {}", e); + return Err(anyhow!("Error creating file watcher")); + } + }; + + for manifest_path in manifests_paths.iter() { + let dir = manifest_path.parent().unwrap(); + if let Err(e) = watcher.watch(dir, RecursiveMode::Recursive) { + error!(logger, "Error watching directory {}: {}", dir.display(), e); + std::process::exit(1); + } + info!(logger, "Watching directory: {}", dir.display()); + } + + // Process file change events + process_file_events( + logger, + rx, + &exclusion_set, + &manifests_paths, + &source_subgraph_aliases, + sender, + ) + .await +} + +/// Processes file change events and triggers redeployments +async fn process_file_events( + logger: &Logger, + rx: mpsc::Receiver>, + exclusion_set: &GlobSet, + manifests_paths: &Vec, + source_subgraph_aliases: &HashMap, + sender: Sender<(DeploymentHash, SubgraphName)>, +) -> Result<()> { + loop { + // Wait for an event + let event = match rx.recv() { + Ok(Ok(e)) => e, + Ok(_) => continue, + Err(_) => { + error!(logger, "Error receiving file change event"); + return Err(anyhow!("Error receiving file change event")); + } + }; + + if !is_relevant_event( + &event, + manifests_paths + .iter() + .map(|p| p.parent().unwrap().to_path_buf()) + .collect(), + exclusion_set, + ) { + continue; + } + + // Once we receive an event, wait for a short period of time to allow for multiple events to be received + // This is because running graph build writes multiple files at once + // Which triggers multiple events, we only need to react to it once + let start = std::time::Instant::now(); + while start.elapsed() < WATCH_DELAY { + match rx.try_recv() { + // Discard all events until the time window has passed + Ok(_) => continue, + Err(_) => break, + } + } + + // Redeploy all subgraphs + deploy_all_subgraphs(logger, manifests_paths, source_subgraph_aliases, &sender).await?; + } +} + +/// Checks if an event is relevant for any of the watched directories +fn is_relevant_event(event: &Event, watched_dirs: Vec, exclusion_set: &GlobSet) -> bool { + for path in event.paths.iter() { + for dir in watched_dirs.iter() { + if path.starts_with(dir) && should_process_event(event, dir, exclusion_set) { + return true; + } + } + } + false +} + +/// Redeploys all subgraphs in the order it appears in the manifests_paths +async fn deploy_all_subgraphs( + logger: &Logger, + manifests_paths: &Vec, + source_subgraph_aliases: &HashMap, + sender: &Sender<(DeploymentHash, SubgraphName)>, +) -> Result<()> { + info!(logger, "File change detected, redeploying all subgraphs"); + let mut count = 0; + for manifest_path in manifests_paths { + let alias_name = source_subgraph_aliases + .iter() + .find(|(_, path)| path == &manifest_path) + .map(|(name, _)| name); + + let id = alias_name + .map(|s| s.to_owned()) + .unwrap_or_else(|| manifest_path.display().to_string()); + + let _ = sender + .send(( + DeploymentHash::new(id).map_err(|_| anyhow!("Failed to create deployment hash"))?, + SubgraphName::new(format!("subgraph-{}", count)) + .map_err(|_| anyhow!("Failed to create subgraph name"))?, + )) + .await; + count += 1; + } + Ok(()) +} + +/// Build a GlobSet from the provided patterns +fn build_glob_set(patterns: &[String], logger: &Logger) -> GlobSet { + let mut builder = GlobSetBuilder::new(); + + for pattern in patterns { + match Glob::new(pattern) { + Ok(glob) => { + builder.add(glob); + } + Err(e) => error!(logger, "Invalid glob pattern '{}': {}", pattern, e), + } + } + + match builder.build() { + Ok(set) => set, + Err(e) => { + error!(logger, "Failed to build glob set: {}", e); + GlobSetBuilder::new().build().unwrap() + } + } +} + +/// Determines if an event should be processed based on exclusion patterns +fn should_process_event(event: &Event, base_dir: &Path, exclusion_set: &GlobSet) -> bool { + // Check each path in the event + for path in event.paths.iter() { + // Get the relative path from the base directory + if let Ok(rel_path) = path.strip_prefix(base_dir) { + let path_str = rel_path.to_string_lossy(); + + // Check if path matches any exclusion pattern + if exclusion_set.is_match(path_str.as_ref()) { + return false; + } + + // Also check against the file name for basename patterns + if let Some(file_name) = rel_path.file_name() { + let name_str = file_name.to_string_lossy(); + if exclusion_set.is_match(name_str.as_ref()) { + return false; + } + } + } + } + + true +} diff --git a/node/src/launcher.rs b/node/src/launcher.rs new file mode 100644 index 00000000000..8f7ee385135 --- /dev/null +++ b/node/src/launcher.rs @@ -0,0 +1,771 @@ +use anyhow::Result; + +use git_testament::{git_testament, render_testament}; +use graph::futures01::Future as _; +use graph::futures03::compat::Future01CompatExt; +use graph::futures03::future::TryFutureExt; + +use crate::config::Config; +use crate::dev::helpers::watch_subgraph_updates; +use crate::network_setup::Networks; +use crate::opt::Opt; +use crate::store_builder::StoreBuilder; +use graph::blockchain::{Blockchain, BlockchainKind, BlockchainMap}; +use graph::components::link_resolver::{ArweaveClient, FileSizeLimit}; +use graph::components::subgraph::Settings; +use graph::data::graphql::load_manager::LoadManager; +use graph::endpoint::EndpointMetrics; +use graph::env::EnvVars; +use graph::prelude::*; +use graph::prometheus::Registry; +use graph::url::Url; +use graph_core::polling_monitor::{arweave_service, ArweaveService, IpfsService}; +use graph_core::{ + SubgraphAssignmentProvider as IpfsSubgraphAssignmentProvider, SubgraphInstanceManager, + SubgraphRegistrar as IpfsSubgraphRegistrar, +}; +use graph_graphql::prelude::GraphQlRunner; +use graph_server_http::GraphQLServer as GraphQLQueryServer; +use graph_server_index_node::IndexNodeServer; +use graph_server_json_rpc::JsonRpcServer; +use graph_server_metrics::PrometheusMetricsServer; +use graph_store_postgres::{ + register_jobs as register_store_jobs, ChainHeadUpdateListener, ConnectionPool, + NotificationSender, Store, SubgraphStore, SubscriptionManager, +}; +use graphman_server::GraphmanServer; +use graphman_server::GraphmanServerConfig; +use std::io::{BufRead, BufReader}; +use std::path::Path; +use std::time::Duration; +use tokio::sync::mpsc; + +git_testament!(TESTAMENT); + +/// Sets up metrics and monitoring +fn setup_metrics(logger: &Logger) -> (Arc, Arc) { + // Set up Prometheus registry + let prometheus_registry = Arc::new(Registry::new()); + let metrics_registry = Arc::new(MetricsRegistry::new( + logger.clone(), + prometheus_registry.clone(), + )); + + (prometheus_registry, metrics_registry) +} + +/// Sets up the store and database connections +async fn setup_store( + logger: &Logger, + node_id: &NodeId, + config: &Config, + fork_base: Option, + metrics_registry: Arc, +) -> ( + ConnectionPool, + Arc, + Arc, + Arc, +) { + let store_builder = StoreBuilder::new( + logger, + node_id, + config, + fork_base, + metrics_registry.cheap_clone(), + ) + .await; + + let primary_pool = store_builder.primary_pool(); + let subscription_manager = store_builder.subscription_manager(); + let chain_head_update_listener = store_builder.chain_head_update_listener(); + let network_store = store_builder.network_store(config.chain_ids()); + + ( + primary_pool, + subscription_manager, + chain_head_update_listener, + network_store, + ) +} + +async fn build_blockchain_map( + logger: &Logger, + config: &Config, + env_vars: &Arc, + node_id: &NodeId, + network_store: Arc, + metrics_registry: Arc, + endpoint_metrics: Arc, + chain_head_update_listener: Arc, + logger_factory: &LoggerFactory, +) -> Arc { + use graph::components::network_provider; + let block_store = network_store.block_store(); + + let mut provider_checks: Vec> = Vec::new(); + + if env_vars.genesis_validation_enabled { + provider_checks.push(Arc::new(network_provider::GenesisHashCheck::new( + block_store.clone(), + ))); + } + + provider_checks.push(Arc::new(network_provider::ExtendedBlocksCheck::new( + env_vars + .firehose_disable_extended_blocks_for_chains + .iter() + .map(|x| x.as_str().into()), + ))); + + let network_adapters = Networks::from_config( + logger.cheap_clone(), + &config, + metrics_registry.cheap_clone(), + endpoint_metrics, + &provider_checks, + ) + .await + .expect("unable to parse network configuration"); + + let blockchain_map = network_adapters + .blockchain_map( + &env_vars, + &node_id, + &logger, + block_store, + &logger_factory, + metrics_registry.cheap_clone(), + chain_head_update_listener, + ) + .await; + + Arc::new(blockchain_map) +} + +fn cleanup_ethereum_shallow_blocks(blockchain_map: &BlockchainMap, network_store: &Arc) { + match blockchain_map + .get_all_by_kind::(BlockchainKind::Ethereum) + .ok() + .map(|chains| { + chains + .iter() + .flat_map(|c| { + if !c.chain_client().is_firehose() { + Some(c.name.to_string()) + } else { + None + } + }) + .collect() + }) { + Some(eth_network_names) => { + network_store + .block_store() + .cleanup_ethereum_shallow_blocks(eth_network_names) + .unwrap(); + } + // This code path only happens if the downcast on the blockchain map fails, that + // probably means we have a problem with the chain loading logic so it's probably + // safest to just refuse to start. + None => unreachable!( + "If you are seeing this message just use a different version of graph-node" + ), + } +} + +async fn spawn_block_ingestor( + logger: &Logger, + blockchain_map: &Arc, + network_store: &Arc, + primary_pool: ConnectionPool, + metrics_registry: &Arc, +) { + let logger = logger.clone(); + let ingestors = Networks::block_ingestors(&logger, &blockchain_map) + .await + .expect("unable to start block ingestors"); + + ingestors.into_iter().for_each(|ingestor| { + let logger = logger.clone(); + info!(logger,"Starting block ingestor for network";"network_name" => &ingestor.network_name().as_str(), "kind" => ingestor.kind().to_string()); + + graph::spawn(ingestor.run()); + }); + + // Start a task runner + let mut job_runner = graph::util::jobs::Runner::new(&logger); + register_store_jobs( + &mut job_runner, + network_store.clone(), + primary_pool, + metrics_registry.clone(), + ); + graph::spawn_blocking(job_runner.start()); +} + +fn deploy_subgraph_from_flag( + subgraph: String, + opt: &Opt, + subgraph_registrar: Arc, + node_id: NodeId, +) { + let (name, hash) = if subgraph.contains(':') { + let mut split = subgraph.split(':'); + (split.next().unwrap(), split.next().unwrap().to_owned()) + } else { + ("cli", subgraph) + }; + + let name = SubgraphName::new(name) + .expect("Subgraph name must contain only a-z, A-Z, 0-9, '-' and '_'"); + let subgraph_id = DeploymentHash::new(hash).expect("Subgraph hash must be a valid IPFS hash"); + let debug_fork = opt + .debug_fork + .clone() + .map(DeploymentHash::new) + .map(|h| h.expect("Debug fork hash must be a valid IPFS hash")); + let start_block = opt + .start_block + .clone() + .map(|block| { + let mut split = block.split(':'); + ( + // BlockHash + split.next().unwrap().to_owned(), + // BlockNumber + split.next().unwrap().parse::().unwrap(), + ) + }) + .map(|(hash, number)| BlockPtr::try_from((hash.as_str(), number))) + .map(Result::unwrap); + + graph::spawn( + async move { + subgraph_registrar.create_subgraph(name.clone()).await?; + subgraph_registrar + .create_subgraph_version( + name, + subgraph_id, + node_id, + debug_fork, + start_block, + None, + None, + false, + ) + .await + } + .map_err(|e| panic!("Failed to deploy subgraph from `--subgraph` flag: {}", e)), + ); +} + +fn build_subgraph_registrar( + metrics_registry: Arc, + network_store: &Arc, + logger_factory: &LoggerFactory, + env_vars: &Arc, + blockchain_map: Arc, + node_id: NodeId, + subgraph_settings: Settings, + link_resolver: Arc, + subscription_manager: Arc, + arweave_service: ArweaveService, + ipfs_service: IpfsService, +) -> Arc< + IpfsSubgraphRegistrar< + IpfsSubgraphAssignmentProvider>, + SubgraphStore, + SubscriptionManager, + >, +> { + let static_filters = ENV_VARS.experimental_static_filters; + let sg_count = Arc::new(SubgraphCountMetric::new(metrics_registry.cheap_clone())); + + let subgraph_instance_manager = SubgraphInstanceManager::new( + &logger_factory, + env_vars.cheap_clone(), + network_store.subgraph_store(), + blockchain_map.cheap_clone(), + sg_count.cheap_clone(), + metrics_registry.clone(), + link_resolver.clone(), + ipfs_service, + arweave_service, + static_filters, + ); + + // Create IPFS-based subgraph provider + let subgraph_provider = IpfsSubgraphAssignmentProvider::new( + &logger_factory, + link_resolver.clone(), + subgraph_instance_manager, + sg_count, + ); + + // Check version switching mode environment variable + let version_switching_mode = ENV_VARS.subgraph_version_switching_mode; + + // Create named subgraph provider for resolving subgraph name->ID mappings + let subgraph_registrar = Arc::new(IpfsSubgraphRegistrar::new( + &logger_factory, + link_resolver, + Arc::new(subgraph_provider), + network_store.subgraph_store(), + subscription_manager, + blockchain_map, + node_id.clone(), + version_switching_mode, + Arc::new(subgraph_settings), + )); + + subgraph_registrar +} + +fn build_graphql_server( + config: &Config, + logger: &Logger, + expensive_queries: Vec>, + metrics_registry: Arc, + network_store: &Arc, + logger_factory: &LoggerFactory, +) -> GraphQLQueryServer> { + let shards: Vec<_> = config.stores.keys().cloned().collect(); + let load_manager = Arc::new(LoadManager::new( + &logger, + shards, + expensive_queries, + metrics_registry.clone(), + )); + let graphql_runner = Arc::new(GraphQlRunner::new( + &logger, + network_store.clone(), + load_manager, + metrics_registry, + )); + let graphql_server = GraphQLQueryServer::new(&logger_factory, graphql_runner.clone()); + + graphql_server +} + +/// Runs the Graph Node by initializing all components and starting all required services +/// This function is the main entry point for running a Graph Node instance +/// +/// # Arguments +/// +/// * `opt` - Command line options controlling node behavior and configuration +/// * `env_vars` - Environment variables for configuring the node +/// * `ipfs_service` - Service for interacting with IPFS for subgraph deployments +/// * `link_resolver` - Resolver for IPFS links in subgraph manifests and files +/// * `dev_updates` - Optional channel for receiving subgraph update notifications in development mode +pub async fn run( + logger: Logger, + opt: Opt, + env_vars: Arc, + ipfs_service: IpfsService, + link_resolver: Arc, + dev_updates: Option>, +) { + // Log version information + info!( + logger, + "Graph Node version: {}", + render_testament!(TESTAMENT) + ); + + if !graph_server_index_node::PoiProtection::from_env(&ENV_VARS).is_active() { + warn!( + logger, + "GRAPH_POI_ACCESS_TOKEN not set; might leak POIs to the public via GraphQL" + ); + } + + // Get configuration + let (config, subgraph_settings, fork_base) = setup_configuration(&opt, &logger, &env_vars); + + let node_id = NodeId::new(opt.node_id.clone()) + .expect("Node ID must be between 1 and 63 characters in length"); + + // Obtain subgraph related command-line arguments + let subgraph = opt.subgraph.clone(); + + // Obtain ports to use for the GraphQL server(s) + let http_port = opt.http_port; + + // Obtain JSON-RPC server port + let json_rpc_port = opt.admin_port; + + // Obtain index node server port + let index_node_port = opt.index_node_port; + + // Obtain metrics server port + let metrics_port = opt.metrics_port; + + info!(logger, "Starting up"); + + // Set up metrics + let (prometheus_registry, metrics_registry) = setup_metrics(&logger); + + // Optionally, identify the Elasticsearch logging configuration + let elastic_config = opt + .elasticsearch_url + .clone() + .map(|endpoint| ElasticLoggingConfig { + endpoint, + username: opt.elasticsearch_user.clone(), + password: opt.elasticsearch_password.clone(), + client: reqwest::Client::new(), + }); + + // Create a component and subgraph logger factory + let logger_factory = + LoggerFactory::new(logger.clone(), elastic_config, metrics_registry.clone()); + + let arweave_resolver = Arc::new(ArweaveClient::new( + logger.cheap_clone(), + opt.arweave + .parse() + .expect("unable to parse arweave gateway address"), + )); + + let arweave_service = arweave_service( + arweave_resolver.cheap_clone(), + env_vars.mappings.ipfs_request_limit, + match env_vars.mappings.max_ipfs_file_bytes { + 0 => FileSizeLimit::Unlimited, + n => FileSizeLimit::MaxBytes(n as u64), + }, + ); + + let metrics_server = PrometheusMetricsServer::new(&logger_factory, prometheus_registry.clone()); + + let endpoint_metrics = Arc::new(EndpointMetrics::new( + logger.clone(), + &config.chains.providers(), + metrics_registry.cheap_clone(), + )); + + // TODO: make option loadable from configuration TOML and environment: + let expensive_queries = + read_expensive_queries(&logger, opt.expensive_queries_filename.clone()).unwrap(); + + let (primary_pool, subscription_manager, chain_head_update_listener, network_store) = + setup_store( + &logger, + &node_id, + &config, + fork_base, + metrics_registry.cheap_clone(), + ) + .await; + + let graphman_server_config = make_graphman_server_config( + primary_pool.clone(), + network_store.cheap_clone(), + metrics_registry.cheap_clone(), + &env_vars, + &logger, + &logger_factory, + ); + + start_graphman_server(opt.graphman_port, graphman_server_config).await; + + let launch_services = |logger: Logger, env_vars: Arc| async move { + let blockchain_map = build_blockchain_map( + &logger, + &config, + &env_vars, + &node_id, + network_store.clone(), + metrics_registry.clone(), + endpoint_metrics, + chain_head_update_listener, + &logger_factory, + ) + .await; + + // see comment on cleanup_ethereum_shallow_blocks + if !opt.disable_block_ingestor { + cleanup_ethereum_shallow_blocks(&blockchain_map, &network_store); + } + + let graphql_server = build_graphql_server( + &config, + &logger, + expensive_queries, + metrics_registry.clone(), + &network_store, + &logger_factory, + ); + + let index_node_server = IndexNodeServer::new( + &logger_factory, + blockchain_map.clone(), + network_store.clone(), + link_resolver.clone(), + ); + + if !opt.disable_block_ingestor { + spawn_block_ingestor( + &logger, + &blockchain_map, + &network_store, + primary_pool, + &metrics_registry, + ) + .await; + } + + let subgraph_registrar = build_subgraph_registrar( + metrics_registry.clone(), + &network_store, + &logger_factory, + &env_vars, + blockchain_map.clone(), + node_id.clone(), + subgraph_settings, + link_resolver.clone(), + subscription_manager, + arweave_service, + ipfs_service, + ); + + graph::spawn( + subgraph_registrar + .start() + .map_err(|e| panic!("failed to initialize subgraph provider {}", e)) + .compat(), + ); + + // Start admin JSON-RPC server. + let json_rpc_server = JsonRpcServer::serve( + json_rpc_port, + http_port, + subgraph_registrar.clone(), + node_id.clone(), + logger.clone(), + ) + .await + .expect("failed to start JSON-RPC admin server"); + + // Let the server run forever. + std::mem::forget(json_rpc_server); + + // Add the CLI subgraph with a REST request to the admin server. + if let Some(subgraph) = subgraph { + deploy_subgraph_from_flag(subgraph, &opt, subgraph_registrar.clone(), node_id.clone()); + } + + // Serve GraphQL queries over HTTP + graph::spawn(async move { graphql_server.start(http_port).await }); + + // Run the index node server + graph::spawn(async move { index_node_server.start(index_node_port).await }); + + graph::spawn(async move { + metrics_server + .start(metrics_port) + .await + .expect("Failed to start metrics server") + }); + + // If we are in dev mode, watch for subgraph updates + // And drop and recreate the subgraph when it changes + if let Some(dev_updates) = dev_updates { + graph::spawn(async move { + watch_subgraph_updates( + &logger, + network_store.subgraph_store(), + subgraph_registrar.clone(), + node_id.clone(), + dev_updates, + ) + .await; + }); + } + }; + + graph::spawn(launch_services(logger.clone(), env_vars.cheap_clone())); + + spawn_contention_checker(logger.clone()); + + graph::futures03::future::pending::<()>().await; +} + +fn spawn_contention_checker(logger: Logger) { + // Periodically check for contention in the tokio threadpool. First spawn a + // task that simply responds to "ping" requests. Then spawn a separate + // thread to periodically ping it and check responsiveness. + let (ping_send, mut ping_receive) = mpsc::channel::>(1); + graph::spawn(async move { + while let Some(pong_send) = ping_receive.recv().await { + let _ = pong_send.clone().send(()); + } + panic!("ping sender dropped"); + }); + std::thread::spawn(move || loop { + std::thread::sleep(Duration::from_secs(1)); + let (pong_send, pong_receive) = std::sync::mpsc::sync_channel(1); + if graph::futures03::executor::block_on(ping_send.clone().send(pong_send)).is_err() { + debug!(logger, "Shutting down contention checker thread"); + break; + } + let mut timeout = Duration::from_millis(10); + while pong_receive.recv_timeout(timeout) == Err(std::sync::mpsc::RecvTimeoutError::Timeout) + { + debug!(logger, "Possible contention in tokio threadpool"; + "timeout_ms" => timeout.as_millis(), + "code" => LogCode::TokioContention); + if timeout < ENV_VARS.kill_if_unresponsive_timeout { + timeout *= 10; + } else if ENV_VARS.kill_if_unresponsive { + // The node is unresponsive, kill it in hopes it will be restarted. + crit!(logger, "Node is unresponsive, killing process"); + std::process::abort() + } + } + }); +} + +/// Sets up and loads configuration based on command line options +fn setup_configuration( + opt: &Opt, + logger: &Logger, + env_vars: &Arc, +) -> (Config, Settings, Option) { + let config = match Config::load(logger, &opt.clone().into()) { + Err(e) => { + eprintln!("configuration error: {}", e); + std::process::exit(1); + } + Ok(config) => config, + }; + + let subgraph_settings = match env_vars.subgraph_settings { + Some(ref path) => { + info!(logger, "Reading subgraph configuration file `{}`", path); + match Settings::from_file(path) { + Ok(rules) => rules, + Err(e) => { + eprintln!("configuration error in subgraph settings {}: {}", path, e); + std::process::exit(1); + } + } + } + None => Settings::default(), + }; + + if opt.check_config { + match config.to_json() { + Ok(txt) => println!("{}", txt), + Err(e) => eprintln!("error serializing config: {}", e), + } + eprintln!("Successfully validated configuration"); + std::process::exit(0); + } + + // Obtain the fork base URL + let fork_base = match &opt.fork_base { + Some(url) => { + // Make sure the endpoint ends with a terminating slash. + let url = if !url.ends_with('/') { + let mut url = url.clone(); + url.push('/'); + Url::parse(&url) + } else { + Url::parse(url) + }; + + Some(url.expect("Failed to parse the fork base URL")) + } + None => { + warn!( + logger, + "No fork base URL specified, subgraph forking is disabled" + ); + None + } + }; + + (config, subgraph_settings, fork_base) +} + +async fn start_graphman_server(port: u16, config: Option>) { + let Some(config) = config else { + return; + }; + + let server = GraphmanServer::new(config) + .unwrap_or_else(|err| panic!("Invalid graphman server configuration: {err:#}")); + + server + .start(port) + .await + .unwrap_or_else(|err| panic!("Failed to start graphman server: {err:#}")); +} + +fn make_graphman_server_config<'a>( + pool: ConnectionPool, + store: Arc, + metrics_registry: Arc, + env_vars: &EnvVars, + logger: &Logger, + logger_factory: &'a LoggerFactory, +) -> Option> { + let Some(auth_token) = &env_vars.graphman_server_auth_token else { + warn!( + logger, + "Missing graphman server auth token; graphman server will not start", + ); + + return None; + }; + + let notification_sender = Arc::new(NotificationSender::new(metrics_registry.clone())); + + Some(GraphmanServerConfig { + pool, + notification_sender, + store, + logger_factory, + auth_token: auth_token.to_owned(), + }) +} + +fn read_expensive_queries( + logger: &Logger, + expensive_queries_filename: String, +) -> Result>, std::io::Error> { + // A file with a list of expensive queries, one query per line + // Attempts to run these queries will return a + // QueryExecutionError::TooExpensive to clients + let path = Path::new(&expensive_queries_filename); + let mut queries = Vec::new(); + if path.exists() { + info!( + logger, + "Reading expensive queries file: {}", expensive_queries_filename + ); + let file = std::fs::File::open(path)?; + let reader = BufReader::new(file); + for line in reader.lines() { + let line = line?; + let query = q::parse_query(&line) + .map_err(|e| { + let msg = format!( + "invalid GraphQL query in {}: {}\n{}", + expensive_queries_filename, e, line + ); + std::io::Error::new(std::io::ErrorKind::InvalidData, msg) + })? + .into_static(); + queries.push(Arc::new(query)); + } + } else { + warn!( + logger, + "Expensive queries file not set to a valid file: {}", expensive_queries_filename + ); + } + Ok(queries) +} diff --git a/node/src/lib.rs b/node/src/lib.rs index f65ffc1be8f..7b3d4347ee8 100644 --- a/node/src/lib.rs +++ b/node/src/lib.rs @@ -7,12 +7,12 @@ extern crate diesel; pub mod chain; pub mod config; +pub mod dev; +pub mod launcher; +pub mod manager; pub mod network_setup; pub mod opt; pub mod store_builder; - -pub mod manager; - pub struct MetricsContext { pub prometheus: Arc, pub registry: Arc, diff --git a/node/src/main.rs b/node/src/main.rs index 6cd892079c1..e2445ddf587 100644 --- a/node/src/main.rs +++ b/node/src/main.rs @@ -1,82 +1,13 @@ use clap::Parser as _; -use git_testament::{git_testament, render_testament}; -use graph::futures01::Future as _; -use graph::futures03::compat::Future01CompatExt; -use graph::futures03::future::TryFutureExt; +use git_testament::git_testament; -use graph::blockchain::{Blockchain, BlockchainKind}; -use graph::components::link_resolver::{ArweaveClient, FileSizeLimit}; -use graph::components::subgraph::Settings; -use graph::data::graphql::load_manager::LoadManager; -use graph::endpoint::EndpointMetrics; -use graph::env::EnvVars; -use graph::log::logger; use graph::prelude::*; -use graph::prometheus::Registry; -use graph::url::Url; -use graph_core::polling_monitor::{arweave_service, ipfs_service}; -use graph_core::{ - SubgraphAssignmentProvider as IpfsSubgraphAssignmentProvider, SubgraphInstanceManager, - SubgraphRegistrar as IpfsSubgraphRegistrar, -}; -use graph_graphql::prelude::GraphQlRunner; -use graph_node::config::Config; -use graph_node::network_setup::Networks; -use graph_node::opt; -use graph_node::store_builder::StoreBuilder; -use graph_server_http::GraphQLServer as GraphQLQueryServer; -use graph_server_index_node::IndexNodeServer; -use graph_server_json_rpc::JsonRpcServer; -use graph_server_metrics::PrometheusMetricsServer; -use graph_store_postgres::{ - register_jobs as register_store_jobs, ConnectionPool, NotificationSender, Store, -}; -use graphman_server::GraphmanServer; -use graphman_server::GraphmanServerConfig; -use std::io::{BufRead, BufReader}; -use std::path::Path; -use std::time::Duration; -use tokio::sync::mpsc; +use graph::{env::EnvVars, log::logger}; -git_testament!(TESTAMENT); +use graph_core::polling_monitor::ipfs_service; +use graph_node::{launcher, opt}; -fn read_expensive_queries( - logger: &Logger, - expensive_queries_filename: String, -) -> Result>, std::io::Error> { - // A file with a list of expensive queries, one query per line - // Attempts to run these queries will return a - // QueryExecutionError::TooExpensive to clients - let path = Path::new(&expensive_queries_filename); - let mut queries = Vec::new(); - if path.exists() { - info!( - logger, - "Reading expensive queries file: {}", expensive_queries_filename - ); - let file = std::fs::File::open(path)?; - let reader = BufReader::new(file); - for line in reader.lines() { - let line = line?; - let query = q::parse_query(&line) - .map_err(|e| { - let msg = format!( - "invalid GraphQL query in {}: {}\n{}", - expensive_queries_filename, e, line - ); - std::io::Error::new(std::io::ErrorKind::InvalidData, msg) - })? - .into_static(); - queries.push(Arc::new(query)); - } - } else { - warn!( - logger, - "Expensive queries file not set to a valid file: {}", expensive_queries_filename - ); - } - Ok(queries) -} +git_testament!(TESTAMENT); fn main() { let max_blocking: usize = std::env::var("GRAPH_MAX_BLOCKING_THREADS") @@ -94,516 +25,24 @@ fn main() { async fn main_inner() { env_logger::init(); - let env_vars = Arc::new(EnvVars::from_env().unwrap()); let opt = opt::Opt::parse(); // Set up logger let logger = logger(opt.debug); - // Log version information - info!( - logger, - "Graph Node version: {}", - render_testament!(TESTAMENT) - ); - - if !graph_server_index_node::PoiProtection::from_env(&ENV_VARS).is_active() { - warn!( - logger, - "GRAPH_POI_ACCESS_TOKEN not set; might leak POIs to the public via GraphQL" - ); - } - - let config = match Config::load(&logger, &opt.clone().into()) { - Err(e) => { - eprintln!("configuration error: {}", e); - std::process::exit(1); - } - Ok(config) => config, - }; - - let subgraph_settings = match env_vars.subgraph_settings { - Some(ref path) => { - info!(logger, "Reading subgraph configuration file `{}`", path); - match Settings::from_file(path) { - Ok(rules) => rules, - Err(e) => { - eprintln!("configuration error in subgraph settings {}: {}", path, e); - std::process::exit(1); - } - } - } - None => Settings::default(), - }; - - if opt.check_config { - match config.to_json() { - Ok(txt) => println!("{}", txt), - Err(e) => eprintln!("error serializing config: {}", e), - } - eprintln!("Successfully validated configuration"); - std::process::exit(0); - } - - let node_id = NodeId::new(opt.node_id.clone()) - .expect("Node ID must be between 1 and 63 characters in length"); - - // Obtain subgraph related command-line arguments - let subgraph = opt.subgraph.clone(); - - // Obtain ports to use for the GraphQL server(s) - let http_port = opt.http_port; - - // Obtain JSON-RPC server port - let json_rpc_port = opt.admin_port; - - // Obtain index node server port - let index_node_port = opt.index_node_port; - - // Obtain metrics server port - let metrics_port = opt.metrics_port; - - // Obtain the fork base URL - let fork_base = match &opt.fork_base { - Some(url) => { - // Make sure the endpoint ends with a terminating slash. - let url = if !url.ends_with('/') { - let mut url = url.clone(); - url.push('/'); - Url::parse(&url) - } else { - Url::parse(url) - }; - - Some(url.expect("Failed to parse the fork base URL")) - } - None => { - warn!( - logger, - "No fork base URL specified, subgraph forking is disabled" - ); - None - } - }; - - info!(logger, "Starting up"); - - // Optionally, identify the Elasticsearch logging configuration - let elastic_config = opt - .elasticsearch_url - .clone() - .map(|endpoint| ElasticLoggingConfig { - endpoint, - username: opt.elasticsearch_user.clone(), - password: opt.elasticsearch_password.clone(), - client: reqwest::Client::new(), - }); - - // Set up Prometheus registry - let prometheus_registry = Arc::new(Registry::new()); - let metrics_registry = Arc::new(MetricsRegistry::new( - logger.clone(), - prometheus_registry.clone(), - )); - - // Create a component and subgraph logger factory - let logger_factory = - LoggerFactory::new(logger.clone(), elastic_config, metrics_registry.clone()); - let ipfs_client = graph::ipfs::new_ipfs_client(&opt.ipfs, &logger) .await .unwrap_or_else(|err| panic!("Failed to create IPFS client: {err:#}")); let ipfs_service = ipfs_service( ipfs_client.cheap_clone(), - ENV_VARS.mappings.max_ipfs_file_bytes, - ENV_VARS.mappings.ipfs_timeout, - ENV_VARS.mappings.ipfs_request_limit, - ); - - let arweave_resolver = Arc::new(ArweaveClient::new( - logger.cheap_clone(), - opt.arweave - .parse() - .expect("unable to parse arweave gateway address"), - )); - - let arweave_service = arweave_service( - arweave_resolver.cheap_clone(), + env_vars.mappings.max_ipfs_file_bytes, + env_vars.mappings.ipfs_timeout, env_vars.mappings.ipfs_request_limit, - match env_vars.mappings.max_ipfs_file_bytes { - 0 => FileSizeLimit::Unlimited, - n => FileSizeLimit::MaxBytes(n as u64), - }, ); - // Convert the clients into a link resolver. Since we want to get past - // possible temporary DNS failures, make the resolver retry let link_resolver = Arc::new(IpfsResolver::new(ipfs_client, env_vars.cheap_clone())); - let metrics_server = PrometheusMetricsServer::new(&logger_factory, prometheus_registry.clone()); - - let endpoint_metrics = Arc::new(EndpointMetrics::new( - logger.clone(), - &config.chains.providers(), - metrics_registry.cheap_clone(), - )); - - let graphql_metrics_registry = metrics_registry.clone(); - - let contention_logger = logger.clone(); - - // TODO: make option loadable from configuration TOML and environment: - let expensive_queries = - read_expensive_queries(&logger, opt.expensive_queries_filename).unwrap(); - - let store_builder = StoreBuilder::new( - &logger, - &node_id, - &config, - fork_base, - metrics_registry.cheap_clone(), - ) - .await; - - let primary_pool = store_builder.primary_pool(); - let subscription_manager = store_builder.subscription_manager(); - let chain_head_update_listener = store_builder.chain_head_update_listener(); - let network_store = store_builder.network_store(config.chain_ids()); - - let graphman_server_config = make_graphman_server_config( - primary_pool.clone(), - network_store.cheap_clone(), - metrics_registry.cheap_clone(), - &env_vars, - &logger, - &logger_factory, - ); - - start_graphman_server(opt.graphman_port, graphman_server_config).await; - - let launch_services = |logger: Logger, env_vars: Arc| async move { - use graph::components::network_provider; - - let block_store = network_store.block_store(); - - let mut provider_checks: Vec> = Vec::new(); - - if env_vars.genesis_validation_enabled { - provider_checks.push(Arc::new(network_provider::GenesisHashCheck::new( - block_store.clone(), - ))); - } - - provider_checks.push(Arc::new(network_provider::ExtendedBlocksCheck::new( - env_vars - .firehose_disable_extended_blocks_for_chains - .iter() - .map(|x| x.as_str().into()), - ))); - - let network_adapters = Networks::from_config( - logger.cheap_clone(), - &config, - metrics_registry.cheap_clone(), - endpoint_metrics, - &provider_checks, - ) - .await - .expect("unable to parse network configuration"); - - let blockchain_map = network_adapters - .blockchain_map( - &env_vars, - &node_id, - &logger, - block_store, - &logger_factory, - metrics_registry.cheap_clone(), - chain_head_update_listener, - ) - .await; - - // see comment on cleanup_ethereum_shallow_blocks - if !opt.disable_block_ingestor { - match blockchain_map - .get_all_by_kind::(BlockchainKind::Ethereum) - .ok() - .map(|chains| { - chains - .iter() - .flat_map(|c| { - if !c.chain_client().is_firehose() { - Some(c.name.to_string()) - } else { - None - } - }) - .collect() - }) { - Some(eth_network_names) => { - network_store - .block_store() - .cleanup_ethereum_shallow_blocks(eth_network_names) - .unwrap(); - } - // This code path only happens if the downcast on the blockchain map fails, that - // probably means we have a problem with the chain loading logic so it's probably - // safest to just refuse to start. - None => unreachable!( - "If you are seeing this message just use a different version of graph-node" - ), - } - } - - let blockchain_map = Arc::new(blockchain_map); - - let shards: Vec<_> = config.stores.keys().cloned().collect(); - let load_manager = Arc::new(LoadManager::new( - &logger, - shards, - expensive_queries, - metrics_registry.clone(), - )); - let graphql_runner = Arc::new(GraphQlRunner::new( - &logger, - network_store.clone(), - load_manager, - graphql_metrics_registry, - )); - let graphql_server = GraphQLQueryServer::new(&logger_factory, graphql_runner.clone()); - - let index_node_server = IndexNodeServer::new( - &logger_factory, - blockchain_map.clone(), - network_store.clone(), - link_resolver.clone(), - ); - - if !opt.disable_block_ingestor { - let logger = logger.clone(); - let ingestors = Networks::block_ingestors(&logger, &blockchain_map) - .await - .expect("unable to start block ingestors"); - - ingestors.into_iter().for_each(|ingestor| { - let logger = logger.clone(); - info!(logger,"Starting block ingestor for network";"network_name" => &ingestor.network_name().as_str(), "kind" => ingestor.kind().to_string()); - - graph::spawn(ingestor.run()); - }); - - // Start a task runner - let mut job_runner = graph::util::jobs::Runner::new(&logger); - register_store_jobs( - &mut job_runner, - network_store.clone(), - primary_pool, - metrics_registry.clone(), - ); - graph::spawn_blocking(job_runner.start()); - } - let static_filters = ENV_VARS.experimental_static_filters; - - let sg_count = Arc::new(SubgraphCountMetric::new(metrics_registry.cheap_clone())); - - let subgraph_instance_manager = SubgraphInstanceManager::new( - &logger_factory, - env_vars.cheap_clone(), - network_store.subgraph_store(), - blockchain_map.cheap_clone(), - sg_count.cheap_clone(), - metrics_registry.clone(), - link_resolver.clone(), - ipfs_service, - arweave_service, - static_filters, - ); - - // Create IPFS-based subgraph provider - let subgraph_provider = IpfsSubgraphAssignmentProvider::new( - &logger_factory, - link_resolver.clone(), - subgraph_instance_manager, - sg_count, - ); - - // Check version switching mode environment variable - let version_switching_mode = ENV_VARS.subgraph_version_switching_mode; - - // Create named subgraph provider for resolving subgraph name->ID mappings - let subgraph_registrar = Arc::new(IpfsSubgraphRegistrar::new( - &logger_factory, - link_resolver, - Arc::new(subgraph_provider), - network_store.subgraph_store(), - subscription_manager, - blockchain_map, - node_id.clone(), - version_switching_mode, - Arc::new(subgraph_settings), - )); - graph::spawn( - subgraph_registrar - .start() - .map_err(|e| panic!("failed to initialize subgraph provider {}", e)) - .compat(), - ); - - // Start admin JSON-RPC server. - let json_rpc_server = JsonRpcServer::serve( - json_rpc_port, - http_port, - subgraph_registrar.clone(), - node_id.clone(), - logger.clone(), - ) - .await - .expect("failed to start JSON-RPC admin server"); - - // Let the server run forever. - std::mem::forget(json_rpc_server); - - // Add the CLI subgraph with a REST request to the admin server. - if let Some(subgraph) = subgraph { - let (name, hash) = if subgraph.contains(':') { - let mut split = subgraph.split(':'); - (split.next().unwrap(), split.next().unwrap().to_owned()) - } else { - ("cli", subgraph) - }; - - let name = SubgraphName::new(name) - .expect("Subgraph name must contain only a-z, A-Z, 0-9, '-' and '_'"); - let subgraph_id = - DeploymentHash::new(hash).expect("Subgraph hash must be a valid IPFS hash"); - let debug_fork = opt - .debug_fork - .map(DeploymentHash::new) - .map(|h| h.expect("Debug fork hash must be a valid IPFS hash")); - let start_block = opt - .start_block - .map(|block| { - let mut split = block.split(':'); - ( - // BlockHash - split.next().unwrap().to_owned(), - // BlockNumber - split.next().unwrap().parse::().unwrap(), - ) - }) - .map(|(hash, number)| BlockPtr::try_from((hash.as_str(), number))) - .map(Result::unwrap); - - graph::spawn( - async move { - subgraph_registrar.create_subgraph(name.clone()).await?; - subgraph_registrar - .create_subgraph_version( - name, - subgraph_id, - node_id, - debug_fork, - start_block, - None, - None, - ) - .await - } - .map_err(|e| panic!("Failed to deploy subgraph from `--subgraph` flag: {}", e)), - ); - } - - // Serve GraphQL queries over HTTP - graph::spawn(async move { graphql_server.start(http_port).await }); - - // Run the index node server - graph::spawn(async move { index_node_server.start(index_node_port).await }); - - graph::spawn(async move { - metrics_server - .start(metrics_port) - .await - .expect("Failed to start metrics server") - }); - }; - - graph::spawn(launch_services(logger.clone(), env_vars.cheap_clone())); - - // Periodically check for contention in the tokio threadpool. First spawn a - // task that simply responds to "ping" requests. Then spawn a separate - // thread to periodically ping it and check responsiveness. - let (ping_send, mut ping_receive) = mpsc::channel::>(1); - graph::spawn(async move { - while let Some(pong_send) = ping_receive.recv().await { - let _ = pong_send.clone().send(()); - } - panic!("ping sender dropped"); - }); - std::thread::spawn(move || loop { - std::thread::sleep(Duration::from_secs(1)); - let (pong_send, pong_receive) = std::sync::mpsc::sync_channel(1); - if graph::futures03::executor::block_on(ping_send.clone().send(pong_send)).is_err() { - debug!(contention_logger, "Shutting down contention checker thread"); - break; - } - let mut timeout = Duration::from_millis(10); - while pong_receive.recv_timeout(timeout) == Err(std::sync::mpsc::RecvTimeoutError::Timeout) - { - debug!(contention_logger, "Possible contention in tokio threadpool"; - "timeout_ms" => timeout.as_millis(), - "code" => LogCode::TokioContention); - if timeout < ENV_VARS.kill_if_unresponsive_timeout { - timeout *= 10; - } else if ENV_VARS.kill_if_unresponsive { - // The node is unresponsive, kill it in hopes it will be restarted. - crit!(contention_logger, "Node is unresponsive, killing process"); - std::process::abort() - } - } - }); - - graph::futures03::future::pending::<()>().await; -} - -async fn start_graphman_server(port: u16, config: Option>) { - let Some(config) = config else { - return; - }; - - let server = GraphmanServer::new(config) - .unwrap_or_else(|err| panic!("Invalid graphman server configuration: {err:#}")); - - server - .start(port) - .await - .unwrap_or_else(|err| panic!("Failed to start graphman server: {err:#}")); -} - -fn make_graphman_server_config<'a>( - pool: ConnectionPool, - store: Arc, - metrics_registry: Arc, - env_vars: &EnvVars, - logger: &Logger, - logger_factory: &'a LoggerFactory, -) -> Option> { - let Some(auth_token) = &env_vars.graphman_server_auth_token else { - warn!( - logger, - "Missing graphman server auth token; graphman server will not start", - ); - - return None; - }; - - let notification_sender = Arc::new(NotificationSender::new(metrics_registry.clone())); - Some(GraphmanServerConfig { - pool, - notification_sender, - store, - logger_factory, - auth_token: auth_token.to_owned(), - }) + launcher::run(logger, opt, env_vars, ipfs_service, link_resolver, None).await; } diff --git a/node/src/manager/commands/run.rs b/node/src/manager/commands/run.rs index 2c6bfdcb148..1892353c6a9 100644 --- a/node/src/manager/commands/run.rs +++ b/node/src/manager/commands/run.rs @@ -213,6 +213,7 @@ pub async fn run( None, None, None, + false, ) .await?; diff --git a/server/json-rpc/src/lib.rs b/server/json-rpc/src/lib.rs index 103d36f806c..970bb3959d3 100644 --- a/server/json-rpc/src/lib.rs +++ b/server/json-rpc/src/lib.rs @@ -133,6 +133,7 @@ impl ServerState { None, None, params.history_blocks, + false, ) .await { diff --git a/store/postgres/Cargo.toml b/store/postgres/Cargo.toml index 027a46414d9..82a12d823c2 100644 --- a/store/postgres/Cargo.toml +++ b/store/postgres/Cargo.toml @@ -21,7 +21,7 @@ lazy_static = "1.5" lru_time_cache = "0.11" maybe-owned = "0.3.4" postgres = "0.19.1" -openssl = "0.10.72" +openssl = { version = "0.10.72", features = ["vendored"] } postgres-openssl = "0.5.1" rand.workspace = true serde = { workspace = true } diff --git a/store/postgres/src/subgraph_store.rs b/store/postgres/src/subgraph_store.rs index d19cc68f44a..54bbafbc124 100644 --- a/store/postgres/src/subgraph_store.rs +++ b/store/postgres/src/subgraph_store.rs @@ -1415,6 +1415,16 @@ impl SubgraphStoreTrait for SubgraphStore { }) } + fn unassign_subgraph(&self, deployment: &DeploymentLocator) -> Result<(), StoreError> { + let site = self.find_site(deployment.id.into())?; + let mut pconn = self.primary_conn()?; + pconn.transaction(|conn| -> Result<_, StoreError> { + let mut pconn = primary::Connection::new(conn); + let changes = pconn.unassign_subgraph(site.as_ref())?; + pconn.send_store_event(&self.sender, &StoreEvent::new(changes)) + }) + } + fn pause_subgraph(&self, deployment: &DeploymentLocator) -> Result<(), StoreError> { let site = self.find_site(deployment.id.into())?; let mut pconn = self.primary_conn()?; diff --git a/store/test-store/tests/chain/ethereum/manifest.rs b/store/test-store/tests/chain/ethereum/manifest.rs index f025be2e626..084398502bb 100644 --- a/store/test-store/tests/chain/ethereum/manifest.rs +++ b/store/test-store/tests/chain/ethereum/manifest.rs @@ -91,6 +91,13 @@ impl LinkResolverTrait for TextResolver { Box::new(self.clone()) } + fn for_manifest( + &self, + _manifest_path: &str, + ) -> Result, anyhow::Error> { + Ok(Box::new(self.clone())) + } + async fn cat(&self, _logger: &Logger, link: &Link) -> Result, anyhow::Error> { self.texts .get(&link.link) diff --git a/tests/runner-tests/file-link-resolver/abis/Contract.abi b/tests/runner-tests/file-link-resolver/abis/Contract.abi new file mode 100644 index 00000000000..9d9f56b9263 --- /dev/null +++ b/tests/runner-tests/file-link-resolver/abis/Contract.abi @@ -0,0 +1,15 @@ +[ + { + "anonymous": false, + "inputs": [ + { + "indexed": false, + "internalType": "string", + "name": "testCommand", + "type": "string" + } + ], + "name": "TestEvent", + "type": "event" + } +] diff --git a/tests/runner-tests/file-link-resolver/package.json b/tests/runner-tests/file-link-resolver/package.json new file mode 100644 index 00000000000..11e796fc72c --- /dev/null +++ b/tests/runner-tests/file-link-resolver/package.json @@ -0,0 +1,13 @@ +{ + "name": "file-link-resolver", + "version": "0.1.0", + "scripts": { + "codegen": "graph codegen --skip-migrations", + "create:test": "graph create test/file-link-resolver --node $GRAPH_NODE_ADMIN_URI", + "deploy:test": "graph deploy test/file-link-resolver --version-label v0.0.1 --ipfs $IPFS_URI --node $GRAPH_NODE_ADMIN_URI" + }, + "devDependencies": { + "@graphprotocol/graph-cli": "0.60.0", + "@graphprotocol/graph-ts": "0.31.0" + } +} diff --git a/tests/runner-tests/file-link-resolver/schema.graphql b/tests/runner-tests/file-link-resolver/schema.graphql new file mode 100644 index 00000000000..2eec3606b65 --- /dev/null +++ b/tests/runner-tests/file-link-resolver/schema.graphql @@ -0,0 +1,5 @@ +type Block @entity { + id: ID! + number: BigInt! + hash: Bytes! +} \ No newline at end of file diff --git a/tests/runner-tests/file-link-resolver/src/mapping.ts b/tests/runner-tests/file-link-resolver/src/mapping.ts new file mode 100644 index 00000000000..ecce2ff9de5 --- /dev/null +++ b/tests/runner-tests/file-link-resolver/src/mapping.ts @@ -0,0 +1,11 @@ +import { ethereum, log } from "@graphprotocol/graph-ts"; +import { Block } from "../generated/schema"; + +export function handleBlock(block: ethereum.Block): void { + log.info("Processing block: {}", [block.number.toString()]); + + let blockEntity = new Block(block.number.toString()); + blockEntity.number = block.number; + blockEntity.hash = block.hash; + blockEntity.save(); +} diff --git a/tests/runner-tests/file-link-resolver/subgraph.yaml b/tests/runner-tests/file-link-resolver/subgraph.yaml new file mode 100644 index 00000000000..4a50915beb4 --- /dev/null +++ b/tests/runner-tests/file-link-resolver/subgraph.yaml @@ -0,0 +1,22 @@ +specVersion: 0.0.8 +schema: + file: ./schema.graphql +dataSources: + - kind: ethereum/contract + name: Contract + network: test + source: + address: "0x0000000000000000000000000000000000000000" + abi: Contract + mapping: + kind: ethereum/events + apiVersion: 0.0.7 + language: wasm/assemblyscript + entities: + - Block + abis: + - name: Contract + file: ./abis/Contract.abi + blockHandlers: + - handler: handleBlock + file: ./src/mapping.ts diff --git a/tests/runner-tests/yarn.lock b/tests/runner-tests/yarn.lock index 2f9f1287bec..ee3d9e5202a 100644 --- a/tests/runner-tests/yarn.lock +++ b/tests/runner-tests/yarn.lock @@ -349,40 +349,6 @@ which "2.0.2" yaml "1.10.2" -"@graphprotocol/graph-cli@0.79.0-alpha-20240711124603-49edf22": - version "0.79.0-alpha-20240711124603-49edf22" - resolved "https://registry.yarnpkg.com/@graphprotocol/graph-cli/-/graph-cli-0.79.0-alpha-20240711124603-49edf22.tgz#4e3f6201932a0b68ce64d6badd8432cf2bead3c2" - integrity sha512-fZrdPiFbbbBVMnvsjfKA+j48WzzquaHQIpozBqnUKRPCV1n1NenIaq2nH16mlMwovRIS7AAIVCpa0QYQuPzw7Q== - dependencies: - "@float-capital/float-subgraph-uncrashable" "^0.0.0-alpha.4" - "@oclif/core" "2.8.6" - "@oclif/plugin-autocomplete" "^2.3.6" - "@oclif/plugin-not-found" "^2.4.0" - "@whatwg-node/fetch" "^0.8.4" - assemblyscript "0.19.23" - binary-install-raw "0.0.13" - chalk "3.0.0" - chokidar "3.5.3" - debug "4.3.4" - docker-compose "0.23.19" - dockerode "2.5.8" - fs-extra "9.1.0" - glob "9.3.5" - gluegun "5.1.6" - graphql "15.5.0" - immutable "4.2.1" - ipfs-http-client "55.0.0" - jayson "4.0.0" - js-yaml "3.14.1" - open "8.4.2" - prettier "3.0.3" - semver "7.4.0" - sync-request "6.1.0" - tmp-promise "3.0.3" - web3-eth-abi "1.7.0" - which "2.0.2" - yaml "1.10.2" - "@graphprotocol/graph-ts@0.30.0": version "0.30.0" resolved "https://registry.npmjs.org/@graphprotocol/graph-ts/-/graph-ts-0.30.0.tgz" @@ -1507,11 +1473,6 @@ defaults@^1.0.3: dependencies: clone "^1.0.2" -define-lazy-prop@^2.0.0: - version "2.0.0" - resolved "https://registry.yarnpkg.com/define-lazy-prop/-/define-lazy-prop-2.0.0.tgz#3f7ae421129bcaaac9bc74905c98a0009ec9ee7f" - integrity sha512-Ds09qNh8yw3khSjiJjiUInaGX9xlqZDY7JVryGxdxV7NPeuqQfplOpQ66yJFZut3jLa5zOwkXw1g9EI2uKh4Og== - delay@^5.0.0: version "5.0.0" resolved "https://registry.npmjs.org/delay/-/delay-5.0.0.tgz" @@ -1584,13 +1545,6 @@ ejs@3.1.6: dependencies: jake "^10.6.1" -ejs@3.1.8: - version "3.1.8" - resolved "https://registry.yarnpkg.com/ejs/-/ejs-3.1.8.tgz#758d32910c78047585c7ef1f92f9ee041c1c190b" - integrity sha512-/sXZeMlhS0ArkfX2Aw780gJzXSMPnKjtspYZv+f3NiKLlubezAHDU5+9xz6gd3/NhG3txQCo6xlglmTS+oTGEQ== - dependencies: - jake "^10.8.5" - ejs@^3.1.8: version "3.1.9" resolved "https://registry.npmjs.org/ejs/-/ejs-3.1.9.tgz" @@ -2055,42 +2009,6 @@ gluegun@5.1.2: which "2.0.2" yargs-parser "^21.0.0" -gluegun@5.1.6: - version "5.1.6" - resolved "https://registry.yarnpkg.com/gluegun/-/gluegun-5.1.6.tgz#74ec13193913dc610f5c1a4039972c70c96a7bad" - integrity sha512-9zbi4EQWIVvSOftJWquWzr9gLX2kaDgPkNR5dYWbM53eVvCI3iKuxLlnKoHC0v4uPoq+Kr/+F569tjoFbA4DSA== - dependencies: - apisauce "^2.1.5" - app-module-path "^2.2.0" - cli-table3 "0.6.0" - colors "1.4.0" - cosmiconfig "7.0.1" - cross-spawn "7.0.3" - ejs "3.1.8" - enquirer "2.3.6" - execa "5.1.1" - fs-jetpack "4.3.1" - lodash.camelcase "^4.3.0" - lodash.kebabcase "^4.1.1" - lodash.lowercase "^4.3.0" - lodash.lowerfirst "^4.3.1" - lodash.pad "^4.5.1" - lodash.padend "^4.6.1" - lodash.padstart "^4.6.1" - lodash.repeat "^4.1.0" - lodash.snakecase "^4.1.1" - lodash.startcase "^4.4.0" - lodash.trim "^4.5.1" - lodash.trimend "^4.5.1" - lodash.trimstart "^4.5.1" - lodash.uppercase "^4.3.0" - lodash.upperfirst "^4.3.1" - ora "4.0.2" - pluralize "^8.0.0" - semver "7.3.5" - which "2.0.2" - yargs-parser "^21.0.0" - graceful-fs@^4.1.6, graceful-fs@^4.2.0: version "4.2.11" resolved "https://registry.yarnpkg.com/graceful-fs/-/graceful-fs-4.2.11.tgz#4183e4e8bf08bb6e05bbb2f7d2e0c8f712ca40e3" @@ -2377,7 +2295,7 @@ is-binary-path@~2.1.0: dependencies: binary-extensions "^2.0.0" -is-docker@^2.0.0, is-docker@^2.1.1: +is-docker@^2.0.0: version "2.2.1" resolved "https://registry.npmjs.org/is-docker/-/is-docker-2.2.1.tgz" integrity sha512-F+i2BKsFrH66iaUFc0woD8sLy8getkwTwtOBjvs56Cx4CgJDeKQeqfz8wAYiSb8JOprWhHH5p77PbmYCvvUuXQ== @@ -3022,15 +2940,6 @@ onetime@^5.1.0, onetime@^5.1.2: dependencies: mimic-fn "^2.1.0" -open@8.4.2: - version "8.4.2" - resolved "https://registry.yarnpkg.com/open/-/open-8.4.2.tgz#5b5ffe2a8f793dcd2aad73e550cb87b59cb084f9" - integrity sha512-7x81NCL719oNbsq/3mh+hVrAWmFuEYUqrq/Iw3kUzH8ReypT9QQ0BLoJS7/G9k6N81XjW4qHWtjWwe/9eLy1EQ== - dependencies: - define-lazy-prop "^2.0.0" - is-docker "^2.1.1" - is-wsl "^2.2.0" - ora@4.0.2: version "4.0.2" resolved "https://registry.npmjs.org/ora/-/ora-4.0.2.tgz" @@ -3151,11 +3060,6 @@ prettier@1.19.1: resolved "https://registry.npmjs.org/prettier/-/prettier-1.19.1.tgz" integrity sha512-s7PoyDv/II1ObgQunCbB9PdLmUcBZcnWOcxDh7O0N/UwDEsHyqkW+Qh28jW+mVuCdx7gLB0BotYI1Y6uI9iyew== -prettier@3.0.3: - version "3.0.3" - resolved "https://registry.yarnpkg.com/prettier/-/prettier-3.0.3.tgz#432a51f7ba422d1469096c0fdc28e235db8f9643" - integrity sha512-L/4pUDMxcNa8R/EthV08Zt42WBO4h1rarVtK0K+QJG0X187OLo7l699jWw0GKuwzkPQ//jMFA/8Xm6Fh3J/DAg== - process-nextick-args@~2.0.0: version "2.0.1" resolved "https://registry.npmjs.org/process-nextick-args/-/process-nextick-args-2.0.1.tgz" diff --git a/tests/src/fixture/mod.rs b/tests/src/fixture/mod.rs index 217c7f705b6..b9c07a41e7d 100644 --- a/tests/src/fixture/mod.rs +++ b/tests/src/fixture/mod.rs @@ -3,6 +3,7 @@ pub mod substreams; use std::collections::{BTreeSet, HashMap}; use std::marker::PhantomData; +use std::path::PathBuf; use std::sync::Mutex; use std::time::{Duration, Instant}; @@ -17,7 +18,9 @@ use graph::blockchain::{ TriggerFilterWrapper, TriggersAdapter, TriggersAdapterSelector, }; use graph::cheap_clone::CheapClone; -use graph::components::link_resolver::{ArweaveClient, ArweaveResolver, FileSizeLimit}; +use graph::components::link_resolver::{ + ArweaveClient, ArweaveResolver, FileLinkResolver, FileSizeLimit, +}; use graph::components::metrics::MetricsRegistry; use graph::components::network_provider::ChainName; use graph::components::store::{BlockStore, DeploymentLocator, EthereumCallCache, SourceableStore}; @@ -38,7 +41,7 @@ use graph::prelude::ethabi::ethereum_types::H256; use graph::prelude::serde_json::{self, json}; use graph::prelude::{ async_trait, lazy_static, q, r, ApiVersion, BigInt, BlockNumber, DeploymentHash, - GraphQlRunner as _, IpfsResolver, LoggerFactory, NodeId, QueryError, + GraphQlRunner as _, IpfsResolver, LinkResolver, LoggerFactory, NodeId, QueryError, SubgraphAssignmentProvider, SubgraphCountMetric, SubgraphName, SubgraphRegistrar, SubgraphStore as _, SubgraphVersionSwitchingMode, TriggerProcessor, }; @@ -455,6 +458,38 @@ pub async fn setup( chain: &impl TestChainTrait, graft_block: Option, env_vars: Option, +) -> TestContext { + setup_inner(test_info, stores, chain, graft_block, env_vars, None).await +} + +pub async fn setup_with_file_link_resolver( + test_info: &TestInfo, + stores: &Stores, + chain: &impl TestChainTrait, + graft_block: Option, + env_vars: Option, +) -> TestContext { + let mut base_dir = PathBuf::from(test_info.test_dir.clone()); + base_dir.push("build"); + let link_resolver = Arc::new(FileLinkResolver::with_base_dir(base_dir)); + setup_inner( + test_info, + stores, + chain, + graft_block, + env_vars, + Some(link_resolver), + ) + .await +} + +pub async fn setup_inner( + test_info: &TestInfo, + stores: &Stores, + chain: &impl TestChainTrait, + graft_block: Option, + env_vars: Option, + link_resolver: Option>, ) -> TestContext { let env_vars = Arc::new(match env_vars { Some(ev) => ev, @@ -483,10 +518,13 @@ pub async fn setup( .unwrap(), ); - let link_resolver = Arc::new(IpfsResolver::new( - ipfs_client.cheap_clone(), - Default::default(), - )); + let link_resolver = match link_resolver { + Some(link_resolver) => link_resolver, + None => Arc::new(IpfsResolver::new( + ipfs_client.cheap_clone(), + Default::default(), + )), + }; let ipfs_service = ipfs_service( ipfs_client.cheap_clone(), @@ -574,6 +612,7 @@ pub async fn setup( None, graft_block, None, + false, ) .await .expect("failed to create subgraph version"); diff --git a/tests/src/lib.rs b/tests/src/lib.rs index c89168d7003..2b67fc4dc44 100644 --- a/tests/src/lib.rs +++ b/tests/src/lib.rs @@ -4,6 +4,7 @@ pub mod fixture; pub mod helpers; #[macro_use] pub mod macros; +pub mod recipe; pub mod subgraph; pub use config::{Config, DbConfig, EthConfig, CONFIG}; diff --git a/tests/src/recipe.rs b/tests/src/recipe.rs new file mode 100644 index 00000000000..de7f0ebe0c1 --- /dev/null +++ b/tests/src/recipe.rs @@ -0,0 +1,133 @@ +use crate::{ + fixture::{stores, Stores, TestInfo}, + helpers::run_cmd, +}; +use graph::ipfs; +use graph::prelude::{DeploymentHash, SubgraphName}; +use std::process::Command; +pub struct RunnerTestRecipe { + pub stores: Stores, + pub test_info: TestInfo, +} + +impl RunnerTestRecipe { + pub async fn new(test_name: &str, subgraph_name: &str) -> Self { + let subgraph_name = SubgraphName::new(subgraph_name).unwrap(); + let test_dir = format!("./runner-tests/{}", subgraph_name); + + let (stores, hash) = tokio::join!( + stores(test_name, "./runner-tests/config.simple.toml"), + build_subgraph(&test_dir, None) + ); + + Self { + stores, + test_info: TestInfo { + test_dir, + test_name: test_name.to_string(), + subgraph_name, + hash, + }, + } + } + + /// Builds a new test subgraph with a custom deploy command. + pub async fn new_with_custom_cmd(name: &str, subgraph_name: &str, deploy_cmd: &str) -> Self { + let subgraph_name = SubgraphName::new(subgraph_name).unwrap(); + let test_dir = format!("./runner-tests/{}", subgraph_name); + + let (stores, hash) = tokio::join!( + stores(name, "./runner-tests/config.simple.toml"), + build_subgraph(&test_dir, Some(deploy_cmd)) + ); + + Self { + stores, + test_info: TestInfo { + test_dir, + test_name: name.to_string(), + subgraph_name, + hash, + }, + } + } + + pub async fn new_with_file_link_resolver( + name: &str, + subgraph_name: &str, + manifest: &str, + ) -> Self { + let subgraph_name = SubgraphName::new(subgraph_name).unwrap(); + let test_dir = format!("./runner-tests/{}", subgraph_name); + + let stores = stores(name, "./runner-tests/config.simple.toml").await; + build_subgraph(&test_dir, None).await; + let hash = DeploymentHash::new(manifest).unwrap(); + Self { + stores, + test_info: TestInfo { + test_dir, + test_name: name.to_string(), + subgraph_name, + hash, + }, + } + } +} + +/// deploy_cmd is the command to run to deploy the subgraph. If it is None, the +/// default `yarn deploy:test` is used. +async fn build_subgraph(dir: &str, deploy_cmd: Option<&str>) -> DeploymentHash { + build_subgraph_with_yarn_cmd(dir, deploy_cmd.unwrap_or("deploy:test")).await +} + +async fn build_subgraph_with_yarn_cmd(dir: &str, yarn_cmd: &str) -> DeploymentHash { + build_subgraph_with_yarn_cmd_and_arg(dir, yarn_cmd, None).await +} + +pub async fn build_subgraph_with_yarn_cmd_and_arg( + dir: &str, + yarn_cmd: &str, + arg: Option<&str>, +) -> DeploymentHash { + // Test that IPFS is up. + ipfs::IpfsRpcClient::new(ipfs::ServerAddress::local_rpc_api(), &graph::log::discard()) + .await + .expect("Could not connect to IPFS, make sure it's running at port 5001"); + + // Make sure dependencies are present. + + run_cmd( + Command::new("yarn") + .arg("install") + .arg("--mutex") + .arg("file:.yarn-mutex") + .current_dir("./runner-tests/"), + ); + + // Run codegen. + run_cmd(Command::new("yarn").arg("codegen").current_dir(dir)); + + let mut args = vec![yarn_cmd]; + args.extend(arg); + + // Run `deploy` for the side effect of uploading to IPFS, the graph node url + // is fake and the actual deploy call is meant to fail. + let deploy_output = run_cmd( + Command::new("yarn") + .args(&args) + .env("IPFS_URI", "http://127.0.0.1:5001") + .env("GRAPH_NODE_ADMIN_URI", "http://localhost:0") + .current_dir(dir), + ); + + // Hack to extract deployment id from `graph deploy` output. + const ID_PREFIX: &str = "Build completed: "; + let Some(mut line) = deploy_output.lines().find(|line| line.contains(ID_PREFIX)) else { + panic!("No deployment id found, graph deploy probably had an error") + }; + if !line.starts_with(ID_PREFIX) { + line = &line[5..line.len() - 5]; // workaround for colored output + } + DeploymentHash::new(line.trim_start_matches(ID_PREFIX)).unwrap() +} diff --git a/tests/tests/file_link_resolver.rs b/tests/tests/file_link_resolver.rs new file mode 100644 index 00000000000..1b12aef64c4 --- /dev/null +++ b/tests/tests/file_link_resolver.rs @@ -0,0 +1,62 @@ +use graph::object; +use graph_tests::{ + fixture::{ + self, + ethereum::{chain, empty_block, genesis}, + test_ptr, + }, + recipe::RunnerTestRecipe, +}; + +#[tokio::test] +async fn file_link_resolver() -> anyhow::Result<()> { + std::env::set_var("GRAPH_NODE_DISABLE_DEPLOYMENT_HASH_VALIDATION", "true"); + let RunnerTestRecipe { stores, test_info } = RunnerTestRecipe::new_with_file_link_resolver( + "file_link_resolver", + "file-link-resolver", + "subgraph.yaml", + ) + .await; + + let blocks = { + let block_0 = genesis(); + let block_1 = empty_block(block_0.ptr(), test_ptr(1)); + let block_2 = empty_block(block_1.ptr(), test_ptr(2)); + let block_3 = empty_block(block_2.ptr(), test_ptr(3)); + + vec![block_0, block_1, block_2, block_3] + }; + + let chain = chain(&test_info.test_name, blocks, &stores, None).await; + + let ctx = fixture::setup_with_file_link_resolver(&test_info, &stores, &chain, None, None).await; + ctx.start_and_sync_to(test_ptr(3)).await; + let query = r#"{ blocks(first: 4, orderBy: number) { id, hash } }"#; + let query_res = ctx.query(query).await.unwrap(); + + assert_eq!( + query_res, + Some(object! { + blocks: vec![ + object! { + id: test_ptr(0).number.to_string(), + hash: format!("0x{}", test_ptr(0).hash_hex()), + }, + object! { + id: test_ptr(1).number.to_string(), + hash: format!("0x{}", test_ptr(1).hash_hex()), + }, + object! { + id: test_ptr(2).number.to_string(), + hash: format!("0x{}", test_ptr(2).hash_hex()), + }, + object! { + id: test_ptr(3).number.to_string(), + hash: format!("0x{}", test_ptr(3).hash_hex()), + }, + ] + }) + ); + + Ok(()) +} diff --git a/tests/tests/runner_tests.rs b/tests/tests/runner_tests.rs index 261c886dfea..48a1bb67ffe 100644 --- a/tests/tests/runner_tests.rs +++ b/tests/tests/runner_tests.rs @@ -1,5 +1,4 @@ use std::marker::PhantomData; -use std::process::Command; use std::str::FromStr; use std::sync::atomic::{self, AtomicBool}; use std::sync::Arc; @@ -13,14 +12,11 @@ use graph::data::subgraph::schema::{SubgraphError, SubgraphHealth}; use graph::data::value::Word; use graph::data_source::CausalityRegion; use graph::env::{EnvVars, TEST_WITH_NO_REORG}; -use graph::ipfs; use graph::ipfs::test_utils::add_files_to_local_ipfs_node_for_testing; use graph::object; use graph::prelude::ethabi::ethereum_types::H256; use graph::prelude::web3::types::Address; -use graph::prelude::{ - hex, CheapClone, DeploymentHash, SubgraphAssignmentProvider, SubgraphName, SubgraphStore, -}; +use graph::prelude::{hex, CheapClone, SubgraphAssignmentProvider, SubgraphName, SubgraphStore}; use graph_tests::fixture::ethereum::{ chain, empty_block, generate_empty_blocks_for_range, genesis, push_test_command, push_test_log, push_test_polling_trigger, @@ -28,60 +24,12 @@ use graph_tests::fixture::ethereum::{ use graph_tests::fixture::substreams::chain as substreams_chain; use graph_tests::fixture::{ - self, stores, test_ptr, test_ptr_reorged, MockAdapterSelector, NoopAdapterSelector, Stores, - TestChainTrait, TestContext, TestInfo, + self, test_ptr, test_ptr_reorged, MockAdapterSelector, NoopAdapterSelector, TestChainTrait, + TestContext, TestInfo, }; -use graph_tests::helpers::run_cmd; +use graph_tests::recipe::{build_subgraph_with_yarn_cmd_and_arg, RunnerTestRecipe}; use slog::{o, Discard, Logger}; -struct RunnerTestRecipe { - pub stores: Stores, - pub test_info: TestInfo, -} - -impl RunnerTestRecipe { - async fn new(test_name: &str, subgraph_name: &str) -> Self { - let subgraph_name = SubgraphName::new(subgraph_name).unwrap(); - let test_dir = format!("./runner-tests/{}", subgraph_name); - - let (stores, hash) = tokio::join!( - stores(test_name, "./runner-tests/config.simple.toml"), - build_subgraph(&test_dir, None) - ); - - Self { - stores, - test_info: TestInfo { - test_dir, - test_name: test_name.to_string(), - subgraph_name, - hash, - }, - } - } - - /// Builds a new test subgraph with a custom deploy command. - async fn new_with_custom_cmd(name: &str, subgraph_name: &str, deploy_cmd: &str) -> Self { - let subgraph_name = SubgraphName::new(subgraph_name).unwrap(); - let test_dir = format!("./runner-tests/{}", subgraph_name); - - let (stores, hash) = tokio::join!( - stores(name, "./runner-tests/config.simple.toml"), - build_subgraph(&test_dir, Some(deploy_cmd)) - ); - - Self { - stores, - test_info: TestInfo { - test_dir, - test_name: name.to_string(), - subgraph_name, - hash, - }, - } - } -} - fn assert_eq_ignore_backtrace(err: &SubgraphError, expected: &SubgraphError) { let equal = { if err.subgraph_id != expected.subgraph_id @@ -1278,60 +1226,3 @@ async fn arweave_file_data_sources() { Some(object! { file: object!{ id: id, content: content.clone() } }) ); } - -/// deploy_cmd is the command to run to deploy the subgraph. If it is None, the -/// default `yarn deploy:test` is used. -async fn build_subgraph(dir: &str, deploy_cmd: Option<&str>) -> DeploymentHash { - build_subgraph_with_yarn_cmd(dir, deploy_cmd.unwrap_or("deploy:test")).await -} - -async fn build_subgraph_with_yarn_cmd(dir: &str, yarn_cmd: &str) -> DeploymentHash { - build_subgraph_with_yarn_cmd_and_arg(dir, yarn_cmd, None).await -} - -async fn build_subgraph_with_yarn_cmd_and_arg( - dir: &str, - yarn_cmd: &str, - arg: Option<&str>, -) -> DeploymentHash { - // Test that IPFS is up. - ipfs::IpfsRpcClient::new(ipfs::ServerAddress::local_rpc_api(), &graph::log::discard()) - .await - .expect("Could not connect to IPFS, make sure it's running at port 5001"); - - // Make sure dependencies are present. - - run_cmd( - Command::new("yarn") - .arg("install") - .arg("--mutex") - .arg("file:.yarn-mutex") - .current_dir("./runner-tests/"), - ); - - // Run codegen. - run_cmd(Command::new("yarn").arg("codegen").current_dir(dir)); - - let mut args = vec![yarn_cmd]; - args.extend(arg); - - // Run `deploy` for the side effect of uploading to IPFS, the graph node url - // is fake and the actual deploy call is meant to fail. - let deploy_output = run_cmd( - Command::new("yarn") - .args(&args) - .env("IPFS_URI", "http://127.0.0.1:5001") - .env("GRAPH_NODE_ADMIN_URI", "http://localhost:0") - .current_dir(dir), - ); - - // Hack to extract deployment id from `graph deploy` output. - const ID_PREFIX: &str = "Build completed: "; - let Some(mut line) = deploy_output.lines().find(|line| line.contains(ID_PREFIX)) else { - panic!("No deployment id found, graph deploy probably had an error") - }; - if !line.starts_with(ID_PREFIX) { - line = &line[5..line.len() - 5]; // workaround for colored output - } - DeploymentHash::new(line.trim_start_matches(ID_PREFIX)).unwrap() -}