Skip to content

Commit a2b1f5e

Browse files
committed
Add from_slot example & note
1 parent c6fb70d commit a2b1f5e

File tree

2 files changed

+162
-0
lines changed

2 files changed

+162
-0
lines changed

README.md

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,24 @@ def create_subscription_request():
211211
- `failed = False`: exclude failed transactions to reduce noise.
212212
- `PROCESSED` commitment: get updates as soon as transactions are processed (fastest possible).
213213

214+
#### Using `from_slot` for historical data
215+
216+
The `from_slot` parameter allows you to replay blockchain data from a specific historical slot:
217+
218+
```python
219+
request = geyser_pb2.SubscribeRequest(
220+
slots={"filter": geyser_pb2.SubscribeRequestFilterSlots()},
221+
commitment=geyser_pb2.CommitmentLevel.PROCESSED,
222+
from_slot=362520000 # Start from this historical slot
223+
)
224+
```
225+
226+
What is `from_slot`?
227+
- Starts streaming from a specific slot instead of the current slot
228+
- Useful for replaying missed events and backfilling after short downtime
229+
- Limited by the data retention (usually a few minutes)
230+
- If the requested slot is too old, you'll get an error with the oldest available slot
231+
214232
### Step 7: Instruction data decoding
215233

216234
This is where the magic happens - extracting meaningful data from raw blockchain bytes:
@@ -331,6 +349,7 @@ Here are short summaries of each learning example file, from basic to advanced,
331349
### Other subscriptions
332350

333351
- **`slots_subscription.py`**: this example shows how to subscribe to slot updates, giving you a real-time feed of when new slots are processed by the validator.
352+
- **`historical_replay_with_from_slot.py`**: demonstrates using the `from_slot` parameter to replay historical blockchain data from a specific slot instead of starting from the current slot.
334353
- **`blocks_subscription.py`**: this script demonstrates how to subscribe to entire blocks that contain transactions interacting with a specific account.
335354
- **`blocks_meta_subscription.py`**: this example shows how to subscribe to just the metadata of blocks, which is a lightweight way to track block production.
336355
- **`entries_subscription.py`**: this script demonstrates how to subscribe to ledger entries, which provides a low-level stream of the changes being written to the Solana ledger.
Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
import asyncio
2+
import os
3+
import sys
4+
import grpc
5+
6+
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
7+
8+
from dotenv import load_dotenv
9+
from generated import geyser_pb2, geyser_pb2_grpc
10+
11+
load_dotenv()
12+
13+
GEYSER_ENDPOINT = os.getenv("GEYSER_ENDPOINT")
14+
GEYSER_API_TOKEN = os.getenv("GEYSER_API_TOKEN")
15+
16+
17+
async def main():
18+
"""
19+
Demonstrates using from_slot parameter for historical data replay.
20+
21+
The from_slot parameter allows you to start streaming from a specific slot
22+
instead of the current slot. This is useful for:
23+
- Replaying historical data
24+
- Backfilling missed events
25+
- Testing with past blockchain data
26+
- Analyzing historical patterns
27+
28+
Note: Typically only recent slots (within a few minutes/hours) are available.
29+
"""
30+
async with grpc.aio.secure_channel(
31+
GEYSER_ENDPOINT,
32+
grpc.composite_channel_credentials(
33+
grpc.ssl_channel_credentials(),
34+
grpc.metadata_call_credentials(
35+
lambda context, callback: callback(
36+
(("x-token", GEYSER_API_TOKEN),), None
37+
)
38+
),
39+
),
40+
) as channel:
41+
stub = geyser_pb2_grpc.GeyserStub(channel)
42+
43+
# First, try to get a current slot by subscribing briefly
44+
print("📍 Getting current network slot...")
45+
current_slot = None
46+
47+
try:
48+
# Try Ping first
49+
ping_request = geyser_pb2.PingRequest()
50+
ping_response = await stub.Ping(ping_request)
51+
current_slot = ping_response.slot
52+
print(f" Current slot from Ping: {current_slot}")
53+
except:
54+
# If Ping doesn't work, get it from a brief subscription
55+
temp_request = geyser_pb2.SubscribeRequest(
56+
slots={"temp": geyser_pb2.SubscribeRequestFilterSlots()},
57+
commitment=geyser_pb2.CommitmentLevel.PROCESSED,
58+
)
59+
async for response in stub.Subscribe(iter([temp_request])):
60+
if response.slot:
61+
current_slot = response.slot.slot
62+
print(f" Current slot from stream: {current_slot}")
63+
break
64+
65+
if not current_slot:
66+
print(" ⚠️ Could not get current slot")
67+
return
68+
69+
# Calculate a starting point - use a safe value within retention window
70+
# Each slot is ~400ms, so 100 slots = ~40 seconds
71+
slots_back = 100 # Conservative value that should always work
72+
from_slot = current_slot - slots_back
73+
74+
print(f"⏰ Replaying from slot {from_slot} ({slots_back} slots back)")
75+
print(f" This represents approximately {slots_back * 0.4 / 60:.1f} minutes of history")
76+
print("---")
77+
78+
# Create subscription with from_slot parameter
79+
request = geyser_pb2.SubscribeRequest(
80+
slots={
81+
"historical": geyser_pb2.SubscribeRequestFilterSlots(
82+
filter_by_commitment=True,
83+
)
84+
},
85+
commitment=geyser_pb2.CommitmentLevel.PROCESSED,
86+
from_slot=from_slot # Start from historical slot
87+
)
88+
89+
print("🚀 Starting historical replay...")
90+
print("📡 Streaming slots from the past to present...")
91+
print("---")
92+
93+
slot_count = 0
94+
first_slot = None
95+
96+
try:
97+
async for response in stub.Subscribe(iter([request])):
98+
if response.slot:
99+
slot_count += 1
100+
101+
if first_slot is None:
102+
first_slot = response.slot.slot
103+
print(f"✅ First historical slot received: {first_slot}")
104+
105+
# Show progress every 100 slots
106+
if slot_count % 100 == 0:
107+
current = response.slot.slot
108+
progress = ((current - first_slot) / (current_slot - first_slot)) * 100
109+
print(f"📊 Progress: {slot_count} slots processed")
110+
print(f" Current slot: {current}")
111+
print(f" Catching up: {progress:.1f}% complete")
112+
print("---")
113+
114+
# Stop after catching up to near-current
115+
if response.slot.slot >= current_slot - 10:
116+
print(f"🎉 Caught up to current slot!")
117+
print(f" Processed {slot_count} historical slots")
118+
print(f" From: {first_slot}")
119+
print(f" To: {response.slot.slot}")
120+
break
121+
122+
except grpc.RpcError as e:
123+
if "not available" in str(e.details()):
124+
# Extract the oldest available slot from error message
125+
import re
126+
match = re.search(r"last available: (\d+)", str(e.details()))
127+
if match:
128+
oldest = int(match.group(1))
129+
print(f"❌ Requested slot {from_slot} is too old")
130+
print(f" Oldest available slot: {oldest}")
131+
print(f" That's {(current_slot - oldest) * 0.4 / 3600:.1f} hours of history")
132+
print("\n💡 Tip: Try using a more recent from_slot value")
133+
else:
134+
print(f"❌ Error: {e.code()} - {e.details()}")
135+
136+
137+
if __name__ == "__main__":
138+
try:
139+
asyncio.run(main())
140+
except KeyboardInterrupt:
141+
print("\n🛑 Historical replay stopped")
142+
except Exception as e:
143+
print(f"❌ Error: {e}")

0 commit comments

Comments
 (0)