Python SDK
The official async Python SDK for Lichen. Built on httpx and websockets with
full type hints.
Installation
pip install ./sdk/python
Requires Python 3.9+. Dependencies: httpx, websockets,
base58, cryptography, dilithium-py.
The import package is lichen. The examples below use public testnet endpoints; swap
to https://rpc.lichen.network and wss://rpc.lichen.network/ws only
when targeting mainnet.
Quick Start
import asyncio
from lichen import Connection, Keypair, PublicKey, TransactionBuilder
async def main():
# Connect to public testnet
conn = Connection("https://testnet-rpc.lichen.network", ws_url="wss://testnet-rpc.lichen.network/ws")
# Generate a keypair
kp = Keypair.generate()
print(f"Address: {kp.pubkey()}")
# Check balance
balance = await conn.get_balance(kp.pubkey())
print(f"Balance: {balance['licn']:.9f} LICN ({balance['spores']} spores)")
# Build and send a transfer
recipient = PublicKey("RecipientBase58Address...")
blockhash = await conn.get_recent_blockhash()
tx = (TransactionBuilder()
.add(TransactionBuilder.transfer(kp.pubkey(), recipient, 1_000_000_000))
.set_recent_blockhash(blockhash)
.build_and_sign(kp))
signature = await conn.send_transaction(tx)
print(f"TX Signature: {signature}")
await conn.close()
asyncio.run(main())
LichenID helper: import LichenIdClient for profile and metadata
reads, skills, vouches, premium-name auctions, delegate lifecycle and delegated profile
writes, recovery approvals, and skill attestation flows. It resolves the YID
program via the symbol registry and handles the ABI byte layout for the wrapped LichenID
methods.
SporePay helper: import SporePayClient for stream creation,
stream/cliff reads, withdrawable balance checks, withdrawals, cancellations, transfers, and
aggregated stats. Drop to raw call_contract() for admin-only or newly added
contract methods that are not wrapped yet.
LichenSwap helper: import LichenSwapClient for symbol-registry
resolution, pool/quote/liquidity reads, TWAP and protocol-fee reads, aggregate stats, and the
common create-pool, add-liquidity, and swap flows. Flash loans,
remove_liquidity, get_reserves, and admin methods stay on the raw
contract-call path.
ThallLend helper: import ThallLendClient for symbol-registry
resolution, account/protocol/interest reads, counter reads, aggregate stats, and the common
deposit, withdraw, borrow, repay, and liquidate flows. Flash loans and admin methods stay on
raw call_contract().
SporeVault helper: import SporeVaultClient for symbol-registry
resolution, vault/user/strategy reads, aggregate stats, and the common deposit, withdraw, and
harvest flow. Admin-only fee, cap, risk-tier, and strategy-management paths stay on raw
call_contract().
BountyBoard helper: import BountyBoardClient for symbol-registry
resolution, bounty reads (single + count + platform stats), aggregate stats, and the
full bounty lifecycle: create, submit work, approve, and cancel. Admin-only identity-gate,
token-address, fee, and pause controls stay on raw call_contract().
from lichen import (
AddSkillParams,
Connection,
Keypair,
LICHEN_ID_DELEGATE_PERMISSIONS,
LichenIdClient,
LichenSwapClient,
RegisterIdentityParams,
SetDelegateParams,
SetMetadataParams,
SetAvailabilityParams,
SporePayClient,
SporeVaultClient,
ThallLendClient,
)
conn = Connection("https://testnet-rpc.lichen.network", ws_url="wss://testnet-rpc.lichen.network/ws")
lichen_id = LichenIdClient(conn)
lichenswap = LichenSwapClient(conn)
sporepay = SporePayClient(conn)
sporevault = SporeVaultClient(conn)
thalllend = ThallLendClient(conn)
kp = Keypair.generate()
delegate_kp = Keypair.generate()
await lichen_id.register_identity(kp, RegisterIdentityParams(agent_type=3, name="Research Agent"))
await lichen_id.add_skill(kp, AddSkillParams(name="research", proficiency=95))
await lichen_id.set_metadata(kp, SetMetadataParams(metadata={"region": "eu-west", "role": "research"}))
await lichen_id.set_delegate(kp, SetDelegateParams(
delegate=delegate_kp.pubkey(),
permissions=LICHEN_ID_DELEGATE_PERMISSIONS["PROFILE"],
expires_at_ms=1_700_000_000_000,
))
await lichen_id.set_availability(kp, SetAvailabilityParams(status="available"))
profile = await lichen_id.get_profile(kp.pubkey())
delegate = await lichen_id.get_delegate(kp.pubkey(), delegate_kp.pubkey())
pool_info = await lichenswap.get_pool_info()
quote = await lichenswap.get_quote(1_000_000_000, True)
swap_stats = await lichenswap.get_stats()
stream = await sporepay.get_stream_info(0)
vault_stats = await sporevault.get_vault_stats()
vault_position = await sporevault.get_user_position(kp.pubkey())
vault_summary = await sporevault.get_stats()
lending_account = await thalllend.get_account_info(kp.pubkey())
lending_stats = await thalllend.get_stats()
print(profile, delegate, pool_info, quote, swap_stats, stream, vault_stats, vault_position, vault_summary, lending_account, lending_stats)
Connection Class
Connection(rpc_url, ws_url=None)
Create an async connection to a Lichen validator node.
Parameters
| Name | Type | Required | Description |
|---|---|---|---|
| rpc_url | str | Yes | HTTP JSON-RPC endpoint URL |
| ws_url | Optional[str] | No | WebSocket endpoint URL (required for subscriptions) |
conn = Connection(
"https://testnet-rpc.lichen.network",
ws_url="wss://testnet-rpc.lichen.network/ws"
)
Chain Queries
await get_slot() -> int
Get the current slot number.
slot = await conn.get_slot() # 42
await get_latest_block() -> Dict[str, Any]
Get the most recently produced block.
await get_block(slot: int) -> Dict[str, Any]
Get a block by slot number.
await get_recent_blockhash() -> str
Get a recent blockhash for signing transactions.
await get_metrics() -> Dict[str, Any]
Get chain performance metrics — TPS, total transactions, total blocks, average block time.
await get_chain_status() -> Dict[str, Any]
Get comprehensive chain status.
await health() -> Dict[str, str]
Liveness probe. Returns {"status": "ok"}.
await get_network_info() -> Dict[str, Any]
Get network information — chain ID, network ID, version, slot, validator/peer count.
await get_peers() -> List[Dict[str, Any]]
Get connected P2P peers.
await get_total_burned() -> Dict[str, int]
Get total burned LICN — {"spores": ..., "licn": ...}.
Account Methods
await get_balance(pubkey: PublicKey) -> Dict[str, int]
Get account balance in spores and LICN.
balance = await conn.get_balance(PublicKey("7xKXtg2CW87d..."))
print(balance["spores"]) # 5000000000
print(balance["licn"]) # 5.0
await get_account(pubkey: PublicKey) -> Dict[str, Any]
Get raw account data — spores, owner, executable, data.
await get_account_info(pubkey: PublicKey) -> Dict[str, Any]
Get enhanced account information with metadata.
await get_transaction_history(pubkey: PublicKey, limit: int = 10) -> Dict[str, Any]
Get paginated transaction history for an account.
Transaction Methods
await send_transaction(transaction: Transaction) -> str
Serialize and submit a signed transaction. Returns the signature string.
blockhash = await conn.get_recent_blockhash()
tx = (TransactionBuilder()
.add(TransactionBuilder.transfer(kp.pubkey(), recipient, 1_000_000_000))
.set_recent_blockhash(blockhash)
.build_and_sign(kp))
sig = await conn.send_transaction(tx)
print(f"Signature: {sig}")
await get_transaction(signature: str) -> Dict[str, Any]
Look up a transaction by its signature hash.
await confirm_transaction(signature: str, timeout: float = 30.0) -> Optional[Dict[str, Any]]
Poll until a transaction is observed or the timeout expires. Returns the
transaction payload when confirmed, otherwise None.
await get_transaction_proof(signature: str) -> Dict[str, Any]
Get a binary Merkle inclusion proof for a confirmed transaction. Returns
slot, tx_index, tx_hash, root, and
proof (list of dicts with hash and direction).
proof = await conn.get_transaction_proof(signature)
print(f"Merkle root: {proof['root']}")
print(f"Proof depth: {len(proof['proof'])}")
await transfer(from_keypair: Keypair, to: PublicKey, amount: int) -> str
Build, sign, and send a native LICN transfer. Amount in spores (1 LICN = 1,000,000,000 spores).
recipient = PublicKey("RecipientAddr...")
sig = await conn.transfer(kp, recipient, 5_000_000_000) # 5 LICN
print(f"Transfer: {sig}")
Validator & Staking
await get_validators() -> List[Dict[str, Any]]
Get all registered validators.
await get_validator_info(pubkey: PublicKey) -> Dict[str, Any]
Get detailed validator information.
await get_validator_performance(pubkey: PublicKey) -> Dict[str, Any]
Get validator performance metrics.
await stake(from_keypair: Keypair, validator: PublicKey, amount: int) -> str
Build, sign, and send a stake transaction. Amount in spores.
validator = PublicKey("ValidatorPubkey...")
sig = await conn.stake(kp, validator, 10_000_000_000) # 10 LICN
await unstake(from_keypair: Keypair, validator: PublicKey, amount: int) -> str
Build, sign, and send an unstake request.
await get_staking_status(pubkey: PublicKey) -> Dict[str, Any]
Get staking status for an account.
await get_staking_rewards(pubkey: PublicKey) -> Dict[str, Any]
Get accumulated staking rewards.
Contract Methods
await deploy_contract(deployer: Keypair, code: bytes, init_data: bytes = b"") -> str
Deploy a WASM smart contract. Code must start with \0asm magic
header and be under 512 KB.
with open("my_contract.wasm", "rb") as f:
wasm = f.read()
sig = await conn.deploy_contract(kp, wasm)
print(f"Deploy tx: {sig}")
await call_contract(caller: Keypair, contract: PublicKey, function_name: str, args: bytes = b"", value: int = 0) -> str
Call a function on a deployed WASM contract. Optionally send LICN (spores) with the call.
import json
contract = PublicKey("ContractAddr...")
args = json.dumps({"recipient": "Addr...", "amount": 100}).encode()
sig = await conn.call_contract(kp, contract, "transfer", args)
await call_readonly_contract(contract_id: PublicKey, function_name: str, args: bytes = b"", from_pubkey: Optional[PublicKey] = None) -> Dict[str, Any]
Execute a read-only contract call with no state change. This is the transport
primitive used by LichenSwapClient and the fallback path for helper gaps.
entry = await conn.get_symbol_registry("LICHENSWAP")
program = PublicKey(entry["program"])
result = await conn.call_readonly_contract(program, "get_pool_info")
print(result["returnCode"], result["returnData"])
await upgrade_contract(owner: Keypair, contract: PublicKey, code: bytes) -> str
Upgrade a deployed contract with new WASM bytecode (owner only).
await get_contract_info(contract_id: PublicKey) -> Dict[str, Any]
Get information about a deployed smart contract.
await get_contract_logs(contract_id: PublicKey) -> Dict[str, Any]
Get execution logs emitted by a contract.
await get_all_contracts() -> Dict[str, Any]
List all deployed contracts.
Program Methods
await get_program(program_id: PublicKey) -> Dict[str, Any]
Get program information.
await get_symbol_registry(symbol: str) -> Dict[str, Any]
Resolve a registry symbol such as YID, SPOREPAY, or
LICHENSWAP to its deployed program entry.
entry = await conn.get_symbol_registry("LICHENSWAP")
print(entry["program"])
await get_programs() -> Dict[str, Any]
List all deployed programs.
await get_program_stats(program_id: PublicKey) -> Dict[str, Any]
Get execution statistics for a program.
await get_program_calls(program_id: PublicKey) -> Dict[str, Any]
Get recent call history for a program.
await get_program_storage(program_id: PublicKey) -> Dict[str, Any]
Get on-chain storage summary for a program.
await get_sporepay_stats() -> Dict[str, Any] / await get_lichenswap_stats() -> Dict[str, Any] / await get_thalllend_stats() -> Dict[str, Any] / await get_sporevault_stats() -> Dict[str, Any] / await get_bountyboard_stats() -> Dict[str, Any]
Fetch aggregated stats from the public SporePay, LichenSwap, ThallLend, SporeVault, and BountyBoard RPC routes. Helper clients wrap SporeVault, ThallLend, SporePay, and LichenSwap for typed access.
sporepay_stats = await conn.get_sporepay_stats()
lichenswap_stats = await conn.get_lichenswap_stats()
thalllend_stats = await conn.get_thalllend_stats()
sporevault_stats = await conn.get_sporevault_stats()
bountyboard_stats = await conn.get_bountyboard_stats()
print(sporepay_stats, lichenswap_stats, thalllend_stats, sporevault_stats, bountyboard_stats)
NFT Methods
await get_collection(collection_id: PublicKey) -> Dict[str, Any]
Get NFT collection metadata.
await get_nft(collection_id: PublicKey, token_id: int) -> Dict[str, Any]
Get a specific NFT by collection and token ID.
await get_nfts_by_owner(owner: PublicKey) -> Dict[str, Any]
Get all NFTs owned by an address.
await get_nfts_by_collection(collection_id: PublicKey) -> Dict[str, Any]
Get all NFTs in a collection.
await get_nft_activity(collection_id: PublicKey, token_id: int) -> Dict[str, Any]
Get activity history for a specific NFT.
WebSocket Subscriptions
All subscription methods are async and require ws_url in the Connection
constructor. Each on_* method returns a subscription ID for unsubscribing with the
corresponding off_* method. Callbacks can be sync or async.
await on_slot(callback: Callable[[int], None]) -> int
Subscribe to slot updates. Unsubscribe with off_slot(sub_id).
sub_id = await conn.on_slot(lambda slot: print(f"Slot: {slot}"))
# ... later
await conn.off_slot(sub_id)
await on_block(callback) -> int
Subscribe to new blocks. Unsubscribe with off_block(sub_id).
await on_transaction(callback) -> int
Subscribe to transactions observed in canonical blocks. Use signature status
subscriptions when you need explicit processed / confirmed / finalized commitment tracking.
Unsubscribe with off_transaction(sub_id).
await on_account_change(pubkey: PublicKey, callback) -> int
Watch a specific account for balance changes. Unsubscribe with
off_account_change(sub_id).
await on_logs(callback, contract_id: Optional[PublicKey] = None) -> int
Stream contract logs. Optional contract filter. Unsubscribe with
off_logs(sub_id).
await on_program_updates(callback) / on_program_calls(callback, program_id?)
Program deploy/upgrade events and invocations. Unsubscribe with
off_program_updates / off_program_calls.
await on_nft_mints(callback, collection_id?) / on_nft_transfers(callback, collection_id?)
NFT mint and transfer events. Optional collection filter. Unsubscribe with
off_nft_mints / off_nft_transfers.
await on_market_listings(callback) / on_market_sales(callback)
Marketplace listing and sale events. Unsubscribe with
off_market_listings / off_market_sales.
await close()
Close the WebSocket connection, cancel background tasks, and clear subscriptions.
Keypair Class
ML-DSA-65 keypair backed by dilithium_py. Implements signing and seed-based derivation.
Keypair.generate() -> Keypair
Generate a new random keypair.
kp = Keypair.generate()
print(f"Address: {kp.pubkey()}")
Keypair.from_seed(seed: bytes) -> Keypair
Derive a keypair from a 32-byte seed. Raises ValueError if seed is
not 32 bytes.
seed = b'\x00' * 32
kp = Keypair.from_seed(seed)
Keypair.load(path: Path) -> Keypair / keypair.save(path: Path)
Load a keypair from a JSON file or save it to one. The JSON stores the 32-byte seed plus derived address and PQ verifying-key metadata.
from pathlib import Path
kp = Keypair.generate()
kp.save(Path("my-wallet.json"))
# Later
kp2 = Keypair.load(Path("my-wallet.json"))
assert kp.pubkey() == kp2.pubkey()
keypair.sign(message: bytes) -> PqSignature
Sign an arbitrary message. Returns a self-contained PQ signature.
keypair.public_key() -> PqPublicKey
Get the ML-DSA-65 public key object used for signature verification.
keypair.pubkey() -> PublicKey
Get the native 32-byte address used for balances, transfers, staking, and contract calls.
keypair.seed() -> bytes
Get the 32-byte seed for backup/restore.
PublicKey Class
A 32-byte public key with base58 encoding. Uses the base58 library.
PublicKey(value: Union[str, bytes, list])
Create from a base58 string, bytes, or list of ints. Raises
ValueError if not 32 bytes.
pk1 = PublicKey("7xKXtg2CW87d97TXJSDpbD5jBkheTqA83TZRuJosgAsU")
pk2 = PublicKey(b'\x00' * 32)
pk3 = PublicKey([0] * 32)
Methods
| Method | Returns | Description |
|---|---|---|
| to_base58() | str | Base58-encoded string representation |
| to_bytes() | bytes | Raw 32-byte bytes |
| __str__() | str | Same as to_base58() |
| __eq__(other) | bool | Byte-level equality |
| __hash__() | int | Hashable for use in sets/dicts |
Class Methods
| Method | Returns | Description |
|---|---|---|
| PublicKey.from_base58(s) | PublicKey | Create from base58 string |
| PublicKey.from_bytes(b) | PublicKey | Create from bytes |
| PublicKey.new_unique() | PublicKey | Random key (testing only) |
Transaction Types & Builder
Dataclasses
@dataclass
class Instruction:
program_id: PublicKey
accounts: List[PublicKey]
data: bytes
@dataclass
class Message:
instructions: List[Instruction]
recent_blockhash: str
@dataclass
class Transaction:
signatures: List[str] # hex-encoded
message: Message
TransactionBuilder
Fluent builder for constructing and signing transactions.
| Method | Returns | Description |
|---|---|---|
| add(instruction) | TransactionBuilder | Append an instruction |
| set_recent_blockhash(hash) | TransactionBuilder | Set the recent blockhash |
| build() | Message | Build unsigned message |
| build_and_sign(keypair) | Transaction | Build, sign, and return a Transaction |
Static Instruction Builders
| Method | Description |
|---|---|
| TransactionBuilder.transfer(from_pubkey, to_pubkey, amount) | Create a LICN transfer instruction (type 0). Amount in spores. |
| TransactionBuilder.stake(from_pubkey, validator, amount) | Create a stake instruction (type 9) |
| TransactionBuilder.unstake(from_pubkey, validator, amount) | Create an unstake instruction (type 10) |
| TransactionBuilder.deploy_contract(deployer, code, init_data) | Create a contract deploy instruction (CONTRACT_PROGRAM_ID) |
| TransactionBuilder.call_contract(caller, contract, func, args, value) | Create a contract call instruction |
| TransactionBuilder.upgrade_contract(owner, contract, code) | Create a contract upgrade instruction (owner only) |
Serialization helpers: TransactionBuilder.message_to_bincode(msg)
and TransactionBuilder.transaction_to_bincode(tx) are available for custom
serialization workflows.
Error Handling
All RPC methods raise standard Python Exception with descriptive messages from the server.
try:
balance = await conn.get_balance(pubkey)
except Exception as e:
if "Account not found" in str(e):
print("Account does not exist yet")
elif "Rate limit" in str(e):
await asyncio.sleep(1)
else:
raise
Common error codes:
-32601— Method not found-32602— Invalid params-32003— Admin auth required-32005— Rate limit / subscription limit exceeded