Drizzle ServiceDrizzle Service

Bulk Operations Examples

Practical examples demonstrating how to use bulk operations for high-performance batch processing

Basic Setup

First, let's set up service instances for our examples:

Service Setup
import { drizzleService } from 'drizzle-service/pg' // or 'drizzle-service/sqlite'
import { users, todos, products, orders, db } from './schema'

// Initialize the service
const service = drizzleService(db) 

// User service
const userService = service(users) 

// Todo service with soft delete
const todoService = service(todos, {
  soft: {
    field: 'status',
    deletedValue: 'canceled',
    notDeletedValue: 'active'
  }
})

// Product service for e-commerce examples
const productService = service(products) 

// Order service for transaction examples
const orderService = service(orders) 

Bulk Create Operations

Basic Bulk Creation

Create Multiple Users
// Prepare user data
const newUsers = [
  {
    email: 'alice@company.com',
    name: 'Alice Johnson',
    role: 'developer',
    department: 'engineering'
  },
  {
    email: 'bob@company.com',
    name: 'Bob Smith',
    role: 'designer',
    department: 'product'
  },
  {
    email: 'carol@company.com',
    name: 'Carol Davis',
    role: 'manager',
    department: 'engineering'
  }
]

// Bulk create users
const [error, createdUsers] = await userService.bulkCreate(newUsers) 

if (error) {
  console.error('Bulk create failed:', error.message)
} else {
  console.log(`Successfully created ${createdUsers.length} users`) // [!code ]
  createdUsers.forEach(user => { // [!code ]
    console.log(`- ${user.name} (${user.email})`) // [!code ]
  }) // [!code ]
}

Large Dataset Import

Import Large Dataset with Batching
// Import a large CSV dataset
const importUsersFromCSV = async (csvData: any[]) => {
  const batchSize = 500 // Process 500 records at a time
  const results = []
  
  for (let i = 0; i < csvData.length; i += batchSize) {
    const batch = csvData.slice(i, i + batchSize)
    
    // Transform CSV data to user objects
    const userBatch = batch.map(row => ({
      email: row.email.toLowerCase(),
      name: `${row.firstName} ${row.lastName}`,
      role: row.role || 'user',
      department: row.department,
      joinDate: new Date(row.joinDate),
      metadata: {
        importBatch: Math.floor(i / batchSize) + 1,
        originalRowNumber: csvData.indexOf(row) + 1
      }
    }))
    
    const [error, created] = await userService.bulkCreate(userBatch)
    
    if (error) {
      console.error(`Batch ${Math.floor(i / batchSize) + 1} failed:`, error.message)
      results.push({ batch: Math.floor(i / batchSize) + 1, success: false, error })
    } else {
      console.log(`Batch ${Math.floor(i / batchSize) + 1}: ${created.length} users created`)
      results.push({ batch: Math.floor(i / batchSize) + 1, success: true, count: created.length })
    }
  }
  
  return results
}

// Usage
const csvUsers = await parseCsvFile('users.csv')
const importResults = await importUsersFromCSV(csvUsers)

Bulk Create with Validation

// Create products with price validation
const createProducts = async (productData: any[]) => {
  // Pre-validate data
  const validatedProducts = productData.map(product => {
    if (!product.name || product.name.length < 3) {
      throw new Error(`Product name too short: ${product.name}`)
    }
    
    if (product.price <= 0) {
      throw new Error(`Invalid price for ${product.name}: ${product.price}`)
    }
    
    return {
      ...product,
      sku: product.sku || generateSKU(product.name),
      price: Math.round(product.price * 100) / 100, // Round to 2 decimal places
      createdAt: new Date(),
      status: 'active'
    }
  })
  
  const [error, created] = await productService.bulkCreate(validatedProducts)
  
  if (error) {
    console.error('Product creation failed:', error.message)
    return { success: false, error: error.message }
  }
  
  return {
    success: true,
    products: created,
    message: `Created ${created.length} products successfully`
  }
}

Bulk Update Operations

Basic Bulk Updates

// Promote multiple users to managers
const userPromotions = [
  { id: 'user-1', changes: { role: 'manager', promotedAt: new Date() } },
  { id: 'user-2', changes: { role: 'manager', promotedAt: new Date() } },
  { id: 'user-3', changes: { role: 'senior-developer', promotedAt: new Date() } }
]

const [error, updatedUsers] = await userService.bulkUpdate(userPromotions)

if (error) {
  console.error('Bulk update failed:', error.message)
} else {
  console.log('Successfully updated user roles:')
  updatedUsers.forEach(user => {
    console.log(`- ${user.name}: ${user.role}`)
  })
}

Conditional Bulk Updates

// Update todo priorities based on due dates
const updateTodoPriorities = async () => {
  // Find todos due soon
  const upcomingTodos = await todoService.findAll({
    custom: sql`${todos.dueDate} BETWEEN NOW() AND NOW() + INTERVAL '3 days'`
  })
  
  // Prepare priority updates
  const priorityUpdates = upcomingTodos.map(todo => {
    const daysUntilDue = Math.ceil(
      (new Date(todo.dueDate).getTime() - Date.now()) / (1000 * 60 * 60 * 24)
    )
    
    let newPriority = todo.priority
    if (daysUntilDue <= 1) {
      newPriority = 'urgent'
    } else if (daysUntilDue <= 2) {
      newPriority = 'high'
    }
    
    return {
      id: todo.id,
      changes: {
        priority: newPriority,
        priorityUpdatedAt: new Date(),
        autoUpdated: true
      }
    }
  }).filter(update => update.changes.priority !== upcomingTodos.find(t => t.id === update.id)?.priority)
  
  if (priorityUpdates.length === 0) {
    return { message: 'No priority updates needed' }
  }
  
  const [error, updated] = await todoService.bulkUpdate(priorityUpdates)
  
  if (error) {
    console.error('Priority update failed:', error.message)
    return { success: false, error }
  }
  
  return {
    success: true,
    updatedCount: updated.length,
    message: `Updated priorities for ${updated.length} todos`
  }
}

Bulk Status Updates

// Update order statuses based on external processing results
const processOrderUpdates = async (processingResults: any[]) => {
  const statusUpdates = processingResults.map(result => ({
    id: result.orderId,
    changes: {
      status: result.success ? 'fulfilled' : 'failed',
      processedAt: new Date(),
      processingNotes: result.notes,
      trackingNumber: result.success ? result.trackingNumber : null
    }
  }))
  
  const [error, updatedOrders] = await orderService.bulkUpdate(statusUpdates)
  
  if (error) {
    console.error('Order status update failed:', error.message)
    return null
  }
  
  // Send notifications for fulfilled orders
  const fulfilledOrders = updatedOrders.filter(order => order.status === 'fulfilled')
  await Promise.all(
    fulfilledOrders.map(order => 
      sendShippingNotification(order.customerEmail, order.trackingNumber)
    )
  )
  
  return {
    totalUpdated: updatedOrders.length,
    fulfilled: fulfilledOrders.length,
    failed: updatedOrders.filter(order => order.status === 'failed').length
  }
}

Bulk Delete Operations

Basic Bulk Soft Delete

// Mark multiple todos as canceled
const todoIdsToCancel = ['todo-1', 'todo-2', 'todo-3', 'todo-4']

const result = await todoService.bulkDelete(todoIdsToCancel)

if (result.success) {
  console.log(`${result.affectedCount} todos marked as canceled`)
  console.log(result.message)
} else {
  console.error('Bulk delete failed')
}

Cleanup Operations

// Delete users who haven't logged in for over 2 years
const cleanupInactiveUsers = async () => {
  const twoYearsAgo = new Date()
  twoYearsAgo.setFullYear(twoYearsAgo.getFullYear() - 2)
  
  // Find inactive users
  const inactiveUsers = await userService.findAll({
    custom: sql`${users.lastLoginAt} < ${twoYearsAgo} OR ${users.lastLoginAt} IS NULL`,
    limit: 1000 // Process in manageable chunks
  })
  
  if (inactiveUsers.length === 0) {
    return { message: 'No inactive users found' }
  }
  
  // Archive user data before deletion
  await archiveUserData(inactiveUsers.map(u => u.id))
  
  // Bulk delete inactive users
  const userIds = inactiveUsers.map(u => u.id)
  const result = await userService.bulkDelete(userIds)
  
  return {
    scanned: inactiveUsers.length,
    deleted: result.affectedCount,
    success: result.success
  }
}

Cascade Delete Operations

// Delete users and all their related data
const deleteUsersWithData = async (userIds: string[]) => {
  try {
    // First, delete related todos
    const userTodos = await todoService.findAll({
      custom: inArray(todos.userId, userIds)
    })
    
    if (userTodos.length > 0) {
      const todoIds = userTodos.map(todo => todo.id)
      await todoService.bulkHardDelete(todoIds)
      console.log(`Deleted ${todoIds.length} related todos`)
    }
    
    // Delete user orders
    const userOrders = await orderService.findAll({
      custom: inArray(orders.userId, userIds)
    })
    
    if (userOrders.length > 0) {
      const orderIds = userOrders.map(order => order.id)
      await orderService.bulkHardDelete(orderIds)
      console.log(`Deleted ${orderIds.length} related orders`)
    }
    
    // Finally, delete users
    const result = await userService.bulkHardDelete(userIds)
    
    return {
      success: result.success,
      usersDeleted: result.affectedCount,
      todosDeleted: userTodos.length,
      ordersDeleted: userOrders.length
    }
    
  } catch (error) {
    console.error('Cascade delete failed:', error)
    return { success: false, error: error.message }
  }
}

Bulk Upsert Operations

Product Catalog Updates

// Update product catalog from external feed
const updateProductCatalog = async (productFeed: any[]) => {
  const productData = productFeed.map(item => ({
    sku: item.sku,
    name: item.name,
    description: item.description,
    price: parseFloat(item.price),
    categoryId: item.categoryId,
    inStock: item.quantity > 0,
    quantity: item.quantity,
    lastUpdated: new Date()
  }))
  
  // Use bulkCreate for new products or handle conflicts manually
  const [error, upserted] = await productService.bulkCreate(productData)
  
  if (error) {
    console.error('Product catalog update failed:', error.message)
    // Handle unique constraint violations by trying individual updates
    return await handleProductUpsertConflicts(productData)
  }
  
  const inserted = upserted.filter(p => !p.updatedAt || p.createdAt === p.updatedAt)
  const updated = upserted.filter(p => p.updatedAt && p.createdAt !== p.updatedAt)
  
  return {
    total: upserted.length,
    inserted: inserted.length,
    updated: updated.length,
    message: `Processed ${upserted.length} products: ${inserted.length} new, ${updated.length} updated`
  }
}

User Synchronization

// Synchronize users from external HR system
const syncUsersFromHR = async (hrUsers: any[]) => {
  const userData = hrUsers.map(hrUser => ({
    employeeId: hrUser.id,
    email: hrUser.email,
    name: `${hrUser.firstName} ${hrUser.lastName}`,
    department: hrUser.department,
    role: hrUser.role,
    status: hrUser.active ? 'active' : 'inactive',
    joinDate: new Date(hrUser.startDate),
    lastSyncedAt: new Date()
  }))
  
  // Use bulkCreate for new users or handle updates separately
  const [error, synced] = await userService.bulkCreate(userData)
  
  if (error) {
    console.error('User sync failed:', error.message)
    // Handle conflicts by trying individual operations
    return await handleUserSyncConflicts(userData)
  }
  
  // Deactivate users not present in HR system
  const hrEmployeeIds = hrUsers.map(u => u.id)
  const allUsers = await userService.findAll({ limit: 10000 })
  const usersToDeactivate = allUsers
    .filter(user => user.employeeId && !hrEmployeeIds.includes(user.employeeId))
    .map(user => ({
      id: user.id,
      changes: { status: 'inactive', lastSyncedAt: new Date() }
    }))
  
  if (usersToDeactivate.length > 0) {
    await userService.bulkUpdate(usersToDeactivate)
  }
  
  return {
    success: true,
    synchronized: synced.length,
    deactivated: usersToDeactivate.length
  }
}

Error Handling and Recovery

Robust Bulk Processing

const safeBulkOperation = async <T>(
  operation: () => Promise<[Error | null, T | null]>,
  retryCount = 3
): Promise<{ success: boolean; data?: T; error?: string }> => {
  
  for (let attempt = 1; attempt <= retryCount; attempt++) {
    try {
      const [error, result] = await operation()
      
      if (error) {
        // Check if error is retryable
        if (error.message.includes('deadlock') || error.message.includes('timeout')) {
          if (attempt < retryCount) {
            console.log(`Attempt ${attempt} failed, retrying...`, error.message)
            await new Promise(resolve => setTimeout(resolve, attempt * 1000)) // Exponential backoff
            continue
          }
        }
        
        return { success: false, error: error.message }
      }
      
      return { success: true, data: result }
      
    } catch (error) {
      if (attempt === retryCount) {
        return { 
          success: false, 
          error: error instanceof Error ? error.message : String(error) 
        }
      }
      
      console.log(`Attempt ${attempt} failed with exception, retrying...`)
      await new Promise(resolve => setTimeout(resolve, attempt * 1000))
    }
  }
  
  return { success: false, error: 'Max retries exceeded' }
}

// Usage with bulk create
const createUsersWithRetry = async (userData: any[]) => {
  return await safeBulkOperation(
    () => userService.bulkCreate(userData),
    3 // Retry up to 3 times
  )
}

Partial Failure Handling

// Process updates with individual error handling
const processUpdatesWithErrorHandling = async (updates: any[]) => {
  const results = {
    successful: [],
    failed: [],
    total: updates.length
  }
  
  // Split into smaller batches to isolate failures
  const batchSize = 50
  
  for (let i = 0; i < updates.length; i += batchSize) {
    const batch = updates.slice(i, i + batchSize)
    
    const [error, updated] = await userService.bulkUpdate(batch)
    
    if (error) {
      // If batch fails, try individual updates
      console.log(`Batch failed, trying individual updates...`)
      
      for (const update of batch) {
        const [individualError, individualResult] = await userService.update(
          update.id, 
          update.changes
        )
        
        if (individualError) {
          results.failed.push({
            id: update.id,
            error: individualError.message
          })
        } else {
          results.successful.push(individualResult)
        }
      }
    } else {
      results.successful.push(...updated)
    }
  }
  
  return results
}

Performance Optimization

Memory-Efficient Processing

// Process very large datasets without memory issues
const processLargeDataset = async (dataSource: AsyncIterable<any>) => {
  const batchSize = 1000
  let batch = []
  let processedCount = 0
  let errorCount = 0
  
  for await (const item of dataSource) {
    batch.push(item)
    
    if (batch.length >= batchSize) {
      const [error, results] = await userService.bulkCreate(batch)
      
      if (error) {
        console.error(`Batch failed at count ${processedCount}:`, error.message)
        errorCount++
      } else {
        processedCount += results.length
      }
      
      // Clear batch for next iteration
      batch = []
      
      // Optional: Add delay to prevent overwhelming the database
      if (processedCount % 10000 === 0) {
        console.log(`Processed ${processedCount} records...`)
        await new Promise(resolve => setTimeout(resolve, 100))
      }
    }
  }
  
  // Process remaining items
  if (batch.length > 0) {
    const [error, results] = await userService.bulkCreate(batch)
    if (!error) {
      processedCount += results.length
    } else {
      errorCount++
    }
  }
  
  return {
    totalProcessed: processedCount,
    errors: errorCount,
    success: errorCount === 0
  }
}