DEV Community

David Mezzetti for NeuML

Posted on • Edited on • Originally published at neuml.hashnode.dev

Tensor workflows

Many of the examples and use cases for txtai focus on transforming text. Makes sense as txt is even in the name! But that doesn't mean txtai only works with text.

This article will cover examples of how to efficiently process tensors using txtai workflows.

Install dependencies

Install txtai and all dependencies. We will install the api, pipeline and workflow optional extras packages, along with the datasets package

pip install txtai[api,pipeline,workflow] datasets
Enter fullscreen mode Exit fullscreen mode

Transform large tensor arrays

The first section attempts to apply a simple transform to a very large memory-mapped array (2,000,000 x 1024).

import numpy as np
import torch

# Generate large memory-mapped array
rows, cols = 2000000, 1024
data = np.memmap("data.npy", dtype=np.float32, mode="w+", shape=(rows, cols))
del data

# Open memory-mapped array
data = np.memmap("data.npy", dtype=np.float32, shape=(rows, cols))

# Create tensor
tensor = torch.from_numpy(data).to("cuda:0")

# Apply tanh transform to tensor
torch.tanh(tensor).shape
Enter fullscreen mode Exit fullscreen mode
torch.tanh(tensor).shape

RuntimeError: CUDA out of memory. Tried to allocate 7.63 GiB (GPU 0; 11.17 GiB total capacity; 7.63 GiB already allocated; 3.04 GiB free; 7.63 GiB reserved in total by PyTorch) If reserved memory is >> allocated memory try setting max_split_size_mb to avoid fragmentation.  See documentation for Memory Management and PYTORCH_CUDA_ALLOC_CONF
Enter fullscreen mode Exit fullscreen mode
ls -l --block-size=MB data.npy
Enter fullscreen mode Exit fullscreen mode
-rw-r--r-- 1 root root 8192MB Dec  6 23:24 data.npy
Enter fullscreen mode Exit fullscreen mode

Not surprisingly this runs out of CUDA memory. The array needs 2,000,000 * 1024 * 4 = 8GB which exceeds the amount of GPU memory available.

One of the great things about NumPy and PyTorch arrays is that they can be sliced without having to copy data. Additionally, PyTorch has methods to work directly on NumPy arrays without copying data, in other words both NumPy arrays and PyTorch arrays can share the same memory. This opens the door to efficient processing of tensor data in place.

Let's try applying a simple tanh transform in batches over the array.

def process(x):
  print(x.shape)
  return torch.tanh(torch.from_numpy(x).to("cuda:0")).cpu().numpy()

# Split into 250,000 rows per call
batch = 250000
count = 0
for x in range(0, len(data), batch):
  for row in process(data[x : x + batch]):
    count += 1

print(count)
Enter fullscreen mode Exit fullscreen mode
(250000, 1024)
(250000, 1024)
(250000, 1024)
(250000, 1024)
(250000, 1024)
(250000, 1024)
(250000, 1024)
(250000, 1024)
2000000
Enter fullscreen mode Exit fullscreen mode

Iterating over the data array and selecting slices to operate on allows the transform to complete successfully! Each torch.from_numpy call is building a view of a portion the existing large NumPy data array.

Enter workflows

The next section takes the same array and shows how workflows can apply transformations to tensors.

from txtai.workflow import Task, Workflow

# Create workflow with a single task calling process for each batch
task = Task(process)
workflow = Workflow([task], batch)

# Run workflow
count = 0
for row in workflow(data):
  count += 1

print(count)
Enter fullscreen mode Exit fullscreen mode
(250000, 1024)
(250000, 1024)
(250000, 1024)
(250000, 1024)
(250000, 1024)
(250000, 1024)
(250000, 1024)
(250000, 1024)
2000000
Enter fullscreen mode Exit fullscreen mode

Workflows process the data in the same fashion as the code in the previous section. On top of that, workflows can handle text, images, video, audio, document, tensors and more. Workflow graphs can also be connected together to handle complex use cases.

Workflows with PyTorch models

The next example applies a PyTorch model to the same data. The model applies a series of transforms and outputs a single float per row.

from torch import nn

class Model(nn.Module):
    def __init__(self):
        super().__init__()

        self.gelu = nn.ReLU()
        self.linear1 = nn.Linear(1024, 512)
        self.dropout = nn.Dropout(0.5)
        self.norm = nn.LayerNorm(512)
        self.linear2 = nn.Linear(512, 1)

    def forward(self, inputs):
        outputs = self.gelu(inputs)
        outputs = self.linear1(outputs)
        outputs = self.dropout(outputs)
        outputs = self.norm(outputs)
        outputs = self.linear2(outputs)

        return outputs

model = Model().to("cuda:0")

def process(x):
  with torch.no_grad():
    outputs = model(torch.from_numpy(x).to("cuda:0")).cpu().numpy()
    print(outputs.shape)
    return outputs

# Create workflow with a single task calling model for each batch
task = Task(process)
workflow = Workflow([task], batch)

# Run workflow
count = 0
for row in workflow(data):
  count += 1

print(count)
Enter fullscreen mode Exit fullscreen mode
(250000, 1)
(250000, 1)
(250000, 1)
(250000, 1)
(250000, 1)
(250000, 1)
(250000, 1)
(250000, 1)
2000000
Enter fullscreen mode Exit fullscreen mode

Once again the data can be processed in batches using workflows, even with a more complex model. Let's try a more interesting example.

Workflows in parallel

Workflows consist of a series of tasks. Each task can output one to many outputs per input element. Multi-output tasks have options available to merge the data for downstream tasks.

The following example builds a workflow with a task having three separate actions. Each action takes text as an input an applies a sentiment classifier. This is followed by a task that merges the three outputs for each row using a mean transform. Essentially, this workflow builds a weighted sentiment classifier using the outputs of three models.

import time

from datasets import load_dataset
from transformers import AutoTokenizer, AutoModelForSequenceClassification

class Tokens:
    def __init__(self, texts):
        tokenizer = AutoTokenizer.from_pretrained("distilbert-base-uncased-finetuned-sst-2-english")
        tokens = tokenizer(texts, padding=True, return_tensors="pt").to("cuda:0")

        self.inputs, self.attention = tokens["input_ids"], tokens["attention_mask"]

    def __len__(self):
        return len(self.inputs)

    def __getitem__(self, value):
        return (self.inputs[value], self.attention[value])

class Classify:
    def __init__(self, model):
        self.model = model

    def __call__(self, tokens):
        with torch.no_grad():
            inputs, attention = tokens
            outputs = self.model(input_ids=inputs, attention_mask=attention)
            outputs = outputs["logits"]

        return outputs

# Load reviews from the rotten tomatoes dataset
ds = load_dataset("rotten_tomatoes")
texts = ds["train"]["text"]

tokens = Tokens(texts)

model1 = AutoModelForSequenceClassification.from_pretrained("M-FAC/bert-tiny-finetuned-sst2")
model1 = model1.to("cuda:0")

model2 = AutoModelForSequenceClassification.from_pretrained("howey/electra-base-sst2")
model2 = model2.to("cuda:0")

model3 = AutoModelForSequenceClassification.from_pretrained("philschmid/MiniLM-L6-H384-uncased-sst2")
model3 = model3.to("cuda:0")

task1 = Task([Classify(model1), Classify(model2), Classify(model3)])
task2 = Task([lambda x: torch.sigmoid(x).mean(axis=1).cpu().numpy()])

workflow = Workflow([task1, task2], 250)

start = time.time()
for x in workflow(tokens):
  pass

print(f"Took {time.time() - start} seconds")
Enter fullscreen mode Exit fullscreen mode
Took 84.73194456100464 seconds
Enter fullscreen mode Exit fullscreen mode

Note that while the task actions are parallel, that doesn't necessarily mean the operations are concurrent. In the case above, the actions are are executed sequentially.

Workflows have an additional option to run task actions concurrently. The two supported modes are "thread" and "process". I/O bound actions will do better with multithreading and CPU bound actions will do better with multiprocessing. More can be read in the txtai documentation.

task1 = Task([Classify(model1), Classify(model2), Classify(model3)], concurrency="thread")
task2 = Task([lambda x: torch.sigmoid(x).mean(axis=1).cpu().numpy()])

workflow = Workflow([task1, task2], 250)

start = time.time()
for x in workflow(tokens):
  pass

print(f"Took {time.time() - start} seconds")
Enter fullscreen mode Exit fullscreen mode
Took 85.21102929115295 seconds
Enter fullscreen mode Exit fullscreen mode

In this case, concurrency doesn't improve performance. While the GIL is a factor, a bigger factor is that the GPU is already fully loaded. This method would be more beneficial if the system had a second GPU or the primary GPU had idle cycles.

Wrapping up

This article introduced a number of different ways to work with large-scale tensor data and process it efficiently. This article purposely didn't cover embeddings and pipelines to demonstrate how workflows can stand on their own. In addition to workflows, this article covered efficient methods to work with large tensor arrays in PyTorch and NumPy.

Top comments (0)