Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 89 additions & 23 deletions cmd/benchmark/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"context"
"fmt"
"math/rand/v2"
"net/rpc"
"strconv"
"sync"
Expand Down Expand Up @@ -122,14 +123,17 @@ func (pbc *ParallelBenchmarkClient) getOrAddVertex(externalID string, conn *rpc.
"external_id": externalID,
}
var resp rpcTypes.AddVertexResponse
err := conn.Call("QueryManager.AddVertex", &rpcTypes.AddVertexRequest{
Properties: properties,
}, &resp)
rtt := time.Since(start)

if err != nil {
return "", err
for {
err := conn.Call("QueryManager.AddVertex", &rpcTypes.AddVertexRequest{
Properties: properties,
}, &resp)
if err == nil {
break
}
log.Printf("Failed to call add vertex")
time.Sleep(500 * time.Millisecond)
}
rtt := time.Since(start)

// Store the vertex ID (use LoadOrStore to handle race condition)
actual, _ := pbc.vertexIDMap.LoadOrStore(externalID, resp.VertexID)
Expand Down Expand Up @@ -165,11 +169,18 @@ func (pbc *ParallelBenchmarkClient) addEdge(fromVertexID, toVertexID string, wei
"weight": strconv.Itoa(weight),
}
var resp rpcTypes.AddEdgeResponse
err = conn.Call("QueryManager.AddEdge", &rpcTypes.AddEdgeRequest{
FromVertexID: fromInternal,
ToVertexID: toInternal,
Properties: properties,
}, &resp)
for {
err := conn.Call("QueryManager.AddEdge", &rpcTypes.AddEdgeRequest{
FromVertexID: fromInternal,
ToVertexID: toInternal,
Properties: properties,
}, &resp)
if err == nil {
break
}
log.Printf("RPC call to AddEdge failed")
time.Sleep(500 * time.Millisecond)
}
rtt := time.Since(start)

if err != nil {
Expand Down Expand Up @@ -215,17 +226,19 @@ func (pbc *ParallelBenchmarkClient) runBFSQueries(checkpointNum int) {
// Run BFS
start := time.Now()
var resp rpcTypes.BFSResponse
err := conn.Call("QueryManager.BFS", &rpcTypes.BFSRequest{
StartVertexID: startVertexID,
Radius: pbc.bfsRadius,
Timestamp: types.Timestamp(float64(time.Now().Unix())),
}, &resp)
rtt := time.Since(start)

if err != nil {
log.Printf("Error running BFS for vertex %s: %v", vertexID, err)
continue
for {
err := conn.Call("QueryManager.BFS", &rpcTypes.BFSRequest{
StartVertexID: startVertexID,
Radius: pbc.bfsRadius,
Timestamp: types.Timestamp(float64(time.Now().Unix())),
}, &resp)
if err == nil {
break
}
log.Printf("Failed to call BFS on query manager")
time.Sleep(500 * time.Millisecond)
}
rtt := time.Since(start)

bfsResultSet := make(map[types.VertexId]bool)
for _, v := range resp.Vertices {
Expand All @@ -241,7 +254,7 @@ func (pbc *ParallelBenchmarkClient) runBFSQueries(checkpointNum int) {
return true
},
)

log.Printf("BFS result for vertex %s at checkpoint %d: %d vertices", vertexID, checkpointNum, len(bfsResult))

// Record measurement
Expand Down Expand Up @@ -327,6 +340,59 @@ func (pbc *ParallelBenchmarkClient) Run(workload []string) BenchmarkResults {
// Give a bit more time for any in-flight operations
time.Sleep(2000 * time.Millisecond)

// perform a bunch of move operations
// Get a connection for move operations
moveConn := pbc.getConn()
for i := 0; i < 10; i++ {
// choose two random shards
s1 := rand.IntN(len(pbc.cfg.Shards))
s2 := rand.IntN(len(pbc.cfg.Shards))
if s1 == s2 {
s2 = (s1 + 1) % len(pbc.cfg.Shards)
}

// choose vertices from the first shard
var fetchResp rpcTypes.FetchAllResponse
for {
err := moveConn.Call("QueryManager.FetchAll", rpcTypes.FetchAllRequest{}, &fetchResp)
if err == nil {
break
}
log.Printf("Failed to fetch all")
time.Sleep(100 * time.Millisecond)
}
if len(fetchResp.ShardVertices[s1]) > 0 {
// choose a subset of the vertices
vertices := make([]types.VertexId, max(0, int(0.1*(float64(rand.IntN(len(fetchResp.ShardVertices[s1])))))))
for i := range vertices {
vertices[i] = fetchResp.ShardVertices[s1][i].VertexID
}

if len(vertices) > 0 {
// Run MoveVertices
start := time.Now()
var resp rpcTypes.MoveVerticesResponse
for {
err := moveConn.Call("QueryManager.MoveVertices", &rpcTypes.MoveVerticesRequest{
Vertices: vertices,
Shard: types.ShardId(s2),
}, &resp)
if err == nil {
break
}
log.Printf("failed to move: %w", err)
time.Sleep(100 * time.Millisecond)
}
rtt := time.Since(start)
log.Printf("Elapsed for move call (size %d): %v", len(vertices), rtt)
}
}
}
pbc.putConn(moveConn)

// wait for ops to finish
time.Sleep(2000 * time.Millisecond)

// Run BFS queries
pbc.runBFSQueries(checkpointIdx + 1)
pbc.checkpointOps[checkpointIdx] = int64(currentOp)
Expand Down
2 changes: 1 addition & 1 deletion configs/3_shard_3_replica_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
query_manager:
host: "localhost"
port: 9090
bfs_type: "naive" # Options: "naive", "optimized"
bfs_type: "optimized" # Options: "naive", "optimized"

# Shard configuration - each shard stores a partition of the graph
# Each shard has multiple replicas that use Raft for consensus
Expand Down
1 change: 1 addition & 0 deletions internal/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ type ShardId uint32
type VertexId string
type Timestamp float64
type Properties map[string]string
type BFSId int
64 changes: 0 additions & 64 deletions logs/benchmark.log

This file was deleted.

Loading