ExampleΒΆ

This code demonstrates usage of the APSW api. It gives you a good overview of all the things that can be done. Also included is output so you can see what gets printed when you run the code.

#!/usr/bin/env python3

import os
import sys
import time
import apsw

# Note: this code uses Python's optional typing annotations.  You can
# ignore them and do not need to use them
from typing import Optional, Iterator, Tuple

###
### Check we have the expected version of apsw and sqlite
###

print("      Using APSW file", apsw.__file__)  # from the extension module
print("         APSW version", apsw.apswversion())  # from the extension module
print("   SQLite lib version", apsw.sqlitelibversion())  # from the sqlite library code
print("SQLite header version", apsw.SQLITE_VERSION_NUMBER)  # from the sqlite header file at compile time
|       Using APSW file /space/apsw/apsw/__init__.cpython-310-x86_64-linux-gnu.so
|          APSW version 3.39.2.0
|    SQLite lib version 3.39.2
| SQLite header version 3039002
###
### Opening/creating database
###

connection = apsw.Connection("dbfile")
cursor = connection.cursor()
###
### simple statement
###

cursor.execute("create table foo(x,y,z)")

###
### using different types
###

cursor.execute("insert into foo values(?,?,?)", (1, 1.1, None))  # integer, float/real, Null
cursor.execute("insert into foo(x) values(?)", ("abc", ))  # string (note trailing comma to ensure tuple!)
cursor.execute(
    "insert into foo(x) values(?)",  # a blob (binary data)
    (b"abc\xff\xfe", ))

###
### multiple statements
###

cursor.execute(
    "delete from foo; insert into foo values(1,2,3); create table bar(a,b,c) ; insert into foo values(4, 'five', 6.0)")

###
### iterator
###

for x, y, z in cursor.execute("select x,y,z from foo"):
    print(cursor.getdescription())  # shows column names and declared types
    print(x, y, z)

###
### iterator - multiple statements
###

for m, n, o in cursor.execute("select x,y,z from foo ; select a,b,c from bar"):
    print(m, n, o)

###
### bindings - sequence
###

cursor.execute("insert into foo values(?,?,?)", (7, 'eight', False))
cursor.execute("insert into foo values(?,?,?1)", ('one', 'two'))  # nb sqlite does the numbers from 1

###
### bindings - dictionary
###

cursor.execute("insert into foo values(:alpha, :beta, :gamma)", {'alpha': 1, 'beta': 2, 'gamma': 'three'})
###
### tracing execution
###

def mytrace(cursor: apsw.Cursor, statement: str, bindings: Optional[apsw.Bindings]) -> bool:
    "Called just before executing each statement"
    print("SQL:", statement)
    if bindings:
        print("Bindings:", bindings)
    return True  # if you return False then execution is aborted

cursor.setexectrace(mytrace)
cursor.execute("drop table bar ; create table bar(x,y,z); select * from foo where x=?", (3, ))
| SQL: drop table bar ;
| SQL:  create table bar(x,y,z);
| SQL:  select * from foo where x=?
| Bindings: (3,)
###
### tracing results
###

def rowtrace(cursor: apsw.Cursor, row: apsw.SQLiteValues) -> apsw.SQLiteValues:
    """Called with each row of results before they are handed off.  You can return None to
    cause the row to be skipped or a different set of values to return"""
    print("Row:", row)
    return row

cursor.setrowtrace(rowtrace)
for row in cursor.execute("select x,y from foo where x>3"):
    pass
| SQL: select x,y from foo where x>3
| Row: (4, 'five')
| Row: (7, 'eight')
| Row: ('one', 'two')
# Clear tracers
cursor.setrowtrace(None)
cursor.setexectrace(None)

###
### executemany
###

# (This will work correctly with multiple statements, as well as statements that
# return data.  The second argument can be anything that is iterable.)
cursor.executemany("insert into foo (x) values(?)", ([1], [2], [3]))

# You can also use it for statements that return data
for row in cursor.executemany("select * from foo where x=?", ([1], [2], [3])):
    print(row)
###
### defining your own functions
###

def ilove7(*args: apsw.SQLiteValue) -> int:
    "a scalar function"
    print("ilove7 got", args, "but I love 7")
    return 7

connection.createscalarfunction("seven", ilove7)

for row in cursor.execute("select seven(x,y) from foo"):
    print(row)
| ilove7 got (1, 2) but I love 7
| (7,)
| ilove7 got (4, 'five') but I love 7
| (7,)
| ilove7 got (7, 'eight') but I love 7
| (7,)
| ilove7 got ('one', 'two') but I love 7
| (7,)
| ilove7 got (1, 2) but I love 7
| (7,)
| ilove7 got (1, None) but I love 7
| (7,)
| ilove7 got (2, None) but I love 7
| (7,)
| ilove7 got (3, None) but I love 7
| (7,)
###
### aggregate functions are more complex
###

# Here we return the longest item when represented as a string.

class longest:

    def __init__(self) -> None:
        self.longest = ""

    def step(self, *args: apsw.SQLiteValue) -> None:
        for arg in args:
            if len(str(arg)) > len(self.longest):
                self.longest = str(arg)

    def final(self) -> str:
        return self.longest

    @classmethod
    def factory(cls) -> apsw.AggregateCallbacks:
        return cls(), cls.step, cls.final

connection.createaggregatefunction("longest", longest.factory)
for row in cursor.execute("select longest(x,y) from foo"):
    print(row)
| ('eight',)
###
### Defining collations.
###

# The default sorting mechanisms don't understand numbers at the end of strings
# so here we define a collation that does

cursor.execute("create table s(str)")
cursor.executemany("insert into s values(?)", (["file1"], ["file7"], ["file17"], ["file20"], ["file3"]))

for row in cursor.execute("select * from s order by str"):
    print(row)
| ('file1',)
| ('file17',)
| ('file20',)
| ('file3',)
| ('file7',)
def strnumcollate(s1: apsw.SQLiteValue, s2: apsw.SQLiteValue) -> int:
    # return -1 if s1<s2, +1 if s1>s2 else 0

    # split values into two parts - the head and the numeric tail
    values: list[tuple[str, int]] = [(str(s1), 0), (str(s2), 0)]
    for vn, v in enumerate(values):
        i = len(v[0])
        for i in range(len(v[0]), 0, -1):
            if v[0][i - 1] not in "01234567890":
                break
        try:
            v = (v[0][:i], int(v[0][i:]))
            values[vn] = v
        except ValueError:
            pass
    # compare
    if values[0] < values[1]:
        return -1
    if values[0] > values[1]:
        return 1
    return 0

connection.createcollation("strnum", strnumcollate)

for row in cursor.execute("select * from s order by str collate strnum"):
    print(row)
| ('file1',)
| ('file3',)
| ('file7',)
| ('file17',)
| ('file20',)
###
### Authorizer (eg if you want to control what user supplied SQL can do)
###

def authorizer(operation: int, paramone: Optional[str], paramtwo: Optional[str], databasename: Optional[str], triggerorview: Optional[str]) -> int:
    """Called when each operation is prepared.  We can return SQLITE_OK, SQLITE_DENY or
    SQLITE_IGNORE"""
    # find the operation name
    print(apsw.mapping_authorizer_function[operation], paramone, paramtwo, databasename, triggerorview)
    if operation == apsw.SQLITE_CREATE_TABLE and paramone and paramone.startswith("private"):
        return apsw.SQLITE_DENY  # not allowed to create tables whose names start with private

    return apsw.SQLITE_OK  # always allow

connection.setauthorizer(authorizer)
cursor.execute("insert into s values('foo')")
cursor.execute("select str from s limit 1")
| SQLITE_INSERT s None main None
| SQLITE_SELECT None None None None
| SQLITE_READ s str main None
# Cancel authorizer
connection.setauthorizer(None)
###
### progress handler (SQLite 3 experimental feature)
###

# something to give us large numbers of random numbers
import random

def randomintegers(howmany: int) -> Iterator[Tuple[int]]:
    for i in range(howmany):
        yield (random.randint(0, 9999999999), )

# create a table with 100 random numbers
cursor.execute("begin ; create table bigone(x)")
cursor.executemany("insert into bigone values(?)", randomintegers(100))
cursor.execute("commit")

# display an ascii spinner
_phcount = 0
_phspinner = "|/-\\"

def progresshandler() -> int:
    global _phcount
    sys.stdout.write(_phspinner[_phcount % len(_phspinner)] + chr(8))  # chr(8) is backspace
    sys.stdout.flush()
    _phcount += 1
    time.sleep(0.1)  # deliberate delay so we can see the spinner (SQLite is too fast otherwise!)
    return 0  # returning non-zero aborts

# register progresshandler every 20 instructions
connection.setprogresshandler(progresshandler, 20)

# see it in action - sorting 100 numbers to find the biggest takes a while
print("spinny thing -> ", end="")
for i in cursor.execute("select max(x) from bigone"):
    print("\n", i, sep="", end="")
    sys.stdout.flush()

connection.setprogresshandler(None)
###
### commit hook (SQLite3 experimental feature)
###

def mycommithook() -> int:
    print("in commit hook")
    hour = time.localtime()[3]
    if hour < 8 or hour > 17:
        print("no commits out of hours")
        return 1  # abort commits outside of 8am through 6pm
    print("commits okay at this time")
    return 0  # let commit go ahead

connection.setcommithook(mycommithook)
try:
    cursor.execute("begin; create table example(x,y,z); insert into example values (3,4,5) ; commit")
except apsw.ConstraintError:
    print("commit was not allowed")

connection.setcommithook(None)
| in commit hook
| commits okay at this time
###
### update hook
###

def myupdatehook(type: int, databasename: str, tablename: str, rowid: int) -> None:
    print("Updated: %s database %s, table %s, row %d" %
          (apsw.mapping_authorizer_function[type], databasename, tablename, rowid))

connection.setupdatehook(myupdatehook)
cursor.execute("insert into s values(?)", ("file93", ))
cursor.execute("update s set str=? where str=?", ("file94", "file93"))
cursor.execute("delete from s where str=?", ("file94", ))
connection.setupdatehook(None)
| Updated: SQLITE_INSERT database main, table s, row 7
| Updated: SQLITE_UPDATE database main, table s, row 7
| Updated: SQLITE_DELETE database main, table s, row 7
###
### Blob I/O
###

cursor.execute("create table blobby(x,y)")
# Add a blob we will fill in later
cursor.execute("insert into blobby values(1,zeroblob(10000))")
# Or as a binding
cursor.execute("insert into blobby values(2,?)", (apsw.zeroblob(20000), ))
# Open a blob for writing.  We need to know the rowid
rowid = next(cursor.execute("select ROWID from blobby where x=1"))[0]
blob = connection.blobopen("main", "blobby", "y", rowid, True)
blob.write(b"hello world")
blob.seek(2000)
blob.write(b"hello world, again")
blob.close()
###
### Virtual tables
###

# This virtual table stores information about files in a set of
# directories so you can execute SQL queries

def getfiledata(directories):
    columns = None
    data = []
    counter = 1
    for directory in directories:
        for f in os.listdir(directory):
            if not os.path.isfile(os.path.join(directory, f)):
                continue
            counter += 1
            st = os.stat(os.path.join(directory, f))
            if columns is None:
                columns = ["rowid", "name", "directory"] + [x for x in dir(st) if x.startswith("st_")]
            data.append([counter, f, directory] + [getattr(st, x) for x in columns[3:]])
    return columns, data

# This gets registered with the Connection
class Source:

    def Create(self, db, modulename, dbname, tablename, *args):
        columns, data = getfiledata([eval(a.replace("\\", "\\\\")) for a in args])  # eval strips off layer of quotes
        schema = "create table foo(" + ','.join(["'%s'" % (x, ) for x in columns[1:]]) + ")"
        return schema, Table(columns, data)

    Connect = Create

# Represents a table
class Table:

    def __init__(self, columns, data):
        self.columns = columns
        self.data = data

    def BestIndex(self, *args):
        return None

    def Open(self):
        return Cursor(self)

    def Disconnect(self):
        pass

    Destroy = Disconnect

# Represents a cursor
class Cursor:

    def __init__(self, table):
        self.table = table

    def Filter(self, *args):
        self.pos = 0

    def Eof(self):
        return self.pos >= len(self.table.data)

    def Rowid(self):
        return self.table.data[self.pos][0]

    def Column(self, col):
        return self.table.data[self.pos][1 + col]

    def Next(self):
        self.pos += 1

    def Close(self):
        pass

# Register the module as filesource
connection.createmodule("filesource", Source())

# Arguments to module - all directories in sys.path
sysdirs = ",".join(["'%s'" % (x, ) for x in sys.path[1:] if len(x) and os.path.isdir(x)])
cursor.execute("create virtual table sysfiles using filesource(" + sysdirs + ")")

# Which 3 files are the biggest?
for size, directory, file in cursor.execute(
        "select st_size,directory,name from sysfiles order by st_size desc limit 3"):
    print(size, file, directory)
| 46928312 d9e93640ccdc3fbe1e95__mypyc.cpython-310-x86_64-linux-gnu.so /home/rogerb/.local/lib/python3.10/site-packages
| 1246656 unicodedata2.cpython-310-x86_64-linux-gnu.so /usr/lib/python3/dist-packages
| 765704 _brotli.cpython-310-x86_64-linux-gnu.so /usr/lib/python3/dist-packages
# Which 3 files are the oldest?
for ctime, directory, file in cursor.execute("select st_ctime,directory,name from sysfiles order by st_ctime limit 3"):
    print(ctime, file, directory)
| 1582056231.6234083 .style.yapf /space/apsw
| 1587233567.4374056 arandr-0.1.10.egg-info /usr/lib/python3/dist-packages
| 1604593216.8926702 pyparsing.py /usr/lib/python3/dist-packages
###
### A VFS that "obfuscates" the database file contents.  The scheme
### used is to xor all bytes with 0xa5.  This scheme honours that used
### for MAPI and SQL Server.
###

def encryptme(data):
    if not data: return data
    return bytes([x ^ 0xa5 for x in data])

# Inheriting from a base of "" means the default vfs
class ObfuscatedVFS(apsw.VFS):

    def __init__(self, vfsname="obfu", basevfs=""):
        self.vfsname = vfsname
        self.basevfs = basevfs
        apsw.VFS.__init__(self, self.vfsname, self.basevfs)

    # We want to return our own file implementation, but also
    # want it to inherit
    def xOpen(self, name, flags):
        # We can look at uri parameters
        if isinstance(name, apsw.URIFilename):
            print("fast is", name.uri_parameter("fast"))
            print("level is", name.uri_int("level", 3))
            print("warp is", name.uri_boolean("warp", False))
            print("notpresent is", name.uri_parameter("notpresent"))
| fast is speed
| level is 7
| warp is True
| notpresent is None
        return ObfuscatedVFSFile(self.basevfs, name, flags)

# The file implementation where we override xRead and xWrite to call our
# encryption routine
class ObfuscatedVFSFile(apsw.VFSFile):

    def __init__(self, inheritfromvfsname, filename, flags):
        apsw.VFSFile.__init__(self, inheritfromvfsname, filename, flags)

    def xRead(self, amount, offset):
        return encryptme(super(ObfuscatedVFSFile, self).xRead(amount, offset))

    def xWrite(self, data, offset):
        super(ObfuscatedVFSFile, self).xWrite(encryptme(data), offset)

# To register the VFS we just instantiate it
obfuvfs = ObfuscatedVFS()
# Lets see what vfs are now available?
print(apsw.vfsnames())
| ['unix', 'obf', 'memdb', 'unix-excl', 'unix-dotfile', 'unix-none']
# Make an obfuscated db, passing in some URI parameters
obfudb = apsw.Connection("file:myobfudb?fast=speed&level=7&warp=on",
                         flags=apsw.SQLITE_OPEN_READWRITE | apsw.SQLITE_OPEN_CREATE | apsw.SQLITE_OPEN_URI,
                         vfs=obfuvfs.vfsname)
# Check it works
obfudb.cursor().execute("create table foo(x,y); insert into foo values(1,2)")

# Check it really is obfuscated on disk
print(repr(open("myobfudb", "rb").read()[:20]))
| b'\xf6\xf4\xe9\xcc\xd1\xc0\x85\xc3\xca\xd7\xc8\xc4\xd1\x85\x96\xa5\xb5\xa5\xa4\xa4'
# And unobfuscating it
print(repr(encryptme(open("myobfudb", "rb").read()[:20])))
| b'SQLite format 3\x00\x10\x00\x01\x01'
# Tidy up
obfudb.close()
os.remove("myobfudb")
###
### Limits
###

# Print some limits
for limit in ("LENGTH", "COLUMN", "ATTACHED"):
    name = "SQLITE_LIMIT_" + limit
    maxname = "SQLITE_MAX_" + limit  # compile time
    orig = connection.limit(getattr(apsw, name))
    print(name, orig)
    # To get the maximum, set to 0x7fffffff and then read value back
    connection.limit(getattr(apsw, name), 0x7fffffff)
    max = connection.limit(getattr(apsw, name))
    print(maxname, max)

# Set limit for size of a string
cursor.execute("create table testlimit(s)")
cursor.execute("insert into testlimit values(?)", ("x" * 1024, ))  # 1024 char string
connection.limit(apsw.SQLITE_LIMIT_LENGTH, 1023)  # limit is now 1023
try:
    cursor.execute("insert into testlimit values(?)", ("y" * 1024, ))
    print("string exceeding limit was inserted")
except apsw.TooBigError:
    print("Caught toobig exception")
connection.limit(apsw.SQLITE_LIMIT_LENGTH, 0x7fffffff)
| SQLITE_LIMIT_LENGTH 1000000000
| SQLITE_MAX_LENGTH 1000000000
| SQLITE_LIMIT_COLUMN 2000
| SQLITE_MAX_COLUMN 2000
| SQLITE_LIMIT_ATTACHED 125
| SQLITE_MAX_ATTACHED 125
| Caught toobig exception
###
### Backup to memory
###

# We will copy the disk database into a memory database

memcon = apsw.Connection(":memory:")

# Copy into memory
with memcon.backup("main", connection, "main") as backup:
    backup.step()  # copy whole database in one go

# There will be no disk accesses for this query
for row in memcon.cursor().execute("select * from s"):
    pass
###
### Shell
###

# Here we use the shell to do a csv export providing the existing db
# connection

# Export to a StringIO
import io

output = io.StringIO()
shell = apsw.Shell(stdout=output, db=connection)
# How to execute a dot command
shell.process_command(".mode csv")
shell.process_command(".headers on")
# How to execute SQL
shell.process_sql(
    "create table csvtest(col1,col2); insert into csvtest values(3,4); insert into csvtest values('a b', NULL)")
# Let the shell figure out SQL vs dot command
shell.process_complete_line("select * from csvtest")

# Verify output
print(output.getvalue())
| col1,col2
| 3,4
| a b,
|
###
### Statistics
###

print("SQLite memory usage current %d max %d" % apsw.status(apsw.SQLITE_STATUS_MEMORY_USED))
| SQLite memory usage current 315008 max 326872
###
### Cleanup
###

# We can close connections manually (useful if you want to catch exceptions)
# but you don't have to
connection.close(True)  # force it since we want to exit

# Delete database - we don't need it any more
os.remove("dbfile")