1use crate::config::AptosConfig;
33use crate::error::{AptosError, AptosResult};
34use crate::retry::{RetryConfig, RetryExecutor};
35use crate::types::AccountAddress;
36use reqwest::Client;
37use serde::{Deserialize, Serialize};
38use std::sync::Arc;
39use url::Url;
40
41const MAX_INDEXER_RESPONSE_SIZE: usize = 10 * 1024 * 1024;
43
44#[derive(Debug, Clone)]
65pub struct IndexerClient {
66 indexer_url: Url,
67 client: Client,
68 retry_config: Arc<RetryConfig>,
69}
70
71#[derive(Debug, Serialize)]
73struct GraphQLRequest {
74 query: String,
75 #[serde(skip_serializing_if = "Option::is_none")]
76 variables: Option<serde_json::Value>,
77}
78
79#[derive(Debug, Deserialize)]
81struct GraphQLResponse<T> {
82 data: Option<T>,
83 errors: Option<Vec<GraphQLError>>,
84}
85
86#[derive(Debug, Deserialize)]
88struct GraphQLError {
89 message: String,
90}
91
92impl IndexerClient {
93 pub fn new(config: &AptosConfig) -> AptosResult<Self> {
106 let indexer_url = config
107 .indexer_url()
108 .cloned()
109 .ok_or_else(|| AptosError::Config("indexer URL not configured".into()))?;
110
111 let pool = config.pool_config();
112
113 let mut builder = Client::builder()
115 .timeout(config.timeout)
116 .pool_max_idle_per_host(pool.max_idle_per_host.unwrap_or(usize::MAX))
117 .pool_idle_timeout(pool.idle_timeout)
118 .tcp_nodelay(pool.tcp_nodelay);
119
120 if let Some(keepalive) = pool.tcp_keepalive {
121 builder = builder.tcp_keepalive(keepalive);
122 }
123
124 let client = builder.build().map_err(AptosError::Http)?;
125
126 let retry_config = Arc::new(config.retry_config().clone());
127
128 Ok(Self {
129 indexer_url,
130 client,
131 retry_config,
132 })
133 }
134
135 pub fn with_url(url: &str) -> AptosResult<Self> {
141 let indexer_url = Url::parse(url)?;
142 crate::config::validate_url_scheme(&indexer_url)?;
144 let client = Client::new();
145 Ok(Self {
146 indexer_url,
147 client,
148 retry_config: Arc::new(RetryConfig::default()),
149 })
150 }
151
152 pub async fn query<T: for<'de> Deserialize<'de> + Send + 'static>(
160 &self,
161 query: &str,
162 variables: Option<serde_json::Value>,
163 ) -> AptosResult<T> {
164 let request = GraphQLRequest {
165 query: query.to_string(),
166 variables,
167 };
168
169 let client = self.client.clone();
170 let url = self.indexer_url.clone();
171 let retry_config = self.retry_config.clone();
172
173 let executor = RetryExecutor::from_shared(retry_config);
174 executor
175 .execute(|| {
176 let client = client.clone();
177 let url = url.clone();
178 let request = GraphQLRequest {
179 query: request.query.clone(),
180 variables: request.variables.clone(),
181 };
182 async move {
183 let response = client.post(url.as_str()).json(&request).send().await?;
184
185 if response.status().is_success() {
186 let bytes = crate::config::read_response_bounded(
189 response,
190 MAX_INDEXER_RESPONSE_SIZE,
191 )
192 .await?;
193 let graphql_response: GraphQLResponse<T> = serde_json::from_slice(&bytes)?;
194
195 if let Some(errors) = graphql_response.errors {
196 let mut message = String::new();
198 for (i, e) in errors.iter().enumerate() {
199 if i > 0 {
200 message.push_str("; ");
201 }
202 message.push_str(&e.message);
203 }
204 return Err(AptosError::Api {
205 status_code: 400,
206 message,
207 error_code: Some("GRAPHQL_ERROR".into()),
208 vm_error_code: None,
209 });
210 }
211
212 graphql_response.data.ok_or_else(|| {
213 AptosError::Internal("no data in GraphQL response".into())
214 })
215 } else {
216 let status = response.status();
217 let body = response.text().await.unwrap_or_default();
218 Err(AptosError::api(status.as_u16(), body))
219 }
220 }
221 })
222 .await
223 }
224
225 pub async fn get_fungible_asset_balances(
231 &self,
232 address: AccountAddress,
233 ) -> AptosResult<Vec<FungibleAssetBalance>> {
234 #[derive(Deserialize)]
235 struct Response {
236 current_fungible_asset_balances: Vec<FungibleAssetBalance>,
237 }
238
239 let query = r"
240 query GetFungibleAssetBalances($address: String!) {
241 current_fungible_asset_balances(
242 where: { owner_address: { _eq: $address } }
243 ) {
244 asset_type
245 amount
246 metadata {
247 name
248 symbol
249 decimals
250 }
251 }
252 }
253 ";
254
255 let variables = serde_json::json!({
256 "address": address.to_string()
257 });
258
259 let response: Response = self.query(query, Some(variables)).await?;
260 Ok(response.current_fungible_asset_balances)
261 }
262
263 pub async fn get_account_tokens(
269 &self,
270 address: AccountAddress,
271 ) -> AptosResult<Vec<TokenBalance>> {
272 #[derive(Deserialize)]
273 struct Response {
274 current_token_ownerships_v2: Vec<TokenBalance>,
275 }
276
277 let query = r"
278 query GetAccountTokens($address: String!) {
279 current_token_ownerships_v2(
280 where: { owner_address: { _eq: $address }, amount: { _gt: 0 } }
281 ) {
282 token_data_id
283 amount
284 current_token_data {
285 token_name
286 description
287 token_uri
288 current_collection {
289 collection_name
290 }
291 }
292 }
293 }
294 ";
295
296 let variables = serde_json::json!({
297 "address": address.to_string()
298 });
299
300 let response: Response = self.query(query, Some(variables)).await?;
301 Ok(response.current_token_ownerships_v2)
302 }
303
304 pub async fn get_account_transactions(
310 &self,
311 address: AccountAddress,
312 limit: Option<u32>,
313 ) -> AptosResult<Vec<Transaction>> {
314 #[derive(Deserialize)]
315 struct Response {
316 account_transactions: Vec<Transaction>,
317 }
318
319 let query = r"
320 query GetAccountTransactions($address: String!, $limit: Int!) {
321 account_transactions(
322 where: { account_address: { _eq: $address } }
323 order_by: { transaction_version: desc }
324 limit: $limit
325 ) {
326 transaction_version
327 coin_activities {
328 activity_type
329 amount
330 coin_type
331 }
332 }
333 }
334 ";
335
336 let variables = serde_json::json!({
337 "address": address.to_string(),
338 "limit": limit.unwrap_or(25)
339 });
340
341 let response: Response = self.query(query, Some(variables)).await?;
342 Ok(response.account_transactions)
343 }
344}
345
346#[derive(Debug, Clone, Deserialize)]
348pub struct FungibleAssetBalance {
349 pub asset_type: String,
351 pub amount: String,
353 pub metadata: Option<FungibleAssetMetadata>,
355}
356
357#[derive(Debug, Clone, Deserialize)]
359pub struct FungibleAssetMetadata {
360 pub name: String,
362 pub symbol: String,
364 pub decimals: u8,
366}
367
368#[derive(Debug, Clone, Deserialize)]
370pub struct TokenBalance {
371 pub token_data_id: String,
373 pub amount: String,
375 pub current_token_data: Option<TokenData>,
377}
378
379#[derive(Debug, Clone, Deserialize)]
381pub struct TokenData {
382 pub token_name: String,
384 pub description: String,
386 pub token_uri: String,
388 pub current_collection: Option<CollectionData>,
390}
391
392#[derive(Debug, Clone, Deserialize)]
394pub struct CollectionData {
395 pub collection_name: String,
397}
398
399#[derive(Debug, Clone, Deserialize)]
401pub struct Transaction {
402 pub transaction_version: String,
404 pub coin_activities: Vec<CoinActivity>,
406}
407
408#[derive(Debug, Clone, Deserialize)]
410pub struct CoinActivity {
411 pub activity_type: String,
413 pub amount: Option<String>,
415 pub coin_type: String,
417}
418
419#[derive(Debug, Clone, Default)]
421pub struct PaginationParams {
422 pub limit: u32,
424 pub offset: u32,
426}
427
428impl PaginationParams {
429 pub fn new(limit: u32, offset: u32) -> Self {
431 Self { limit, offset }
432 }
433
434 pub fn first(limit: u32) -> Self {
436 Self { limit, offset: 0 }
437 }
438}
439
440#[derive(Debug, Clone)]
442pub struct Page<T> {
443 pub items: Vec<T>,
445 pub has_more: bool,
447 pub total_count: Option<u64>,
449}
450
451#[derive(Debug, Clone, Deserialize)]
453pub struct Event {
454 pub sequence_number: String,
456 #[serde(rename = "type")]
458 pub event_type: String,
459 pub data: serde_json::Value,
461 pub transaction_version: Option<String>,
463 pub account_address: Option<String>,
465 pub creation_number: Option<String>,
467}
468
469#[derive(Debug, Clone, Deserialize)]
471pub struct Collection {
472 pub collection_id: String,
474 pub collection_name: String,
476 pub creator_address: String,
478 pub current_supply: String,
480 pub max_supply: Option<String>,
482 pub uri: String,
484 pub description: String,
486}
487
488#[derive(Debug, Clone, Deserialize)]
490pub struct CoinBalance {
491 pub coin_type: String,
493 pub amount: String,
495}
496
497#[derive(Debug, Clone, Deserialize)]
499pub struct ProcessorStatus {
500 pub processor: String,
502 pub last_success_version: u64,
504 pub last_updated: Option<String>,
506}
507
508impl IndexerClient {
509 pub async fn get_account_tokens_paginated(
517 &self,
518 address: AccountAddress,
519 pagination: Option<PaginationParams>,
520 ) -> AptosResult<Page<TokenBalance>> {
521 #[derive(Deserialize)]
522 struct AggregateCount {
523 count: u64,
524 }
525
526 #[derive(Deserialize)]
527 struct Aggregate {
528 aggregate: Option<AggregateCount>,
529 }
530
531 #[derive(Deserialize)]
532 struct Response {
533 current_token_ownerships_v2: Vec<TokenBalance>,
534 current_token_ownerships_v2_aggregate: Aggregate,
535 }
536
537 let pagination = pagination.unwrap_or(PaginationParams {
538 limit: 25,
539 offset: 0,
540 });
541
542 let query = r"
543 query GetAccountTokens($address: String!, $limit: Int!, $offset: Int!) {
544 current_token_ownerships_v2(
545 where: { owner_address: { _eq: $address }, amount: { _gt: 0 } }
546 limit: $limit
547 offset: $offset
548 ) {
549 token_data_id
550 amount
551 current_token_data {
552 token_name
553 description
554 token_uri
555 current_collection {
556 collection_name
557 }
558 }
559 }
560 current_token_ownerships_v2_aggregate(
561 where: { owner_address: { _eq: $address }, amount: { _gt: 0 } }
562 ) {
563 aggregate {
564 count
565 }
566 }
567 }
568 ";
569
570 let variables = serde_json::json!({
571 "address": address.to_string(),
572 "limit": pagination.limit,
573 "offset": pagination.offset
574 });
575
576 let response: Response = self.query(query, Some(variables)).await?;
577 let total_count = response
578 .current_token_ownerships_v2_aggregate
579 .aggregate
580 .map(|a| a.count);
581 let has_more = total_count.is_some_and(|total| {
582 (u64::from(pagination.offset) + response.current_token_ownerships_v2.len() as u64)
583 < total
584 });
585
586 Ok(Page {
587 items: response.current_token_ownerships_v2,
588 has_more,
589 total_count,
590 })
591 }
592
593 pub async fn get_account_transactions_paginated(
599 &self,
600 address: AccountAddress,
601 pagination: Option<PaginationParams>,
602 ) -> AptosResult<Page<Transaction>> {
603 #[derive(Deserialize)]
604 struct AggregateCount {
605 count: u64,
606 }
607
608 #[derive(Deserialize)]
609 struct Aggregate {
610 aggregate: Option<AggregateCount>,
611 }
612
613 #[derive(Deserialize)]
614 struct Response {
615 account_transactions: Vec<Transaction>,
616 account_transactions_aggregate: Aggregate,
617 }
618
619 let pagination = pagination.unwrap_or(PaginationParams {
620 limit: 25,
621 offset: 0,
622 });
623
624 let query = r"
625 query GetAccountTransactions($address: String!, $limit: Int!, $offset: Int!) {
626 account_transactions(
627 where: { account_address: { _eq: $address } }
628 order_by: { transaction_version: desc }
629 limit: $limit
630 offset: $offset
631 ) {
632 transaction_version
633 coin_activities {
634 activity_type
635 amount
636 coin_type
637 }
638 }
639 account_transactions_aggregate(
640 where: { account_address: { _eq: $address } }
641 ) {
642 aggregate {
643 count
644 }
645 }
646 }
647 ";
648
649 let variables = serde_json::json!({
650 "address": address.to_string(),
651 "limit": pagination.limit,
652 "offset": pagination.offset
653 });
654
655 let response: Response = self.query(query, Some(variables)).await?;
656 let total_count = response
657 .account_transactions_aggregate
658 .aggregate
659 .map(|a| a.count);
660 let has_more = total_count.is_some_and(|total| {
661 (u64::from(pagination.offset) + response.account_transactions.len() as u64) < total
662 });
663
664 Ok(Page {
665 items: response.account_transactions,
666 has_more,
667 total_count,
668 })
669 }
670
671 pub async fn get_events_by_type(
677 &self,
678 event_type: &str,
679 limit: Option<u32>,
680 ) -> AptosResult<Vec<Event>> {
681 #[derive(Deserialize)]
682 struct Response {
683 events: Vec<Event>,
684 }
685
686 let query = r"
687 query GetEventsByType($type: String!, $limit: Int!) {
688 events(
689 where: { type: { _eq: $type } }
690 order_by: { transaction_version: desc }
691 limit: $limit
692 ) {
693 sequence_number
694 type
695 data
696 transaction_version
697 account_address
698 creation_number
699 }
700 }
701 ";
702
703 let variables = serde_json::json!({
704 "type": event_type,
705 "limit": limit.unwrap_or(25)
706 });
707
708 let response: Response = self.query(query, Some(variables)).await?;
709 Ok(response.events)
710 }
711
712 pub async fn get_events_by_account(
718 &self,
719 address: AccountAddress,
720 limit: Option<u32>,
721 ) -> AptosResult<Vec<Event>> {
722 #[derive(Deserialize)]
723 struct Response {
724 events: Vec<Event>,
725 }
726
727 let query = r"
728 query GetEventsByAccount($address: String!, $limit: Int!) {
729 events(
730 where: { account_address: { _eq: $address } }
731 order_by: { transaction_version: desc }
732 limit: $limit
733 ) {
734 sequence_number
735 type
736 data
737 transaction_version
738 account_address
739 creation_number
740 }
741 }
742 ";
743
744 let variables = serde_json::json!({
745 "address": address.to_string(),
746 "limit": limit.unwrap_or(25)
747 });
748
749 let response: Response = self.query(query, Some(variables)).await?;
750 Ok(response.events)
751 }
752
753 pub async fn get_collection(
760 &self,
761 collection_address: AccountAddress,
762 ) -> AptosResult<Collection> {
763 #[derive(Deserialize)]
764 struct Response {
765 current_collections_v2: Vec<Collection>,
766 }
767
768 let query = r"
769 query GetCollection($address: String!) {
770 current_collections_v2(
771 where: { collection_id: { _eq: $address } }
772 limit: 1
773 ) {
774 collection_id
775 collection_name
776 creator_address
777 current_supply
778 max_supply
779 uri
780 description
781 }
782 }
783 ";
784
785 let variables = serde_json::json!({
786 "address": collection_address.to_string()
787 });
788
789 let response: Response = self.query(query, Some(variables)).await?;
790 response
791 .current_collections_v2
792 .into_iter()
793 .next()
794 .ok_or_else(|| {
795 AptosError::NotFound(format!("Collection not found: {collection_address}"))
796 })
797 }
798
799 pub async fn get_collection_tokens(
805 &self,
806 collection_address: AccountAddress,
807 pagination: Option<PaginationParams>,
808 ) -> AptosResult<Page<TokenBalance>> {
809 #[derive(Deserialize)]
810 struct Response {
811 current_token_ownerships_v2: Vec<TokenBalance>,
812 }
813
814 let pagination = pagination.unwrap_or(PaginationParams {
815 limit: 25,
816 offset: 0,
817 });
818
819 let query = r"
820 query GetCollectionTokens($address: String!, $limit: Int!, $offset: Int!) {
821 current_token_ownerships_v2(
822 where: {
823 current_token_data: {
824 current_collection: {
825 collection_id: { _eq: $address }
826 }
827 }
828 amount: { _gt: 0 }
829 }
830 limit: $limit
831 offset: $offset
832 ) {
833 token_data_id
834 amount
835 current_token_data {
836 token_name
837 description
838 token_uri
839 current_collection {
840 collection_name
841 }
842 }
843 }
844 }
845 ";
846
847 let variables = serde_json::json!({
848 "address": collection_address.to_string(),
849 "limit": pagination.limit,
850 "offset": pagination.offset
851 });
852
853 let response: Response = self.query(query, Some(variables)).await?;
854 let items_count = response.current_token_ownerships_v2.len();
855
856 Ok(Page {
857 items: response.current_token_ownerships_v2,
858 has_more: items_count == pagination.limit as usize,
859 total_count: None,
860 })
861 }
862
863 pub async fn get_coin_balances(
869 &self,
870 address: AccountAddress,
871 ) -> AptosResult<Vec<CoinBalance>> {
872 #[derive(Deserialize)]
873 struct Response {
874 current_coin_balances: Vec<CoinBalance>,
875 }
876
877 let query = r"
878 query GetCoinBalances($address: String!) {
879 current_coin_balances(
880 where: { owner_address: { _eq: $address } }
881 ) {
882 coin_type
883 amount
884 }
885 }
886 ";
887
888 let variables = serde_json::json!({
889 "address": address.to_string()
890 });
891
892 let response: Response = self.query(query, Some(variables)).await?;
893 Ok(response.current_coin_balances)
894 }
895
896 pub async fn get_coin_activities(
902 &self,
903 address: AccountAddress,
904 limit: Option<u32>,
905 ) -> AptosResult<Vec<CoinActivity>> {
906 #[derive(Deserialize)]
907 struct Response {
908 coin_activities: Vec<CoinActivity>,
909 }
910
911 let query = r"
912 query GetCoinActivities($address: String!, $limit: Int!) {
913 coin_activities(
914 where: { owner_address: { _eq: $address } }
915 order_by: { transaction_version: desc }
916 limit: $limit
917 ) {
918 activity_type
919 amount
920 coin_type
921 }
922 }
923 ";
924
925 let variables = serde_json::json!({
926 "address": address.to_string(),
927 "limit": limit.unwrap_or(25)
928 });
929
930 let response: Response = self.query(query, Some(variables)).await?;
931 Ok(response.coin_activities)
932 }
933
934 pub async fn get_processor_status(&self) -> AptosResult<Vec<ProcessorStatus>> {
940 #[derive(Deserialize)]
941 struct Response {
942 processor_status: Vec<ProcessorStatus>,
943 }
944
945 let query = r"
946 query GetProcessorStatus {
947 processor_status {
948 processor
949 last_success_version
950 last_updated
951 }
952 }
953 ";
954
955 let response: Response = self.query(query, None).await?;
956 Ok(response.processor_status)
957 }
958
959 pub async fn get_indexer_version(&self) -> AptosResult<u64> {
966 let statuses = self.get_processor_status().await?;
967 statuses
968 .into_iter()
969 .map(|s| s.last_success_version)
970 .max()
971 .ok_or_else(|| AptosError::Internal("No processor status available".into()))
972 }
973
974 pub async fn check_indexer_lag(
980 &self,
981 reference_version: u64,
982 max_lag: u64,
983 ) -> AptosResult<bool> {
984 let indexer_version = self.get_indexer_version().await?;
985 Ok(reference_version.saturating_sub(indexer_version) <= max_lag)
986 }
987}
988
989#[cfg(test)]
990mod tests {
991 use super::*;
992
993 #[test]
994 fn test_indexer_client_creation() {
995 let client = IndexerClient::new(&AptosConfig::testnet());
996 assert!(client.is_ok());
997 }
998
999 #[test]
1000 fn test_pagination_params() {
1001 let params = PaginationParams::new(10, 20);
1002 assert_eq!(params.limit, 10);
1003 assert_eq!(params.offset, 20);
1004
1005 let first_page = PaginationParams::first(50);
1006 assert_eq!(first_page.limit, 50);
1007 assert_eq!(first_page.offset, 0);
1008 }
1009
1010 #[test]
1011 fn test_page_has_more() {
1012 let page: Page<u32> = Page {
1013 items: vec![1, 2, 3],
1014 has_more: true,
1015 total_count: Some(100),
1016 };
1017 assert!(page.has_more);
1018 assert_eq!(page.items.len(), 3);
1019 assert_eq!(page.total_count, Some(100));
1020 }
1021
1022 #[test]
1023 fn test_custom_url() {
1024 let client = IndexerClient::with_url("https://custom-indexer.example.com/v1/graphql");
1025 assert!(client.is_ok());
1026 }
1027}