Indexing File Attachments with the Python SDK

Extracting text from File Attachments and indexing it with Python.

Written by

Kostas Botsas

Published on

September 14, 2023

The all-new File Attachments column types enable storing files in your database alongside other common column data types. Once uploaded, the attachment lives in Xata’s object store and it can be reached as a column value with additional metadata such as mediaType, size and attributes, which can be used in queries, filters and summaries. Xata’s API provides several methods for File Access, including Authenticated, Public and Signed URLs.

The actual file content is not indexed by default, though. Indexing refers to the process of extracting a file’s text content and storing it under a text column, where it can be queried and searched. How can we use Xata’s advanced features such as free-text search and Ask AI on our file attachment’s actual content?

In this article we are exploring methods for indexing File Attachments using Xata’s Python SDK, which is now in General Availability.

#

Overview

The xfileindex script in the xtools repo can be used to extract content from text or PDF file attachments stored in Xata and indexed as raw text, which allows us to use the content with query, free-text search, and Ask AI capabilities.

python xfileindex.py --db https://{workspace}.{region}.xata.sh/db/{db} --table mytable --columns myfile,mymultiplefiles

The README file provides usage examples as well as the full list of parameters.

Python is a programming language that comes pre-installed on many popular OS distributions, but it can also be downloaded and installed on any platform. The Python package installer, pip, almost always exists alongside Python wherever it is installed (but there are other ways to install pip). Pip fetches modules from the official Python Package Index(PyPI), where you can find the Xata Python SDK package. It can be installed with the command pip install xata - check the documentation for more details.

With the Xata Python SDK installed, we can import the XataClient in our code (from xata.client import XataClient) and interact with our Xata database.

Note that the Python version in your system (python --version) should be 3.8 or higher, in order to use the script.

Now that we’ve covered how to use Xata with Python, let’s jump straight to the point of how to index file attachments.

We will assume that you’ve already uploaded some files in your database under the file or file[] column type. Check out the File Attachments documentation for more details on those column types.

Our script scrolls through the Xata database, looks for file or file[] columns with text, CSV or PDF content (mediaType), extracts the text and writes it into text columns in a target table. This is all you need to put the file content to work with query, search, and Ask AI features!

Here are the essential steps for running this script:

  1. Clone the xtools repo:
git clone https://github.com/xataio/xtools

This repo is a library of open source helper scripts provided by Xata, but we welcome community contributions. 2. Navigate to the directory:

cd xtools/xfileindex
  1. Install the required packages using pip:
pip install -r requirements.txt

The script uses some extra packages (xata, PyPDF2) which are not typically available with the default Python installed in your system, so we need to make sure they’re installed before we proceed. 4. Set your Xata API key as an environment variable on your terminal. You can generate an API key in the account settings page. Alternatively, you can use a .env file, to store the API key.

export XATA_API_KEY="xau_mykey”
  1. We need to gather a few parameters about our database and schema:
  • The database endpoint URL. You can find that on the Settings page of your database, under “Database endpoint”.
  • The table name where our files are stored.
  • The name(s) of the column(s) where your files are stored.
  1. Knowing all of the above, you can run the script as follows:

    python xfileindex.py --db https://{workspace}.{region}.xata.sh/db/{db} --table tablename --columns firstcolumn,secondcolumn

Let’s take a moment to review the output and dive deeper into how things work.

The script creates a new table mytableIndex, by appending "Index" to the source table name. Alternatively you can specify your preferred name for the target table by providing the dest parameter (--dest customTableName). Also, in case your table lives in a branch other than main, you can add the branch parameter (--branch dev).

The script opens a “scroll”, a cursor-based query to the table, which goes through records looking for content under the specified columns (firstcolumn, secondcolumn). An intermediate function ensure_target_table makes sure the target table schema is compatible with the output that is going to be generated by the script, but you won’t typically be concerned about that - unless you want to customize the schema of the resulting table, in which case feel free to dive into the relevant code section.

Here is how Xata's “scroll” works in Python:

xata = XataClient(db_url=f"{TARGET_DB}:{BRANCH}")
querypayload = {"columns": COLUMNS_TO_INDEX, "page": {"size": PAGE_SIZE}}
more = True
while more:
  response = xata.data().query(SOURCE_TABLE, querypayload)
  if response.is_success():
    process_response(xata, response)
    more = response.has_more_results()
    if more:
      page = {"after": response.get_cursor(), "size": PAGE_SIZE}
      querypayload = {"columns": COLUMNS_TO_INDEX, "page": page}

An overview of the variables in use include:

  • COLUMNS_TO_INDEX: an array with the content of the --columns input parameter.

  • PAGE_SIZE: the number of records to fetch from Xata at once with every scroll iteration. The query does not fetch the file content at this stage, just metadata informing you about the file and its format. Using the maximum page size of 200, it fetches as many records as possible in a single round trip which is usually the fastest approach.

  • SOURCE_TABLE: the content of the --table input parameter.

Alongside the actual records, you also get:

  • a flag that you can check with has_more_results() which informs you whether there are more records matching your query (containing content in the specified columns).
  • a cursor that we retrieve with get_cursor() which points the following query to the next page of results (if any).

Text extraction takes place in the function process_response(xata, response).

As shown earlier, the scroll query fetches records containing file metadata, but not the file content. After you find a record and column where a file lives, you can proceed to download the file’s content.

You can determine whether the file lives in a single file or file array column by retrieving the table schema and looking into the column type defined in it. Just to keep things simple we can also extrapolate this based on whether the column response is a dictionary (single file) or an array (hence, potentially multiple files):

if column in record and type(record[column]) == dict:
    column_type = "single_file"
elif column in record and type(record[column]) == list:
    column_type = "multiple_files"

Depending on whether the column you're working with is a file or file[], you download the file content using the xata.files().get or xata.files.get_item() call respectively:

if column_type == "single_file":
  file = xata.files().get(
    SOURCE_TABLE,
    record["id"],
    column
  )
elif column_type == "multiple_files":
  file = xata.files().get_item(
    SOURCE_TABLE,
    record["id"],
    column,
    column_file["id"]
  )

The difference between the two methods is that content from single files (.get()) is retrieved using the combination of record ID and column name, while content from file arrays (.get_item()) additionally requires the ID of the file - since files stored in an array column also get a file ID (automatically or explicitly assigned) so they can be identified in the array.

File column metadata include the mediaType attribute, which informs you of the file’s type. Use this information to process content accordingly for plain text, CSV or PDF files:

def process_file(file, mediaType):
  mediaType = mediaType.lower()
  if mediaType.startswith("text/plain") or mediaType.startswith("text/csv"):
    chunked_text = process_text_file(file)
  elif mediaType.startswith("application/pdf"):
    chunked_text = process_pdf_file(file)
  else:
    chunked_text = []
  return chunked_text

NOTE: CSV files are treated as plain text in the context of this script. We recommend using the purpose-built iImport CSV](/docs/csv-data/import-data) feature for ingesting CSV files in a schema that mirrors the CSV file structure.

Text files are split to chunks in order to fit into Xata’s text column type, which can store a maximum payload size of 200KB as stated in the limits page. The default value of the optional maxchunk parameter, which sets the MAX_TEXT_COLUMN_LENGTH variable, is 200000 (characters) to satisfy this requirement.

The process_text_file method then calls the wrap function from the textwrap module, to create chunked text. The result is a list of paragraphs with the requested max width.

def process_text_file(file):
  chunked_text = wrap(
    str(file.content.decode(ENCODING)),
    width=MAX_TEXT_COLUMN_LENGTH,
    drop_whitespace=False,
    break_on_hyphens=False,
    expand_tabs=False,
    replace_whitespace=False,
  )
  return chunked_text

The process_pdf_file method calls the PdfReader function from the PyPDF2 module to extract text from each page of the PDF. Additionally, in case a page contains more text than specified by the maxchunk parameter, it applies the same text processing wrap function which may split the PDF page to more chunks if necessary.

def process_pdf_file(file):
  with BytesIO(file.content) as open_pdf_file:
    reader = PdfReader(open_pdf_file)
    chunked_text = []
    for page_iterator in range(len(reader.pages)):
      pdf_page = reader.pages[page_iterator]
      extracted_text = pdf_page.extract_text()
      chunks = wrap(
        str(extracted_text),
        width=MAX_TEXT_COLUMN_LENGTH,
        drop_whitespace=False,
        break_on_hyphens=False,
        expand_tabs=False,
        replace_whitespace=False,
      )
      chunked_text.extend(chunks)
    return chunked_text

Similar functions can be implemented for other mediaTypes to support more file types, but that may require installing additional libraries and software (such as Pandoc).

Now that you have extracted text from the file attachment into a chunked_text array containing chunks of appropriate max length, it is time to ingest them into Xata. The ingest_chunks method does just that, while offering a few options regarding our approach for overwriting records and ingesting content in bulks.

In other words: do we prefer to overwrite text content for the same files if it already exists (i.e. when re-running the script), or do we always append new records?

Every record in a table is uniquely identified by the built-in id column. By specifying a deterministic id pattern you can overwrite records if they already exist. Otherwise if you don’t specify any ID for a record, Xata automatically assigns one, which means you write a new record every time you index a chunk of text, potentially creating duplicates if the script runs multiple times on the same tables. The preferred approach can be specified with the --id input parameter, providing a value between deterministic or random.

With the deterministic approach, record IDs are generated by concatenating the origin record’s ID, the source table name, the source column name and an iterator for each chunk of text from this file. Additionally, for file[] columns, include the ID of the files in the array.

if ID_STRATEGY == "deterministic":
  if column_type == "single_file":
    chunk_rec_id = (
      f'{source_record["id"]}-{SOURCE_TABLE}-{column}-{chunk_iterator}'
    )
  elif column_type == "multiple_files":
    chunk_rec_id = f'{source_record["id"]}-{SOURCE_TABLE}-{column}-{column_file["id"]}-{chunk_iterator}'

The deterministic ID approach requires the use of the upsert method, or the update transaction operation with the upsert parameter set to true, to replace existing records.

With the non-deterministic (random) approach, you simply do not include any ID when writing to Xata with the insert method or insert transaction operation, which automatically assigns a new ID to the created record.

The mode parameter allows for selection between the atomic or transaction indexing approach. In both cases when emitting text content to Xata the response code should be checked for the retriable status code 429, returned in case you hit the branch rate limits.

“Atomic” means writing one record, a single chunk of text, with each HTTP/S request to Xata.

  • Atomic requests with deterministic record IDs use the upsert method:
if ID_STRATEGY == "deterministic":
  #...
  if MODE == "atomic":
    resp = xata.records().upsert(TARGET_TABLE, chunk_rec_id, content_record)
    if resp.is_success():
      print(
        "  id:",
        chunk_rec_id,
        "size:",
        len(content_record["content"]),
        "chars",
      )
    else:
      print("Response", resp.status_code, resp)
      while resp.status_code == 429:
        print("Throttled. Retrying...")
        resp = xata.records().upsert(TARGET_TABLE, chunk_rec_id, content_record)
  • Atomic requests with autogenerated record IDs use the insert method:
elif ID_STRATEGY == "random":
  #...
  if MODE == "atomic":
    resp = xata.records().insert(TARGET_TABLE, content_record)
    if resp.status_code == 201:
      print(
        "  id:",
        resp["id"],
        "size:",
        len(content_record["content"]),
        "chars",
      )
    else:
      print("Response", resp.status_code, resp)
      while resp.status_code == 429:
        print("Throttled. Retrying...")
        resp = xata.records().insert(TARGET_TABLE, content_record)

Atomic writes are useful for low volume use cases and for troubleshooting purposes, as in case of a write error we would receive a characteristic response code and error context about the erroring part of the request.

However, this approach does not usually perform well at scale as it requires the client to initiate a network connection and add HTTP/S protocol overhead (headers, encryption) for, comparably, little payload.

By including multiple chunks of text in a single write request, we can achieve better “payload to overhead” ratio and our code ultimately works faster. This can be done with the bulk_insert method or with transaction insert operations.

A distinctive difference between the two approaches for this particular case, is that transactions can also perform multiple upsert (replace record) operations at once, while the bulk endpoint only supports creating multiple new records, so it can be used only with the autogenerated ID approach.

All operations within a transaction request are rolled back (aborted) in case any operation fails.

Performance-wise, both methods would be very close. Although we could use bulks for autogenerated ids, just for uniformity we use the Transaction method for both deterministic and autogenerated IDs.

The Transaction helper method handles the grouping of transaction operations.

if MODE == "transaction":
    trx = Transaction(xata)
  • Appending an “update” operation with deterministic record IDs using the upsert parameter (set to True):
if ID_STRATEGY == "deterministic":
  #...
  elif MODE == "transaction":
    trx.update(TARGET_TABLE, chunk_rec_id, content_record, True)
  • Appending an “insert” operation with random (autogenerated) record IDs:
elif ID_STRATEGY == "random":
  #...
  elif MODE == "transaction":
    trx.insert(TARGET_TABLE, content_record)

Once we reach the maximum number of operations that can run as a single transaction, as set by the tsize script parameter, we call the trx.run() method.

resp = trx.run()
if resp["status_code"] == 200:
  # print success
else:
  # error handling
  while resp["status_code"] == 429:
    print("Throttled. Retrying...")
    trx.operations = retriable_operations
    resp = trx.run()

Now that you’ve indexed the content of file attachments to Xata, you can unleash the full potential of advanced free-text search and Ask AI capabilities.

As hinted in the script output above, one of the file attachments in the records indexed, was The Lord of the Rings (lotr.pdf).

Jumping to the Playground section in the Xata Web UI, you can ask AI questions to the generated table mytableIndex:

import { getXataClient } from "./xata";
const xata = getXataClient();

const response = await xata.db.mytableIndex.ask("Why was the One Ring forged?", {
  rules: ["Only answer questions using the provided context."],
});

console.log(response);

Response:

{
  "answer": "The One Ring was forged by Sauron to be his master and to control the other Rings of Power.",
  "sessionId": "eh6adiosf10ct97qfahv70kn2k",
  "records": [
    "rec_cjobbv3l4agk1ks239dg-mytable-firstcolumn-15",
    "rec_cjobbv3l4agk1ks239dg-mytable-firstcolumn-517",
    "rec_cjobbv3l4agk1ks239dg-mytable-firstcolumn-263"
  ]
}

Go ahead and give it a try - you can upload this example file to your table and index it with this script:

python xfileindex.py --db https://ws-fdrujb.eu-central-1.xata.sh/db/myfilesdb --table mytable  --columns myfiles
Created new table mytableIndex

Downloading file from table: mytable , record: rec_cjob8i3l4agk1ks239a0 , column: myfiles , filename: jokes.pdf
- Processing record: rec_cjob8i3l4agk1ks239a0 , column: myfiles , filename: jokes.pdf , type: application/pdf in 1 chunk:
  Indexed 1 / 1 chunk.

The result is one record with a single chunk of text since the attachment is a one-page pdf with text content that fits within the character limit of a chunk (200000 characters).

In the Playground, we can ask questions on the file’s content:

import { getXataClient } from "./xata";
const xata = getXataClient();

const response = await xata.db.mytableIndex.ask("Why do I need a cat?",
  {
    rules: [
      "Only answer questions using the provided context."
    ]
  });

console.log(response);

Response:

{
  "answer": "You need a cat because it will remind you that you don't deserve unconditional love.",
  "sessionId": "53nseq27f905pev4lhs4duiv3k",
  "records": [
    "rec_cjob8i3l4agk1ks239a0-mytable-myfiles-0"
  ]
}

Time to upload your own text, csv and pdf files to Xata and get them indexed easily!

If you are looking to index some of your locally stored files directly into Xata without storing them as attachments first, we got you covered: localfileindex is another flavor of this script, it extracts text from your local files and indexes the contents to Xata.

Check out the readme for usage instructions.

Xata is committed to making data easy to work with. We will keep adding new features and enriching the file attachment column types and the SDKs.

If you have feedback or questions, you can reach out to us on Discord, X / Twitter or submit a support request.

Start free,
pay as you grow

Xata provides the best free plan in the industry. It is production ready by default and doesn't pause or cool-down. Take your time to build your business and upgrade when you're ready to scale.

Free plan includes
  • Single team member
  • 10 database branches
  • High availability
  • 15 GB data storage
Start freeExplore all plans
Free plan includes
  • Single team member
  • 10 database branches
  • High availability
  • 15 GB data storage

Sign up to our newsletter

By subscribing, you agree with Xata’s Terms of Service and Privacy Policy.

Copyright © 2024 Xatabase Inc.
All rights reserved.

Product

Feature requestsPricingStatusAI solutions