Want to find the answer quick?
Bulk Processor
If you want to ingest multiple records in Xata, using bulk
is the most efficient way. The BulkProcessor
, a helper of the Python SDK, aims to make the process even simpler by abstracting away any complexity of juggling concurrent workers or chunking data, or maintaining queues.
You can use the BulkProcessor
to e.g. ingest a CSV file into Xata or read documents from a queue and delegate the ingestion to the processor.
Two methods are available to put data in the processing queue:
bp.put_records(":table", ":records")
to add multiple recordsbp.put_record(":table", ":record")
to add only one record
Using the option of multiples, bp.add_records(":table", ":records")
is more efficient as it requires less locking of the internal data structures.
Configuration Options
You can tweak the processor to your needs if necessary:
thread_pool_size
: How many data queue workers should be deployed (default: 4)batch_size
: How many records per table should be pushed as batch (default: 25)flush_interval
: After how many seconds should the per table queue be flushed (default: 5 seconds)processing_timeout
: Cooldown period between batches (default: 0.025 seconds)
If data is coming slowly, e.g. > 1 record / second, it's reasonable to have fewer threads deployed and decrease the batch_size
and flush_interval
to get documents in faster.
How to ingest many records
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
from xata.client import XataClient from xata.helpers import BulkProcessor client = XataClient() bp = BulkProcessor(client) # The dict keys match the columns in the destination table "Users" data = [ {"name": "Max Musterman", "email": "max@acme.co"}, {"name": "Ida von Klammer", "email": "ida@acme.co"}, # ... more records {"name": "Mia Diaz", "email": "mia@acme.co"}, ] # Add records to processor bp.put_records("Users", data) # Ensure the Processing queue is flushed before the script terminates. # This command will halt the script until all records have been pushed. bp.flush_queue()
How to ingest data from a queue
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
from xata.client import XataClient from xata.helpers import BulkProcessor client = XataClient() bp = BulkProcessor(client) # Sub to a queue and continously read messages while queue.subscribed(): msg = queue.read() # Reading destination and data from queue table_name = msg["table"] record = msg["data"] # Add records to processor bp.put_record(table_name, record) # Ensure the Processing queue is flushed before the script terminates. # This command will halt the script until all records have been pushed. bp.flush_queue()
Logging
The BulkProcessor
just like the client
use the logging
package and will emit logs.