From 7646ad8a2b7d0809657a4f2a228443b4c992d070 Mon Sep 17 00:00:00 2001 From: Vyr Cossont Date: Fri, 31 Mar 2023 05:38:47 -0700 Subject: IndexingScheduler: fetch and import in batches (#24285) Co-authored-by: Claire --- app/workers/scheduler/indexing_scheduler.rb | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) (limited to 'app/workers/scheduler/indexing_scheduler.rb') diff --git a/app/workers/scheduler/indexing_scheduler.rb b/app/workers/scheduler/indexing_scheduler.rb index c42396629..1bbe9cd5d 100644 --- a/app/workers/scheduler/indexing_scheduler.rb +++ b/app/workers/scheduler/indexing_scheduler.rb @@ -6,17 +6,21 @@ class Scheduler::IndexingScheduler sidekiq_options retry: 0 + IMPORT_BATCH_SIZE = 1000 + SCAN_BATCH_SIZE = 10 * IMPORT_BATCH_SIZE + def perform return unless Chewy.enabled? indexes.each do |type| with_redis do |redis| - ids = redis.smembers("chewy:queue:#{type.name}") - - type.import!(ids) - - redis.pipelined do |pipeline| - ids.each { |id| pipeline.srem("chewy:queue:#{type.name}", id) } + redis.sscan_each("chewy:queue:#{type.name}", count: SCAN_BATCH_SIZE) do |ids| + redis.pipelined do + ids.each_slice(IMPORT_BATCH_SIZE) do |slice_ids| + type.import!(slice_ids) + redis.srem("chewy:queue:#{type.name}", slice_ids) + end + end end end end -- cgit From d5ad8b6422432b5a0e8f5268d662e6121186f532 Mon Sep 17 00:00:00 2001 From: Vyr Cossont Date: Fri, 31 Mar 2023 23:28:35 -0700 Subject: Fix Redis client and type errors introduced in #24285 (#24342) --- app/workers/scheduler/indexing_scheduler.rb | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) (limited to 'app/workers/scheduler/indexing_scheduler.rb') diff --git a/app/workers/scheduler/indexing_scheduler.rb b/app/workers/scheduler/indexing_scheduler.rb index 1bbe9cd5d..d622f5586 100644 --- a/app/workers/scheduler/indexing_scheduler.rb +++ b/app/workers/scheduler/indexing_scheduler.rb @@ -14,12 +14,10 @@ class Scheduler::IndexingScheduler indexes.each do |type| with_redis do |redis| - redis.sscan_each("chewy:queue:#{type.name}", count: SCAN_BATCH_SIZE) do |ids| - redis.pipelined do - ids.each_slice(IMPORT_BATCH_SIZE) do |slice_ids| - type.import!(slice_ids) - redis.srem("chewy:queue:#{type.name}", slice_ids) - end + redis.sscan_each("chewy:queue:#{type.name}", count: SCAN_BATCH_SIZE).each_slice(IMPORT_BATCH_SIZE) do |ids| + type.import!(ids) + redis.pipelined do |pipeline| + pipeline.srem("chewy:queue:#{type.name}", ids) end end end -- cgit