Skip to main content

Memory-Efficient Streaming

For large systems, stream paths without collecting all in memory.

Basic Streaming

Use stream() instead of analyze():

for path in spa.stream(sector=42, depth=12, threshold=1e-9):
if path.contribution > 0.001:
process(path)

Key Differences from analyze()

Aspectanalyze()stream()
ReturnsPathCollectionIterator of Path
MemoryLoads all pathsOne path at a time
OrderSorted by contributionDiscovery order
FeaturesFiltering, aggregationManual processing

Use Cases

Large Systems

When the number of paths exceeds available memory:

# System with 10,000+ sectors
spa = SPA(large_A_matrix, intensities)

# Stream instead of loading all
significant_paths = []
for path in spa.stream(sector=1, depth=15, threshold=1e-12):
if path.contribution > 0.0001:
significant_paths.append(path)

print(f"Found {len(significant_paths)} significant paths")

Early Termination

Stop processing when you have enough:

top_paths = []
for path in spa.stream(sector=42, depth=10):
if path.contribution > 0.01:
top_paths.append(path)
if len(top_paths) >= 100:
break # Stop early

print(f"Collected {len(top_paths)} paths with >1% contribution")

Real-Time Processing

Process paths as they're discovered:

import csv

with open("paths.csv", "w", newline="") as f:
writer = csv.writer(f)
writer.writerow(["contribution", "path", "depth"])

for path in spa.stream(sector=42, depth=8):
writer.writerow([
f"{path.contribution:.6f}",
" → ".join(path.sectors),
path.depth
])

Custom Aggregation

Build custom aggregations without storing all paths:

from collections import defaultdict

# Count paths by depth
depth_counts = defaultdict(int)
depth_contributions = defaultdict(float)

for path in spa.stream(sector=42, depth=10):
depth_counts[path.depth] += 1
depth_contributions[path.depth] += path.contribution

for depth in sorted(depth_counts.keys()):
print(f"Depth {depth}: {depth_counts[depth]} paths, "
f"{depth_contributions[depth]:.2%} contribution")

Parameters

stream() accepts most of the same parameters as analyze():

spa.stream(
sector=42, # Target sector (required)
depth=8, # Maximum depth (default: 8)
threshold=0.001, # Minimum threshold (default: 0.001)
threshold_type="percentage", # or "absolute"
satellite="ghg", # Which satellite to analyze
)

Combining with PathCollection

If you need PathCollection features after streaming:

from fastspa import PathCollection

# Stream and collect significant paths
paths_list = [
path for path in spa.stream(sector=42, depth=10)
if path.contribution > 0.001
]

# Create PathCollection manually
paths = PathCollection(
paths=paths_list,
target_sector=42,
total_intensity=spa.total_intensity()[42],
satellite_name="intensity",
)

# Now use PathCollection features
df = paths.to_dataframe()