Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 21 additions & 42 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ The following databases are currently supported by VSB:

* [Pinecone](vsb/databases/pinecone/README.md)
* [pgvector](vsb/databases/pgvector/README.md)
* [OpenSearch](vsb/databases/opensearch/README.md)

> [!TIP]
> You can also display the list of supported databases using the following command:
Expand Down Expand Up @@ -158,6 +157,7 @@ Common parameters include the following:
* `--users=<int>`: Specify the number of concurrent users (connections) to simulate.
* `--skip_populate`: Skip populating the database with data and immediately perform the Run
phase.
* `--namespace_name`: Which namespace to write into (defaults to "")

## Use cases

Expand All @@ -174,68 +174,47 @@ range of vector databases. It can be used to perform a range of tasks, including
### Synthetic Workloads

Sometimes the workload you want to model doesn't exist in any provided dataset, or you want
to test a specific aspect of a database's performance. In these cases, you can use
a _synthetic_ workload to generate custom workloads with specific
characteristics.

There are three modes of synthetic workloads, however the most common is the
`synthetic-proportional` workload:

* `synthetic-proportional` workloads populate the database with an initial set of
records, then run a series of assorted request operations (inserts, queries, deletes, updates) in proportion to the given ratios.

* `synthetic` workloads generate a fixed number of records and queries with the given
distribution, then runs population and query phases in sequence.

* `synthetic-runbook` workloads generate a fixed number of records and queries, and
splits them into a series of multiple 'populate -> run' steps. This is useful
for testing how a database performs when data is loaded incrementally.

Some important parameters for synthetic workloads include:
to test a specific aspect of a database's performance. In these cases, you can use the `synthetic`, `synthetic-runbook`,
and `synthetic-proportional` workloads to generate custom workloads with specific characteristics. Some
important parameters for synthetic workloads include:

* `--synthetic_records`: The number of records to generate for the synthetic workload.
* `--synthetic_requests`: The number of requests to generate for the run phase of
the synthetic workload.
* `--synthetic_requests`: The number of requests to generate for the synthetic workload.
* `--synthetic_dimensions`: The dimensionality of generated vectors.
* `--synthetic_query_distribution`: The distribution of query/fetch IDs for synthetic proportional workloads.
* `--synthetic_record_ratio`: The distribution of record vectors in space for synthetic proportional workloads.
* `--synthetic_insert_ratio`: The proportion of insert operations for synthetic proportional workloads.
* `--synthetic_query_ratio`: The proportion of query operations for synthetic proportional workloads.
* `--synthetic_metadata`: The metadata schema to use for each record.

#### **Defining Synthetic Metadata**
Metadata is specified by providing multiple `--synthetic_metadata` flags describing a metadata field
with a key and supported value. Values can be `<# digits>n` for a random integer, `<# chars>s` for a
random string, `<# chars>s<# strings>l` for a random list of strings, or `b` for a random boolean.

Metadata can optionally be generated for each record in a synthetic workload.
Metadata is specified using one or more `--synthetic_metadata` flags. Each flag
defines a *metadata field* with a *name* and a *format specification*.
For example, `--synthetic_metadata=id:10n` generates a metadata field `id` with a random 10-digit integer.
`--synthetic_metadata=tags:5s10l` generates a metadata field `tags` with a list of 10 ranodm strings, each
5 characters long. `--synthetic_metadata=active:b` generates a metadata field `active` with a random boolean.

##### **Supported Metadata Types**
| Type | Format Spec | Example |
|------|------------------------|---------|
| Random integer with `#` digits | `<# digits>n` | `id:10n` → `{"id": 1234567890}` |
| Random alphanumeric string of `#` characters | `<# chars>s` | `username:8s` → `{"username": "aZb3Xy91"}` |
| List of `<# items>` strings, each of length `<# chars>` | `<# chars>s<# items>l` | `tags:5s10l` → `{"tags": ["apple", "delta", "omega", ...]}` |
| Random boolean (`true` or `false`) | `b` | `active:b` → `{"active": true}` |
You can see the full list of parameters by running `vsb --help`.

##### **Example Metadata Usage**
- `--synthetic_metadata=id:10n` → Generates a numeric `id` with 10 random digits*
- `--synthetic_metadata=tags:5s10l` → Generates a `tags` field containing a list of 10 random words, each 5 characters long.
- `--synthetic_metadata=username:8s` → Generates a random username with 8 characters.
- `--synthetic_metadata=active:b` → Generates an active/inactive flag as `true` or `false`.
`synthetic` workloads generate a fixed number of records and queries with the given distribution, then
runs population and query phases in sequence.

You can see the full list of parameters by running `vsb --help`.
`synthetic-runbook` workloads generate a fixed number of records and queries, and splits them into a series of
'populate -> run' steps. This is useful for testing how a database performs when data is loaded incrementally.

`synthetic-proportional` workloads populate the database with an initial set of records, then run a series of
assorted request operations (inserts, queries, deletes, updates) in proportion to the given ratios.

**Example**

The following command runs a synthetic workload against Pinecone, with 1,000 initial records,
then performs 100 requests in a zipfian query distribution, made up of 30% inserts,
50% queries, 10% deletes, and 10% updates:
The following command runs a synthetic workload against Pinecone, with 10,000 initial records,
a zipfian query distribution, and 30% inserts, 50% queries, 10% deletes, and 10% updates:

```shell
vsb --database=pinecone --pinecone_api_key=<API_KEY> \
--workload=synthetic-proportional \
--synthetic_records=1000 --synthetic_requests=100 \
--synthetic_records=10000 --synthetic_queries=1000000 \
--synthetic_insert_ratio=0.3 --synthetic_query_ratio=0.5 \
--synthetic_delete_ratio=0.1 --synthetic_update_ratio=0.1 \
--synthetic_dimensions=768 --synthetic_query_distribution=zipfian \
Expand Down
92 changes: 6 additions & 86 deletions vsb/cmdline_args.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,12 @@ def add_vsb_cmdline_args(
"then the target will be distributed across all users. "
"Specify 0 for unlimited. Default is %(default)s.",
)
general_group.add_argument(
"--namespace_name",
type=str,
default="",
help="Name of namespace to use. One will be created if it does not exist. Default is vsb-<workload>.",
)

if include_locust_args:
general_group.add_argument(
Expand Down Expand Up @@ -278,97 +284,13 @@ def add_vsb_cmdline_args(
help="Name of Pinecone index to connect to. One will be created if it does not exist. Default is vsb-<workload>.",
env_var="VSB__PINECONE_INDEX_NAME",
)

pinecone_group.add_argument(
"--pinecone_index_spec",
type=json_to_pinecone_spec,
default={"serverless": {"cloud": "aws", "region": "us-east-1"}},
help="JSON spec of Pinecone index to create (if it does not exist). Default is %(default)s.",
)

opensearch_group = parser.add_argument_group(
"Options specific to OpenSearch database"
)
opensearch_group.add_argument(
"--opensearch_index_name",
type=str,
default=None,
help="Name of OpenSearch index to connect to the OpenSearch Collection. One will be created if it does not exist. Default is vsb-<workload>.",
env_var="VSB__OPENSEARCH_INDEX_NAME",
)
opensearch_group.add_argument(
"--opensearch_host",
type=str,
default="localhost",
help="opensearch host to connect to the OpenSearch Collection. Default is %(default)s.",
env_var="VSB__OPENSEARCH_HOST",
)
opensearch_group.add_argument(
"--opensearch_port",
type=int,
default=9200,
help="opensearch port to connect to the OpenSearch Collection. Default is %(default)s.",
env_var="VSB__OPENSEARCH_PORT",
)
opensearch_group.add_argument(
"--opensearch_region",
type=str,
default="us-east-1",
help="opensearch region to connect to the OpenSearch Collection. Default is %(default)s.",
env_var="VSB__OPENSEARCH_REGION",
)
opensearch_group.add_argument(
"--opensearch_service",
type=str,
default="aoss",
help="opensearch service to connect. Default is %(default)s. which is for Amazon OpenSearch Serverless. For Amazon OpenSearch Managed cluster, use 'es'.",
env_var="VSB__OPENSEARCH_SERVICE",
)
opensearch_group.add_argument(
"--aws_access_key",
type=str,
default=None,
help="AWS access key to connect to the OpenSearch Collection. Default is %(default)s.",
env_var="VSB__AWS_ACCESS_KEY",
)
opensearch_group.add_argument(
"--aws_secret_key",
type=str,
default=None,
help="AWS secret access key to connect to the OpenSearch Collection. Default is %(default)s.",
env_var="VSB__AWS_SECRET_KEY",
)
opensearch_group.add_argument(
"--aws_session_token",
type=str,
default=None,
help="AWS session token to connect to the OpenSearch Collection. Default is %(default)s.",
env_var="VSB__AWS_SESSION_TOKEN",
)
opensearch_group.add_argument(
"--opensearch_username",
type=str,
default=None,
help="Opensearch username to use. If specified then must also specify "
"--opensearch_password. Default is %(default)s.",
env_var="VSB__OPENSEARCH_USERNAME",
)
opensearch_group.add_argument(
"--opensearch_password",
type=str,
default=None,
help="Opensearch password to use. If specified then must also specify "
"--opensearch_username. Default is %(default)s.",
env_var="VSB__OPENSEARCH_PASSWORD",
)
opensearch_group.add_argument(
"--opensearch_use_tls",
action=argparse.BooleanOptionalAction,
default=True,
help="Should Opensearch use SSL / TLS to connect to the server. Default is %(default)s.",
env_var="VSB__OPENSEARCH_USE_TLS",
)

pgvector_group = parser.add_argument_group("Options specific to pgvector database")
pgvector_group.add_argument(
"--pgvector_host",
Expand Down Expand Up @@ -482,8 +404,6 @@ def validate_parsed_args(
"The following arguments must be specified when --database is "
"'pinecone'" + formatter.format_help(),
)
case "opensearch":
pass
case "pgvector":
pass
case _:
Expand Down
66 changes: 26 additions & 40 deletions vsb/databases/pinecone/pinecone.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import vsb
from vsb import logger
from pinecone import PineconeException, NotFoundException, UnauthorizedException
from pinecone import PineconeException, NotFoundException
from pinecone.grpc import PineconeGRPC, GRPCIndex
from tenacity import retry, stop_after_attempt, wait_exponential_jitter, after_log
import grpc.experimental.gevent as grpc_gevent
Expand All @@ -24,36 +24,37 @@ class PineconeNamespace(Namespace):
def __init__(self, index: GRPCIndex, namespace: str):
# TODO: Support multiple namespaces
self.index = index
self.namespace = Namespace

def insert_batch(self, batch: RecordList):
def insert_batch(self, batch: RecordList,namespace):
# Pinecone expects a list of dicts (or tuples).
dicts = [dict(rec) for rec in batch]
self.index.upsert(dicts)
self.index.upsert(dicts,namespace=namespace)

def update_batch(self, batch: list[Record]):
def update_batch(self, batch: list[Record],namespace):
# Pinecone treats insert and update as the same operation.
self.insert_batch(batch)
self.insert_batch(batch,namespace=namespace)

def search(self, request: SearchRequest) -> list[str]:
def search(self, request: SearchRequest,namespace) -> list[str]:
@retry(
wait=wait_exponential_jitter(initial=0.1, jitter=0.1),
stop=stop_after_attempt(5),
after=after_log(logger, logging.DEBUG),
)
def do_query_with_retry():
return self.index.query(
vector=request.values, top_k=request.top_k, filter=request.filter
vector=request.values, top_k=request.top_k, filter=request.filter,namespace=namespace
)

result = do_query_with_retry()
matches = [m["id"] for m in result["matches"]]
return matches

def fetch_batch(self, request: list[str]) -> list[Record]:
return self.index.fetch(request).vectors.values
def fetch_batch(self, request: list[str],namespace) -> list[Record]:
return self.index.fetch(request,namespace=namespace).vectors.values

def delete_batch(self, request: list[str]):
self.index.delete(request)
def delete_batch(self, request: list[str],namespace):
self.index.delete(request,namespace=namespace)


class PineconeDB(DB):
Expand All @@ -68,52 +69,37 @@ def __init__(
self.pc = PineconeGRPC(config["pinecone_api_key"])
self.skip_populate = config["skip_populate"]
self.overwrite = config["overwrite"]
self.index_name = config["pinecone_index_name"]
if self.index_name is None:
index_name = config["pinecone_index_name"]
if index_name is None:
# None specified, default to "vsb-<workload>"
self.index_name = f"vsb-{name}"
index_name = f"vsb-{name}"
spec = config["pinecone_index_spec"]
try:
self.index = self.pc.Index(name=self.index_name)
self.index = self.pc.Index(name=index_name)
self.created_index = False
except UnauthorizedException:
api_key = config["pinecone_api_key"]
masked_api_key = api_key[:4] + "*" * (len(api_key) - 8) + api_key[-4:]
logger.critical(
f"PineconeDB: Got UnauthorizedException when attempting to connect "
f"to index '{self.index_name}' using API key '{masked_api_key}' - check "
f"your API key and permissions"
)
raise StopUser()
except NotFoundException:
logger.info(
f"PineconeDB: Specified index '{self.index_name}' was not found, or the "
f"specified API key cannot access it. Creating new index '{self.index_name}'."
f"PineconeDB: Specified index '{index_name}' was not found, or the "
f"specified API key cannot access it. Creating new index '{index_name}'."
)
self.pc.create_index(
name=self.index_name,
dimension=dimensions,
metric=metric.value,
spec=spec,
name=index_name, dimension=dimensions, metric=metric.value, spec=spec
)
self.index = self.pc.Index(name=self.index_name)
self.index = self.pc.Index(name=index_name)
self.created_index = True

info = self.pc.describe_index(self.index_name)
info = self.pc.describe_index(index_name)
index_dims = info["dimension"]
if dimensions != index_dims:
raise ValueError(
f"PineconeDB index '{self.index_name}' has incorrect dimensions - expected:{dimensions}, found:{index_dims}"
f"PineconeDB index '{index_name}' has incorrect dimensions - expected:{dimensions}, found:{index_dims}"
)
index_metric = info["metric"]
if metric.value != index_metric:
raise ValueError(
f"PineconeDB index '{self.index_name}' has incorrect metric - expected:{metric.value}, found:{index_metric}"
f"PineconeDB index '{index_name}' has incorrect metric - expected:{metric.value}, found:{index_metric}"
)

def close(self):
self.index.close()

def get_batch_size(self, sample_record: Record) -> int:
# Return the largest batch size possible, based on the following
# constraints:
Expand Down Expand Up @@ -149,15 +135,15 @@ def initialize_population(self):
return
if not self.created_index and not self.overwrite:
msg = (
f"PineconeDB: Index '{self.index_name}' already exists - cowardly "
f"PineconeDB: Index '{self.index.name}' already exists - cowardly "
f"refusing to overwrite existing data. Specify --overwrite to "
f"delete it, or specify --skip_populate to skip population phase."
f"delete it, or specify --skip-populate to skip population phase."
)
logger.critical(msg)
raise StopUser()
try:
logger.info(
f"PineconeDB: Deleting existing index '{self.index_name}' before "
f"PineconeDB: Deleting existing index '{self.index.name}' before "
f"population (--overwrite=True)"
)
self.index.delete(delete_all=True)
Expand Down
Loading