Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,24 @@ def create_subscription_request():
- `failed = False`: exclude failed transactions to reduce noise.
- `PROCESSED` commitment: get updates as soon as transactions are processed (fastest possible).

#### Using `from_slot` for historical data

The `from_slot` parameter allows you to replay blockchain data from a specific historical slot:

```python
request = geyser_pb2.SubscribeRequest(
slots={"filter": geyser_pb2.SubscribeRequestFilterSlots()},
commitment=geyser_pb2.CommitmentLevel.PROCESSED,
from_slot=362520000 # Start from this historical slot
)
```

What is `from_slot`?
- Starts streaming from a specific slot instead of the current slot
- Useful for replaying missed events and backfilling after short downtime
- Limited by the data retention (usually a few minutes)
- If the requested slot is too old, you'll get an error with the oldest available slot

### Step 7: Instruction data decoding

This is where the magic happens - extracting meaningful data from raw blockchain bytes:
Expand Down Expand Up @@ -331,6 +349,7 @@ Here are short summaries of each learning example file, from basic to advanced,
### Other subscriptions

- **`slots_subscription.py`**: this example shows how to subscribe to slot updates, giving you a real-time feed of when new slots are processed by the validator.
- **`historical_replay_with_from_slot.py`**: demonstrates using the `from_slot` parameter to replay historical blockchain data from a specific slot instead of starting from the current slot.
- **`blocks_subscription.py`**: this script demonstrates how to subscribe to entire blocks that contain transactions interacting with a specific account.
- **`blocks_meta_subscription.py`**: this example shows how to subscribe to just the metadata of blocks, which is a lightweight way to track block production.
- **`entries_subscription.py`**: this script demonstrates how to subscribe to ledger entries, which provides a low-level stream of the changes being written to the Solana ledger.
Expand Down
143 changes: 143 additions & 0 deletions learning-examples/historical_replay_with_from_slot.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
import asyncio
import os
import sys
import grpc

sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))

from dotenv import load_dotenv
from generated import geyser_pb2, geyser_pb2_grpc

load_dotenv()

GEYSER_ENDPOINT = os.getenv("GEYSER_ENDPOINT")
GEYSER_API_TOKEN = os.getenv("GEYSER_API_TOKEN")


async def main():
"""
Demonstrates using from_slot parameter for historical data replay.

The from_slot parameter allows you to start streaming from a specific slot
instead of the current slot. This is useful for:
- Replaying historical data
- Backfilling missed events
- Testing with past blockchain data
- Analyzing historical patterns

Note: Typically only recent slots (within a few minutes/hours) are available.
"""
async with grpc.aio.secure_channel(
GEYSER_ENDPOINT,
grpc.composite_channel_credentials(
grpc.ssl_channel_credentials(),
grpc.metadata_call_credentials(
lambda context, callback: callback(
(("x-token", GEYSER_API_TOKEN),), None
)
),
),
) as channel:
stub = geyser_pb2_grpc.GeyserStub(channel)

# First, try to get a current slot by subscribing briefly
print("📍 Getting current network slot...")
current_slot = None

try:
# Try Ping first
ping_request = geyser_pb2.PingRequest()
ping_response = await stub.Ping(ping_request)
current_slot = ping_response.slot
print(f" Current slot from Ping: {current_slot}")
except:
# If Ping doesn't work, get it from a brief subscription
temp_request = geyser_pb2.SubscribeRequest(
slots={"temp": geyser_pb2.SubscribeRequestFilterSlots()},
commitment=geyser_pb2.CommitmentLevel.PROCESSED,
)
async for response in stub.Subscribe(iter([temp_request])):
if response.slot:
current_slot = response.slot.slot
print(f" Current slot from stream: {current_slot}")
break

if not current_slot:
print(" ⚠️ Could not get current slot")
return

# Calculate a starting point - use a safe value within retention window
# Each slot is ~400ms, so 100 slots = ~40 seconds
slots_back = 100 # Conservative value that should always work
from_slot = current_slot - slots_back

print(f"⏰ Replaying from slot {from_slot} ({slots_back} slots back)")
print(f" This represents approximately {slots_back * 0.4 / 60:.1f} minutes of history")
print("---")

# Create subscription with from_slot parameter
request = geyser_pb2.SubscribeRequest(
slots={
"historical": geyser_pb2.SubscribeRequestFilterSlots(
filter_by_commitment=True,
)
},
commitment=geyser_pb2.CommitmentLevel.PROCESSED,
from_slot=from_slot # Start from historical slot
)

print("🚀 Starting historical replay...")
print("📡 Streaming slots from the past to present...")
print("---")

slot_count = 0
first_slot = None

try:
async for response in stub.Subscribe(iter([request])):
if response.slot:
slot_count += 1

if first_slot is None:
first_slot = response.slot.slot
print(f"✅ First historical slot received: {first_slot}")

# Show progress every 100 slots
if slot_count % 100 == 0:
current = response.slot.slot
progress = ((current - first_slot) / (current_slot - first_slot)) * 100
print(f"📊 Progress: {slot_count} slots processed")
print(f" Current slot: {current}")
print(f" Catching up: {progress:.1f}% complete")
print("---")

# Stop after catching up to near-current
if response.slot.slot >= current_slot - 10:
print(f"🎉 Caught up to current slot!")
print(f" Processed {slot_count} historical slots")
print(f" From: {first_slot}")
print(f" To: {response.slot.slot}")
break

except grpc.RpcError as e:
if "not available" in str(e.details()):
# Extract the oldest available slot from error message
import re
match = re.search(r"last available: (\d+)", str(e.details()))
if match:
oldest = int(match.group(1))
print(f"❌ Requested slot {from_slot} is too old")
print(f" Oldest available slot: {oldest}")
print(f" That's {(current_slot - oldest) * 0.4 / 3600:.1f} hours of history")
print("\n💡 Tip: Try using a more recent from_slot value")
else:
print(f"❌ Error: {e.code()} - {e.details()}")


if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\n🛑 Historical replay stopped")
except Exception as e:
print(f"❌ Error: {e}")