diff --git a/.github/workflows/build-and-test.yml b/.github/workflows/build-and-test.yml new file mode 100644 index 0000000..b352d0d --- /dev/null +++ b/.github/workflows/build-and-test.yml @@ -0,0 +1,48 @@ +name: Rust + +on: + push: + branches: + - main + - staging + pull_request: + branches: + - main + - staging + +jobs: + build: + runs-on: ubuntu-latest + + steps: + - name: Checkout code + uses: actions/checkout@v2 + + - name: Install Rust + uses: actions-rs/toolchain@v1 + with: + toolchain: stable + profile: minimal + override: true + + - name: Cache cargo registry + uses: actions/cache@v2 + with: + path: ~/.cargo/registry + key: ${{ runner.os }}-cargo-registry-${{ hashFiles('**/Cargo.lock') }} + restore-keys: | + ${{ runner.os }}-cargo-registry- + + - name: Cache cargo index + uses: actions/cache@v2 + with: + path: ~/.cargo/git + key: ${{ runner.os }}-cargo-index-${{ hashFiles('**/Cargo.lock') }} + restore-keys: | + ${{ runner.os }}-cargo-index- + + - name: Build + run: cargo build --verbose + + - name: Run tests + run: cargo test --verbose diff --git a/.github/workflows/build-production.yml b/.github/workflows/build-production.yml new file mode 100644 index 0000000..f721c49 --- /dev/null +++ b/.github/workflows/build-production.yml @@ -0,0 +1,54 @@ +name: PRODUCTION -- Build & Push Docker Image +on: + push: + branches: [main] + paths: + - "**" + +jobs: + build_and_push: + name: Push Docker image to Docker Hub + runs-on: ubuntu-latest + steps: + - name: Check out the repo + uses: actions/checkout@v3 + with: + submodules: "true" + + - name: Log in to Docker Hub + uses: docker/login-action@v2 + with: + username: ${{ secrets.DOCKERHUB_USERNAME }} + password: ${{ secrets.DOCKERHUB_TOKEN }} + + - name: Extract metadata (tags, labels) for Docker + id: meta + uses: docker/metadata-action@v4 + with: + images: lukasdeco/indexer + + - name: Build and push Docker image + uses: docker/build-push-action@v4 + with: + context: . + file: Dockerfile + push: true + tags: themetadao/asset-watcher:main + labels: ${{ steps.meta.outputs.labels }} + + # redeploy_app: + # name: Redeploy app on Railway + # needs: build_and_push + # runs-on: ubuntu-latest + # steps: + # - name: Call Redeploy API + # env: + # RAILWAY_TOKEN: ${{ secrets.RAILWAY_TOKEN }} + # run: | + # echo railway toke: $RAILWAY_TOKEN + # echo "Authorization: Bearer ${RAILWAY_TOKEN}" + # curl https://backboard.railway.app/graphql/v2 \ + # -X POST \ + # -H "Authorization: Bearer ${RAILWAY_TOKEN}" \ + # -H "Content-Type: application/json" \ + # --data '{"query": "mutation serviceInstanceDeploy($serviceId: String!, $environmentId: String!) {\n serviceInstanceDeploy(serviceId: $serviceId, environmentId: $environmentId)\n}\n", "variables": { "environmentId": "0942e3fe-8ec3-49b4-b8fb-26eb10b6e08f", "serviceId": "783719dc-3c30-437d-a3a9-b1aeb1d5c487" } }' diff --git a/.github/workflows/build-staging.yml b/.github/workflows/build-staging.yml new file mode 100644 index 0000000..a7e6269 --- /dev/null +++ b/.github/workflows/build-staging.yml @@ -0,0 +1,54 @@ +name: STAGING -- Build & Push Docker Image +on: + push: + branches: [staging] + paths: + - "**" + +jobs: + build_and_push: + name: Push Docker image to Docker Hub + runs-on: ubuntu-latest + steps: + - name: Check out the repo + uses: actions/checkout@v3 + with: + submodules: "true" + + - name: Log in to Docker Hub + uses: docker/login-action@v2 + with: + username: ${{ secrets.DOCKERHUB_USERNAME }} + password: ${{ secrets.DOCKERHUB_TOKEN }} + + - name: Extract metadata (tags, labels) for Docker + id: meta + uses: docker/metadata-action@v4 + with: + images: lukasdeco/indexer + + - name: Build and push Docker image + uses: docker/build-push-action@v4 + with: + context: . + file: Dockerfile + push: true + tags: themetadao/asset-watcher:staging + labels: ${{ steps.meta.outputs.labels }} + + redeploy_app: + name: Redeploy app on Railway + needs: build_and_push + runs-on: ubuntu-latest + steps: + - name: Call Redeploy API + env: + RAILWAY_TOKEN: ${{ secrets.RAILWAY_TOKEN }} + run: | + echo railway toke: $RAILWAY_TOKEN + echo "Authorization: Bearer ${RAILWAY_TOKEN}" + curl https://backboard.railway.app/graphql/v2 \ + -X POST \ + -H "Authorization: Bearer ${RAILWAY_TOKEN}" \ + -H "Content-Type: application/json" \ + --data '{"query": "mutation serviceInstanceDeploy($serviceId: String!, $environmentId: String!) {\n serviceInstanceDeploy(serviceId: $serviceId, environmentId: $environmentId)\n}\n", "variables": { "environmentId": "4015588d-3c82-4413-9484-314539aecd39", "serviceId": "c1221eaa-e011-4ed7-9bbb-5597692f5532" } }' diff --git a/Cargo.lock b/Cargo.lock index aa4230b..21483b4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -135,6 +135,55 @@ dependencies = [ "winapi", ] +[[package]] +name = "anstream" +version = "0.6.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "418c75fa768af9c03be99d17643f93f79bbba589895012a80e3452a19ddda15b" +dependencies = [ + "anstyle", + "anstyle-parse", + "anstyle-query", + "anstyle-wincon", + "colorchoice", + "is_terminal_polyfill", + "utf8parse", +] + +[[package]] +name = "anstyle" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "038dfcf04a5feb68e9c60b21c9625a54c2c0616e79b72b0fd87075a056ae1d1b" + +[[package]] +name = "anstyle-parse" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c03a11a9034d92058ceb6ee011ce58af4a9bf61491aa7e1e59ecd24bd40d22d4" +dependencies = [ + "utf8parse", +] + +[[package]] +name = "anstyle-query" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ad186efb764318d35165f1758e7dcef3b10628e26d41a44bc5550652e6804391" +dependencies = [ + "windows-sys 0.52.0", +] + +[[package]] +name = "anstyle-wincon" +version = "3.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "61a38449feb7068f52bb06c12759005cf459ee52bb4adc1d5a7c4322d716fb19" +dependencies = [ + "anstyle", + "windows-sys 0.52.0", +] + [[package]] name = "anyhow" version = "1.0.86" @@ -327,10 +376,13 @@ version = "0.1.0" dependencies = [ "async-trait", "bs58 0.5.1", + "bytes", + "chrono", "deadpool", "deadpool-diesel", "diesel", "dotenv", + "env_logger 0.11.3", "futures", "futures-channel", "futures-util", @@ -339,8 +391,10 @@ dependencies = [ "hyper 1.3.1", "hyper-util", "postgres", + "reqwest 0.12.4", "serde", "serde_json", + "serde_urlencoded", "solana-account-decoder", "solana-client", "solana-program", @@ -350,6 +404,7 @@ dependencies = [ "tokio-postgres", "tokio-tungstenite 0.21.0", "url", + "warp", ] [[package]] @@ -397,6 +452,12 @@ dependencies = [ "syn 2.0.65", ] +[[package]] +name = "atomic-waker" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" + [[package]] name = "atty" version = "0.2.14" @@ -447,6 +508,12 @@ version = "0.21.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567" +[[package]] +name = "base64" +version = "0.22.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" + [[package]] name = "base64ct" version = "1.6.0" @@ -820,6 +887,12 @@ dependencies = [ "os_str_bytes", ] +[[package]] +name = "colorchoice" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b6a852b24ab71dffc585bcb46eaf7959d175cb865a7152e35b348d1b2960422" + [[package]] name = "combine" version = "3.8.1" @@ -1167,6 +1240,7 @@ checksum = "ff236accb9a5069572099f0b350a92e9560e8e63a9b8d546162f4a5e03026bb2" dependencies = [ "bitflags 2.5.0", "byteorder", + "chrono", "diesel_derives", "itoa", "pq-sys", @@ -1335,6 +1409,16 @@ dependencies = [ "syn 2.0.65", ] +[[package]] +name = "env_filter" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a009aa4810eb158359dda09d0c87378e4bbb89b5a801f016885a4707ba24f7ea" +dependencies = [ + "log", + "regex", +] + [[package]] name = "env_logger" version = "0.9.3" @@ -1348,6 +1432,19 @@ dependencies = [ "termcolor", ] +[[package]] +name = "env_logger" +version = "0.11.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38b35839ba51819680ba087cd351788c9a3c476841207e0b8cee0b04722343b9" +dependencies = [ + "anstream", + "anstyle", + "env_filter", + "humantime", + "log", +] + [[package]] name = "equivalent" version = "1.0.1" @@ -1600,6 +1697,25 @@ dependencies = [ "tracing", ] +[[package]] +name = "h2" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa82e28a107a8cc405f0839610bdc9b15f1e25ec7d696aa5cf173edbcb1486ab" +dependencies = [ + "atomic-waker", + "bytes", + "fnv", + "futures-core", + "futures-sink", + "http 1.1.0", + "indexmap 2.2.6", + "slab", + "tokio", + "tokio-util", + "tracing", +] + [[package]] name = "hash32" version = "0.2.1" @@ -1639,6 +1755,30 @@ version = "0.14.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" +[[package]] +name = "headers" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06683b93020a07e3dbcf5f8c0f6d40080d725bea7936fc01ad345c01b97dc270" +dependencies = [ + "base64 0.21.7", + "bytes", + "headers-core", + "http 0.2.12", + "httpdate", + "mime", + "sha1", +] + +[[package]] +name = "headers-core" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7f66481bfee273957b1f20485a4ff3362987f85b2c236580d81b4eb7a326429" +dependencies = [ + "http 0.2.12", +] + [[package]] name = "hermit-abi" version = "0.1.19" @@ -1739,6 +1879,19 @@ dependencies = [ "http 1.1.0", ] +[[package]] +name = "http-body-util" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "793429d76616a256bcb62c2a2ec2bed781c8307e797e2598c50010f2bee2544f" +dependencies = [ + "bytes", + "futures-util", + "http 1.1.0", + "http-body 1.0.0", + "pin-project-lite", +] + [[package]] name = "httparse" version = "1.8.0" @@ -1767,7 +1920,7 @@ dependencies = [ "futures-channel", "futures-core", "futures-util", - "h2", + "h2 0.3.26", "http 0.2.12", "http-body 0.4.6", "httparse", @@ -1788,9 +1941,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fe575dd17d0862a9a33781c8c4696a55c320909004a67a00fb286ba8b1bc496d" dependencies = [ "bytes", + "futures-channel", + "futures-util", + "h2 0.4.5", "http 1.1.0", "http-body 1.0.0", + "httparse", + "itoa", + "pin-project-lite", + "smallvec", "tokio", + "want", ] [[package]] @@ -1807,6 +1968,22 @@ dependencies = [ "tokio-rustls", ] +[[package]] +name = "hyper-tls" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70206fc6890eaca9fde8a0bf71caa2ddfc9fe045ac9e5c70df101a7dbde866e0" +dependencies = [ + "bytes", + "http-body-util", + "hyper 1.3.1", + "hyper-util", + "native-tls", + "tokio", + "tokio-native-tls", + "tower-service", +] + [[package]] name = "hyper-util" version = "0.1.3" @@ -1814,6 +1991,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ca38ef113da30126bbff9cd1705f9273e15d45498615d138b0c20279ac7a76aa" dependencies = [ "bytes", + "futures-channel", "futures-util", "http 1.1.0", "http-body 1.0.0", @@ -1821,6 +1999,9 @@ dependencies = [ "pin-project-lite", "socket2", "tokio", + "tower", + "tower-service", + "tracing", ] [[package]] @@ -1926,6 +2107,12 @@ version = "2.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f518f335dce6725a761382244631d86cf0ccb2863413590b31338feb467f9c3" +[[package]] +name = "is_terminal_polyfill" +version = "1.70.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8478577c03552c21db0e2724ffb8986a5ce7af88107e6be5d2ee6e158c12800" + [[package]] name = "itertools" version = "0.10.5" @@ -2138,6 +2325,16 @@ version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" +[[package]] +name = "mime_guess" +version = "2.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4192263c238a5f0d0c6bfd21f336a313a4ce1c450542449ca191bb657b4642ef" +dependencies = [ + "mime", + "unicase", +] + [[package]] name = "minimal-lexical" version = "0.2.1" @@ -2164,6 +2361,24 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "multer" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "01acbdc23469fd8fe07ab135923371d5f5a422fbf9c522158677c8eb15bc51c2" +dependencies = [ + "bytes", + "encoding_rs", + "futures-util", + "http 0.2.12", + "httparse", + "log", + "memchr", + "mime", + "spin 0.9.8", + "version_check", +] + [[package]] name = "native-tls" version = "0.2.11" @@ -2546,6 +2761,26 @@ dependencies = [ "siphasher", ] +[[package]] +name = "pin-project" +version = "1.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6bf43b791c5b9e34c3d182969b4abb522f9343702850a2e57f460d00d09b4b3" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2f38a4412a78282e09a2cf38d195ea5420d15ba0602cb375210efbc877243965" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.65", +] + [[package]] name = "pin-project-lite" version = "0.2.14" @@ -2971,7 +3206,7 @@ dependencies = [ "encoding_rs", "futures-core", "futures-util", - "h2", + "h2 0.3.26", "http 0.2.12", "http-body 0.4.6", "hyper 0.14.28", @@ -2984,7 +3219,7 @@ dependencies = [ "percent-encoding", "pin-project-lite", "rustls", - "rustls-pemfile", + "rustls-pemfile 1.0.4", "serde", "serde_json", "serde_urlencoded", @@ -2999,7 +3234,49 @@ dependencies = [ "wasm-bindgen-futures", "web-sys", "webpki-roots 0.25.4", - "winreg", + "winreg 0.50.0", +] + +[[package]] +name = "reqwest" +version = "0.12.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "566cafdd92868e0939d3fb961bd0dc25fcfaaed179291093b3d43e6b3150ea10" +dependencies = [ + "base64 0.22.1", + "bytes", + "encoding_rs", + "futures-core", + "futures-util", + "h2 0.4.5", + "http 1.1.0", + "http-body 1.0.0", + "http-body-util", + "hyper 1.3.1", + "hyper-tls", + "hyper-util", + "ipnet", + "js-sys", + "log", + "mime", + "native-tls", + "once_cell", + "percent-encoding", + "pin-project-lite", + "rustls-pemfile 2.1.2", + "serde", + "serde_json", + "serde_urlencoded", + "sync_wrapper", + "system-configuration", + "tokio", + "tokio-native-tls", + "tower-service", + "url", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", + "winreg 0.52.0", ] [[package]] @@ -3115,7 +3392,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a9aace74cb666635c918e9c12bc0d348266037aa8eb599b5cba565709a8dff00" dependencies = [ "openssl-probe", - "rustls-pemfile", + "rustls-pemfile 1.0.4", "schannel", "security-framework", ] @@ -3129,6 +3406,22 @@ dependencies = [ "base64 0.21.7", ] +[[package]] +name = "rustls-pemfile" +version = "2.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29993a25686778eb88d4189742cd713c9bce943bc54251a33509dc63cbacf73d" +dependencies = [ + "base64 0.22.1", + "rustls-pki-types", +] + +[[package]] +name = "rustls-pki-types" +version = "1.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "976295e77ce332211c0d24d92c0e83e50f5c5f046d11082cea19f3df13a3562d" + [[package]] name = "rustls-webpki" version = "0.101.7" @@ -3160,6 +3453,12 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "scoped-tls" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1cf6437eb19a8f4a6cc0f7dca544973b0b78843adbfeb3683d1a94a0024a294" + [[package]] name = "scopeguard" version = "1.2.0" @@ -3572,7 +3871,7 @@ version = "1.18.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bee2daf61ae582edf9634adf8e5021faf002df0d3f69078ecbcd6c7b41bdf833" dependencies = [ - "env_logger", + "env_logger 0.9.3", "lazy_static", "log", ] @@ -3597,7 +3896,7 @@ dependencies = [ "gethostname", "lazy_static", "log", - "reqwest", + "reqwest 0.11.27", "solana-sdk", "thiserror", ] @@ -3745,7 +4044,7 @@ dependencies = [ "crossbeam-channel", "futures-util", "log", - "reqwest", + "reqwest 0.11.27", "semver", "serde", "serde_derive", @@ -3829,7 +4128,7 @@ dependencies = [ "bs58 0.4.0", "indicatif", "log", - "reqwest", + "reqwest 0.11.27", "semver", "serde", "serde_derive", @@ -3852,7 +4151,7 @@ dependencies = [ "base64 0.21.7", "bs58 0.4.0", "jsonrpc-core", - "reqwest", + "reqwest 0.11.27", "semver", "serde", "serde_derive", @@ -4774,6 +5073,28 @@ dependencies = [ "winnow", ] +[[package]] +name = "tower" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" +dependencies = [ + "futures-core", + "futures-util", + "pin-project", + "pin-project-lite", + "tokio", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tower-layer" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c20c8dbed6283a09604c3e69b4b7eeb54e298b8a600d4d5ecb5ad39de609f1d0" + [[package]] name = "tower-service" version = "0.3.2" @@ -4865,6 +5186,15 @@ version = "1.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825" +[[package]] +name = "unicase" +version = "2.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7d2d4dafb69621809a81864c9c1b864479e1235c0dd4e199924b9742439ed89" +dependencies = [ + "version_check", +] + [[package]] name = "unicode-bidi" version = "0.3.15" @@ -4962,6 +5292,12 @@ version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" +[[package]] +name = "utf8parse" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" + [[package]] name = "vcpkg" version = "0.2.15" @@ -4995,6 +5331,35 @@ dependencies = [ "try-lock", ] +[[package]] +name = "warp" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4378d202ff965b011c64817db11d5829506d3404edeadb61f190d111da3f231c" +dependencies = [ + "bytes", + "futures-channel", + "futures-util", + "headers", + "http 0.2.12", + "hyper 0.14.28", + "log", + "mime", + "mime_guess", + "multer", + "percent-encoding", + "pin-project", + "scoped-tls", + "serde", + "serde_json", + "serde_urlencoded", + "tokio", + "tokio-tungstenite 0.21.0", + "tokio-util", + "tower-service", + "tracing", +] + [[package]] name = "wasi" version = "0.9.0+wasi-snapshot-preview1" @@ -5313,6 +5678,16 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "winreg" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a277a57398d4bfa075df44f501a17cfdf8542d224f0d36095a2adc7aee4ef0a5" +dependencies = [ + "cfg-if", + "windows-sys 0.48.0", +] + [[package]] name = "x509-parser" version = "0.14.0" diff --git a/Cargo.toml b/Cargo.toml index 779c6f2..db8dca8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,7 +10,7 @@ async-trait = "0.1.58" bs58 = "0.5.1" deadpool = "0.12.1" deadpool-diesel = {version="0.6.1", features=["postgres"]} -diesel = {version= "2.1.6", features=["postgres"]} +diesel = {version= "2.1.6", features=["postgres", "chrono"]} dotenv = "0.15.0" futures-util = "0.3.30" hex = "0.4.3" @@ -31,3 +31,9 @@ postgres = "0.19.7" tokio-postgres = "0.7.10" futures-channel = "0.3.30" futures = "0.3.30" +chrono = "0.4.38" +reqwest = {version="0.12.0", features=["json"]} +warp = "0.3.7" +bytes = "1.1.0" +serde_urlencoded = "0.7.1" +env_logger = "0.11.3" diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..46cf4ca --- /dev/null +++ b/Dockerfile @@ -0,0 +1,27 @@ +FROM lukemathwalker/cargo-chef:latest-rust-1 AS chef +WORKDIR /asset-watcher + +FROM chef AS planner +COPY . . +RUN cargo chef prepare --recipe-path recipe.json + +FROM chef AS builder +COPY --from=planner /asset-watcher/recipe.json recipe.json +# Build dependencies - this is the caching Docker layer! +RUN cargo chef cook --release --recipe-path recipe.json +# Build application +COPY . . + +RUN cargo build --release --bin asset-watcher + +# We do not need the Rust toolchain to run the binary! +FROM debian:bookworm-slim AS runtime +RUN mkdir /run/sshd +RUN apt-get update +RUN apt-get install ca-certificates libpq5 -y +RUN update-ca-certificates +EXPOSE 8080 +WORKDIR /asset-watcher +COPY --from=builder /asset-watcher/target/release/asset-watcher /usr/local/bin + +ENTRYPOINT ["/usr/local/bin/asset-watcher"] \ No newline at end of file diff --git a/src/adapters/rpc.rs b/src/adapters/rpc.rs index 1205120..d131cce 100644 --- a/src/adapters/rpc.rs +++ b/src/adapters/rpc.rs @@ -1,69 +1,10 @@ -// use futures_util::{SinkExt, StreamExt}; -// use http::Request; -// use serde_json::json; -// use tokio::net::TcpStream; -// use tokio_tungstenite::tungstenite::handshake::client::generate_key; -// use tokio_tungstenite::tungstenite::protocol::Message; -// use tokio_tungstenite::WebSocketStream; -// use tokio_tungstenite::{connect_async, MaybeTlsStream}; - -// pub struct SolanaRpcClient { -// socket: WebSocketStream>, -// } - -// impl SolanaRpcClient { -// pub async fn new(url: &str) -> Result { -// let parsed_url = url::Url::parse(url).unwrap(); -// let request = Request::builder() -// .uri(parsed_url.to_string()) -// .header("sec-websocket-key", generate_key()) -// .header("host", "devnet-local.themetadao-org.workers.dev") -// .header("upgrade", "websocket") -// .header("connection", "upgrade") -// .header("sec-websocket-version", 13) -// .body(()) -// .unwrap(); -// let connection_res = connect_async(request).await; -// match connection_res { -// Ok((socket, _)) => Ok(SolanaRpcClient { socket }), -// Err(e) => Err(format!("error connecting RPC {:?}", e)), -// } -// } - -// pub async fn on_account_change(&mut self, acct: String, callback: F) -// where -// F: Fn(String) + Send + 'static, -// { -// let message = serde_json::to_string(&json!({ -// "jsonrpc": "2.0", -// "id": 1, -// "method": "accountSubscribe", -// "params": [ -// acct, -// { -// "encoding": "jsonParsed", -// "commitment": "finalized" -// } -// ] -// })) -// .unwrap(); - -// self.socket.send(Message::Text(message)).await.unwrap(); - -// while let Some(msg) = self.socket.next().await { -// match msg { -// Ok(Message::Text(text)) => { -// callback(text); -// } -// Ok(Message::Binary(bin)) => { -// eprintln!("Received binary data: {:?}", bin); -// } -// Err(e) => { -// eprintln!("WebSocket error: {}", e); -// break; -// } -// _ => {} -// } -// } -// } -// } +use std::{env, sync::Arc}; + +pub async fn get_pubsub_client( +) -> Result, Box> +{ + let rpc_endpoint_ws = env::var("RPC_ENDPOINT_WSS").expect("RPC_ENDPOINT_WSS must be set"); + let pub_sub_client = + solana_client::nonblocking::pubsub_client::PubsubClient::new(&rpc_endpoint_ws).await?; + Ok(Arc::new(pub_sub_client)) +} diff --git a/src/entities/auth.rs b/src/entities/auth.rs new file mode 100644 index 0000000..5335cce --- /dev/null +++ b/src/entities/auth.rs @@ -0,0 +1,39 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Serialize, Deserialize, Debug)] +#[serde(rename_all = "camelCase")] +pub struct AuthSessionResponse { + pub session_id: String, + pub was_logged_in: bool, +} + +#[derive(Serialize, Deserialize, Debug)] +#[serde(rename_all = "camelCase")] +pub struct AuthMessageResponse { + pub message: String, + pub session_id: Option, +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct AuthErrorResponse { + pub error: String, +} + +#[derive(Serialize, Deserialize, Debug)] +#[serde(rename_all = "camelCase")] +pub struct AuthPostRequest { + pub pub_key: String, +} + +#[derive(Serialize, Deserialize, Debug)] +#[serde(rename_all = "camelCase")] +pub struct AuthPutRequest { + pub id: String, + pub signature: String, + pub pub_key: String, +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct AuthGetRequest { + pub pubkey: String, +} diff --git a/src/entities/conditional_vaults.rs b/src/entities/conditional_vaults.rs index 6541eae..77e4498 100644 --- a/src/entities/conditional_vaults.rs +++ b/src/entities/conditional_vaults.rs @@ -13,7 +13,7 @@ table! { } } -#[derive(Queryable)] +#[derive(Queryable, Clone)] pub struct ConditionalVault { pub cond_vault_acct: String, pub status: Option, diff --git a/src/entities/markets.rs b/src/entities/markets.rs new file mode 100644 index 0000000..08112d3 --- /dev/null +++ b/src/entities/markets.rs @@ -0,0 +1,27 @@ +use chrono::{DateTime, Utc}; + +table! { + use diesel::sql_types::*; + + markets (market_acct) { + market_acct -> Varchar, + market_type -> Varchar, + create_tx_sig -> Varchar, + proposal_acct -> Nullable, + base_mint_acct -> Varchar, + quote_mint_acct -> Varchar, + created_at -> Timestamptz, + } +} + +#[derive(Queryable, Insertable, Debug, Clone, Selectable)] +#[diesel(table_name = markets)] +pub struct Market { + pub market_acct: String, + pub market_type: String, + pub create_tx_sig: String, + pub proposal_acct: Option, + pub base_mint_acct: String, + pub quote_mint_acct: String, + pub created_at: DateTime, +} diff --git a/src/entities/mod.rs b/src/entities/mod.rs index ead55bc..110081a 100644 --- a/src/entities/mod.rs +++ b/src/entities/mod.rs @@ -1,4 +1,7 @@ +pub mod auth; pub mod conditional_vaults; +pub mod markets; pub mod token_acct_balances; pub mod token_accts; +pub mod tokens; pub mod transactions; diff --git a/src/entities/token_acct_balances.rs b/src/entities/token_acct_balances.rs index 7b0a50d..5b4d430 100644 --- a/src/entities/token_acct_balances.rs +++ b/src/entities/token_acct_balances.rs @@ -1,30 +1,28 @@ -use std::time::SystemTime; +use chrono::{DateTime, Utc}; table! { - token_acct_balances (token_acct) { + token_acct_balances (token_acct, mint_acct, amount, created_at) { token_acct -> Varchar, mint_acct -> Varchar, owner_acct -> Varchar, amount -> BigInt, - created_at -> Timestamp, + created_at -> Timestamptz, + slot -> BigInt, + tx_sig -> Nullable, + delta -> BigInt, } } -#[derive(Queryable)] -pub struct TokenAcctBalances { - pub token_acct: String, - pub mint_acct: String, - pub owner_acct: String, - pub amount: i64, - pub created_at: SystemTime, -} - -#[derive(Insertable)] +#[derive(Queryable, Clone, Insertable, Selectable)] #[diesel(table_name = token_acct_balances)] -pub struct TokenAcctBalancesRecord { +#[diesel(check_for_backend(diesel::pg::Pg))] +pub struct TokenAcctBalances { pub token_acct: String, pub mint_acct: String, pub owner_acct: String, pub amount: i64, - pub created_at: SystemTime, + pub created_at: DateTime, + pub slot: i64, + pub tx_sig: Option, + pub delta: i64, } diff --git a/src/entities/token_accts.rs b/src/entities/token_accts.rs index 9834551..d6c6958 100644 --- a/src/entities/token_accts.rs +++ b/src/entities/token_accts.rs @@ -1,3 +1,4 @@ +use chrono::{DateTime, Utc}; use diesel::pg::Pg; use diesel::serialize::IsNull; use diesel::{ @@ -8,7 +9,6 @@ use diesel::{ }; use serde::{Deserialize, Serialize}; use std::io::Write; -use std::time::SystemTime; table! { token_accts (token_acct) { @@ -16,7 +16,7 @@ table! { mint_acct -> Varchar, owner_acct -> Varchar, amount -> BigInt, - updated_at -> Nullable, + updated_at -> Nullable, status -> crate::entities::token_accts::TokenAcctStatusType, } } @@ -27,7 +27,7 @@ pub struct TokenAcct { pub mint_acct: String, pub owner_acct: String, pub amount: i64, - pub updated_at: Option, + pub updated_at: Option>, pub status: TokenAcctStatus, } @@ -35,14 +35,27 @@ pub struct TokenAcct { #[diesel(postgres_type(name = "token_acct_status"))] pub struct TokenAcctStatusType; -#[derive(Debug, PartialEq, FromSqlRow, AsExpression, Eq, Clone, Hash, serde::Deserialize)] +#[derive( + Debug, PartialEq, FromSqlRow, AsExpression, Eq, Clone, Hash, serde::Deserialize, Serialize, +)] #[diesel(sql_type = TokenAcctStatusType)] +#[serde(rename_all = "lowercase")] pub enum TokenAcctStatus { Watching, Enabled, Disabled, } +impl ToString for TokenAcctStatus { + fn to_string(&self) -> String { + match self { + TokenAcctStatus::Watching => "watching".to_string(), + TokenAcctStatus::Enabled => "enabled".to_string(), + TokenAcctStatus::Disabled => "disabled".to_string(), + } + } +} + impl ToSql for TokenAcctStatus { fn to_sql<'b>(&'b self, out: &mut Output<'b, '_, Pg>) -> serialize::Result { match *self { @@ -77,3 +90,29 @@ impl TokenAcctsInsertChannelPayload { serde_json::from_str(json_str) } } + +// todo setup serialization +#[derive(Serialize, Deserialize, Debug)] +pub struct TokenAcctsStatusUpdateChannelPayload { + pub status: TokenAcctStatus, + pub token_acct: String, +} + +impl TokenAcctsStatusUpdateChannelPayload { + pub fn parse_payload( + json_str: &str, + ) -> Result { + serde_json::from_str(json_str) + } +} + +#[derive(Debug, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct WatchTokenBalancePayload { + pub token_acct: String, +} + +#[derive(Debug, Deserialize, Serialize)] +pub struct WatchTokenBalanceResponse { + pub message: String, +} diff --git a/src/entities/tokens.rs b/src/entities/tokens.rs new file mode 100644 index 0000000..ebae163 --- /dev/null +++ b/src/entities/tokens.rs @@ -0,0 +1,25 @@ +use chrono::{DateTime, Utc}; + +table! { + tokens (mint_acct) { + mint_acct -> Varchar, + name -> Varchar, + symbol -> Varchar, + supply -> BigInt, + decimals -> SmallInt, + updated_at -> Timestamptz, + image_url -> Nullable, + } +} + +#[derive(Queryable, Clone, Selectable)] +#[diesel(table_name = tokens)] +pub struct Token { + pub mint_acct: String, + pub name: String, + pub symbol: String, + pub supply: i64, + pub decimals: i16, + pub updated_at: DateTime, + pub image_url: Option, +} diff --git a/src/entities/transactions.rs b/src/entities/transactions.rs index 37247c4..0ed1cad 100644 --- a/src/entities/transactions.rs +++ b/src/entities/transactions.rs @@ -1,3 +1,4 @@ +use chrono::{DateTime, Utc}; use diesel::pg::{Pg, PgValue}; use diesel::{ backend::Backend, @@ -6,20 +7,31 @@ use diesel::{ sql_types::Text, }; use serde::{Deserialize, Serialize}; -use std::time::SystemTime; table! { transactions (tx_sig) { tx_sig -> Varchar, slot -> BigInt, - block_time -> Timestamp, + block_time -> Timestamptz, failed -> Bool, payload -> Text, serializer_logic_version -> SmallInt, - main_ix_type -> Varchar, + main_ix_type -> Nullable, } } +#[derive(Queryable, Clone, Selectable)] +#[diesel(table_name = transactions)] +pub struct Transaction { + pub tx_sig: String, + pub slot: i64, + pub block_time: DateTime, + pub failed: bool, + pub payload: String, + pub serializer_logic_version: i16, + pub main_ix_type: Option, +} + #[derive(Debug, Clone, Copy, AsExpression, FromSqlRow)] #[sql_type = "Text"] pub enum InstructionType { @@ -31,6 +43,8 @@ pub enum InstructionType { OpenbookCancelOrder, AutocratInitializeProposal, AutocratFinalizeProposal, + VaultMergeConditionalTokens, + VaultRedeemConditionalTokensForUnderlyingTokens, } impl ToSql for InstructionType @@ -52,6 +66,12 @@ where "autocrat_initialize_proposal".to_sql(out) } InstructionType::AutocratFinalizeProposal => "autocrat_finalize_proposal".to_sql(out), + InstructionType::VaultMergeConditionalTokens => { + "vault_merge_conditional_tokens".to_sql(out) + } + InstructionType::VaultRedeemConditionalTokensForUnderlyingTokens => { + "vault_redeem_conditional_tokens_for_underlying_tokens".to_sql(out) + } } } } @@ -67,28 +87,20 @@ impl FromSql for InstructionType { b"openbook_cancel_order" => Ok(InstructionType::OpenbookCancelOrder), b"autocrat_initialize_proposal" => Ok(InstructionType::AutocratInitializeProposal), b"autocrat_finalize_proposal" => Ok(InstructionType::AutocratFinalizeProposal), + b"vault_merge_conditional_tokens" => Ok(InstructionType::VaultMergeConditionalTokens), + b"vault_redeem_conditional_tokens_for_underlying_tokens" => { + Ok(InstructionType::VaultRedeemConditionalTokensForUnderlyingTokens) + } x => Err(format!("Unrecognized variant {:?}", x).into()), } } } -#[derive(Queryable, Clone, Selectable)] -#[diesel(table_name = transactions)] -pub struct Transaction { - pub tx_sig: String, - pub slot: i64, - pub block_time: SystemTime, - pub failed: bool, - pub payload: String, - pub serializer_logic_version: i16, - pub main_ix_type: InstructionType, -} - -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] #[serde(rename_all = "camelCase")] pub struct Payload { - pub block_time: u64, - pub slot: u64, + pub block_time: i64, + pub slot: i64, pub recent_blockhash: String, pub compute_units_consumed: String, pub fee: String, @@ -116,6 +128,12 @@ impl Payload { } "initializeProposal" => return Some(InstructionType::AutocratInitializeProposal), "finalizeProposal" => return Some(InstructionType::AutocratFinalizeProposal), + "mergeConditionalTokensForUnderlyingTokens" => { + return Some(InstructionType::VaultMergeConditionalTokens) + } + "redeemConditionalTokensForUnderlyingTokens" => { + return Some(InstructionType::VaultRedeemConditionalTokensForUnderlyingTokens) + } _ => continue, } } @@ -123,7 +141,7 @@ impl Payload { } } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] #[serde(rename_all = "camelCase")] pub struct Account { pub name: String, @@ -136,7 +154,7 @@ pub struct Account { pub post_token_balance: Option, } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] #[serde(rename_all = "camelCase")] pub struct TokenBalance { pub mint: String, @@ -145,7 +163,7 @@ pub struct TokenBalance { pub decimals: u8, } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] #[serde(rename_all = "camelCase")] pub struct Instruction { pub name: String, @@ -157,7 +175,7 @@ pub struct Instruction { pub args: Vec, } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] #[serde(rename_all = "camelCase")] pub struct AccountWithData { pub name: String, @@ -166,7 +184,7 @@ pub struct AccountWithData { pub is_writeable: bool, } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct Arg { pub name: String, #[serde(rename = "type")] diff --git a/src/events/mod.rs b/src/entrypoints/events/mod.rs similarity index 64% rename from src/events/mod.rs rename to src/entrypoints/events/mod.rs index b6310d2..0b59d72 100644 --- a/src/events/mod.rs +++ b/src/entrypoints/events/mod.rs @@ -1,3 +1,5 @@ pub mod rpc_token_acct_updates; +pub mod setup; pub mod token_accts_insert; +pub mod token_accts_status_update; pub mod transactions_insert; diff --git a/src/entrypoints/events/rpc_token_acct_updates.rs b/src/entrypoints/events/rpc_token_acct_updates.rs new file mode 100644 index 0000000..f6e9468 --- /dev/null +++ b/src/entrypoints/events/rpc_token_acct_updates.rs @@ -0,0 +1,226 @@ +use std::env; +use std::sync::{Arc, MutexGuard}; + +use chrono::Utc; +use deadpool::managed::Object; +use deadpool_diesel::Manager; +use diesel::{update, ExpressionMethods, PgConnection, QueryDsl, RunQueryDsl}; +use futures::StreamExt; +use solana_account_decoder::{UiAccount, UiAccountData}; +use solana_client::{nonblocking::pubsub_client::PubsubClient, rpc_config::RpcAccountInfoConfig}; +use solana_sdk::program_pack::Pack; +use solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey}; +use std::sync::Mutex; +use tokio::task; + +use crate::entities::token_accts::token_accts::{self}; +use crate::entities::token_accts::{TokenAcct, TokenAcctStatus}; +use crate::entities::transactions::transactions::{self, tx_sig}; +use crate::entities::transactions::Transaction; +use crate::services::balances; +use crate::services::transactions::handle_token_acct_balance_tx; +use diesel::OptionalExtension; + +pub async fn new_handler( + pub_sub_client: Arc, + conn_manager: Arc>>, + token_acct_pubkey: Pubkey, + token_acct_record: TokenAcct, +) { + let rpc_endpoint = env::var("RPC_ENDPOINT_HTTP").expect("RPC_ENDPOINT_HTTP must be set"); + if let Err(e) = check_and_update_initial_balance( + rpc_endpoint, + Arc::clone(&conn_manager), + &token_acct_pubkey, + &token_acct_record, + ) + .await + { + eprintln!("Error during initial balance check: {:?}", e); + } + + let timeout_flag = Arc::new(Mutex::new(true)); + let timeout_flag_arc = Arc::clone(&timeout_flag); + + let account_subscribe_res = pub_sub_client + .account_subscribe( + &token_acct_pubkey, + Some(RpcAccountInfoConfig { + encoding: Some(solana_account_decoder::UiAccountEncoding::JsonParsed), + data_slice: None, + commitment: Some(CommitmentConfig::confirmed()), + min_context_slot: None, + }), + ) + .await; + + if account_subscribe_res.is_err() { + eprintln!( + "error when subscribing to account, {:?}", + account_subscribe_res.err().unwrap() + ); + return; + } + + println!( + "successfully subscribed to token acct: {}", + token_acct_pubkey.to_string() + ); + + let (mut subscription, _) = account_subscribe_res.ok().unwrap(); + + let conn_manager_clone_sub = Arc::clone(&conn_manager); + while let Some(val) = subscription.next().await { + let mut timeout_flag_val = timeout_flag.lock().unwrap(); + *timeout_flag_val = false; + let ui_account: UiAccount = val.value; + let context = val.context; + println!("account subscribe context: {:?}", context); + match ui_account.data { + UiAccountData::Binary(data, encoding) => { + println!("Binary data: {:?}, Encoding: {:?}", data, encoding); + } + UiAccountData::Json(data) => { + println!("account subscribe notification: {:?}", data); + let record_clone = token_acct_record.clone(); + let token_acct_clone = record_clone.token_acct.clone(); + let conn_clone_for_task = Arc::clone(&conn_manager_clone_sub); + task::spawn(async move { + let token_acct_update_res = balances::handle_token_acct_change( + conn_clone_for_task, + record_clone, + data, + context, + ) + .await; + match token_acct_update_res { + Ok(_) => { + println!("successfully updated token balance: {:?}", token_acct_clone) + } + Err(e) => println!("error kind: {:?}", e), + } + }); + } + UiAccountData::LegacyBinary(data) => { + println!("Parsed LegacyBinary data: {:?}", data); + } + } + } + println!( + "end of rpc account subscriber scope: {}", + token_acct_pubkey.to_string() + ); + + // enabling this seems questionable now... since when this function fires we will not reset things to + // update_token_acct_with_status( + // token_acct_pubkey.to_string(), + // TokenAcctStatus::Enabled, + // conn_manager, + // ) + // .await; +} + +async fn check_and_update_initial_balance( + rpc_endpoint: String, + conn_manager: Arc>>, + token_acct_pubkey: &Pubkey, + token_acct_record: &TokenAcct, +) -> Result<(), Box> { + let rpc_client = solana_client::nonblocking::rpc_client::RpcClient::new(rpc_endpoint); + let account_data = rpc_client + .get_account_with_commitment(token_acct_pubkey, CommitmentConfig::confirmed()) + .await?; + + if let Some(account) = account_data.value { + let token_account: spl_token::state::Account = + spl_token::state::Account::unpack(&account.data)?; + let balance = token_account.amount as i64; + + if token_acct_record.amount != balance { + if token_acct_record.amount != balance { + let latest_tx: Vec< + solana_client::rpc_response::RpcConfirmedTransactionStatusWithSignature, + > = rpc_client + .get_signatures_for_address(token_acct_pubkey) + .await? + .into_iter() + .filter(|tx| tx.err.is_none()) + .collect(); + + if let Some(latest_tx_info) = latest_tx.first() { + let transaction_sig = latest_tx_info.signature.clone(); + let transaction_sig_2 = latest_tx_info.signature.clone(); + let transaction_exists: Option = conn_manager + .interact(move |db: &mut PgConnection| { + transactions::table + .filter(tx_sig.eq(transaction_sig.clone())) + .first::(db) + .optional() + }) + .await??; + + let slot = latest_tx_info.slot as i64; + let mint_acct = token_account.mint.to_string(); + let owner_acct = token_account.owner.to_string(); + + let transaction_sig_option = if transaction_exists.is_some() { + Some(transaction_sig_2) + } else { + None + }; + + handle_token_acct_balance_tx( + conn_manager, + token_acct_pubkey.to_string(), + balance, + transaction_sig_option, + slot, + mint_acct, + owner_acct, + ) + .await?; + } + } + } + } + + Ok(()) +} + +// TODO this should return a result and be handled appropriately.. +async fn update_token_acct_with_status( + token_acct: String, + status: TokenAcctStatus, + conn_manager: Arc>>, +) { + let token_acct_query = token_acct.clone(); + let status_for_set = status.clone(); + let res = conn_manager + .interact(move |db| { + update( + token_accts::table.filter(token_accts::token_acct.eq(token_acct_query.to_string())), + ) + .set(( + token_accts::dsl::status.eq(status_for_set), + token_accts::dsl::updated_at.eq(Utc::now()), + )) + .get_result::(db) + }) + .await; + + match res { + Ok(Ok(_)) => println!( + "updated token acct to {:?} status: {}", + status, + token_acct.to_string() + ), + Err(e) => eprintln!( + "error updating token acct [{}] to {:?} status: {}", + token_acct, status, e + ), + Ok(Err(e)) => eprintln!( + "error updating token acct [{}] to {:?} status: {}", + token_acct, status, e + ), + } +} diff --git a/src/entrypoints/events/setup.rs b/src/entrypoints/events/setup.rs new file mode 100644 index 0000000..3b34b37 --- /dev/null +++ b/src/entrypoints/events/setup.rs @@ -0,0 +1,111 @@ +use crate::entities::token_accts::{token_accts, TokenAcct, TokenAcctStatus}; +use deadpool::managed::Object; +use deadpool_diesel::Manager; +use diesel::prelude::*; +use diesel::PgConnection; +use futures::{stream, FutureExt, StreamExt, TryStreamExt}; +use postgres::NoTls; +use solana_client::nonblocking::pubsub_client::PubsubClient; +use solana_sdk::pubkey::Pubkey; +use std::str::FromStr; +use std::sync::Arc; +use tokio::task; +use tokio_postgres::{connect, AsyncMessage}; + +// TODO this should return a result +pub async fn setup_event_listeners( + db_url: &str, + managed_connection: Arc>>, + pub_sub_client: Arc, +) { + // account subscribe for token_accts already in Watching status + let token_accts_res = managed_connection + .clone() + .interact(|conn| { + return token_accts::table + .filter(token_accts::status.eq(TokenAcctStatus::Watching)) + .load::(conn) + .expect("Error loading token_accts"); + }) + .await; + + match token_accts_res { + Ok(token_accts_vec) => { + for record in token_accts_vec { + match Pubkey::from_str(&record.token_acct) { + Ok(token_acct_pubkey) => { + let conn_manager_arg_clone = Arc::clone(&managed_connection); + let pub_sub_client_clone = Arc::clone(&pub_sub_client); + println!( + "spawning task for token acct subscription: {}", + token_acct_pubkey.to_string() + ); + task::spawn(async move { + println!( + "task running for token acct subscription: {}", + token_acct_pubkey.to_string() + ); + super::rpc_token_acct_updates::new_handler( + pub_sub_client_clone, + conn_manager_arg_clone, + token_acct_pubkey, + record, + ) + .await + }); + } + Err(e) => eprintln!("Error with token acct pubkey parsing: {}", e), + } + } + } + Err(e) => eprintln!("Error with subscribing to token accts: {}", e), + } + + // listen to postgres notifications + let (client, mut connection) = connect(db_url, NoTls).await.unwrap(); + // Make transmitter and receiver. + let (tx, mut rx) = futures_channel::mpsc::unbounded(); + let stream = + stream::poll_fn(move |cx| connection.poll_message(cx)).map_err(|e| panic!("{}", e)); + let connection = stream.forward(tx).map(|r| r.unwrap()); + tokio::spawn(connection); + + client + .batch_execute( + " + LISTEN token_accts_insert_channel; + LISTEN token_accts_status_update_channel; + LISTEN transactions_insert_channel; + ", + ) + .await + .unwrap(); + + while let Some(m) = rx.next().await { + let connect_clone = Arc::clone(&managed_connection); + match m { + AsyncMessage::Notification(n) => match n.channel() { + "token_accts_insert_channel" => { + task::spawn(super::token_accts_insert::new_handler( + n, + connect_clone, + Arc::clone(&pub_sub_client), + )); + } + "transactions_insert_channel" => { + task::spawn(super::transactions_insert::new_handler(n, connect_clone)); + } + "token_accts_status_update_channel" => { + task::spawn(super::token_accts_status_update::new_handler( + n, + connect_clone, + Arc::clone(&pub_sub_client), + )); + } + _ => (), + }, + AsyncMessage::Notice(notice) => println!("async message error: {:?}", notice), + _ => println!("fallthrough handler of async message from postgres listener"), + } + } +} diff --git a/src/events/token_accts_insert.rs b/src/entrypoints/events/token_accts_insert.rs similarity index 93% rename from src/events/token_accts_insert.rs rename to src/entrypoints/events/token_accts_insert.rs index 1c96f07..5826cf9 100644 --- a/src/events/token_accts_insert.rs +++ b/src/entrypoints/events/token_accts_insert.rs @@ -1,7 +1,7 @@ use crate::entities::token_accts::token_accts; use crate::entities::token_accts::TokenAcct; use crate::entities::token_accts::TokenAcctsInsertChannelPayload; -use crate::events::rpc_token_acct_updates; +use crate::entrypoints::events::rpc_token_acct_updates; use deadpool::managed::Object; use deadpool_diesel::Manager; use diesel::prelude::*; @@ -54,17 +54,16 @@ async fn handle_new_token_acct_notification( }) .await?; let token_acct_pubkey = Pubkey::from_str(&token_acct_string)?; - let conn_manager_arg_clone = Arc::clone(&pool_connection); let pub_sub_client_clone = Arc::clone(&pub_sub_rpc_client); tokio::spawn(async move { rpc_token_acct_updates::new_handler( pub_sub_client_clone, - conn_manager_arg_clone, + cloned_connection, token_acct_pubkey, token_acct_record.clone(), ) - .await + .await; }) .await?; diff --git a/src/entrypoints/events/token_accts_status_update.rs b/src/entrypoints/events/token_accts_status_update.rs new file mode 100644 index 0000000..d2ff57e --- /dev/null +++ b/src/entrypoints/events/token_accts_status_update.rs @@ -0,0 +1,75 @@ +use crate::entities::token_accts::token_accts; +use crate::entities::token_accts::TokenAcct; +use crate::entities::token_accts::TokenAcctStatus; +use crate::entities::token_accts::TokenAcctsStatusUpdateChannelPayload; +use crate::entrypoints::events::rpc_token_acct_updates; +use deadpool::managed::Object; +use deadpool_diesel::Manager; +use diesel::prelude::*; +use diesel::PgConnection; +use postgres::Notification; +use solana_client::nonblocking::pubsub_client::PubsubClient; +use solana_sdk::pubkey::Pubkey; +use std::str::FromStr; +use std::sync::Arc; + +use crate::entities::token_accts::token_accts::dsl::*; + +pub async fn new_handler( + notification: Notification, + pool_connection: Arc>>, + pub_sub_rpc_client: Arc, +) { + println!( + "new token_accts_status_update payload: {:?}", + notification.payload() + ); + match handle_update_token_acct_status_notification( + pool_connection, + notification, + Arc::clone(&pub_sub_rpc_client), + ) + .await + { + Ok(()) => println!("successfully handled token_acct status update notification"), + Err(e) => eprintln!("error token_acct status update notification: {:?}", e), + }; +} + +async fn handle_update_token_acct_status_notification( + pool_connection: Arc>>, + notification: Notification, + pub_sub_rpc_client: Arc, +) -> Result<(), Box> { + let cloned_connection = Arc::clone(&pool_connection); + let token_acct_payload = + TokenAcctsStatusUpdateChannelPayload::parse_payload(notification.payload())?; + if token_acct_payload.status != TokenAcctStatus::Watching { + return Ok(()); + } + let token_acct_string = token_acct_payload.token_acct; + let token_acct_clone = token_acct_string.clone(); + let token_acct_record: TokenAcct = cloned_connection + .clone() + .interact(move |conn| { + return token_accts + .filter(token_accts::dsl::token_acct.eq(&token_acct_clone)) + .first(conn) + .expect("could not find token record"); + }) + .await?; + let token_acct_pubkey = Pubkey::from_str(&token_acct_string)?; + let pub_sub_client_clone = Arc::clone(&pub_sub_rpc_client); + + tokio::spawn(async move { + rpc_token_acct_updates::new_handler( + pub_sub_client_clone, + Arc::clone(&cloned_connection), + token_acct_pubkey, + token_acct_record.clone(), + ) + .await + }); + + Ok(()) +} diff --git a/src/entrypoints/events/transactions_insert.rs b/src/entrypoints/events/transactions_insert.rs new file mode 100644 index 0000000..77772f3 --- /dev/null +++ b/src/entrypoints/events/transactions_insert.rs @@ -0,0 +1,189 @@ +use crate::entities::transactions::{InstructionType, Payload, TransactionsInsertChannelPayload}; +use crate::services; +use deadpool::managed::Object; +use deadpool_diesel::Manager; +use diesel::prelude::*; +use diesel::{ExpressionMethods, PgConnection}; +use futures::executor::block_on; +use postgres::Notification; +use std::sync::Arc; + +use crate::entities::transactions::{transactions::dsl::*, Transaction}; + +pub async fn new_handler( + notification: Notification, + pool_connection: Arc>>, +) { + println!( + "new transactions table payload: {:?}", + notification.payload() + ); + match TransactionsInsertChannelPayload::parse_payload(notification.payload()) { + Ok(tx_payload) => match handle_new_transaction(tx_payload.tx_sig, pool_connection).await { + Ok(()) => println!("successfully handled new transaction notification"), + Err(e) => eprintln!("error handling new transaction notification: {:?}", e), + }, + Err(e) => eprintln!("error parsing new transaction notification: {:?}", e), + }; +} + +async fn handle_new_transaction( + transaction_signature: String, + connection: Arc>>, +) -> Result<(), Box> { + let txn_result = connection + .clone() + .interact(|conn| { + return transactions + .filter(tx_sig.eq(transaction_signature)) + .limit(1) + .select(Transaction::as_select()) + .load(conn); + }) + .await?; + + let txn_vec: Vec = txn_result?; + let txn = &txn_vec[0]; + + index_tx_record(txn.clone(), connection).await?; + + Ok(()) +} + +pub async fn index_tx_record( + tx: Transaction, + connection: Arc>>, +) -> Result<(), Box> { + let payload_parsed = Payload::parse_payload(&tx.payload)?; + + match payload_parsed.get_main_ix_type() { + Some(ix_type) => match ix_type { + InstructionType::VaultMintConditionalTokens => { + let mint_handler_res = services::new_mint::handle_mint_tx( + connection, + payload_parsed.clone(), + tx.tx_sig.clone(), + ); + + let mint_handler_res_awaited = block_on(mint_handler_res); + match mint_handler_res_awaited { + Ok(_) => println!( + "handled new mint tx: {:?}, {:?}", + payload_parsed.signatures, + payload_parsed.get_main_ix_type() + ), + Err(e) => eprintln!( + "error tracking new mint: {:?}. payload instructions: {:?}", + e, payload_parsed.instructions + ), + } + } + InstructionType::AmmSwap => { + let swap_res = services::swaps::handle_swap_tx( + connection, + payload_parsed.clone(), + tx.tx_sig.clone(), + ); + + let swap_res_awaited = block_on(swap_res); + match swap_res_awaited { + Ok(_) => println!( + "handled swap tx: {:?}, {:?}", + payload_parsed.signatures, + payload_parsed.get_main_ix_type() + ), + Err(e) => eprintln!( + "error tracking swap: {:?}. payload: {:?}", + e, payload_parsed + ), + } + } + InstructionType::AmmDeposit => { + let amm_deposit_res = services::liquidity_adding::handle_lp_deposit_tx( + connection, + payload_parsed.clone(), + tx.tx_sig.clone(), + ); + + let amm_deposit_res_awaited = block_on(amm_deposit_res); + match amm_deposit_res_awaited { + Ok(_) => println!( + "handled amm deposit tx: {:?}, {:?}", + payload_parsed.signatures, + payload_parsed.get_main_ix_type() + ), + Err(e) => eprintln!( + "error tracking amm deposit: {:?}. payload: {:?}", + e, payload_parsed + ), + } + } + InstructionType::AmmWithdraw => { + let amm_withdrawal_res = services::liquidity_removing::handle_lp_withdrawal_tx( + connection, + payload_parsed.clone(), + tx.tx_sig.clone(), + ); + + let amm_withdrawal_res_awaited = block_on(amm_withdrawal_res); + match amm_withdrawal_res_awaited { + Ok(_) => println!( + "handled amm withdrawal tx: {:?}, {:?}", + payload_parsed.signatures, + payload_parsed.get_main_ix_type() + ), + Err(e) => eprintln!( + "error tracking amm withdrawal: {:?}. payload: {:?}", + e, payload_parsed + ), + } + } + InstructionType::VaultMergeConditionalTokens => { + let merge_conditionals_res = + services::merge_conditionals_for_underlying::handle_merge_conditional_tokens_tx( + connection, + payload_parsed.clone(), + tx.tx_sig.clone(), + ); + + let merge_conditionals_res_awaited = block_on(merge_conditionals_res); + match merge_conditionals_res_awaited { + Ok(_) => println!( + "handled merge conditionals tx: {:?}, {:?}", + payload_parsed.signatures, + payload_parsed.get_main_ix_type() + ), + Err(e) => eprintln!( + "error tracking merge conditionals: {:?}. payload: {:?}", + e, payload_parsed + ), + } + } + InstructionType::VaultRedeemConditionalTokensForUnderlyingTokens => { + let redeem_conditionals_res = + services::redeem_conditionals::handle_redeem_conditional_tokens_tx( + connection, + payload_parsed.clone(), + tx.tx_sig.clone(), + ); + + let redeem_conditionals_res_awaited = block_on(redeem_conditionals_res); + match redeem_conditionals_res_awaited { + Ok(_) => println!( + "handled redeem conditionals tx: {:?}, {:?}", + payload_parsed.signatures, + payload_parsed.get_main_ix_type() + ), + Err(e) => eprintln!( + "error tracking redeem conditionals: {:?}. payload: {:?}", + e, payload_parsed + ), + } + } + x => println!("unhandled ix type: {:?}", x), + }, + None => println!("tx has no ix type we care about"), + } + + Ok(()) +} diff --git a/src/entrypoints/http/mod.rs b/src/entrypoints/http/mod.rs new file mode 100644 index 0000000..2ec48d2 --- /dev/null +++ b/src/entrypoints/http/mod.rs @@ -0,0 +1,2 @@ +pub mod post_watch_token_acct; +pub mod routes; diff --git a/src/entrypoints/http/post_watch_token_acct.rs b/src/entrypoints/http/post_watch_token_acct.rs new file mode 100644 index 0000000..b0e7de4 --- /dev/null +++ b/src/entrypoints/http/post_watch_token_acct.rs @@ -0,0 +1,155 @@ +use std::sync::Arc; + +use crate::entities::token_accts::token_accts; +use crate::entities::token_accts::token_accts::dsl::*; +use crate::entities::token_accts::TokenAcct; +use crate::entities::token_accts::TokenAcctStatus; +use crate::entities::token_accts::WatchTokenBalancePayload; +use crate::entities::token_accts::WatchTokenBalanceResponse; +use chrono::Utc; +use deadpool::managed::Object; +use deadpool_diesel::Manager; +use diesel::prelude::*; +use diesel::update; +use diesel::PgConnection; +use warp::{reject::Reject, Reply}; + +#[derive(Debug)] +struct ParseError; +impl Reject for ParseError {} + +pub async fn handler( + reply_with_status: warp::reply::WithStatus<&'static str>, + message: WatchTokenBalancePayload, + conn_manager: Arc>>, +) -> Result { + let response = reply_with_status.into_response(); + if !response.status().is_success() { + return Ok(warp::reply::with_status( + warp::reply::json(&WatchTokenBalanceResponse { + message: "unsuccessful response status".to_string(), + }), + response.status(), + )); + } + + let token_acct_pubkey = message.token_acct.clone(); + let token_acct_for_query = token_acct_pubkey.clone(); + + let token_acct_res: Result, _>, deadpool_diesel::InteractError> = + conn_manager + .interact(move |db| { + token_accts + .filter(token_accts::dsl::token_acct.eq(&token_acct_for_query)) + .first::(db) + .optional() + }) + .await; + + let token_acct_for_update = token_acct_pubkey.clone(); + match token_acct_res { + Ok(Ok(Some(token_acct_record))) => { + // if already watching, we need to switch back to enabled and then back to make sure account subscribe reinits + if token_acct_record.status == TokenAcctStatus::Watching { + let enabled_update_res = conn_manager + .interact(move |db| { + update_token_acct_with_status( + token_acct_for_update, + TokenAcctStatus::Enabled, + db, + ) + }) + .await; + + match enabled_update_res { + Ok(_) => (), + Err(e) => { + return Ok(warp::reply::with_status( + warp::reply::json(&WatchTokenBalanceResponse { + message: format!( + "error updating token acct [{}] to {:?} status: {}", + token_acct_pubkey.to_string(), + status, + e + ), + }), + warp::http::StatusCode::BAD_REQUEST, + )); + } + } + } + } + Err(e) => { + return Ok(warp::reply::with_status( + warp::reply::json(&WatchTokenBalanceResponse { + message: format!("error interacting with postgres pool: {:?}", e), + }), + response.status(), + )); + } + Ok(Err(e)) => { + return Ok(warp::reply::with_status( + warp::reply::json(&WatchTokenBalanceResponse { + message: format!("error fetching token_acct to update: {:?}", e), + }), + response.status(), + )); + } + Ok(Ok(None)) => { + return Ok(warp::reply::with_status( + warp::reply::json(&WatchTokenBalanceResponse { + message: format!("could not find token_acct to update"), + }), + response.status(), + )); + } + } + + let token_acct_for_watching_update = token_acct_pubkey.clone(); + let res = conn_manager + .interact(move |db| { + update_token_acct_with_status( + token_acct_for_watching_update, + TokenAcctStatus::Watching, + db, + ) + }) + .await; + + match res { + Ok(_) => Ok(warp::reply::with_status( + warp::reply::json(&WatchTokenBalanceResponse { + message: format!( + "updated token acct to {:?} status: {}", + status, + token_acct_pubkey.to_string() + ), + }), + warp::http::StatusCode::OK, + )), + Err(e) => Ok(warp::reply::with_status( + warp::reply::json(&WatchTokenBalanceResponse { + message: format!( + "error updating token acct [{}] to {:?} status: {}", + token_acct_pubkey.to_string(), + status, + e + ), + }), + warp::http::StatusCode::BAD_REQUEST, + )), + } +} + +fn update_token_acct_with_status( + token_acct_pubkey: String, + token_acct_status: TokenAcctStatus, + db: &mut PgConnection, +) -> Result { + update(token_accts::table.filter(token_accts::token_acct.eq(token_acct_pubkey.to_string()))) + .set(( + token_accts::dsl::status.eq(token_acct_status.clone()), + token_accts::dsl::updated_at.eq(Utc::now()), + )) + .get_result::(db) +} diff --git a/src/entrypoints/http/routes.rs b/src/entrypoints/http/routes.rs new file mode 100644 index 0000000..49f43fb --- /dev/null +++ b/src/entrypoints/http/routes.rs @@ -0,0 +1,114 @@ +use std::{env, sync::Arc}; + +use deadpool::managed::Object; +use deadpool_diesel::Manager; +use diesel::PgConnection; +use solana_client::nonblocking::pubsub_client::PubsubClient; +use tokio::sync::Mutex; +use warp::Filter; + +use crate::{entities::token_accts::WatchTokenBalancePayload, services::auth::AuthClient}; + +use super::post_watch_token_acct; + +pub async fn listen_and_serve(db: Arc>>) { + let port = match env::var("PORT") + .unwrap_or("8080".to_string()) + .parse::() + { + Ok(port) => port, + Err(_) => 8080, + }; + let auth_service_url = env::var("AUTH_SERVICE_URL").expect("AUTH_SERVICE_URL must be set"); + let auth_client = Arc::new(Mutex::new(AuthClient::new(&auth_service_url))); + + let auth_filter = warp::any() + .and(warp::header::("authorization")) + .and(with_auth_client(auth_client.clone())) + .and_then(validate_token); + + let watch_balance_route = warp::post() + .and(warp::path("watch-token-balance")) + .and(auth_filter) + .and(watch_token_json_body()) + .and(with_db(db)) + .and_then(post_watch_token_acct::handler); + + let cors = warp::cors() + .allow_any_origin() + .allow_headers(vec![ + "User-Agent", + "Sec-Fetch-Mode", + "Referer", + "Origin", + "Access-Control-Request-Method", + "Access-Control-Request-Headers", + "Content-Type", + "Authorization", + "Sec-Ch-Ua", + "Sec-Ch-Ua-Mobile", + "Sec-Ch-Ua-Platform", + ]) + .allow_methods(vec!["POST", "GET", "OPTIONS"]); + + let routes = watch_balance_route.with(cors); + + warp::serve(routes).run(([0, 0, 0, 0], port)).await +} + +fn watch_token_json_body( +) -> impl Filter + Clone { + warp::body::content_length_limit(1024 * 16).and(warp::body::json()) +} + +fn with_auth_client( + auth_client: Arc>, +) -> impl Filter>,), Error = std::convert::Infallible> + Clone { + warp::any().map(move || auth_client.clone()) +} +fn with_db( + db: Arc>>, +) -> impl Filter>>,), Error = std::convert::Infallible> + Clone +{ + warp::any().map(move || db.clone()) +} +fn with_rpc_pub_sub( + rpc_pub_sub: Arc, +) -> impl Filter,), Error = std::convert::Infallible> + Clone { + warp::any().map(move || rpc_pub_sub.clone()) +} + +async fn validate_token( + token: String, + auth_client: Arc>, +) -> Result, warp::Rejection> { + let auth_client = auth_client.lock().await; + + // Check if the token starts with "Bearer " + if !token.starts_with("Bearer ") { + return Ok(warp::reply::with_status( + "Invalid authorization header", + warp::http::StatusCode::BAD_REQUEST, + )); + } + + // Extract the actual token by removing the "Bearer " prefix + let token_trimmed = token.trim_start_matches("Bearer "); + + match auth_client.post_session(&token_trimmed).await { + Ok(response) => { + println!("Valid token! Session ID: {}", response.session_id); + Ok(warp::reply::with_status( + "Token is valid", + warp::http::StatusCode::OK, + )) + } + Err(err) => { + println!("Invalid token: {}", err); + Ok(warp::reply::with_status( + "Token is invalid", + warp::http::StatusCode::UNAUTHORIZED, + )) + } + } +} diff --git a/src/entrypoints/jobs/mod.rs b/src/entrypoints/jobs/mod.rs new file mode 100644 index 0000000..6d1b427 --- /dev/null +++ b/src/entrypoints/jobs/mod.rs @@ -0,0 +1 @@ +pub mod transaction_indexing; diff --git a/src/entrypoints/jobs/transaction_indexing.rs b/src/entrypoints/jobs/transaction_indexing.rs new file mode 100644 index 0000000..87de2f1 --- /dev/null +++ b/src/entrypoints/jobs/transaction_indexing.rs @@ -0,0 +1,49 @@ +use std::sync::Arc; + +use chrono::{Duration, NaiveDateTime, Utc}; +use deadpool::managed::Object; +use deadpool_diesel::Manager; +use diesel::{ExpressionMethods, PgConnection}; + +use crate::entities::transactions::transactions; +use crate::entities::transactions::transactions::main_ix_type; +use crate::entities::transactions::{transactions::block_time, Transaction}; +use crate::entrypoints::events; +use diesel::prelude::*; + +pub async fn run_job( + pg_connection: Arc>>, +) -> Result<(), Box> { + let thirty_days_ago = Utc::now().naive_utc() - Duration::minutes(50); + + // Run the query + let transactions = + get_recent_transactions_with_main_ix_type(thirty_days_ago, Arc::clone(&pg_connection)) + .await?; + + // Process each transaction + for transaction in transactions { + let pg_clone = Arc::clone(&pg_connection); + events::transactions_insert::index_tx_record(transaction, pg_clone).await?; + } + Ok(()) +} + +async fn get_recent_transactions_with_main_ix_type( + thirty_days_ago: NaiveDateTime, + connection: Arc>>, +) -> Result, Box> { + let res = connection + .interact(move |conn| { + transactions::table + .filter( + main_ix_type + .is_not_null() + .and(block_time.ge(thirty_days_ago)), + ) + .load::(conn) + }) + .await?; + + Ok(res?) +} diff --git a/src/entrypoints/mod.rs b/src/entrypoints/mod.rs new file mode 100644 index 0000000..a3128a9 --- /dev/null +++ b/src/entrypoints/mod.rs @@ -0,0 +1,3 @@ +pub mod events; +pub mod http; +pub mod jobs; diff --git a/src/events/rpc_token_acct_updates.rs b/src/events/rpc_token_acct_updates.rs deleted file mode 100644 index 77a2228..0000000 --- a/src/events/rpc_token_acct_updates.rs +++ /dev/null @@ -1,70 +0,0 @@ -use std::sync::Arc; - -use deadpool::managed::Object; -use deadpool_diesel::Manager; -use diesel::PgConnection; -use futures::StreamExt; -use solana_account_decoder::{UiAccount, UiAccountData}; -use solana_client::{nonblocking::pubsub_client::PubsubClient, rpc_config::RpcAccountInfoConfig}; -use solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey}; - -use crate::entities::token_accts::TokenAcct; -use crate::services::balances_handler; - -pub async fn new_handler( - pub_sub_client: Arc, - db: Arc>>, - token_acct_pubkey: Pubkey, - token_acct_record: TokenAcct, -) { - println!("subscribing to account {}", token_acct_pubkey.to_string()); - let (mut subscription, _) = pub_sub_client - .account_subscribe( - &token_acct_pubkey, - Some(RpcAccountInfoConfig { - encoding: Some(solana_account_decoder::UiAccountEncoding::JsonParsed), - data_slice: None, - commitment: Some(CommitmentConfig::confirmed()), - min_context_slot: None, - }), - ) - .await - .expect("Failed to subscribe to account"); - - while let Some(val) = subscription.next().await { - let ui_account: UiAccount = val.value; - match ui_account.data { - UiAccountData::Binary(data, encoding) => { - println!("Binary data: {:?}, Encoding: {:?}", data, encoding); - // Process binary data here - } - UiAccountData::Json(data) => { - let record_clone = token_acct_record.clone(); - let token_acct_clone = record_clone.token_acct.clone(); - let token_acct_update_res = db - .interact(move |conn| { - return balances_handler::handle_token_acct_change( - conn, - record_clone, - data, - ); - }) - .await; - match token_acct_update_res { - Ok(res) => match res { - Ok(_) => { - println!("successfully updated token balance: {:?}", token_acct_clone) - } - Err(e) => println!("error kind: {:?}", e), - }, - Err(e) => println!("interact error: {:?}", e), - } - // Process JSON data here - } - UiAccountData::LegacyBinary(data) => { - println!("Parsed LegacyBinary data: {:?}", data); - // Process parsed JSON data here - } - } - } -} diff --git a/src/events/transactions_insert.rs b/src/events/transactions_insert.rs deleted file mode 100644 index b4f538d..0000000 --- a/src/events/transactions_insert.rs +++ /dev/null @@ -1,68 +0,0 @@ -use crate::entities::transactions::{InstructionType, Payload, TransactionsInsertChannelPayload}; -use crate::services; -use deadpool::managed::Object; -use deadpool_diesel::Manager; -use diesel::prelude::*; -use diesel::{ExpressionMethods, PgConnection}; -use postgres::Notification; -use std::sync::Arc; - -use crate::entities::transactions::{transactions::dsl::*, Transaction}; - -pub async fn new_handler( - notification: Notification, - pool_connection: Arc>>, -) { - println!( - "new transactions table payload: {:?}", - notification.payload() - ); - match TransactionsInsertChannelPayload::parse_payload(notification.payload()) { - Ok(tx_payload) => match handle_new_transaction(tx_payload.tx_sig, pool_connection).await { - Ok(()) => println!("successfully handled new transaction notification"), - Err(e) => eprintln!("error handling new transaction notification: {:?}", e), - }, - Err(e) => eprintln!("error parsing new transaction notification: {:?}", e), - }; -} - -async fn handle_new_transaction( - transaction_signature: String, - connection: Arc>>, -) -> Result<(), Box> { - let txn_result = connection - .clone() - .interact(|conn| { - return transactions - .filter(tx_sig.eq(transaction_signature)) - .limit(1) - .select(Transaction::as_select()) - .load(conn); - }) - .await?; - - let txn = txn_result?; - - let payload_parsed = Payload::parse_payload(&txn[0].payload)?; - - match payload_parsed.get_main_ix_type() { - Some(ix_type) => match ix_type { - InstructionType::VaultMintConditionalTokens => { - connection - .interact(|conn| { - let mint_handler_res = - services::new_mint_handlers::handle_mint_tx(conn, payload_parsed); - match mint_handler_res { - Ok(_) => println!("handled new mint tx"), - Err(e) => eprintln!("error tracking new mint: {:?}", e), - } - }) - .await?; - } - x => println!("unhandled ix type: {:?}", x), - }, - None => println!("tx has no ix type we care about"), - } - - Ok(()) -} diff --git a/src/main.rs b/src/main.rs index 4e23d04..7c3ab1b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,23 +3,15 @@ extern crate diesel; extern crate dotenv; use diesel::prelude::*; -use dotenv::dotenv; -use entities::token_accts::TokenAcctStatus; -use futures::{stream, FutureExt, StreamExt, TryStreamExt}; -use postgres::NoTls; -use solana_program::pubkey::Pubkey; use std::{env, sync::Arc}; use tokio::signal; -use tokio_postgres::{connect, AsyncMessage}; -mod entities; -use entities::token_accts::{token_accts::dsl::*, TokenAcct}; mod adapters; -mod events; +mod entities; +mod entrypoints; mod services; use deadpool::managed::Object; use deadpool_diesel::postgres::{Pool, Runtime}; use deadpool_diesel::Manager; -use std::str::FromStr; use tokio::task::{self}; async fn get_database_pool( @@ -31,105 +23,36 @@ async fn get_database_pool( Ok(Arc::new(conn_manager)) } -async fn get_pubsub_client( -) -> Result, Box> -{ - let rpc_endpoint_ws = env::var("RPC_ENDPOINT_WSS").expect("RPC_ENDPOINT_WSS must be set"); - let pub_sub_client = - solana_client::nonblocking::pubsub_client::PubsubClient::new(&rpc_endpoint_ws).await?; - Ok(Arc::new(pub_sub_client)) -} - -// TODO this should return a result -async fn setup_event_listeners( - db_url: &str, - managed_connection: Arc>>, +async fn run_jobs( + pg_connection: Arc>>, ) -> Result<(), Box> { - let pub_sub_client = get_pubsub_client().await?; - - let results = managed_connection - .clone() - .interact(|conn| { - return token_accts - .filter(status.eq(TokenAcctStatus::Watching)) - .load::(conn) - .expect("Error loading token_accts"); - }) - .await?; - - for record in results { - match Pubkey::from_str(&record.token_acct) { - Ok(token_acct_pubkey) => { - let conn_manager_arg_clone = Arc::clone(&managed_connection); - let pub_sub_client_clone = Arc::clone(&pub_sub_client); - tokio::spawn(async move { - events::rpc_token_acct_updates::new_handler( - pub_sub_client_clone, - conn_manager_arg_clone, - token_acct_pubkey, - record, - ) - .await - }); - } - Err(e) => eprintln!("Error with token acct pubkey parsing: {}", e), - } - } - - let (client, mut connection) = connect(db_url, NoTls).await.unwrap(); - // Make transmitter and receiver. - let (tx, mut rx) = futures_channel::mpsc::unbounded(); - let stream = - stream::poll_fn(move |cx| connection.poll_message(cx)).map_err(|e| panic!("{}", e)); - let connection = stream.forward(tx).map(|r| r.unwrap()); - tokio::spawn(connection); - - client - .batch_execute( - " - LISTEN transactions_insert_channel; - LISTEN token_accts_insert_channel; - ", - ) - .await - .unwrap(); - - while let Some(m) = rx.next().await { - let connect_clone = Arc::clone(&managed_connection); - match m { - AsyncMessage::Notification(n) => match n.channel() { - "token_accts_insert_channel" => { - task::spawn(events::token_accts_insert::new_handler( - n, - connect_clone, - Arc::clone(&pub_sub_client), - )); - } - "transactions_insert_channel" => { - task::spawn(events::transactions_insert::new_handler(n, connect_clone)); - } - _ => (), - }, - AsyncMessage::Notice(notice) => println!("async message error: {:?}", notice), - _ => println!("fallthrough handler of async message from postgres listener"), - } - } - - // TODO: event handlers should not return results themselves.. but things they call will return results and they should handle that internally - + entrypoints::jobs::transaction_indexing::run_job(pg_connection).await?; Ok(()) } #[tokio::main] async fn main() -> Result<(), Box> { - dotenv().ok(); - + env_logger::init(); let database_url = env::var("DATABASE_URL").expect("DATABASE_URL must be set"); - let conn_manager_arc = get_database_pool(&database_url).await?; - setup_event_listeners(&database_url, Arc::clone(&conn_manager_arc)).await?; + let pub_sub_client = adapters::rpc::get_pubsub_client().await?; + let db = get_database_pool(&database_url).await?; + + let db_clone = Arc::clone(&db); + let db_clone_2 = Arc::clone(&db); + + let database_url_copy = database_url.clone(); + task::spawn(async move { + entrypoints::events::setup::setup_event_listeners(&database_url_copy, db, pub_sub_client) + .await + }); + + // run the API + task::spawn(async move { entrypoints::http::routes::listen_and_serve(db_clone).await }); + + // TODO setup API and watchers before running backfill... + run_jobs(db_clone_2).await?; - // Block the main function and handle CTRL+C signal::ctrl_c().await?; println!("Received CTRL+C, shutting down."); Ok(()) diff --git a/src/services/auth.rs b/src/services/auth.rs new file mode 100644 index 0000000..4cfd6a3 --- /dev/null +++ b/src/services/auth.rs @@ -0,0 +1,83 @@ +use reqwest::Client; +use std::error::Error; + +use crate::entities::auth::{ + AuthErrorResponse, AuthGetRequest, AuthMessageResponse, AuthPostRequest, AuthPutRequest, + AuthSessionResponse, +}; + +pub struct AuthClient { + client: Client, + base_url: String, +} + +impl AuthClient { + pub fn new(base_url: &str) -> Self { + Self { + client: Client::new(), + base_url: base_url.to_string(), + } + } + + pub async fn post_session(&self, pub_key: &str) -> Result> { + let url = format!("{}/auth", self.base_url); + let req_body = AuthPostRequest { + pub_key: pub_key.to_string(), + }; + let resp = self.client.post(&url).json(&req_body).send().await?; + println!("{}", resp.status()); + if resp.status().is_success() { + let session_response = resp.json::().await?; + Ok(session_response) + } else { + let error_response = resp.json::().await?; + Err(Box::new(std::io::Error::new( + std::io::ErrorKind::Other, + error_response.error, + ))) + } + } + + pub async fn put_session( + &self, + id: &str, + signature: &str, + pub_key: &str, + ) -> Result> { + let url = format!("{}/auth", self.base_url); + let req_body = AuthPutRequest { + id: id.to_string(), + signature: signature.to_string(), + pub_key: pub_key.to_string(), + }; + let resp = self.client.put(&url).json(&req_body).send().await?; + if resp.status().is_success() { + let message_response = resp.json::().await?; + Ok(message_response) + } else { + let error_response = resp.json::().await?; + Err(Box::new(std::io::Error::new( + std::io::ErrorKind::Other, + error_response.error, + ))) + } + } + + pub async fn get_session(&self, pubkey: &str) -> Result> { + let url = format!("{}/auth", self.base_url); + let req_body = AuthGetRequest { + pubkey: pubkey.to_string(), + }; + let resp = self.client.get(&url).json(&req_body).send().await?; + if resp.status().is_success() { + let message_response = resp.json::().await?; + Ok(message_response) + } else { + let error_response = resp.json::().await?; + Err(Box::new(std::io::Error::new( + std::io::ErrorKind::Other, + error_response.error, + ))) + } + } +} diff --git a/src/services/balances.rs b/src/services/balances.rs new file mode 100644 index 0000000..6f0603a --- /dev/null +++ b/src/services/balances.rs @@ -0,0 +1,235 @@ +use crate::entities::token_acct_balances::token_acct_balances; +use crate::entities::token_acct_balances::TokenAcctBalances; +use crate::entities::token_accts::token_accts; + +use crate::entities::token_accts::TokenAcct; +use crate::entities::token_accts::TokenAcctStatus; +use crate::entities::tokens; +use crate::entities::transactions::Payload; +use chrono::Utc; +use deadpool::managed::Object; +use deadpool_diesel::Manager; +use diesel::prelude::*; +use diesel::PgConnection; +use serde_json::Value; +use solana_account_decoder::parse_account_data::ParsedAccount; +use solana_client::rpc_response::RpcResponseContext; +use std::io; +use std::io::ErrorKind; +use std::sync::Arc; + +use super::transactions; + +pub async fn handle_token_acct_change( + conn_manager: Arc>>, + record: TokenAcct, + updated_token_account: ParsedAccount, + ctx: RpcResponseContext, +) -> Result<(), Box> { + // Parse the object + let parsed_object = updated_token_account.parsed.as_object(); + + // Extract the token amount information + let token_amount = parsed_object + .and_then(|object| object.get("info")) + .and_then(|info| info.get("tokenAmount")) + .and_then(|token_amount| token_amount.as_object()); + + // Bail out if token amount information is missing + if token_amount.is_none() { + println!("Could not parse token acct change"); + return Ok(()); + } + + let token_amount_unwrapped = token_amount.unwrap(); + + // Extract amount and decimals, bail out if any are missing or not in expected format + let new_amount_str = token_amount_unwrapped + .get("amount") + .and_then(Value::as_str) + .ok_or_else(|| { + println!("amount not found or invalid"); + Box::new(io::Error::new( + ErrorKind::InvalidData, + "amount not found or invalid", + )) + })?; + + let new_amount: i64 = new_amount_str.parse().map_err(|_| { + println!("Failed to parse amount as i64"); + Box::new(io::Error::new( + ErrorKind::InvalidData, + "Failed to parse amount as i64", + )) + })?; + + // Query the most recent value for the given token_acct to calculate the delta + let record_clone = record.clone(); + let previous_balance = conn_manager + .interact(move |conn| { + token_acct_balances::table + .filter(token_acct_balances::dsl::token_acct.eq(record_clone.token_acct.clone())) + .order_by(token_acct_balances::dsl::slot.desc()) + .select(token_acct_balances::dsl::amount) + .first::(conn) + .optional() + }) + .await??; + + let previous_balance = match previous_balance { + Some(prev) => prev, + None => 0, + }; + + let new_delta = new_amount - previous_balance; + + let new_balance = TokenAcctBalances { + token_acct: record.token_acct.clone(), + mint_acct: record.mint_acct.clone(), + owner_acct: record.owner_acct.clone(), + amount: new_amount, + delta: new_delta, + slot: ctx.slot as i64, + created_at: Utc::now(), + tx_sig: None, + }; + + let conn_manager_clone = conn_manager.clone(); + conn_manager_clone + .interact(move |conn| { + diesel::insert_into(token_acct_balances::table) + .values(new_balance) + .execute(conn) + }) + .await??; + + let now = Utc::now(); + let mut token_balance: TokenAcct = record; + token_balance.amount = new_amount; + token_balance.updated_at = Some(now); + + conn_manager + .interact(move |conn| { + diesel::update( + token_accts::table + .filter(token_accts::token_acct.eq(token_balance.token_acct.clone())), + ) + .set(( + token_accts::amount.eq(new_amount), + token_accts::dsl::updated_at.eq(Utc::now()), + )) + .execute(conn) + }) + .await??; + + Ok(()) +} + +// TODO make this be able to run without updating token_acct to watching +pub async fn handle_token_acct_in_tx( + conn_manager: Arc>>, + transaction_payload: Payload, + transaction_sig: String, + mint_acct_value: &str, + token_account: &str, + authority_account: &str, +) -> Result<(), Box> { + let mint_acct_value_str = mint_acct_value.to_string(); + let token_account_str = token_account.to_string(); + let authority_account_str = authority_account.to_string(); + let transaction_sig_str = transaction_sig.clone(); + + // Check if the token record exists + let mint_acct_value_clone = mint_acct_value_str.clone(); + let token_record_exists = conn_manager + .interact(move |db| { + tokens::tokens::table + .filter(tokens::tokens::dsl::mint_acct.eq(mint_acct_value_clone)) + .count() + .get_result::(db) + }) + .await?? + > 0; + + if !token_record_exists { + println!( + "Token table record not found for account: {}", + token_account_str + ); + return Ok(()); + } + + // Find the matching account in the root accounts array and extract postBalance + let account_with_balance = transaction_payload + .accounts + .iter() + .find(|acc| acc.pubkey == token_account_str) + .ok_or("Matching account not found in transaction payload")?; + + let account_balance = match &account_with_balance.post_token_balance { + Some(token_balance) => token_balance + .amount + .split(':') + .nth(1) + .ok_or("Invalid postBalance format")? + .parse::()?, + None => 0, + }; + + // Check if the token account already exists + let token_account_clone = token_account_str.clone(); + let token_acct_record: Vec = conn_manager + .interact(move |db| { + token_accts::table + .filter(token_accts::dsl::token_acct.eq(token_account_clone)) + .load::(db) + }) + .await??; + + if token_acct_record.is_empty() { + let new_token_acct = TokenAcct { + token_acct: token_account_str.clone(), + owner_acct: authority_account_str.clone(), + amount: account_balance, + status: TokenAcctStatus::Watching, + mint_acct: mint_acct_value_str.clone(), + updated_at: Some(Utc::now()), + }; + + let new_token_acct_clone = new_token_acct.clone(); + conn_manager + .interact(move |db| { + diesel::insert_into(token_accts::table) + .values(&new_token_acct_clone) + .execute(db) + }) + .await??; + } else { + let token_account_update = token_account_str.clone(); + conn_manager + .interact(move |db| { + diesel::update( + token_accts::table.filter(token_accts::token_acct.eq(token_account_update)), + ) + .set(( + token_accts::status.eq(TokenAcctStatus::Watching), + token_accts::dsl::updated_at.eq(Utc::now()), + )) + .execute(db) + }) + .await??; + } + + transactions::handle_token_acct_balance_tx( + conn_manager.clone(), + token_account_str.clone(), + account_balance, + Some(transaction_sig_str), + transaction_payload.slot, + mint_acct_value_str, + authority_account_str, + ) + .await?; + + Ok(()) +} diff --git a/src/services/balances_handler.rs b/src/services/balances_handler.rs deleted file mode 100644 index a42eb0f..0000000 --- a/src/services/balances_handler.rs +++ /dev/null @@ -1,71 +0,0 @@ -use crate::entities::token_acct_balances::token_acct_balances; -use crate::entities::token_acct_balances::TokenAcctBalancesRecord; -use crate::entities::token_accts::token_accts; -use crate::entities::token_accts::token_accts::dsl::*; -use crate::entities::token_accts::TokenAcct; -use diesel::prelude::*; -use diesel::PgConnection; -use serde_json::Value; -use solana_account_decoder::parse_account_data::ParsedAccount; -use std::io::ErrorKind; -use std::time::SystemTime; - -pub fn handle_token_acct_change( - connection: &mut PgConnection, - record: TokenAcct, - updated_token_account: ParsedAccount, -) -> Result<(), ErrorKind> { - // Parse the object - let parsed_object = updated_token_account.parsed.as_object(); - - // Extract the token amount information - let token_amount = parsed_object - .and_then(|object| object.get("info")) - .and_then(|info| info.get("tokenAmount")) - .and_then(|token_amount| token_amount.as_object()); - - // Bail out if token amount information is missing - if token_amount.is_none() { - println!("Could not parse token acct change"); - return Ok(()); - } - - let token_amount_unwrapped = token_amount.unwrap(); - - // Extract amount and decimals, bail out if any are missing or not in expected format - let new_amount_str = token_amount_unwrapped - .get("amount") - .and_then(Value::as_str) - .ok_or_else(|| { - println!("amount not found or invalid"); - ErrorKind::InvalidData - })?; - - let new_amount: i64 = new_amount_str.parse().map_err(|_| { - println!("Failed to parse amount as i64"); - ErrorKind::InvalidData - })?; - - let new_balance = TokenAcctBalancesRecord { - token_acct: record.token_acct.clone(), - mint_acct: record.mint_acct.clone(), - owner_acct: record.owner_acct.clone(), - amount: new_amount, - created_at: SystemTime::now(), - }; - - diesel::insert_into(token_acct_balances::table) - .values(new_balance) - .execute(connection) - .expect("Error inserting into token_acct_balances"); - - let now = SystemTime::now(); - let mut token_balance: TokenAcct = record; - token_balance.amount = new_amount; - token_balance.updated_at = Some(now); - diesel::update(token_accts::table.filter(token_acct.eq(token_balance.token_acct.clone()))) - .set(amount.eq(new_amount)) - .execute(connection) - .expect("Error updating token_accts"); - Ok(()) -} diff --git a/src/services/liquidity_adding.rs b/src/services/liquidity_adding.rs new file mode 100644 index 0000000..d530558 --- /dev/null +++ b/src/services/liquidity_adding.rs @@ -0,0 +1,143 @@ +use std::io; +use std::sync::Arc; + +use crate::entities::markets::markets; +use crate::entities::markets::markets::market_acct; +use crate::entities::markets::Market; +use crate::entities::transactions::Instruction; +use crate::entities::transactions::Payload; +use deadpool::managed::Object; +use deadpool_diesel::Manager; +use diesel::prelude::*; +use diesel::PgConnection; + +use super::balances; + +pub async fn handle_lp_deposit_tx( + conn_manager: Arc>>, + transaction_payload: Payload, + transaction_sig: String, +) -> Result<(), Box> { + let lp_deposit_instruction = find_lp_deposit_instruction(&transaction_payload)?; + let authority_account = find_authority_account(&lp_deposit_instruction)?; + let (lp_ata, lp_mint) = find_lp_mint_and_ata_account(&lp_deposit_instruction)?; + let mut lp_account_vec = vec![(lp_ata.as_str(), lp_mint)]; + let amm_acct = lp_deposit_instruction + .accounts_with_data + .iter() + .find(|account| account.name == "amm") + .map(|account| account.pubkey.clone()); + + let amm_acct_str = match amm_acct { + Some(acct) => acct, + None => "".to_string(), + }; + + if amm_acct_str == "" { + return Err(Box::new(io::Error::new( + io::ErrorKind::Other, + "no amm_acct_str", + ))); + } + + let (base_mint, quote_mint) = + find_base_and_quote_mint(amm_acct_str, Arc::clone(&conn_manager)).await?; + + let mut relevant_accounts = + get_relevant_accounts_from_ix_and_mints(&lp_deposit_instruction, base_mint, quote_mint); + relevant_accounts.append(&mut lp_account_vec); + + for (token_account, mint_acct_value) in relevant_accounts { + balances::handle_token_acct_in_tx( + Arc::clone(&conn_manager), + transaction_payload.clone(), + transaction_sig.clone(), + &mint_acct_value, + token_account, + &authority_account, + ) + .await? + } + + Ok(()) +} + +fn find_lp_deposit_instruction( + transaction_payload: &Payload, +) -> Result> { + transaction_payload + .instructions + .iter() + .find(|instruction| instruction.name == "addLiquidity") + .cloned() + .ok_or_else(|| "addLiquidity instruction not found".into()) +} + +fn find_authority_account( + lp_deposit_instruction: &Instruction, +) -> Result> { + lp_deposit_instruction + .accounts_with_data + .iter() + .find(|account| account.name == "user") + .map(|account| account.pubkey.clone()) + .ok_or_else(|| "Authority account not found in addLiquidity instruction".into()) +} +fn find_lp_mint_and_ata_account( + lp_deposit_instruction: &Instruction, +) -> Result<(String, String), Box> { + let mint_res: Result = lp_deposit_instruction + .accounts_with_data + .iter() + .find(|account| account.name == "lpMint") + .map(|account| account.pubkey.clone()) + .ok_or_else(|| "lpMint account not found in addLiquidity instruction"); + let ata_res: Result = lp_deposit_instruction + .accounts_with_data + .iter() + .find(|account| account.name == "userLpAccount") + .map(|account| account.pubkey.clone()) + .ok_or_else(|| "lpMint account not found in addLiquidity instruction"); + + match (mint_res, ata_res) { + (Ok(mint), Ok(ata)) => Ok((ata, mint)), + _ => Err(Box::new(io::Error::new( + io::ErrorKind::Other, + "could not find lp accounts", + ))), + } +} + +async fn find_base_and_quote_mint( + amm_acct: String, + conn_manager: Arc>>, +) -> Result<(String, String), Box> { + let amm_market: Market = conn_manager + .interact(|connection| { + markets::table + .filter(market_acct.eq(amm_acct)) + .first(connection) + }) + .await??; + + Ok((amm_market.base_mint_acct, amm_market.quote_mint_acct)) +} + +fn get_relevant_accounts_from_ix_and_mints( + lp_deposit_instruction: &crate::entities::transactions::Instruction, + base_mint: String, + quote_mint: String, +) -> Vec<(&str, String)> { + // Collect the necessary "user" accounts to insert into token_accts + + let relevant_accounts: Vec<(&str, String)> = lp_deposit_instruction + .accounts_with_data + .iter() + .filter_map(|account| match account.name.as_str() { + "userBaseAccount" => Some((account.pubkey.as_str(), base_mint.clone())), + "userQuoteAccount" => Some((account.pubkey.as_str(), quote_mint.clone())), + _ => None, + }) + .collect(); + relevant_accounts +} diff --git a/src/services/liquidity_removing.rs b/src/services/liquidity_removing.rs new file mode 100644 index 0000000..6cc35cf --- /dev/null +++ b/src/services/liquidity_removing.rs @@ -0,0 +1,143 @@ +use std::io; +use std::sync::Arc; + +use crate::entities::markets::markets; +use crate::entities::markets::markets::market_acct; +use crate::entities::markets::Market; +use crate::entities::transactions::Instruction; +use crate::entities::transactions::Payload; +use deadpool::managed::Object; +use deadpool_diesel::Manager; +use diesel::prelude::*; +use diesel::PgConnection; + +use super::balances; + +pub async fn handle_lp_withdrawal_tx( + conn_manager: Arc>>, + transaction_payload: Payload, + transaction_sig: String, +) -> Result<(), Box> { + let lp_withdrawal_instruction = find_lp_withdrawal_instruction(&transaction_payload)?; + let authority_account = find_authority_account(&lp_withdrawal_instruction)?; + let (lp_ata, lp_mint) = find_lp_mint_and_ata_account(&lp_withdrawal_instruction)?; + let mut lp_account_vec = vec![(lp_ata.as_str(), lp_mint)]; + let amm_acct = lp_withdrawal_instruction + .accounts_with_data + .iter() + .find(|account| account.name == "amm") + .map(|account| account.pubkey.clone()); + + let amm_acct_str = match amm_acct { + Some(acct) => acct, + None => "".to_string(), + }; + + if amm_acct_str == "" { + return Err(Box::new(io::Error::new( + io::ErrorKind::Other, + "This is an error message", + ))); + } + + let (base_mint, quote_mint) = + find_base_and_quote_mint(amm_acct_str, Arc::clone(&conn_manager)).await?; + + let mut relevant_accounts = + get_relevant_accounts_from_ix_and_mints(&lp_withdrawal_instruction, base_mint, quote_mint); + relevant_accounts.append(&mut lp_account_vec); + + for (token_account, mint_acct_value) in relevant_accounts { + balances::handle_token_acct_in_tx( + Arc::clone(&conn_manager), + transaction_payload.clone(), + transaction_sig.clone(), + &mint_acct_value, + token_account, + &authority_account, + ) + .await? + } + + Ok(()) +} + +fn find_lp_withdrawal_instruction( + transaction_payload: &Payload, +) -> Result> { + transaction_payload + .instructions + .iter() + .find(|instruction| instruction.name == "addLiquidity") + .cloned() + .ok_or_else(|| "addLiquidity instruction not found".into()) +} + +fn find_authority_account( + lp_withdrawal_instruction: &Instruction, +) -> Result> { + lp_withdrawal_instruction + .accounts_with_data + .iter() + .find(|account| account.name == "user") + .map(|account| account.pubkey.clone()) + .ok_or_else(|| "Authority account not found in addLiquidity instruction".into()) +} +fn find_lp_mint_and_ata_account( + lp_withdrawal_instruction: &Instruction, +) -> Result<(String, String), Box> { + let mint_res: Result = lp_withdrawal_instruction + .accounts_with_data + .iter() + .find(|account| account.name == "lpMint") + .map(|account| account.pubkey.clone()) + .ok_or_else(|| "lpMint account not found in addLiquidity instruction"); + let ata_res: Result = lp_withdrawal_instruction + .accounts_with_data + .iter() + .find(|account| account.name == "userLpAccount") + .map(|account| account.pubkey.clone()) + .ok_or_else(|| "lpMint account not found in addLiquidity instruction"); + + match (mint_res, ata_res) { + (Ok(mint), Ok(ata)) => Ok((ata, mint)), + _ => Err(Box::new(io::Error::new( + io::ErrorKind::Other, + "could not find lp accounts", + ))), + } +} + +async fn find_base_and_quote_mint( + amm_acct: String, + conn_manager: Arc>>, +) -> Result<(String, String), Box> { + let amm_market: Market = conn_manager + .interact(|connection| { + markets::table + .filter(market_acct.eq(amm_acct)) + .first(connection) + }) + .await??; + + Ok((amm_market.base_mint_acct, amm_market.quote_mint_acct)) +} + +fn get_relevant_accounts_from_ix_and_mints( + lp_withdrawal_instruction: &crate::entities::transactions::Instruction, + base_mint: String, + quote_mint: String, +) -> Vec<(&str, String)> { + // Collect the necessary "user" accounts to insert into token_accts + + let relevant_accounts: Vec<(&str, String)> = lp_withdrawal_instruction + .accounts_with_data + .iter() + .filter_map(|account| match account.name.as_str() { + "userBaseAccount" => Some((account.pubkey.as_str(), base_mint.clone())), + "userQuoteAccount" => Some((account.pubkey.as_str(), quote_mint.clone())), + _ => None, + }) + .collect(); + relevant_accounts +} diff --git a/src/services/merge_conditionals_for_underlying.rs b/src/services/merge_conditionals_for_underlying.rs new file mode 100644 index 0000000..780897d --- /dev/null +++ b/src/services/merge_conditionals_for_underlying.rs @@ -0,0 +1,126 @@ +use std::sync::Arc; + +use crate::entities::conditional_vaults::conditional_vaults::dsl::*; +use crate::entities::conditional_vaults::ConditionalVault; +use crate::entities::transactions::Instruction; +use crate::entities::transactions::Payload; +use deadpool::managed::Object; +use deadpool_diesel::Manager; +use diesel::prelude::*; +use diesel::PgConnection; + +use super::balances; + +pub async fn handle_merge_conditional_tokens_tx( + conn_manager: Arc>>, + transaction_payload: Payload, + transaction_sig: String, +) -> Result<(), Box> { + let mint_instruction = find_mint_instruction(&transaction_payload)?; + let authority_account = find_authority_account(&mint_instruction)?; + let vault_account = find_vault_account(&mint_instruction)?; + let conditional_vault = + get_conditional_vault(Arc::clone(&conn_manager), &vault_account).await?; + + let relevant_accounts = + get_relevant_accounts_from_mint_and_vault(&mint_instruction, conditional_vault); + + for (token_account, mint_acct_value) in relevant_accounts { + balances::handle_token_acct_in_tx( + Arc::clone(&conn_manager), + transaction_payload.clone(), + transaction_sig.clone(), + &mint_acct_value, + token_account, + &authority_account, + ) + .await? + } + + Ok(()) +} + +fn find_mint_instruction( + transaction_payload: &Payload, +) -> Result> { + transaction_payload + .instructions + .iter() + .find(|instruction| instruction.name == "mergeConditionalTokensForUnderlyingTokens") + .cloned() + .ok_or_else(|| "mergeConditionalTokensForUnderlyingTokens instruction not found".into()) +} + +fn find_authority_account( + mint_instruction: &Instruction, +) -> Result> { + mint_instruction + .accounts_with_data + .iter() + .find(|account| account.name == "authority") + .map(|account| account.pubkey.clone()) + .ok_or_else(|| { + "Authority account not found in mergeConditionalTokensForUnderlyingTokens instruction" + .into() + }) +} + +fn find_vault_account( + mint_instruction: &Instruction, +) -> Result> { + mint_instruction + .accounts_with_data + .iter() + .find(|account| account.name == "vault") + .map(|account| account.pubkey.clone()) + .ok_or_else(|| { + "Vault account not found in mergeConditionalTokensForUnderlyingTokens instruction" + .into() + }) +} + +async fn get_conditional_vault( + conn_manager: Arc>>, + vault_account: &str, +) -> Result> { + let vault_acct_clone = vault_account.to_string(); + let vault = conn_manager + .interact(move |connection| { + conditional_vaults + .filter(cond_vault_acct.eq(vault_acct_clone)) + .first(connection) + }) + .await??; + + Ok(vault) +} + +fn get_relevant_accounts_from_mint_and_vault( + mint_instruction: &crate::entities::transactions::Instruction, + conditional_vault: ConditionalVault, +) -> Vec<(&str, String)> { + // Collect the necessary "user" accounts to insert into token_accts + + let relevant_accounts: Vec<(&str, String)> = mint_instruction + .accounts_with_data + .iter() + .filter_map(|account| { + let vault_clone = conditional_vault.clone(); + match account.name.as_str() { + "userConditionalOnFinalizeTokenAccount" => Some(( + account.pubkey.as_str(), + vault_clone.cond_finalize_token_mint_acct, + )), + "userConditionalOnRevertTokenAccount" => Some(( + account.pubkey.as_str(), + vault_clone.cond_revert_token_mint_acct, + )), + "userUnderlyingTokenAccount" => { + Some((account.pubkey.as_str(), vault_clone.underlying_mint_acct)) + } + _ => None, + } + }) + .collect(); + relevant_accounts +} diff --git a/src/services/mod.rs b/src/services/mod.rs index 3cdf4eb..f1b739a 100644 --- a/src/services/mod.rs +++ b/src/services/mod.rs @@ -1,2 +1,9 @@ -pub mod balances_handler; -pub mod new_mint_handlers; +pub mod auth; +pub mod balances; +pub mod liquidity_adding; +pub mod liquidity_removing; +pub mod merge_conditionals_for_underlying; +pub mod new_mint; +pub mod redeem_conditionals; +pub mod swaps; +pub mod transactions; diff --git a/src/services/new_mint.rs b/src/services/new_mint.rs new file mode 100644 index 0000000..f998381 --- /dev/null +++ b/src/services/new_mint.rs @@ -0,0 +1,120 @@ +use std::sync::Arc; + +use crate::entities::conditional_vaults::conditional_vaults::dsl::*; +use crate::entities::conditional_vaults::ConditionalVault; +use crate::entities::transactions::Instruction; +use crate::entities::transactions::Payload; +use deadpool::managed::Object; +use deadpool_diesel::Manager; +use diesel::prelude::*; +use diesel::PgConnection; + +use super::balances; + +pub async fn handle_mint_tx( + conn_manager: Arc>>, + transaction_payload: Payload, + transaction_sig: String, +) -> Result<(), Box> { + let mint_instruction = find_mint_instruction(&transaction_payload)?; + let authority_account = find_authority_account(&mint_instruction)?; + let vault_account = find_vault_account(&mint_instruction)?; + let conditional_vault = + get_conditional_vault(Arc::clone(&conn_manager), &vault_account).await?; + + let relevant_accounts = + get_relevant_accounts_from_mint_and_vault(&mint_instruction, conditional_vault); + + for (token_account, mint_acct_value) in relevant_accounts { + balances::handle_token_acct_in_tx( + Arc::clone(&conn_manager), + transaction_payload.clone(), + transaction_sig.clone(), + &mint_acct_value, + token_account, + &authority_account, + ) + .await? + } + + Ok(()) +} + +fn find_mint_instruction( + transaction_payload: &Payload, +) -> Result> { + transaction_payload + .instructions + .iter() + .find(|instruction| instruction.name == "mintConditionalTokens") + .cloned() + .ok_or_else(|| "mintConditionalTokens instruction not found".into()) +} + +fn find_authority_account( + mint_instruction: &Instruction, +) -> Result> { + mint_instruction + .accounts_with_data + .iter() + .find(|account| account.name == "authority") + .map(|account| account.pubkey.clone()) + .ok_or_else(|| "Authority account not found in mintConditionalTokens instruction".into()) +} + +fn find_vault_account( + mint_instruction: &Instruction, +) -> Result> { + mint_instruction + .accounts_with_data + .iter() + .find(|account| account.name == "vault") + .map(|account| account.pubkey.clone()) + .ok_or_else(|| "Vault account not found in mintConditionalTokens instruction".into()) +} + +async fn get_conditional_vault( + conn_manager: Arc>>, + vault_account: &str, +) -> Result> { + let vault_acct_clone = vault_account.to_string(); + let vault = conn_manager + .interact(move |connection| { + conditional_vaults + .filter(cond_vault_acct.eq(vault_acct_clone)) + .first(connection) + }) + .await??; + + Ok(vault) +} + +fn get_relevant_accounts_from_mint_and_vault( + mint_instruction: &crate::entities::transactions::Instruction, + conditional_vault: ConditionalVault, +) -> Vec<(&str, String)> { + // Collect the necessary "user" accounts to insert into token_accts + + let relevant_accounts: Vec<(&str, String)> = mint_instruction + .accounts_with_data + .iter() + .filter_map(|account| { + let vault_clone = conditional_vault.clone(); + match account.name.as_str() { + "userConditionalOnFinalizeTokenAccount" => Some(( + account.pubkey.as_str(), + vault_clone.cond_finalize_token_mint_acct, + )), + "userConditionalOnRevertTokenAccount" => Some(( + account.pubkey.as_str(), + vault_clone.cond_revert_token_mint_acct, + )), + "userUnderlyingTokenAccount" => { + Some((account.pubkey.as_str(), vault_clone.underlying_mint_acct)) + } + _ => None, + } + }) + .collect(); + relevant_accounts +} diff --git a/src/services/new_mint_handlers.rs b/src/services/new_mint_handlers.rs deleted file mode 100644 index 06f64fa..0000000 --- a/src/services/new_mint_handlers.rs +++ /dev/null @@ -1,123 +0,0 @@ -use std::time::SystemTime; - -use crate::entities::conditional_vaults::conditional_vaults::dsl::*; -use crate::entities::conditional_vaults::ConditionalVault; -use crate::entities::token_acct_balances::token_acct_balances::dsl::*; -use crate::entities::token_acct_balances::TokenAcctBalancesRecord; -use crate::entities::token_accts::token_accts; -use crate::entities::token_accts::token_accts::dsl::*; -use crate::entities::token_accts::TokenAcct; -use crate::entities::token_accts::TokenAcctStatus; -use crate::entities::transactions::Payload; -use diesel::prelude::*; -use diesel::PgConnection; - -pub fn handle_mint_tx( - connection: &mut PgConnection, - transaction_payload: Payload, -) -> Result<(), Box> { - // Find the "mintConditionalTokens" instruction - let mint_instruction = transaction_payload - .instructions - .iter() - .find(|instruction| instruction.name == "mintConditionalTokens") - .ok_or("mintConditionalTokens instruction not found")?; - - // Find the "authority" account pubkey in the instruction - let authority_account = mint_instruction - .accounts_with_data - .iter() - .find(|account| account.name == "authority") - .ok_or("Authority account not found in mintConditionalTokens instruction")? - .pubkey - .clone(); - - // Find the "vault" account pubkey in the instruction - let vault_account = mint_instruction - .accounts_with_data - .iter() - .find(|account| account.name == "vault") - .ok_or("Vault account not found in mintConditionalTokens instruction")? - .pubkey - .clone(); - - let conditional_vault: ConditionalVault = conditional_vaults - .filter(cond_vault_acct.eq(vault_account.clone())) - .first(connection)?; - - // Collect the necessary "user" accounts to insert into token_accts - let relevant_accounts: Vec<(&str, &str)> = mint_instruction - .accounts_with_data - .iter() - .filter_map(|account| match account.name.as_str() { - "userConditionalOnFinalizeTokenAccount" => Some(( - account.pubkey.as_str(), - conditional_vault.cond_finalize_token_mint_acct.as_str(), - )), - "userConditionalOnRevertTokenAccount" => Some(( - account.pubkey.as_str(), - conditional_vault.cond_revert_token_mint_acct.as_str(), - )), - "userUnderlyingTokenAccount" => Some(( - account.pubkey.as_str(), - conditional_vault.underlying_mint_acct.as_str(), - )), - _ => None, - }) - .collect(); - - for (token_account, mint_acct_value) in relevant_accounts { - // Check if the token account already exists - let exists = token_accts - .filter(token_accts::dsl::token_acct.eq(token_account)) - .count() - .get_result::(connection)? - > 0; - - // If the token account already exists, skip the insert - if exists { - continue; - } - - // Find the matching account in the root accounts array and extract postBalance - let account_with_balance = transaction_payload - .accounts - .iter() - .find(|acc| acc.pubkey == token_account) - .ok_or("Matching account not found in transaction payload")?; - - let account_balance = match &account_with_balance.post_token_balance { - Some(token_balance) => token_balance - .amount - .split(':') - .nth(1) - .ok_or("Invalid postBalance format")? - .parse::()?, - None => 0, - }; - - let new_token_acct = TokenAcct { - token_acct: token_account.to_string(), - owner_acct: authority_account.clone(), - amount: account_balance, - status: TokenAcctStatus::Watching, - mint_acct: mint_acct_value.to_string(), - updated_at: Some(SystemTime::now()), - }; - let new_token_acct_balance = TokenAcctBalancesRecord { - token_acct: token_account.to_string(), - mint_acct: mint_acct_value.to_string(), - owner_acct: authority_account.clone(), - amount: account_balance, - created_at: SystemTime::now(), - }; - diesel::insert_into(token_accts) - .values(&new_token_acct) - .execute(connection)?; - diesel::insert_into(token_acct_balances) - .values(&new_token_acct_balance) - .execute(connection)?; - } - - Ok(()) -} diff --git a/src/services/redeem_conditionals.rs b/src/services/redeem_conditionals.rs new file mode 100644 index 0000000..7a23732 --- /dev/null +++ b/src/services/redeem_conditionals.rs @@ -0,0 +1,125 @@ +use std::sync::Arc; + +use super::balances; +use crate::entities::conditional_vaults::conditional_vaults::dsl::*; +use crate::entities::conditional_vaults::ConditionalVault; +use crate::entities::transactions::Instruction; +use crate::entities::transactions::Payload; +use deadpool::managed::Object; +use deadpool_diesel::Manager; +use diesel::prelude::*; +use diesel::PgConnection; + +pub async fn handle_redeem_conditional_tokens_tx( + conn_manager: Arc>>, + transaction_payload: Payload, + transaction_sig: String, +) -> Result<(), Box> { + let mint_instruction = find_mint_instruction(&transaction_payload)?; + let authority_account = find_authority_account(&mint_instruction)?; + let vault_account = find_vault_account(&mint_instruction)?; + let conditional_vault = + get_conditional_vault(Arc::clone(&conn_manager), &vault_account).await?; + + let relevant_accounts = + get_relevant_accounts_from_mint_and_vault(&mint_instruction, conditional_vault); + + for (token_account, mint_acct_value) in relevant_accounts { + balances::handle_token_acct_in_tx( + Arc::clone(&conn_manager), + transaction_payload.clone(), + transaction_sig.clone(), + &mint_acct_value, + token_account, + &authority_account, + ) + .await? + } + + Ok(()) +} + +fn find_mint_instruction( + transaction_payload: &Payload, +) -> Result> { + transaction_payload + .instructions + .iter() + .find(|instruction| instruction.name == "redeemConditionalTokensForUnderlyingTokens") + .cloned() + .ok_or_else(|| "redeemConditionalTokensForUnderlyingTokens instruction not found".into()) +} + +fn find_authority_account( + mint_instruction: &Instruction, +) -> Result> { + mint_instruction + .accounts_with_data + .iter() + .find(|account| account.name == "authority") + .map(|account| account.pubkey.clone()) + .ok_or_else(|| { + "Authority account not found in redeemConditionalTokensForUnderlyingTokens instruction" + .into() + }) +} + +fn find_vault_account( + mint_instruction: &Instruction, +) -> Result> { + mint_instruction + .accounts_with_data + .iter() + .find(|account| account.name == "vault") + .map(|account| account.pubkey.clone()) + .ok_or_else(|| { + "Vault account not found in redeemConditionalTokensForUnderlyingTokens instruction" + .into() + }) +} + +async fn get_conditional_vault( + conn_manager: Arc>>, + vault_account: &str, +) -> Result> { + let vault_acct_clone = vault_account.to_string(); + let vault = conn_manager + .interact(move |connection| { + conditional_vaults + .filter(cond_vault_acct.eq(vault_acct_clone)) + .first(connection) + }) + .await??; + + Ok(vault) +} + +fn get_relevant_accounts_from_mint_and_vault( + mint_instruction: &crate::entities::transactions::Instruction, + conditional_vault: ConditionalVault, +) -> Vec<(&str, String)> { + // Collect the necessary "user" accounts to insert into token_accts + + let relevant_accounts: Vec<(&str, String)> = mint_instruction + .accounts_with_data + .iter() + .filter_map(|account| { + let vault_clone = conditional_vault.clone(); + match account.name.as_str() { + "userConditionalOnFinalizeTokenAccount" => Some(( + account.pubkey.as_str(), + vault_clone.cond_finalize_token_mint_acct, + )), + "userConditionalOnRevertTokenAccount" => Some(( + account.pubkey.as_str(), + vault_clone.cond_revert_token_mint_acct, + )), + "userUnderlyingTokenAccount" => { + Some((account.pubkey.as_str(), vault_clone.underlying_mint_acct)) + } + _ => None, + } + }) + .collect(); + relevant_accounts +} diff --git a/src/services/swaps.rs b/src/services/swaps.rs new file mode 100644 index 0000000..95821f9 --- /dev/null +++ b/src/services/swaps.rs @@ -0,0 +1,116 @@ +use std::io; +use std::sync::Arc; + +use crate::entities::markets::markets; +use crate::entities::markets::markets::market_acct; +use crate::entities::markets::Market; +use crate::entities::transactions::Instruction; +use crate::entities::transactions::Payload; +use deadpool::managed::Object; +use deadpool_diesel::Manager; +use diesel::prelude::*; +use diesel::PgConnection; + +use super::balances; + +pub async fn handle_swap_tx( + conn_manager: Arc>>, + transaction_payload: Payload, + transaction_sig: String, +) -> Result<(), Box> { + let swap_instruction = find_swap_instruction(&transaction_payload)?; + let authority_account = find_authority_account(&swap_instruction)?; + let amm_acct = swap_instruction + .accounts_with_data + .iter() + .find(|account| account.name == "amm") + .map(|account| account.pubkey.clone()); + + let amm_acct_str = match amm_acct { + Some(acct) => acct, + None => "".to_string(), + }; + + if amm_acct_str == "" { + return Err(Box::new(io::Error::new( + io::ErrorKind::Other, + "amm_acct not found", + ))); + } + + let (base_mint, quote_mint) = + find_base_and_quote_mint(amm_acct_str, Arc::clone(&conn_manager)).await?; + + let relevant_accounts = + get_relevant_accounts_from_ix_and_mints(&swap_instruction, base_mint, quote_mint); + + for (token_account, mint_acct_value) in relevant_accounts { + balances::handle_token_acct_in_tx( + Arc::clone(&conn_manager), + transaction_payload.clone(), + transaction_sig.clone(), + &mint_acct_value, + token_account, + &authority_account, + ) + .await? + } + + Ok(()) +} + +fn find_swap_instruction( + transaction_payload: &Payload, +) -> Result> { + transaction_payload + .instructions + .iter() + .find(|instruction| instruction.name == "swap") + .cloned() + .ok_or_else(|| "swap instruction not found".into()) +} + +fn find_authority_account( + mint_instruction: &Instruction, +) -> Result> { + mint_instruction + .accounts_with_data + .iter() + .find(|account| account.name == "user") + .map(|account| account.pubkey.clone()) + .ok_or_else(|| "Authority account not found in swap instruction".into()) +} + +async fn find_base_and_quote_mint( + amm_acct: String, + conn_manager: Arc>>, +) -> Result<(String, String), Box> { + let amm_market: Market = conn_manager + .interact(|connection| { + markets::table + .filter(market_acct.eq(amm_acct)) + .first(connection) + }) + .await??; + + Ok((amm_market.base_mint_acct, amm_market.quote_mint_acct)) +} + +fn get_relevant_accounts_from_ix_and_mints( + mint_instruction: &crate::entities::transactions::Instruction, + base_mint: String, + quote_mint: String, +) -> Vec<(&str, String)> { + // Collect the necessary "user" accounts to insert into token_accts + + let relevant_accounts: Vec<(&str, String)> = mint_instruction + .accounts_with_data + .iter() + .filter_map(|account| match account.name.as_str() { + "userBaseAccount" => Some((account.pubkey.as_str(), base_mint.clone())), + "userQuoteAccount" => Some((account.pubkey.as_str(), quote_mint.clone())), + _ => None, + }) + .collect(); + relevant_accounts +} diff --git a/src/services/transactions.rs b/src/services/transactions.rs new file mode 100644 index 0000000..8779daa --- /dev/null +++ b/src/services/transactions.rs @@ -0,0 +1,116 @@ +use std::sync::Arc; + +use chrono::Utc; +use deadpool::managed::Object; +use deadpool_diesel::Manager; +use diesel::prelude::*; +use diesel::PgConnection; + +use crate::entities::token_acct_balances::token_acct_balances; +use crate::entities::token_acct_balances::TokenAcctBalances; +use crate::entities::token_accts::token_accts; +// use crate::entrypoints::events; + +/** + * Handles updating our DB for a tx that affects a token acct balance. + * Will update both token_accts and token_acct_balances table with the new balance amount + */ +pub async fn handle_token_acct_balance_tx( + conn_manager: Arc>>, + token_acct: String, + new_balance: i64, + transaction_sig: Option, + slot: i64, + mint_acct: String, + owner_acct: String, +) -> Result<(), Box> { + // Query the most recent value for the given token_acct to calculate the delta + let token_acct_clone_1 = token_acct.clone(); + let previous_balance: Option<_> = conn_manager + .interact(move |db| { + token_acct_balances::table + .filter(token_acct_balances::token_acct.eq(token_acct_clone_1)) + .order(token_acct_balances::slot.desc()) + .select(token_acct_balances::amount) + .first::(db) + .optional() + }) + .await??; + + let delta = match previous_balance { + Some(prev_amount) => new_balance - prev_amount, + None => new_balance, + }; + + let token_acct_clone_2 = token_acct.clone(); + let existing_balance_res = conn_manager + .interact(move |db| { + token_acct_balances::table + .filter( + token_acct_balances::slot + .eq(slot) + .and(token_acct_balances::token_acct.eq(token_acct_clone_2)), + ) + .first::(db) + }) + .await?; + + let maybe_balance = existing_balance_res.ok(); + + if let Some(balance) = maybe_balance { + if balance.tx_sig.is_none() { + let token_acct_clone_3 = token_acct.clone(); + let tx_sig_clone = transaction_sig.clone(); + conn_manager + .interact(move |db| { + diesel::update( + token_acct_balances::table.filter( + token_acct_balances::token_acct + .eq(token_acct_clone_3) + .and(token_acct_balances::slot.eq(slot)), + ), + ) + .set(token_acct_balances::tx_sig.eq(tx_sig_clone)) + .execute(db) + }) + .await??; + } + // already has the correct tx_sig, no need to update anything + } else { + let new_token_acct_balance = TokenAcctBalances { + token_acct: token_acct.clone(), + mint_acct: mint_acct, + owner_acct: owner_acct, + amount: new_balance, + delta, + slot: slot, + tx_sig: transaction_sig, + created_at: Utc::now(), + }; + + conn_manager + .interact(move |db| { + diesel::insert_into(token_acct_balances::table) + .values(&new_token_acct_balance) + .execute(db) + }) + .await??; + } + + // Update the token_accts table with the new balance in the amount column + let token_acct_clone_4 = token_acct.clone(); + conn_manager + .interact(move |db| { + diesel::update( + token_accts::table.filter(token_accts::token_acct.eq(token_acct_clone_4)), + ) + .set(( + token_accts::amount.eq(new_balance), + token_accts::dsl::updated_at.eq(Utc::now()), + )) + .execute(db) + }) + .await??; + + Ok(()) +}