Connect Python to Xata
Learn how to connect your Python application to Xata's PostgreSQL platform using the native psycopg2 driver. Get started with Python and PostgreSQL for direct database operations.
Prerequisites
- Xata account and project setup
- Python 3.8+ installed
- Basic knowledge of Python and SQL
Setup Xata Database
First, set up your Xata database with the common e-commerce dataset:
- Create a project and branch in the Xata console
- Navigate to the Queries tab in your branch
- Run the following SQL commands to create the initial schema:
CREATE TABLE products (
id SERIAL PRIMARY KEY,
name TEXT NOT NULL,
price NUMERIC(7,2) NOT NULL,
rating INTEGER
);
CREATE TABLE orders (
id SERIAL PRIMARY KEY,
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE TABLE order_items (
order_id INTEGER REFERENCES orders(id),
product_id INTEGER REFERENCES products(id),
qty INTEGER NOT NULL,
PRIMARY KEY (order_id, product_id)
);
Create Python Project
Create a new Python project:
mkdir my-xata-app
cd my-xata-app
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
Initialize Xata Project
Initialize your Xata project configuration:
xata init
This will create a .xata
directory with your project configuration.
Store Your Credentials
Create a .env
file in your project root to store your Xata connection string:
# .env
DATABASE_URL="postgresql://username:password@host:port/database"
Get your connection string from the Xata console or CLI:
xata branch url
Important: Never commit your .env
file to version control. Add it to your .gitignore
file.
Install Dependencies
Install the PostgreSQL driver and environment variable library:
pip install psycopg2-binary python-decouple
Create Database Connection
Create a database connection module:
# db/connection.py
import os
import psycopg2
from psycopg2.extras import RealDictCursor
from decouple import config
from contextlib import contextmanager
class DatabaseConnection:
def __init__(self):
self.connection_string = config('DATABASE_URL')
if not self.connection_string:
raise ValueError("DATABASE_URL environment variable is required")
@contextmanager
def get_connection(self):
"""Context manager for database connections"""
conn = None
try:
conn = psycopg2.connect(self.connection_string)
yield conn
except Exception as e:
if conn:
conn.rollback()
raise e
finally:
if conn:
conn.close()
@contextmanager
def get_cursor(self, cursor_factory=None):
"""Context manager for database cursors"""
with self.get_connection() as conn:
cursor = conn.cursor(cursor_factory=cursor_factory)
try:
yield cursor
conn.commit()
except Exception as e:
conn.rollback()
raise e
finally:
cursor.close()
# Global database instance
db = DatabaseConnection()
Create Data Models
Create simple data classes for your entities:
# models/product.py
from dataclasses import dataclass
from decimal import Decimal
from typing import Optional
@dataclass
class Product:
id: Optional[int]
name: str
price: Decimal
rating: Optional[int]
@classmethod
def from_dict(cls, data: dict):
return cls(
id=data.get('id'),
name=data['name'],
price=Decimal(str(data['price'])),
rating=data.get('rating')
)
def to_dict(self):
return {
'id': self.id,
'name': self.name,
'price': float(self.price),
'rating': self.rating
}
# models/order.py
from dataclasses import dataclass
from datetime import datetime
from typing import Optional, List
from .product import Product
@dataclass
class Order:
id: Optional[int]
created_at: Optional[datetime]
order_items: Optional[List['OrderItem']] = None
@classmethod
def from_dict(cls, data: dict):
return cls(
id=data.get('id'),
created_at=data.get('created_at')
)
def to_dict(self):
return {
'id': self.id,
'created_at': self.created_at.isoformat() if self.created_at else None
}
@dataclass
class OrderItem:
order_id: int
product_id: int
qty: int
product: Optional[Product] = None
@classmethod
def from_dict(cls, data: dict):
return cls(
order_id=data['order_id'],
product_id=data['product_id'],
qty=data['qty'],
product=Product.from_dict(data['product']) if data.get('product') else None
)
def to_dict(self):
return {
'order_id': self.order_id,
'product_id': self.product_id,
'qty': self.qty,
'product': self.product.to_dict() if self.product else None
}
Create Service Layer
Create service classes for database operations:
# services/product_service.py
from decimal import Decimal
from typing import List, Optional
from db.connection import db
from models.product import Product
class ProductService:
def get_all_products(self) -> List[Product]:
"""Get all products from the database"""
with db.get_cursor() as cursor:
cursor.execute("SELECT id, name, price, rating FROM products ORDER BY id")
products = []
for row in cursor.fetchall():
products.append(Product(
id=row[0],
name=row[1],
price=row[2],
rating=row[3]
))
return products
def get_product_by_id(self, product_id: int) -> Optional[Product]:
"""Get a product by its ID"""
with db.get_cursor() as cursor:
cursor.execute(
"SELECT id, name, price, rating FROM products WHERE id = %s",
(product_id,)
)
row = cursor.fetchone()
if row:
return Product(
id=row[0],
name=row[1],
price=row[2],
rating=row[3]
)
return None
def create_product(self, name: str, price: Decimal, rating: Optional[int] = None) -> Product:
"""Create a new product"""
with db.get_cursor() as cursor:
cursor.execute(
"INSERT INTO products (name, price, rating) VALUES (%s, %s, %s) RETURNING id, name, price, rating",
(name, price, rating)
)
row = cursor.fetchone()
return Product(
id=row[0],
name=row[1],
price=row[2],
rating=row[3]
)
def update_product(self, product_id: int, name: Optional[str] = None,
price: Optional[Decimal] = None, rating: Optional[int] = None) -> Optional[Product]:
"""Update an existing product"""
# Build dynamic update query
updates = []
params = []
if name is not None:
updates.append("name = %s")
params.append(name)
if price is not None:
updates.append("price = %s")
params.append(price)
if rating is not None:
updates.append("rating = %s")
params.append(rating)
if not updates:
return self.get_product_by_id(product_id)
params.append(product_id)
query = f"UPDATE products SET {', '.join(updates)} WHERE id = %s RETURNING id, name, price, rating"
with db.get_cursor() as cursor:
cursor.execute(query, params)
row = cursor.fetchone()
if row:
return Product(
id=row[0],
name=row[1],
price=row[2],
rating=row[3]
)
return None
def delete_product(self, product_id: int) -> bool:
"""Delete a product by ID"""
with db.get_cursor() as cursor:
cursor.execute("DELETE FROM products WHERE id = %s", (product_id,))
return cursor.rowcount > 0
def get_products_by_rating(self, rating: int) -> List[Product]:
"""Get products by rating"""
with db.get_cursor() as cursor:
cursor.execute(
"SELECT id, name, price, rating FROM products WHERE rating = %s ORDER BY name",
(rating,)
)
products = []
for row in cursor.fetchall():
products.append(Product(
id=row[0],
name=row[1],
price=row[2],
rating=row[3]
))
return products
def get_products_by_price_range(self, min_price: Decimal, max_price: Decimal) -> List[Product]:
"""Get products within a price range"""
with db.get_cursor() as cursor:
cursor.execute(
"SELECT id, name, price, rating FROM products WHERE price >= %s AND price <= %s ORDER BY price",
(min_price, max_price)
)
products = []
for row in cursor.fetchall():
products.append(Product(
id=row[0],
name=row[1],
price=row[2],
rating=row[3]
))
return products
# services/order_service.py
from typing import List, Optional, Dict, Any
from db.connection import db
from models.order import Order, OrderItem
from models.product import Product
class OrderService:
def get_all_orders(self) -> List[Order]:
"""Get all orders"""
with db.get_cursor() as cursor:
cursor.execute("SELECT id, created_at FROM orders ORDER BY created_at DESC")
orders = []
for row in cursor.fetchall():
orders.append(Order(
id=row[0],
created_at=row[1]
))
return orders
def get_order_by_id(self, order_id: int) -> Optional[Order]:
"""Get an order by ID with its items"""
with db.get_cursor() as cursor:
# Get order details
cursor.execute("SELECT id, created_at FROM orders WHERE id = %s", (order_id,))
row = cursor.fetchone()
if not row:
return None
order = Order(id=row[0], created_at=row[1])
# Get order items with product details
cursor.execute("""
SELECT oi.order_id, oi.product_id, oi.qty,
p.id as product_id, p.name, p.price, p.rating
FROM order_items oi
JOIN products p ON oi.product_id = p.id
WHERE oi.order_id = %s
""", (order_id,))
order_items = []
for item_row in cursor.fetchall():
product = Product(
id=item_row[3],
name=item_row[4],
price=item_row[5],
rating=item_row[6]
)
order_item = OrderItem(
order_id=item_row[0],
product_id=item_row[1],
qty=item_row[2],
product=product
)
order_items.append(order_item)
order.order_items = order_items
return order
def create_order(self) -> Order:
"""Create a new order"""
with db.get_cursor() as cursor:
cursor.execute(
"INSERT INTO orders DEFAULT VALUES RETURNING id, created_at"
)
row = cursor.fetchone()
return Order(
id=row[0],
created_at=row[1]
)
def add_item_to_order(self, order_id: int, product_id: int, qty: int) -> Optional[OrderItem]:
"""Add an item to an order"""
with db.get_cursor() as cursor:
# Check if order and product exist
cursor.execute("SELECT id FROM orders WHERE id = %s", (order_id,))
if not cursor.fetchone():
return None
cursor.execute("SELECT id FROM products WHERE id = %s", (product_id,))
if not cursor.fetchone():
return None
# Add order item
cursor.execute(
"INSERT INTO order_items (order_id, product_id, qty) VALUES (%s, %s, %s) RETURNING order_id, product_id, qty",
(order_id, product_id, qty)
)
row = cursor.fetchone()
return OrderItem(
order_id=row[0],
product_id=row[1],
qty=row[2]
)
def get_orders_with_product_details(self) -> List[Dict[str, Any]]:
"""Get orders with product details and calculated totals"""
with db.get_cursor() as cursor:
cursor.execute("""
SELECT
o.id as order_id,
o.created_at,
p.name as product_name,
p.price,
oi.qty,
(p.price * oi.qty) as total
FROM orders o
JOIN order_items oi ON o.id = oi.order_id
JOIN products p ON oi.product_id = p.id
ORDER BY o.created_at DESC
""")
results = []
for row in cursor.fetchall():
results.append({
'order_id': row[0],
'created_at': row[1],
'product_name': row[2],
'price': float(row[3]),
'qty': row[4],
'total': float(row[5])
})
return results
Create Sample Application
Create a simple application to demonstrate the functionality:
# main.py
from decimal import Decimal
from services.product_service import ProductService
from services.order_service import OrderService
def main():
product_service = ProductService()
order_service = OrderService()
try:
# Create sample products
print("Creating sample products...")
products = [
product_service.create_product("Wireless Headphones", Decimal("99.99"), 5),
product_service.create_product("Smartphone", Decimal("699.99"), 4),
product_service.create_product("Laptop", Decimal("1299.99"), 5),
product_service.create_product("Coffee Maker", Decimal("89.99"), 4)
]
print(f"Created {len(products)} products")
# Display all products
print("\nAll products:")
all_products = product_service.get_all_products()
for product in all_products:
rating = product.rating if product.rating else "N/A"
print(f"- {product.name}: ${product.price} (Rating: {rating}/5)")
# Get products by rating
print("\nTop-rated products (5 stars):")
top_products = product_service.get_products_by_rating(5)
for product in top_products:
print(f"- {product.name}: ${product.price}")
# Create an order with items
print("\nCreating an order...")
order = order_service.create_order()
print(f"Created order #{order.id}")
# Add items to order
if products:
order_service.add_item_to_order(order.id, products[0].id, 2) # 2 headphones
if len(products) >= 3:
order_service.add_item_to_order(order.id, products[2].id, 1) # 1 laptop
# Get order details with products
print("\nOrder details:")
order_details = order_service.get_orders_with_product_details()
for detail in order_details:
print(f"Order #{detail['order_id']}: {detail['product_name']} x{detail['qty']} = ${detail['total']}")
# Update a product
if products:
print("\nUpdating product...")
updated_product = product_service.update_product(
products[0].id,
name="Premium Wireless Headphones",
price=Decimal("129.99")
)
if updated_product:
print(f"Updated: {updated_product.name} - ${updated_product.price}")
# Get products by price range
print("\nProducts between $50 and $150:")
mid_range_products = product_service.get_products_by_price_range(
Decimal("50.00"), Decimal("150.00")
)
for product in mid_range_products:
print(f"- {product.name}: ${product.price}")
except Exception as e:
print(f"Error: {e}")
if __name__ == "__main__":
main()
Create Requirements File
Create a requirements.txt
file:
# requirements.txt
psycopg2-binary>=2.9.0
python-decouple>=3.8
Run the Application
Install dependencies and run the application:
pip install -r requirements.txt
python main.py
Next Steps
- Explore Xata branching for development workflows
- Learn about schema changes with zero downtime
- Join our Discord community for help and support