Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.octav.fi/llms.txt

Use this file to discover all available pages before exploring further.

AI agents are autonomous programs that independently monitor portfolios, track transactions, and provide insights without human intervention. This guide shows you how to build production-ready agents using the Octav API.
What is an AI Agent? - An autonomous program that makes decisions and takes actions based on blockchain data without constant human oversight

Agent Types You Can Build

Portfolio Monitor

Track portfolio changes and send alerts
  • Balance threshold notifications
  • Significant movement alerts
  • Daily/weekly summaries
  • Performance tracking

Transaction Watcher

Monitor blockchain activity
  • New transaction alerts
  • Large transfer detection
  • DeFi interaction tracking
  • Gas fee optimization

Yield Optimizer

Analyze DeFi positions
  • Track yield rates
  • Compare protocols
  • Alert on better opportunities
  • Risk assessment

Tax Reporter

Automated tax tracking
  • Transaction categorization
  • Income/expense calculation
  • Quarterly reports
  • Export generation

Basic Portfolio Monitor Agent

Here’s a complete, production-ready portfolio monitoring agent:
import requests
import time
from datetime import datetime
from typing import Dict, List, Optional
import os

class PortfolioMonitorAgent:
    """Autonomous agent that monitors crypto portfolios using Octav API"""

    def __init__(self, api_key: str, check_interval: int = 300):
        self.api_key = api_key
        self.base_url = 'https://api.octav.fi'
        self.check_interval = check_interval  # seconds
        self.headers = {'Authorization': f'Bearer {api_key}'}
        self.last_values: Dict[str, float] = {}

    def get_portfolio(self, address: str) -> Optional[Dict]:
        """Fetch current portfolio data"""
        try:
            response = requests.get(
                f'{self.base_url}/v1/portfolio?addresses={address}',
                headers=self.headers,
                timeout=30
            )
            response.raise_for_status()
            return response.json()[0]
        except requests.exceptions.RequestException as e:
            print(f"Error fetching portfolio: {e}")
            return None

    def check_balance_threshold(self, address: str, min_balance: float = 1000):
        """Alert if portfolio falls below threshold"""
        portfolio = self.get_portfolio(address)
        if not portfolio:
            return

        current_value = float(portfolio['networth'])

        if current_value < min_balance:
            self.send_alert(
                f"⚠️ Low Balance Alert\n"
                f"Address: {address[:10]}...\n"
                f"Current: ${current_value:,.2f}\n"
                f"Threshold: ${min_balance:,.2f}"
            )

    def check_significant_change(self, address: str, threshold_pct: float = 5.0):
        """Alert on significant portfolio value changes"""
        portfolio = self.get_portfolio(address)
        if not portfolio:
            return

        current_value = float(portfolio['networth'])
        last_value = self.last_values.get(address)

        if last_value:
            change = current_value - last_value
            change_pct = (change / last_value) * 100

            if abs(change_pct) >= threshold_pct:
                direction = "📈" if change > 0 else "📉"
                self.send_alert(
                    f"{direction} Significant Change Detected\n"
                    f"Address: {address[:10]}...\n"
                    f"Change: {change_pct:+.2f}%\n"
                    f"From: ${last_value:,.2f}\n"
                    f"To: ${current_value:,.2f}"
                )

        self.last_values[address] = current_value

    def generate_daily_summary(self, address: str):
        """Generate and send daily portfolio summary"""
        portfolio = self.get_portfolio(address)
        if not portfolio:
            return

        # Calculate chain distribution
        chains_summary = []
        for chain_key, chain in portfolio['chains'].items():
            chains_summary.append(
                f"  {chain['name']}: ${float(chain['value']):,.2f}"
            )

        # Calculate protocol distribution
        protocols_summary = []
        for protocol_key, protocol in portfolio['assetByProtocols'].items():
            protocols_summary.append(
                f"  {protocol['name']}: ${float(protocol['value']):,.2f}"
            )

        summary = f"""
📊 Daily Portfolio Summary
Address: {address[:10]}...
Total Value: ${float(portfolio['networth']):,.2f}

Chains:
{chr(10).join(chains_summary)}

Protocols:
{chr(10).join(protocols_summary)}

Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
        """

        self.send_alert(summary.strip())

    def monitor(self, addresses: List[str], config: Dict):
        """Main monitoring loop"""
        print(f"Starting portfolio monitor for {len(addresses)} addresses")
        print(f"Check interval: {self.check_interval}s")

        while True:
            try:
                for address in addresses:
                    # Run configured checks
                    if config.get('check_threshold'):
                        self.check_balance_threshold(
                            address,
                            config.get('min_balance', 1000)
                        )

                    if config.get('check_changes'):
                        self.check_significant_change(
                            address,
                            config.get('change_threshold', 5.0)
                        )

                    # Daily summary at midnight
                    if config.get('daily_summary'):
                        now = datetime.now()
                        if now.hour == 0 and now.minute < 5:
                            self.generate_daily_summary(address)

                    # Respect rate limits
                    time.sleep(2)

                time.sleep(self.check_interval)

            except KeyboardInterrupt:
                print("\nMonitoring stopped by user")
                break
            except Exception as e:
                print(f"Error in monitoring loop: {e}")
                time.sleep(60)  # Wait before retrying

    def send_alert(self, message: str):
        """Send alert notification (implement your preferred method)"""
        print(f"\n{'='*50}")
        print(message)
        print(f"{'='*50}\n")

        # TODO: Implement your notification method:
        # - Email via SendGrid/Mailgun
        # - SMS via Twilio
        # - Slack/Discord webhook
        # - Telegram bot
        # - Push notification service

# Usage Example
if __name__ == "__main__":
    agent = PortfolioMonitorAgent(
        api_key=os.getenv('OCTAV_API_KEY'),
        check_interval=300  # Check every 5 minutes
    )

    # Configure monitoring
    config = {
        'check_threshold': True,
        'min_balance': 5000,
        'check_changes': True,
        'change_threshold': 3.0,  # Alert on 3%+ changes
        'daily_summary': True
    }

    # Start monitoring
    addresses = [
        '0x123...',  # Your addresses
        '0x456...',
    ]

    agent.monitor(addresses, config)

Transaction Monitoring Agent

Monitor and categorize new transactions:
class TransactionMonitorAgent:
    """Monitor new transactions and categorize them"""

    def __init__(self, api_key: str):
        self.api_key = api_key
        self.base_url = 'https://api.octav.fi'
        self.headers = {'Authorization': f'Bearer {api_key}'}
        self.seen_hashes = set()

    def get_transactions(self, address: str, limit: int = 50) -> List[Dict]:
        """Fetch recent transactions"""
        try:
            response = requests.get(
                f'{self.base_url}/v1/transactions?addresses={address}&limit={limit}&sort=DESC',
                headers=self.headers
            )
            response.raise_for_status()
            return response.json()
        except Exception as e:
            print(f"Error fetching transactions: {e}")
            return []

    def check_new_transactions(self, address: str):
        """Check for and process new transactions"""
        transactions = self.get_transactions(address)

        new_txs = [
            tx for tx in transactions
            if tx['hash'] not in self.seen_hashes
        ]

        if new_txs:
            for tx in new_txs:
                self.process_transaction(tx, address)
                self.seen_hashes.add(tx['hash'])

    def process_transaction(self, tx: Dict, address: str):
        """Process and categorize a transaction"""
        tx_type = tx['txType']
        value = sum(float(asset.get('value', 0)) for asset in tx.get('assets', []))

        # Alert on large transactions
        if value > 1000:
            self.send_alert(
                f"💰 Large Transaction Detected\n"
                f"Type: {tx_type}\n"
                f"Value: ${value:,.2f}\n"
                f"Hash: {tx['hash'][:16]}...\n"
                f"Chain: {tx['chainKey']}"
            )

        # Alert on specific transaction types
        if tx_type in ['TRANSFEROUT', 'SWAP']:
            self.send_alert(
                f"📤 Outgoing Transaction\n"
                f"Type: {tx_type}\n"
                f"Value: ${value:,.2f}"
            )

        elif tx_type in ['CLAIM', 'AIRDROP']:
            self.send_alert(
                f"🎁 Reward Claimed\n"
                f"Type: {tx_type}\n"
                f"Value: ${value:,.2f}"
            )

Adding Notification Channels

Implement real notifications using popular services:
import os
from sendgrid import SendGridAPIClient
from sendgrid.helpers.mail import Mail

def send_email_alert(self, message: str):
    """Send email notification via SendGrid"""
    email_message = Mail(
        from_email='alerts@yourdomain.com',
        to_emails='your@email.com',
        subject='Portfolio Alert',
        plain_text_content=message
    )

    try:
        sg = SendGridAPIClient(os.getenv('SENDGRID_API_KEY'))
        response = sg.send(email_message)
        print(f"Email sent: {response.status_code}")
    except Exception as e:
        print(f"Error sending email: {e}")

Deployment Options

AWS Lambda, Google Cloud Functions, Vercel FunctionsPerfect for scheduled monitoring:
# handler.py - AWS Lambda example
import json
from portfolio_monitor import PortfolioMonitorAgent

def lambda_handler(event, context):
    agent = PortfolioMonitorAgent(os.getenv('OCTAV_API_KEY'))

    addresses = event.get('addresses', [])

    for address in addresses:
        agent.check_balance_threshold(address, 1000)
        agent.check_significant_change(address, 5.0)

    return {
        'statusCode': 200,
        'body': json.dumps('Monitoring complete')
    }
Schedule with CloudWatch Events (every 5 minutes)
Run agent as a container
FROM python:3.11-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install -r requirements.txt

COPY agent.py .

CMD ["python", "agent.py"]
Deploy to:
  • AWS ECS/Fargate
  • Google Cloud Run
  • Digital Ocean App Platform
  • Fly.io
Traditional server deploymentUse systemd service:
# /etc/systemd/system/portfolio-monitor.service
[Unit]
Description=Portfolio Monitor Agent
After=network.target

[Service]
Type=simple
User=ubuntu
WorkingDirectory=/home/ubuntu/portfolio-monitor
ExecStart=/usr/bin/python3 agent.py
Restart=always
RestartSec=10
Environment="OCTAV_API_KEY=your_key_here"

[Install]
WantedBy=multi-user.target
Start with: systemctl start portfolio-monitor

Best Practices

Respect Octav’s 360 requests/minute limit:
from time import sleep
from collections import deque
from datetime import datetime, timedelta

class RateLimiter:
    def __init__(self, max_requests=360, window=60):
        self.max_requests = max_requests
        self.window = window
        self.requests = deque()

    def wait_if_needed(self):
        now = datetime.now()
        cutoff = now - timedelta(seconds=self.window)

        # Remove old requests
        while self.requests and self.requests[0] < cutoff:
            self.requests.popleft()

        # Check limit
        if len(self.requests) >= self.max_requests:
            sleep_time = (self.requests[0] + timedelta(seconds=self.window) - now).total_seconds()
            if sleep_time > 0:
                sleep(sleep_time)

        self.requests.append(now)
Implement robust error handling:
import time
from requests.exceptions import RequestException

def fetch_with_retry(self, url, max_retries=3):
    """Fetch with exponential backoff"""
    for attempt in range(max_retries):
        try:
            response = requests.get(url, headers=self.headers, timeout=30)
            response.raise_for_status()
            return response.json()
        except RequestException as e:
            if attempt == max_retries - 1:
                raise
            wait_time = 2 ** attempt
            print(f"Retry {attempt + 1}/{max_retries} in {wait_time}s")
            time.sleep(wait_time)
Monitor API credit usage:
def check_credits(self):
    """Check remaining credits before heavy operations"""
    response = requests.get(
        f'{self.base_url}/v1/credits',
        headers=self.headers
    )
    credits = response.json()

    if credits < 100:
        self.send_alert(f"⚠️ Low API credits: {credits} remaining")

    return credits
Implement comprehensive logging:
import logging

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('agent.log'),
        logging.StreamHandler()
    ]
)

logger = logging.getLogger(__name__)
logger.info(f"Monitoring {len(addresses)} addresses")

Next Steps

Quick Start Prompts

Ready-to-use prompts for AI coding

LLMs Integration

Integrate with ChatGPT and Claude

API Reference

Complete API documentation

Pricing

Understand API costs