Xata SDK for Python
The Python SDK is available as a PyPI package. It uses type annotations and requires Python 3.8 or higher. The API reference is located at https://xata-py.readthedocs.io.
To install the library enter the following command:
pip install xata
The xata
package is a standalone SDK library that features XataClient
. By installing the package, you can import the SDK into your Python project and start building on top of Xata.
The Python SDK is different from the Xata CLI which can be used to manage your Xata databases from the command line or import the Typescript and Javascript client.
To bootstrap the SDK we recommend you use the workspace URL. In the Web UI, navigate to Workspaces then Configuration. You will be directed to the workspace API base URL. Copy the URL and use it to configure routing.
The URL can be passed to the client as a parameter:
xata = XataClient(db_url="REDACTED_DB_URL")
Or you can set the environment variable: XATA_DATABASE_URL
. The parameter value will take precedence over the environment variable.
The format of the database URL (db_url
) parameter must follow the format of: https://test-123456.us-east-1.xata.sh/db/mydb
. The branch name is not mandatory, it can be either appended to the URL, with a :
as the separator, like this https://test-123456.us-east-1.xata.sh/db/mydb:my-feature-branch
. Alternatively, you can specify the branch_name
with the parameter in XataClient
.
There are multiple options to pass your Xata credentials to the client. Xata will check the following using this order of precedence:
- Parameters passed to the constructor
- Environment variables
- The
.env
file - Via
.xatarc
configuration file
The .xatarc
file is generated by the Xata CLI in the current directory when running the command xata init
.
If the previous options were empty, the client looks for a .env
file in the project root directory. Visit the Authentication page to read more about best practices.
XATA_API_KEY="REDACTED_API_KEY"
XATA_DATABASE_URL="REDACTED_DB_URL"
The .xatarc
configuration file is the final source to retrieve the API key and the workspace.
Refer to the authentication page to learn more about the .xatarc
file and best practices.
If no parameters passed, the client probes the following environment variables for authentication.
export XATA_API_KEY="REDACTED_API_KEY"
export XATA_DATABASE_URL="REDACTED_DB_URL"
from xata.client import XataClient
client = XataClient()
Inject the API key and workspace id
directly into the constructor as a parameter:
from xata.client import XataClient
client = XataClient(api_key="REDACTED_API_KEY", db_url="REDACTED_DB_URL")
There are multiple options to point the SDK to a specific branch, if none is selected, the SDK elects main
as current branch name.
The environment variable XATA_BRANCH
can specify a branch, the SDK will look for the variable and initialize the SDK with that value.
You can also, set the branch with the database URL (db_url
) on SDK init, we want to use the branch feature-042
on the database Planes
, we can initialize the SDK with:
xata = XataClient(db_url="https://test-12345.us-east-1.xata.sh/db/Planes:feature-042")
Another options is to specify the branch_name
in the SDK. If you specify db_url
and branch_name
, the latter will be ignored.
If you want to change the branch for a single request, you can do so with all workspace endpoints, by setting the branch_name
:
xata.records().insert("Planes", record, branch_name="feature-042")
The API surface of the Python SDK is organized into namespaces, with each namespace associated with a specific set of APIs. For instance, the databases
namespace provides access to all the available Xata endpoints for managing databases.
Alternatively, you can directly instantiate a namespace, as demonstrated in this example.
Authentication and API key management | client.authentication() |
Branch management | client.branch() |
Database operations | client.databases() |
User invites management | client.invites() |
Branch schema migrations and history | client.migrations() |
User management | client.users() |
Workspace management | client.workspaces() |
Table records access operations | client.records() |
APIs for searching, querying, filtering, and aggregating records | client.data() |
Database table management | client.table() |
SQL over HTTP | client.sql() |
The endpoints and namespaces are generated from the OpenAPI specification.
The following examples assume the client
variable is an instance of XataClient
with the correct credentials. To learn more about initializing the SDK, refer to the Configuration page.
Each endpoint returns an instance of requests.Response
. If your query reaches the threshold of concurrent connections, the server will respond with a 429
status code and the SDK will throw a RateLimitingException
.
You can refer to the API Reference for the relevant parameters to each method.
More examples are available in the GitHub repository of the SDK, including runnable demo apps for the BulkProcessor, pagination of query results, and leveraging transactions.
Below are the parameters passed to the methods in the following examples:
xata.records().insert()
: table name (str), record (dictionary)xata.records().insert_with_id()
: table name (str), record id (str), record (dictionary)xata.records().update()
: table name (str), record id (str), record (dictionary)xata.records().upsert()
: table name (str), record id (str), record (dictionary)xata.records().get()
: table name (str), record id (str)xata.data().query()
: table name (str), object with "columns", "filter", and "sort"xata.records().delete()
: table name (str), record id (str)xata.records().bulk_insert()
: table name (str), records (dictionary with records array)xata.users().get()
xata.data().ask()
: table name (str), question (str)xata.data().ask_follow_up()
: table name (str), session id (str), question (str)to_rfc3339()
: dt (datetime), tz (timezone, default: utc)xata.table().create()
: table name (str)xata.table().set_schema()
: table name (str), table schema (dictionary)
from xata.client import XataClient
xata = XataClient(db_name="my_db")
record = {
"name": "Peter Parker",
"job": "Spiderman",
}
# Insert record to table "Avengers" and let Xata generate a record Id
resp = xata.records().insert("Avengers", record)
assert resp.is_success()
print("Record Id: %s" % resp["id"])
# Insert record to table "Avengers" with your own record Id "spidey-1"
resp = xata.records().insert_with_id("Avengers", "spidey-1", record)
assert resp.is_success()
# Update the record with Id "spidey-1" in table "Avengers"
record["job"] = "your friendly neighborhood spider man"
resp = xata.records().update("Avengers", "spidey-1", record)
assert resp.is_success()
# Upsert: Update or insert a record
record = {
"name": "Bruce Banner",
"job": "Hulk",
}
resp = xata.records().upsert("Avengers", "hulk-1", record)
# On insert status code = 201
assert resp.status_code == 201
assert resp.is_success()
record["job"] = "the incredible hulk"
resp = xata.records().upsert("Avengers", "spidey-1", record)
# On update status code = 200
assert resp.status_code == 200
assert resp.is_success()
from xata.client import XataClient
client = XataClient(db_name="my_db")
record = {
"name": "Peter Parker",
"job": "Spiderman",
}
# Insert record to table "Avengers" and let Xata generate a record ID
resp = client.records().insertRecord("Avengers", record)
assert resp.status_code == 201
print("Record Id: %s" % resp.json()["id"])
# Insert record to table "Avengers" with your own record ID "spidey-1"
resp = client.records().insertRecordWithID("Avengers", "spidey-1", record)
assert resp.status_code == 201
# Update the record with Id "spidey-1" in table "Avengers"
record["job"] = "your friendly neighborhood spider man"
resp = client.records().updateRecordWithID("Avengers", "spidey-1", record)
assert resp.status_code == 200
# Upsert: Update or insert a record
record = {
"name": "Bruce Banner",
"job": "Hulk",
}
resp = client.records().upsertRecordWithID("Avengers", "hulk-1", record)
# On insert status code = 201
assert resp.status_code == 201
record["job"] = "the incredible hulk"
resp = client.records().upsertRecordWithID("Avengers", "spidey-1", record)
# On update status code = 200
assert resp.status_code == 200
The following example shows how to retrieve a record with the id spidey
, from the table Avengers
and how to handle records that do not exist.
from xata.client import XataClient
records = XataClient().records()
spiderman = records.get("Avengers", "spidey")
print(spiderman.is_success()) # True
print(spiderman)
# {"id": "spidey", "name": "Peter Parker", "job": "spiderman"}
# If the record with the Id does not exist, the status code will be 404
batman = records.get("Avengers", "bruce-wayne")
print(batman.status_code) # 404
print(batman.is_success()) # False
from xata.client import XataClient
records = XataClient().records()
spiderman = records.getRecord("Avengers", "spidey")
print(spiderman.json())
# {"id": "spidey", "name": "Peter Parker", "job": "spiderman"}
# If the record with the Id does not exist, the status code will be 404
batman = records.getRecord("Avengers", "bruce-wayne")
print(batman.status_code)
# 404
The following example shows how to query a table and apply filters and sorts. We will query the table Avengers
, and apply some filters.
from xata.client import XataClient
resp = xata.data().query("Avengers", {
"columns": ["name", "thumbnail"], # the columns we want returned
"filter": { "job": "spiderman" }, # optional filters to apply
"sort": { "name": "desc" } # optional sorting key and order (asc/desc)
})
assert resp.is_success()
print(resp["records"])
# [{"id": "spidey", "name": "Peter Parker", "job": "spiderman"}]
# Note it will be an array, even if there is only one record matching the filters
from xata.client import XataClient
resp = xata.search_and_filter().queryTable("Avengers", {
"columns": ["name", "thumbnail"], # the columns we want returned
"filter": { "job": "spiderman" }, # optional filters to apply
"sort": { "name": "desc" } # optional sorting key and order (asc/desc)
})
# queryTable returns status code = 200
assert resp.status_code == 200
records = resp.json()
print(data["records"])
# [{"id": "spidey", "name": "Peter Parker", "job": "spiderman"}]
# Note it will be an array, even if there is only one record matching the filters
The following example will show you how to paginate through your data, and introduce you to two new convenience methods, response.has_more_results()
and response.get_cursor()
introduced in 1.x
.
We want to limit the records per call to 25, and assume the people
table has a total amount of records is > 25.
So we need to make multiple calls to page through our data.
from xata.client import XataClient
xata = XataClient()
records = xata.data().query("people", {
"page": {
"size": 25 # limit result set to 25 records
}
})
# do something with the data ..
# are more pages available ? If yes keep looping
# through until we reached the last page
while records.has_more_results():
# fetch the next page ...
records = xata.data().query("people", {
"page": {
"after": records.get_cursor() # get the next cursor
}
})
# do something with the data ..
from xata.client import XataClient
xata = XataClient()
records = xata.data().queryTable("people", {
"page": {
"size": 25 # limit result set to 25 records
}
})
# do something with the data ...
# are more pages available ? If yes keep looping
# through until we reached the last page
more = records.json()["meta"]["page"]["more"]
while more:
# fetch the next page ..
records = xata.data().query("people", {
"page": {
"after": records..json()["meta"]["page"]["cursor"] # next cursor
}
})
# do something with the data ..
more = records.json()["meta"]["page"]["more"]
from xata.client import XataClient
records = XataClient().records()
record = records.delete("Avengers", "captain-america")
print(record.status_code) # 204
from xata.client import XataClient
records = XataClient().records()
record = records.deleteRecord("Avengers", "captain-america")
print(record.status_code) # 204
The Bulk Processor is a helpful tool for performing bulk operations using the Xata Python client. It abstracts away the complexity of managing bulk operations, making it easier to perform large-scale insert operations. Using the Bulk Processor is recommended for bulk operations in Xata.
from xata.client import XataClient
xata = XataClient(api_key="REDACTED_API_KEY", db_name="my_db", branch_name="feature-042")
avengers = [
{"name": "Peter Parker", "job": "Spiderman"},
{"name": "Bruce Banner", "job": "Hulk"},
{"name": "Steve Rodgers Parker", "job": "Captain America"},
{"name": "Tony Stark", "job": "Iron Man"},
]
resp = xata.records().bulk_insert("Avengers", {"records": avengers})
assert resp.is_success()
from xata.client import XataClient
client = XataClient(api_key="REDACTED_API_KEY", db_name="my_db", branch_name="feature-042")
avengers = [
{"name": "Peter Parker", "job": "Spiderman"},
{"name": "Bruce Banner", "job": "Hulk"},
{"name": "Steve Rodgers Parker", "job": "Captain America"},
{"name": "Tony Stark", "job": "Iron Man"},
]
resp = client.records().bulkInsertTableRecords("Avengers", {"records": avengers})
assert resp.status_code == 200
In this scenario we are uploading an on disk hosted file into a table Photos
through the records API. For more examples please consult the file attachment examples.
xata = XataClient()
upload = xata.records().insert("Photos", {
"title": "My new uploaded photo",
"photo": {
"name": "Nordkette, Innsbruck",
"mediaType": "image/jpeg",
"base64Content": "Tm9yZGtldHRlLCBJbm5zYnJ1Y2ssIFR5cm9sLCBBdXN0cmlhLCBMb3JlbSBJcHN1bSBsb3Jv",
"enablePublicUrl": True
}
})
assert upload.is_success()
xata = XataClient()
upload = xata.records().insertRecord("Photos", {
"title": "My new uploaded photo",
"photo": {
"name": "Nordkette, Innsbruck",
"mediaType": "image/jpeg",
"base64Content": "Tm9yZGtldHRlLCBJbm5zYnJ1Y2ssIFR5cm9sLCBBdXN0cmlhLCBMb3JlbSBJcHN1bSBsb3Jv",
"enablePublicUrl": True
}
})
assert upload.status_code == 201
The following snippets shows you a very simple way to read a file from disk and encode it to base 64:
file_name = "photos/nordkette.jpg"
with open(file_name, "rb") as f:
file_content = f.read()
b64 = base64.b64encode(file_content).decode("ascii")
The ask
endpoint uses search or similarity search algorithms to find relevant information from your database. Please refer to this page to get more information about the endpoint.
xata = XataClient()
answer = xata.data().ask("xata", "does xata have a python sdk")
assert answer.is_success()
print("Answer : %s" % answer["answer"])
print("Records: %s" % answer["records"])
print("Session: %s" % answer["sessionId"])
# Ask a follow up question using the sessionId from the previous question
follow_up = xata.data().ask_follow_up("xata", answer["sessionId"], "what is the best way to do bulk?")
assert follow_up.is_success()
print("Answer : %s" % follow_up["answer"])
print("Records: %s" % follow_up["records"])
client = XataClient(api_key="REDACTED_API_KEY", workspace_id="REDACTED_WS_ID")
client.set_db_and_branch_names("harry-potter", "main")
question_to_ask = "is harry potter able to fly?"
resp = client.search_and_filter().askTable(my_table_name, {
"question": question_to_ask
})
assert resp.status_code == 200
print("Answer : %s" % resp.json()["answer"])
print("Records: %s" % resp.json()["records"])
# [!] follow up questions only available in 1.x version of the SDK
from xata.client import XataClient
xata = XataClient()
user = xata.users().get()
print(user.status_code) # 200
from xata.client import XataClient
client = XataClient()
user = client.users().getUser()
print(user.status_code) # 200
To use the datetime
data type in Xata, you must provide an RFC 3339 compliant string.
The Xata SDK provides a convenient to_rfc3339()
helper function to simplify the submission of Python native datetime
values. You can pass the datetime
object as a parameter to the helper function, and it will convert the value to a RFC 3339 compliant string.
To specify a timezone, you can do so by using the optional timezone
argument. If no timezone is specified, UTC time is applied by default.
The to_rfc3339()
helper function was introduced with the v0.9.0
release.
# import the helper function
from xata.helpers import to_rfc339
from datetime import datetime
#
my_date = datetime.strptime("2023-03-20 13:42:00", "%Y-%m-%d %H:%M:%S")
print(to_rfc339(my_date))
> "2023-03-20T13:42:00+00:00"
# with time
date_without_time = datetime.strptime("2023-03-20", "%Y-%m-%d")
print(to_rfc339(date_without_time))
> "2023-03-20T00:00:00+00:00"
# With a timezone
from pytz import timezone
date_with_tz = datetime.strptime("2023-03-20 13:42:16", "%Y-%m-%d %H:%M:%S")
tz_europe_vienna = timezone("Europe/Vienna")
print(to_rfc339(date_with_tz, tz_europe_vienna))
> "2023-03-20T13:42:16+01:05"
The Xata SDK provides schema editing operations under the Table class.
You can create tables, add and delete columns, or set the full table schema.
from xata.client import XataClient
xata = XataClient()
table_schema = {
"columns": [
{
"name": "title",
"type": "string",
},
{
"name": "description",
"type": "text",
"notNull": True,
"defaultValue": "Default text"
}
]
}
assert xata.table().create("mytable").is_success()
resp = xata.table().set_schema("mytable", table_schema)
# if the operation failed, print the server response
assert resp.is_success(), resp
If you want to ingest multiple records in Xata, using bulk
is the most efficient way. The BulkProcessor
, a helper of the Python SDK, aims to make the process even simpler by abstracting away any complexity of juggling concurrent workers or chunking data, or maintaining queues.
You can use the BulkProcessor
to e.g. ingest a CSV file into Xata or read documents from a queue and delegate the ingestion to the processor.
Two methods are available to put data in the processing queue:
bp.put_records(":table", ":records")
to add multiple recordsbp.put_record(":table", ":record")
to add only one record
Using the option of multiples, bp.add_records(":table", ":records")
is more efficient as it requires less locking of the internal data structures.
You can tweak the processor to your needs if necessary:
thread_pool_size
: How many data queue workers should be deployed (default: 4)batch_size
: How many records per table should be pushed as batch (default: 25)flush_interval
: After how many seconds should the per table queue be flushed (default: 5 seconds)processing_timeout
: Cooldown period between batches (default: 0.025 seconds)
If data is coming slowly, e.g. > 1 record / second, it's reasonable to have fewer threads deployed and decrease the batch_size
and flush_interval
to get documents in faster.
from xata.client import XataClient
from xata.helpers import BulkProcessor
client = XataClient()
bp = BulkProcessor(client)
# The dict keys match the columns in the destination table "Users"
data = [
{"name": "Max Musterman", "email": "max@acme.co"},
{"name": "Ida von Klammer", "email": "ida@acme.co"},
# ... more records
{"name": "Mia Diaz", "email": "mia@acme.co"},
]
# Add records to processor
bp.put_records("Users", data)
# Ensure the Processing queue is flushed before the script terminates.
# This command will halt the script until all records have been pushed.
bp.flush_queue()
from xata.client import XataClient
from xata.helpers import BulkProcessor
client = XataClient()
bp = BulkProcessor(client)
# Sub to a queue and continuously read messages
while queue.subscribed():
msg = queue.read()
# Reading destination and data from queue
table_name = msg["table"]
record = msg["data"]
# Add records to processor
bp.put_record(table_name, record)
# Ensure the Processing queue is flushed before the script terminates.
# This command will halt the script until all records have been pushed.
bp.flush_queue()
The SDK builds on the requests package and includes built-in logging support. By configuring the log level to debug, you can view the URL and status code by default.
The BulkProcessor
just like the client
use the logging
package and will emit logs.
If you are currently a user of a 0.x
version of the Python SDK, and intend to upgrade the SDK version to 1.x
, please consider the following breaking changes. We documented all changes to promoted the SDK to GA in the ticket xataio/xata-py#24.
Status: future breaking
xataio/xata-py#101
The response of an API used to of type requests.Response
, this changed in favour of a custom class called ApiResponse
. This class inherits the build in dict
data type and hence allows to drop the previously needed .json()
method to get access to data. The method .json()
is still available, for a non-breaking upgrade, but pronounced deprecated and will be removed with the next major release. The properties status_code
and headers
are replicated. For further convenience you don't need to check the status code anymore, you can simply use the method is_success()
to check if the request was successful.
In 0.x
getting a record required the .json()
to access the data.
post = xata.records().getRecord("Posts", "12345")
print(post.json()) # {"record": {"id": "12345", "title": "a blog post"}}
In 1.0.0
the response is implied to be a dict
and ready to be used.
post = xata.records().get("Posts", "12345")
print(post) # {"record": {"id": "12345", "title": "a blog post"}}
# alternatively, the legacy way using .json()
post = xata.records().get("Posts", "12345")
print(post.json()) # {"record": {"id": "12345", "title": "a blog post"}}
Status: breaking
xataio/xata-py#93
The API surface was clunky with duplicate names, creating long function calls and did not comply with pep-8 function names.
Listing all databases in 0.x
lst = xata.databases().getDatabaseList()
changed to a more concise call in 1.x
:
lst = xata.databases().list()
Continuously the function arguments are also converted to the pep-8 standard, lowercased and separated by an underscore, SessionId
becomes session_id
.
Status: breaking
In order to align with the pep-8 standard that defines the naming to exceptions, the following classes were renamed:
UnauthorizedException
toUnauthorizedError
RateLimitException
toRateLimitError
ServerErrorException
toXataServerError
If a requests results in a HTTP status code not in the 2xx
range, an error will be raised. The errors depend on the status code:
401
:UnauthorizedError
429
:RateLimitError
500
:XataServerError
Status: breaking
xataio/xata-py#17
The following methods were pronounced deprecated with the 0.7.0
release of the SDK and were removed with the 1.0.0
major version:
client.get()
client.post()
client.put()
client.patch()
client.delete()
client.request()