Connect Python to Xata

Learn how to connect your Python application to Xata's PostgreSQL platform using the native psycopg2 driver. Get started with Python and PostgreSQL for direct database operations.

Prerequisites

  • Xata account and project setup
  • Python 3.8+ installed
  • Basic knowledge of Python and SQL

Setup Xata Database

First, set up your Xata database with the common e-commerce dataset:

  1. Create a project and branch in the Xata console
  2. Navigate to the Queries tab in your branch
  3. Run the following SQL commands to create the initial schema:
CREATE TABLE products (
  id SERIAL PRIMARY KEY,
  name TEXT NOT NULL,
  price NUMERIC(7,2) NOT NULL,
  rating INTEGER
);

CREATE TABLE orders (
  id SERIAL PRIMARY KEY,
  created_at TIMESTAMPTZ DEFAULT NOW()
);

CREATE TABLE order_items (
  order_id INTEGER REFERENCES orders(id),
  product_id INTEGER REFERENCES products(id),
  qty INTEGER NOT NULL,
  PRIMARY KEY (order_id, product_id)
);

Create Python Project

Create a new Python project:

mkdir my-xata-app
cd my-xata-app
python -m venv venv
source venv/bin/activate  # On Windows: venv\Scripts\activate

Initialize Xata Project

Initialize your Xata project configuration:

xata init

This will create a .xata directory with your project configuration.

Store Your Credentials

Create a .env file in your project root to store your Xata connection string:

# .env
DATABASE_URL="postgresql://username:password@host:port/database"

Get your connection string from the Xata console or CLI:

xata branch url

Important: Never commit your .env file to version control. Add it to your .gitignore file.

Install Dependencies

Install the PostgreSQL driver and environment variable library:

pip install psycopg2-binary python-decouple

Create Database Connection

Create a database connection module:

# db/connection.py
import os
import psycopg2
from psycopg2.extras import RealDictCursor
from decouple import config
from contextlib import contextmanager

class DatabaseConnection:
    def __init__(self):
        self.connection_string = config('DATABASE_URL')
        if not self.connection_string:
            raise ValueError("DATABASE_URL environment variable is required")

    @contextmanager
    def get_connection(self):
        """Context manager for database connections"""
        conn = None
        try:
            conn = psycopg2.connect(self.connection_string)
            yield conn
        except Exception as e:
            if conn:
                conn.rollback()
            raise e
        finally:
            if conn:
                conn.close()

    @contextmanager
    def get_cursor(self, cursor_factory=None):
        """Context manager for database cursors"""
        with self.get_connection() as conn:
            cursor = conn.cursor(cursor_factory=cursor_factory)
            try:
                yield cursor
                conn.commit()
            except Exception as e:
                conn.rollback()
                raise e
            finally:
                cursor.close()

# Global database instance
db = DatabaseConnection()

Create Data Models

Create simple data classes for your entities:

# models/product.py
from dataclasses import dataclass
from decimal import Decimal
from typing import Optional

@dataclass
class Product:
    id: Optional[int]
    name: str
    price: Decimal
    rating: Optional[int]

    @classmethod
    def from_dict(cls, data: dict):
        return cls(
            id=data.get('id'),
            name=data['name'],
            price=Decimal(str(data['price'])),
            rating=data.get('rating')
        )

    def to_dict(self):
        return {
            'id': self.id,
            'name': self.name,
            'price': float(self.price),
            'rating': self.rating
        }
# models/order.py
from dataclasses import dataclass
from datetime import datetime
from typing import Optional, List
from .product import Product

@dataclass
class Order:
    id: Optional[int]
    created_at: Optional[datetime]
    order_items: Optional[List['OrderItem']] = None

    @classmethod
    def from_dict(cls, data: dict):
        return cls(
            id=data.get('id'),
            created_at=data.get('created_at')
        )

    def to_dict(self):
        return {
            'id': self.id,
            'created_at': self.created_at.isoformat() if self.created_at else None
        }

@dataclass
class OrderItem:
    order_id: int
    product_id: int
    qty: int
    product: Optional[Product] = None

    @classmethod
    def from_dict(cls, data: dict):
        return cls(
            order_id=data['order_id'],
            product_id=data['product_id'],
            qty=data['qty'],
            product=Product.from_dict(data['product']) if data.get('product') else None
        )

    def to_dict(self):
        return {
            'order_id': self.order_id,
            'product_id': self.product_id,
            'qty': self.qty,
            'product': self.product.to_dict() if self.product else None
        }

Create Service Layer

Create service classes for database operations:

# services/product_service.py
from decimal import Decimal
from typing import List, Optional
from db.connection import db
from models.product import Product

class ProductService:
    def get_all_products(self) -> List[Product]:
        """Get all products from the database"""
        with db.get_cursor() as cursor:
            cursor.execute("SELECT id, name, price, rating FROM products ORDER BY id")
            products = []
            for row in cursor.fetchall():
                products.append(Product(
                    id=row[0],
                    name=row[1],
                    price=row[2],
                    rating=row[3]
                ))
            return products

    def get_product_by_id(self, product_id: int) -> Optional[Product]:
        """Get a product by its ID"""
        with db.get_cursor() as cursor:
            cursor.execute(
                "SELECT id, name, price, rating FROM products WHERE id = %s",
                (product_id,)
            )
            row = cursor.fetchone()
            if row:
                return Product(
                    id=row[0],
                    name=row[1],
                    price=row[2],
                    rating=row[3]
                )
            return None

    def create_product(self, name: str, price: Decimal, rating: Optional[int] = None) -> Product:
        """Create a new product"""
        with db.get_cursor() as cursor:
            cursor.execute(
                "INSERT INTO products (name, price, rating) VALUES (%s, %s, %s) RETURNING id, name, price, rating",
                (name, price, rating)
            )
            row = cursor.fetchone()
            return Product(
                id=row[0],
                name=row[1],
                price=row[2],
                rating=row[3]
            )

    def update_product(self, product_id: int, name: Optional[str] = None, 
                      price: Optional[Decimal] = None, rating: Optional[int] = None) -> Optional[Product]:
        """Update an existing product"""
        # Build dynamic update query
        updates = []
        params = []
        
        if name is not None:
            updates.append("name = %s")
            params.append(name)
        if price is not None:
            updates.append("price = %s")
            params.append(price)
        if rating is not None:
            updates.append("rating = %s")
            params.append(rating)
        
        if not updates:
            return self.get_product_by_id(product_id)
        
        params.append(product_id)
        query = f"UPDATE products SET {', '.join(updates)} WHERE id = %s RETURNING id, name, price, rating"
        
        with db.get_cursor() as cursor:
            cursor.execute(query, params)
            row = cursor.fetchone()
            if row:
                return Product(
                    id=row[0],
                    name=row[1],
                    price=row[2],
                    rating=row[3]
                )
            return None

    def delete_product(self, product_id: int) -> bool:
        """Delete a product by ID"""
        with db.get_cursor() as cursor:
            cursor.execute("DELETE FROM products WHERE id = %s", (product_id,))
            return cursor.rowcount > 0

    def get_products_by_rating(self, rating: int) -> List[Product]:
        """Get products by rating"""
        with db.get_cursor() as cursor:
            cursor.execute(
                "SELECT id, name, price, rating FROM products WHERE rating = %s ORDER BY name",
                (rating,)
            )
            products = []
            for row in cursor.fetchall():
                products.append(Product(
                    id=row[0],
                    name=row[1],
                    price=row[2],
                    rating=row[3]
                ))
            return products

    def get_products_by_price_range(self, min_price: Decimal, max_price: Decimal) -> List[Product]:
        """Get products within a price range"""
        with db.get_cursor() as cursor:
            cursor.execute(
                "SELECT id, name, price, rating FROM products WHERE price >= %s AND price <= %s ORDER BY price",
                (min_price, max_price)
            )
            products = []
            for row in cursor.fetchall():
                products.append(Product(
                    id=row[0],
                    name=row[1],
                    price=row[2],
                    rating=row[3]
                ))
            return products
# services/order_service.py
from typing import List, Optional, Dict, Any
from db.connection import db
from models.order import Order, OrderItem
from models.product import Product

class OrderService:
    def get_all_orders(self) -> List[Order]:
        """Get all orders"""
        with db.get_cursor() as cursor:
            cursor.execute("SELECT id, created_at FROM orders ORDER BY created_at DESC")
            orders = []
            for row in cursor.fetchall():
                orders.append(Order(
                    id=row[0],
                    created_at=row[1]
                ))
            return orders

    def get_order_by_id(self, order_id: int) -> Optional[Order]:
        """Get an order by ID with its items"""
        with db.get_cursor() as cursor:
            # Get order details
            cursor.execute("SELECT id, created_at FROM orders WHERE id = %s", (order_id,))
            row = cursor.fetchone()
            if not row:
                return None
            
            order = Order(id=row[0], created_at=row[1])
            
            # Get order items with product details
            cursor.execute("""
                SELECT oi.order_id, oi.product_id, oi.qty, 
                       p.id as product_id, p.name, p.price, p.rating
                FROM order_items oi
                JOIN products p ON oi.product_id = p.id
                WHERE oi.order_id = %s
            """, (order_id,))
            
            order_items = []
            for item_row in cursor.fetchall():
                product = Product(
                    id=item_row[3],
                    name=item_row[4],
                    price=item_row[5],
                    rating=item_row[6]
                )
                order_item = OrderItem(
                    order_id=item_row[0],
                    product_id=item_row[1],
                    qty=item_row[2],
                    product=product
                )
                order_items.append(order_item)
            
            order.order_items = order_items
            return order

    def create_order(self) -> Order:
        """Create a new order"""
        with db.get_cursor() as cursor:
            cursor.execute(
                "INSERT INTO orders DEFAULT VALUES RETURNING id, created_at"
            )
            row = cursor.fetchone()
            return Order(
                id=row[0],
                created_at=row[1]
            )

    def add_item_to_order(self, order_id: int, product_id: int, qty: int) -> Optional[OrderItem]:
        """Add an item to an order"""
        with db.get_cursor() as cursor:
            # Check if order and product exist
            cursor.execute("SELECT id FROM orders WHERE id = %s", (order_id,))
            if not cursor.fetchone():
                return None
            
            cursor.execute("SELECT id FROM products WHERE id = %s", (product_id,))
            if not cursor.fetchone():
                return None
            
            # Add order item
            cursor.execute(
                "INSERT INTO order_items (order_id, product_id, qty) VALUES (%s, %s, %s) RETURNING order_id, product_id, qty",
                (order_id, product_id, qty)
            )
            row = cursor.fetchone()
            return OrderItem(
                order_id=row[0],
                product_id=row[1],
                qty=row[2]
            )

    def get_orders_with_product_details(self) -> List[Dict[str, Any]]:
        """Get orders with product details and calculated totals"""
        with db.get_cursor() as cursor:
            cursor.execute("""
                SELECT 
                    o.id as order_id,
                    o.created_at,
                    p.name as product_name,
                    p.price,
                    oi.qty,
                    (p.price * oi.qty) as total
                FROM orders o
                JOIN order_items oi ON o.id = oi.order_id
                JOIN products p ON oi.product_id = p.id
                ORDER BY o.created_at DESC
            """)
            
            results = []
            for row in cursor.fetchall():
                results.append({
                    'order_id': row[0],
                    'created_at': row[1],
                    'product_name': row[2],
                    'price': float(row[3]),
                    'qty': row[4],
                    'total': float(row[5])
                })
            return results

Create Sample Application

Create a simple application to demonstrate the functionality:

# main.py
from decimal import Decimal
from services.product_service import ProductService
from services.order_service import OrderService

def main():
    product_service = ProductService()
    order_service = OrderService()

    try:
        # Create sample products
        print("Creating sample products...")
        products = [
            product_service.create_product("Wireless Headphones", Decimal("99.99"), 5),
            product_service.create_product("Smartphone", Decimal("699.99"), 4),
            product_service.create_product("Laptop", Decimal("1299.99"), 5),
            product_service.create_product("Coffee Maker", Decimal("89.99"), 4)
        ]

        print(f"Created {len(products)} products")

        # Display all products
        print("\nAll products:")
        all_products = product_service.get_all_products()
        for product in all_products:
            rating = product.rating if product.rating else "N/A"
            print(f"- {product.name}: ${product.price} (Rating: {rating}/5)")

        # Get products by rating
        print("\nTop-rated products (5 stars):")
        top_products = product_service.get_products_by_rating(5)
        for product in top_products:
            print(f"- {product.name}: ${product.price}")

        # Create an order with items
        print("\nCreating an order...")
        order = order_service.create_order()
        print(f"Created order #{order.id}")

        # Add items to order
        if products:
            order_service.add_item_to_order(order.id, products[0].id, 2)  # 2 headphones
            if len(products) >= 3:
                order_service.add_item_to_order(order.id, products[2].id, 1)  # 1 laptop

        # Get order details with products
        print("\nOrder details:")
        order_details = order_service.get_orders_with_product_details()
        for detail in order_details:
            print(f"Order #{detail['order_id']}: {detail['product_name']} x{detail['qty']} = ${detail['total']}")

        # Update a product
        if products:
            print("\nUpdating product...")
            updated_product = product_service.update_product(
                products[0].id,
                name="Premium Wireless Headphones",
                price=Decimal("129.99")
            )
            if updated_product:
                print(f"Updated: {updated_product.name} - ${updated_product.price}")

        # Get products by price range
        print("\nProducts between $50 and $150:")
        mid_range_products = product_service.get_products_by_price_range(
            Decimal("50.00"), Decimal("150.00")
        )
        for product in mid_range_products:
            print(f"- {product.name}: ${product.price}")

    except Exception as e:
        print(f"Error: {e}")

if __name__ == "__main__":
    main()

Create Requirements File

Create a requirements.txt file:

# requirements.txt
psycopg2-binary>=2.9.0
python-decouple>=3.8

Run the Application

Install dependencies and run the application:

pip install -r requirements.txt
python main.py

Next Steps