Bulk Operations Examples
Practical examples demonstrating how to use bulk operations for high-performance batch processing
Basic Setup
First, let's set up service instances for our examples:
import { drizzleService } from 'drizzle-service/pg' // or 'drizzle-service/sqlite'
import { users, todos, products, orders, db } from './schema'
// Initialize the service
const service = drizzleService(db)
// User service
const userService = service(users)
// Todo service with soft delete
const todoService = service(todos, {
soft: {
field: 'status',
deletedValue: 'canceled',
notDeletedValue: 'active'
}
})
// Product service for e-commerce examples
const productService = service(products)
// Order service for transaction examples
const orderService = service(orders)
Bulk Create Operations
Basic Bulk Creation
// Prepare user data
const newUsers = [
{
email: 'alice@company.com',
name: 'Alice Johnson',
role: 'developer',
department: 'engineering'
},
{
email: 'bob@company.com',
name: 'Bob Smith',
role: 'designer',
department: 'product'
},
{
email: 'carol@company.com',
name: 'Carol Davis',
role: 'manager',
department: 'engineering'
}
]
// Bulk create users
const [error, createdUsers] = await userService.bulkCreate(newUsers)
if (error) {
console.error('Bulk create failed:', error.message)
} else {
console.log(`Successfully created ${createdUsers.length} users`) // [!code ]
createdUsers.forEach(user => { // [!code ]
console.log(`- ${user.name} (${user.email})`) // [!code ]
}) // [!code ]
}
Large Dataset Import
// Import a large CSV dataset
const importUsersFromCSV = async (csvData: any[]) => {
const batchSize = 500 // Process 500 records at a time
const results = []
for (let i = 0; i < csvData.length; i += batchSize) {
const batch = csvData.slice(i, i + batchSize)
// Transform CSV data to user objects
const userBatch = batch.map(row => ({
email: row.email.toLowerCase(),
name: `${row.firstName} ${row.lastName}`,
role: row.role || 'user',
department: row.department,
joinDate: new Date(row.joinDate),
metadata: {
importBatch: Math.floor(i / batchSize) + 1,
originalRowNumber: csvData.indexOf(row) + 1
}
}))
const [error, created] = await userService.bulkCreate(userBatch)
if (error) {
console.error(`Batch ${Math.floor(i / batchSize) + 1} failed:`, error.message)
results.push({ batch: Math.floor(i / batchSize) + 1, success: false, error })
} else {
console.log(`Batch ${Math.floor(i / batchSize) + 1}: ${created.length} users created`)
results.push({ batch: Math.floor(i / batchSize) + 1, success: true, count: created.length })
}
}
return results
}
// Usage
const csvUsers = await parseCsvFile('users.csv')
const importResults = await importUsersFromCSV(csvUsers)
Bulk Create with Validation
// Create products with price validation
const createProducts = async (productData: any[]) => {
// Pre-validate data
const validatedProducts = productData.map(product => {
if (!product.name || product.name.length < 3) {
throw new Error(`Product name too short: ${product.name}`)
}
if (product.price <= 0) {
throw new Error(`Invalid price for ${product.name}: ${product.price}`)
}
return {
...product,
sku: product.sku || generateSKU(product.name),
price: Math.round(product.price * 100) / 100, // Round to 2 decimal places
createdAt: new Date(),
status: 'active'
}
})
const [error, created] = await productService.bulkCreate(validatedProducts)
if (error) {
console.error('Product creation failed:', error.message)
return { success: false, error: error.message }
}
return {
success: true,
products: created,
message: `Created ${created.length} products successfully`
}
}
Bulk Update Operations
Basic Bulk Updates
// Promote multiple users to managers
const userPromotions = [
{ id: 'user-1', changes: { role: 'manager', promotedAt: new Date() } },
{ id: 'user-2', changes: { role: 'manager', promotedAt: new Date() } },
{ id: 'user-3', changes: { role: 'senior-developer', promotedAt: new Date() } }
]
const [error, updatedUsers] = await userService.bulkUpdate(userPromotions)
if (error) {
console.error('Bulk update failed:', error.message)
} else {
console.log('Successfully updated user roles:')
updatedUsers.forEach(user => {
console.log(`- ${user.name}: ${user.role}`)
})
}
Conditional Bulk Updates
// Update todo priorities based on due dates
const updateTodoPriorities = async () => {
// Find todos due soon
const upcomingTodos = await todoService.findAll({
custom: sql`${todos.dueDate} BETWEEN NOW() AND NOW() + INTERVAL '3 days'`
})
// Prepare priority updates
const priorityUpdates = upcomingTodos.map(todo => {
const daysUntilDue = Math.ceil(
(new Date(todo.dueDate).getTime() - Date.now()) / (1000 * 60 * 60 * 24)
)
let newPriority = todo.priority
if (daysUntilDue <= 1) {
newPriority = 'urgent'
} else if (daysUntilDue <= 2) {
newPriority = 'high'
}
return {
id: todo.id,
changes: {
priority: newPriority,
priorityUpdatedAt: new Date(),
autoUpdated: true
}
}
}).filter(update => update.changes.priority !== upcomingTodos.find(t => t.id === update.id)?.priority)
if (priorityUpdates.length === 0) {
return { message: 'No priority updates needed' }
}
const [error, updated] = await todoService.bulkUpdate(priorityUpdates)
if (error) {
console.error('Priority update failed:', error.message)
return { success: false, error }
}
return {
success: true,
updatedCount: updated.length,
message: `Updated priorities for ${updated.length} todos`
}
}
Bulk Status Updates
// Update order statuses based on external processing results
const processOrderUpdates = async (processingResults: any[]) => {
const statusUpdates = processingResults.map(result => ({
id: result.orderId,
changes: {
status: result.success ? 'fulfilled' : 'failed',
processedAt: new Date(),
processingNotes: result.notes,
trackingNumber: result.success ? result.trackingNumber : null
}
}))
const [error, updatedOrders] = await orderService.bulkUpdate(statusUpdates)
if (error) {
console.error('Order status update failed:', error.message)
return null
}
// Send notifications for fulfilled orders
const fulfilledOrders = updatedOrders.filter(order => order.status === 'fulfilled')
await Promise.all(
fulfilledOrders.map(order =>
sendShippingNotification(order.customerEmail, order.trackingNumber)
)
)
return {
totalUpdated: updatedOrders.length,
fulfilled: fulfilledOrders.length,
failed: updatedOrders.filter(order => order.status === 'failed').length
}
}
Bulk Delete Operations
Basic Bulk Soft Delete
// Mark multiple todos as canceled
const todoIdsToCancel = ['todo-1', 'todo-2', 'todo-3', 'todo-4']
const result = await todoService.bulkDelete(todoIdsToCancel)
if (result.success) {
console.log(`${result.affectedCount} todos marked as canceled`)
console.log(result.message)
} else {
console.error('Bulk delete failed')
}
Cleanup Operations
// Delete users who haven't logged in for over 2 years
const cleanupInactiveUsers = async () => {
const twoYearsAgo = new Date()
twoYearsAgo.setFullYear(twoYearsAgo.getFullYear() - 2)
// Find inactive users
const inactiveUsers = await userService.findAll({
custom: sql`${users.lastLoginAt} < ${twoYearsAgo} OR ${users.lastLoginAt} IS NULL`,
limit: 1000 // Process in manageable chunks
})
if (inactiveUsers.length === 0) {
return { message: 'No inactive users found' }
}
// Archive user data before deletion
await archiveUserData(inactiveUsers.map(u => u.id))
// Bulk delete inactive users
const userIds = inactiveUsers.map(u => u.id)
const result = await userService.bulkDelete(userIds)
return {
scanned: inactiveUsers.length,
deleted: result.affectedCount,
success: result.success
}
}
Cascade Delete Operations
// Delete users and all their related data
const deleteUsersWithData = async (userIds: string[]) => {
try {
// First, delete related todos
const userTodos = await todoService.findAll({
custom: inArray(todos.userId, userIds)
})
if (userTodos.length > 0) {
const todoIds = userTodos.map(todo => todo.id)
await todoService.bulkHardDelete(todoIds)
console.log(`Deleted ${todoIds.length} related todos`)
}
// Delete user orders
const userOrders = await orderService.findAll({
custom: inArray(orders.userId, userIds)
})
if (userOrders.length > 0) {
const orderIds = userOrders.map(order => order.id)
await orderService.bulkHardDelete(orderIds)
console.log(`Deleted ${orderIds.length} related orders`)
}
// Finally, delete users
const result = await userService.bulkHardDelete(userIds)
return {
success: result.success,
usersDeleted: result.affectedCount,
todosDeleted: userTodos.length,
ordersDeleted: userOrders.length
}
} catch (error) {
console.error('Cascade delete failed:', error)
return { success: false, error: error.message }
}
}
Bulk Upsert Operations
Product Catalog Updates
// Update product catalog from external feed
const updateProductCatalog = async (productFeed: any[]) => {
const productData = productFeed.map(item => ({
sku: item.sku,
name: item.name,
description: item.description,
price: parseFloat(item.price),
categoryId: item.categoryId,
inStock: item.quantity > 0,
quantity: item.quantity,
lastUpdated: new Date()
}))
// Use bulkCreate for new products or handle conflicts manually
const [error, upserted] = await productService.bulkCreate(productData)
if (error) {
console.error('Product catalog update failed:', error.message)
// Handle unique constraint violations by trying individual updates
return await handleProductUpsertConflicts(productData)
}
const inserted = upserted.filter(p => !p.updatedAt || p.createdAt === p.updatedAt)
const updated = upserted.filter(p => p.updatedAt && p.createdAt !== p.updatedAt)
return {
total: upserted.length,
inserted: inserted.length,
updated: updated.length,
message: `Processed ${upserted.length} products: ${inserted.length} new, ${updated.length} updated`
}
}
User Synchronization
// Synchronize users from external HR system
const syncUsersFromHR = async (hrUsers: any[]) => {
const userData = hrUsers.map(hrUser => ({
employeeId: hrUser.id,
email: hrUser.email,
name: `${hrUser.firstName} ${hrUser.lastName}`,
department: hrUser.department,
role: hrUser.role,
status: hrUser.active ? 'active' : 'inactive',
joinDate: new Date(hrUser.startDate),
lastSyncedAt: new Date()
}))
// Use bulkCreate for new users or handle updates separately
const [error, synced] = await userService.bulkCreate(userData)
if (error) {
console.error('User sync failed:', error.message)
// Handle conflicts by trying individual operations
return await handleUserSyncConflicts(userData)
}
// Deactivate users not present in HR system
const hrEmployeeIds = hrUsers.map(u => u.id)
const allUsers = await userService.findAll({ limit: 10000 })
const usersToDeactivate = allUsers
.filter(user => user.employeeId && !hrEmployeeIds.includes(user.employeeId))
.map(user => ({
id: user.id,
changes: { status: 'inactive', lastSyncedAt: new Date() }
}))
if (usersToDeactivate.length > 0) {
await userService.bulkUpdate(usersToDeactivate)
}
return {
success: true,
synchronized: synced.length,
deactivated: usersToDeactivate.length
}
}
Error Handling and Recovery
Robust Bulk Processing
const safeBulkOperation = async <T>(
operation: () => Promise<[Error | null, T | null]>,
retryCount = 3
): Promise<{ success: boolean; data?: T; error?: string }> => {
for (let attempt = 1; attempt <= retryCount; attempt++) {
try {
const [error, result] = await operation()
if (error) {
// Check if error is retryable
if (error.message.includes('deadlock') || error.message.includes('timeout')) {
if (attempt < retryCount) {
console.log(`Attempt ${attempt} failed, retrying...`, error.message)
await new Promise(resolve => setTimeout(resolve, attempt * 1000)) // Exponential backoff
continue
}
}
return { success: false, error: error.message }
}
return { success: true, data: result }
} catch (error) {
if (attempt === retryCount) {
return {
success: false,
error: error instanceof Error ? error.message : String(error)
}
}
console.log(`Attempt ${attempt} failed with exception, retrying...`)
await new Promise(resolve => setTimeout(resolve, attempt * 1000))
}
}
return { success: false, error: 'Max retries exceeded' }
}
// Usage with bulk create
const createUsersWithRetry = async (userData: any[]) => {
return await safeBulkOperation(
() => userService.bulkCreate(userData),
3 // Retry up to 3 times
)
}
Partial Failure Handling
// Process updates with individual error handling
const processUpdatesWithErrorHandling = async (updates: any[]) => {
const results = {
successful: [],
failed: [],
total: updates.length
}
// Split into smaller batches to isolate failures
const batchSize = 50
for (let i = 0; i < updates.length; i += batchSize) {
const batch = updates.slice(i, i + batchSize)
const [error, updated] = await userService.bulkUpdate(batch)
if (error) {
// If batch fails, try individual updates
console.log(`Batch failed, trying individual updates...`)
for (const update of batch) {
const [individualError, individualResult] = await userService.update(
update.id,
update.changes
)
if (individualError) {
results.failed.push({
id: update.id,
error: individualError.message
})
} else {
results.successful.push(individualResult)
}
}
} else {
results.successful.push(...updated)
}
}
return results
}
Performance Optimization
Memory-Efficient Processing
// Process very large datasets without memory issues
const processLargeDataset = async (dataSource: AsyncIterable<any>) => {
const batchSize = 1000
let batch = []
let processedCount = 0
let errorCount = 0
for await (const item of dataSource) {
batch.push(item)
if (batch.length >= batchSize) {
const [error, results] = await userService.bulkCreate(batch)
if (error) {
console.error(`Batch failed at count ${processedCount}:`, error.message)
errorCount++
} else {
processedCount += results.length
}
// Clear batch for next iteration
batch = []
// Optional: Add delay to prevent overwhelming the database
if (processedCount % 10000 === 0) {
console.log(`Processed ${processedCount} records...`)
await new Promise(resolve => setTimeout(resolve, 100))
}
}
}
// Process remaining items
if (batch.length > 0) {
const [error, results] = await userService.bulkCreate(batch)
if (!error) {
processedCount += results.length
} else {
errorCount++
}
}
return {
totalProcessed: processedCount,
errors: errorCount,
success: errorCount === 0
}
}