Skip to content

Optimize Data Sync Script Performance

Data synchronization scripts often face severe performance bottlenecks when processing large volumes of data. This article presents a comprehensive optimization plan that achieves 162x performance improvement by addressing N+1 query problems, implementing batch operations, and leveraging in-memory computation.

The Problem

The original data synchronization script had severe performance bottlenecks:

Issue TypeDescriptionImpact
N+1 Query ProblemEach record queries related tables individuallyHigh database connection overhead, low throughput
Frequent CommitsCOMMIT after each INSERT/UPDATEHigh transaction overhead
Single Record OperationsInsert/update records one by oneExcessive network round-trips

❌ Bad Practice: Row-by-Row Processing

python
# BAD: Query once per record - 1000 queries for 1000 records!
for row in rows:
    detail_data = get_detail_data(source_conn, record_id, category)
    check_sql = "SELECT * FROM target_table WHERE ref_id = %s"
    cursor.execute(check_sql, (ref_id,))
    existing_data = cursor.fetchone()
    
    if existing_data:
        cursor.execute(update_sql, params)
    else:
        cursor.execute(insert_sql, params)
    target_conn.commit()

Performance Impact:

  • 3000+ queries for 1000 records
  • ~60 seconds per batch
  • High database connection overhead
  • Excessive network round-trips

✅ Good Practice: Batch Operations

python
# GOOD: Batch query + batch write
category_record_ids = defaultdict(list)
for row in rows:
    category_record_ids[row['category']].append(row['record_id'])

# Batch query all related data
detail_data_map = batch_get_detail_data(source_conn, category_record_ids)
existing_data_map = batch_check_target_table(target_conn, rows)

# Batch insert/update
insert_data, update_data = collect_batch_data(rows, existing_data_map)
batch_insert_or_update(target_conn, insert_data, update_data)

Performance Impact:

  • Only 5 queries for 1000 records
  • ~0.37 seconds per batch
  • 162x faster

Optimization Strategies

1. Batch Query Optimization

Before:

python
# ❌ Query once per record
for row in rows:
    detail_data = get_detail_data(source_conn, record_id, category)

After:

python
# ✅ Group by category and batch query
category_record_ids = defaultdict(list)
for row in rows:
    category_record_ids[row['category']].append(row['record_id'])

detail_data_map = batch_get_detail_data(source_conn, category_record_ids)

Key Implementation:

python
def batch_get_detail_data(source_conn, category_record_ids):
    detail_data_map = {}
    for category, record_ids in category_record_ids.items():
        if not record_ids:
            continue
        placeholders = ','.join(['%s'] * len(record_ids))
        
        if category == 1:
            sql = f"SELECT * FROM detail_table_a WHERE id IN ({placeholders})"
        elif category == 2:
            sql = f"SELECT * FROM detail_table_b WHERE id IN ({placeholders})"
        
        with source_conn.cursor() as cursor:
            cursor.execute(sql, tuple(record_ids))
            for row in cursor.fetchall():
                detail_data_map[f"{category}_{row['id']}"] = row
    return detail_data_map

2. Target Table Batch Query

Before:

python
# ❌ Check existence row by row
for row in rows:
    check_sql = "SELECT * FROM target_table WHERE ref_id = %s"
    cursor.execute(check_sql, (ref_id,))
    existing_data = cursor.fetchone()

After:

python
# ✅ Batch query all at once
ref_ids = [row['ref_id'] for row in rows]
placeholders = ','.join(['%s'] * len(ref_ids))
check_sql = f"SELECT * FROM target_table WHERE ref_id IN ({placeholders})"

with target_conn.cursor() as cursor:
    cursor.execute(check_sql, tuple(ref_ids))
    existing_data_map = {row['ref_id']: row for row in cursor.fetchall()}

3. Batch Insert and Update

Before:

python
# ❌ Single record operations with frequent commits
for row in rows:
    if exists:
        cursor.execute(update_sql, params)
    else:
        cursor.execute(insert_sql, params)
    target_conn.commit()

After:

python
# ✅ Batch operations with single commit
insert_data = []
update_data = []

for row in rows:
    if existing_data and has_changes(row, existing_data):
        update_data.append(params)
    elif not existing_data:
        insert_data.append(params)

if insert_data:
    cursor.executemany(insert_sql, insert_data)
    target_conn.commit()

if update_data:
    insert_update_sql = """
        INSERT INTO target_table (...) VALUES (...)
        ON DUPLICATE KEY UPDATE col1=VALUES(col1), col2=VALUES(col2)
    """
    cursor.executemany(insert_update_sql, update_data)
    target_conn.commit()

4. Field Comparison Optimization

Before:

python
# ❌ Separate query for field comparison
check_sql = "SELECT tag_field1 FROM rel_table WHERE record_id = %s"
cursor.execute(check_sql, (record_id,))
if cursor.fetchone():
    return False

After:

python
# ✅ Use fields from main query
existing_tag_fields = {
    'tag_field1': row.get('tag_field1'),
    'tag_field2': row.get('tag_field2'),
}

Performance Comparison

Query Comparison

Operation TypeBefore (per 1000 records)After (per 1000 records)Optimization Rate
Business detail query1000 times4 times99.6%
Target table query1000 times1 time99.9%
Related table query1000 times0 times100%
Total3000 times5 times99.8%

Write Operation Comparison

Operation TypeBefore (per 1000 records)After (per 1000 records)Optimization Rate
INSERTUp to 1000 times1 time99.9%
UPDATEUp to 1000 times1 time99.9%
COMMITUp to 2000 times2 times99.9%

Performance Test Results

MetricBeforeAfterImprovement
Time per batch (1000 records)~60 seconds~0.37 seconds162x
Average time per record~60 ms~0.37 ms162x

Key Technical Points

1. Map Structure for O(1) Lookup

python
# ✅ Use composite key for fast lookup
key = f"{category}_{record_id}"
detail_data = detail_data_map.get(key)

2. ON DUPLICATE KEY UPDATE

sql
INSERT INTO target_table (col1, col2, ..., ref_id)
VALUES (%s, %s, ..., %s)
ON DUPLICATE KEY UPDATE
    col1 = VALUES(col1),
    col2 = VALUES(col2)

Prerequisite: ref_id must have a unique index or primary key.

3. In-Memory Field Comparison

python
# ✅ Compare in memory before update
has_changes = False
for i, field_name in enumerate(field_names):
    if params[i] != existing_data.get(field_name):
        has_changes = True
        break

if has_changes:
    update_data.append(params)

Best Practices

1. Index Optimization

Table NameFieldIndex Type
main_tableid, record_id, create_timeNormal index
target_tableref_idUnique index
detail_table_a/bidPrimary key
rel_tablerecord_idPrimary key

2. Transaction Management

python
# Commit in batches to avoid locking
BATCH_SIZE = 1000
for i in range(0, len(data), BATCH_SIZE):
    batch = data[i:i+BATCH_SIZE]
    cursor.executemany(sql, batch)
    target_conn.commit()

3. Error Handling

python
try:
    cursor.executemany(sql, data)
    target_conn.commit()
except Exception as e:
    target_conn.rollback()
    logger.error(f"Batch operation failed: {e}")

Common Pitfalls to Avoid

❌ Pitfall 1: Ignoring Indexes

python
# Bad: Missing index on filter column
sql = "SELECT * FROM large_table WHERE status = %s"  # No index on status

# Good: Ensure proper indexing
# CREATE INDEX idx_large_table_status ON large_table(status)

❌ Pitfall 2: SELECT * in Batch Queries

python
# Bad: SELECT * returns unnecessary columns
sql = "SELECT * FROM detail_table WHERE id IN (...)"

# Good: Select only needed columns
sql = "SELECT id, name, value FROM detail_table WHERE id IN (...)"

❌ Pitfall 3: Not Handling Master-Slave Delay

python
# Bad: Read from slave immediately after write
write_to_master(data)
read_from_slave(data_id)  # May get stale data

# Good: Read from master for consistency-critical operations
read_from_master(data_id)

Real-World Results

Production Test Results

Data SizeBeforeAfterImprovement
1000 records~60 seconds~0.37 seconds162x
10000 records~600 seconds~3.7 seconds162x
100000 records~6000 seconds~37 seconds162x

Summary

When optimizing data synchronization scripts:

  1. Batch queries - Reduce N+1 queries to O(1) category-based queries
  2. Batch writes - Use executemany() and ON DUPLICATE KEY UPDATE
  3. In-memory computation - Compare fields locally before updating
  4. Reuse query results - Avoid redundant queries by reusing JOIN results
  5. Proper indexing - Ensure filter columns have appropriate indexes

Remember: The key to performance improvement is changing from single record operations to batch operations, which reduces database interactions by 99.9%.

Quick Checklist

Before deploying optimized data sync scripts:

  • [ ] No N+1 queries in loops
  • [ ] Using batch query with IN clause
  • [ ] Using executemany() for batch writes
  • [ ] Using ON DUPLICATE KEY UPDATE for upserts
  • [ ] Filter columns have proper indexes
  • [ ] Transaction batch size is configured
  • [ ] Error handling is implemented
  • [ ] Testing with production-scale data