I posted this question on Stack Overflow and thought I would get the dev.to communities perspective as well.
When we (egghead.io) release a course, we send an email blast to all of our users that are opted into commercial communication. When this happens, our server starts returning 500's because of timed out requests. We use a sidekiq worker (BroadcastMessageSendWorker
) to create jobs that will send the email we have written. We use Sidekiq Pro so we can use the bulk processing feature.
ruby '2.6.5'
gem 'pg', '= 1.2.2'
gem 'rails', '= 5.2.4.1'
gem 'sidekiq', '= 5.2.2'
gem 'sidekiq-pro', '= 4.0.4'
class BroadcastMessageSendWorker
include Sidekiq::Worker
def perform(message_guid)
ActiveRecord::Base.connection_pool.with_connection do
message = BroadcastMessage.find(message_guid)
message.with_lock do
return unless message.pending?
message.pickup!
if message.contacts.count == 0
message.finish!
return
end
batch = Sidekiq::Batch.new
batch.on(:complete, self.class, 'guid' => message_guid)
batch.jobs do
# We can't use `uniq` or `DISTINCT` with find_in_batches because after 1000 records it
# will start blowing up. Instead, use an in-memory `seen` index
seen = Set.new({})
message.contacts.select(:id).find_in_batches do |contact_batch|
args = contact_batch.pluck(:id).map do |contact_id|
next unless seen.add?(contact_id) # add? returns nil if the object is already in the set
[message_guid, contact_id]
end
Sidekiq::Client.push_bulk('class' => BroadcastMessageDeliverWorker, 'args' => args.compact)
end
end
message.update(batch_id: batch.bid)
end
end
end
def on_complete(_, options)
message = BroadcastMessage.find(options['guid'])
message.finish! if message.sending?
end
end
We are building an in memory set to be sure that we don't send 2 of the same emails to a user. ScoutAPM is telling us that the message.contacts.select(:id)
line is taking a long time (contacts joins our users table, so this is somewhat expected).
I analyzed this query:
Subquery Scan on contacts (cost=226960.78..230344.36 rows=52055 width=32) (actual time=555.876..692.685 rows=87926 loops=1)
Filter: (NOT (hashed SubPlan 1))
-> CTE Scan on base_contacts (cost=224403.49..226485.69 rows=104110 width=264) (actual time=523.530..636.032 rows=87926 loops=1)
CTE base_contacts
-> Gather (cost=189856.23..224403.49 rows=104110 width=306) (actual time=523.525..554.679 rows=87926 loops=1)
Workers Planned: 2
Workers Launched: 2
-> Parallel Hash Left Join (cost=188856.23..212992.49 rows=43379 width=306) (actual time=524.667..557.537 rows=29309 loops=3)
Hash Cond: (contacts_1.user_id = users.id)
Filter: ((contacts_1.user_id IS NULL) OR (users.can_contact AND ((users.managed_subscription_id IS NULL) OR CASE WHEN (users.managed_subscription_id = ANY ('{2,236,690}'::integer[])) THEN false ELSE true END)))
Rows Removed by Filter: 12924
-> Parallel Seq Scan on contacts contacts_1 (cost=149225.21..168513.90 rows=47078 width=306) (actual time=272.862..365.114 rows=42233 loops=3)
Filter: ((NOT (hashed SubPlan 2)) AND (NOT (hashed SubPlan 3)))
Rows Removed by Filter: 108423
SubPlan 2
-> Seq Scan on mailkick_opt_outs mailkick_opt_outs_1 (cost=0.00..2147.74 rows=71817 width=22) (actual time=0.044..16.912 rows=71898 loops=3)
Filter: (active AND (list IS NULL))
Rows Removed by Filter: 19576
SubPlan 3
-> Nested Loop (cost=0.43..146644.75 rows=101271 width=4) (actual time=0.098..142.573 rows=325264 loops=3)
-> Seq Scan on broadcast_messages (cost=0.00..9.80 rows=1 width=4) (actual time=0.066..0.085 rows=1 loops=3)
Filter: (signature = 'broadcast_message_signature'::text)
Rows Removed by Filter: 63
-> Index Scan using index_ahoy_messages_on_broadcast_message_id on ahoy_messages (cost=0.43..144633.82 rows=200113 width=8) (actual time=0.030..107.063 rows=325264 loops=3)
Index Cond: (broadcast_message_id = broadcast_messages.id)
Filter: ((user_type)::text = 'ClassType'::text)
-> Parallel Hash (cost=36562.34..36562.34 rows=176534 width=9) (actual time=106.742..106.742 rows=141443 loops=3)
Buckets: 131072 Batches: 8 Memory Usage: 3168kB
-> Parallel Seq Scan on users (cost=0.00..36562.34 rows=176534 width=9) (actual time=0.044..74.643 rows=141443 loops=3)
SubPlan 1
-> Seq Scan on mailkick_opt_outs (cost=0.00..2376.43 rows=72345 width=22) (actual time=0.011..14.309 rows=74331 loops=1)
Filter: (active AND ((list IS NULL) OR ((list)::text = 'javascript'::text)))
Rows Removed by Filter: 17143
Planning Time: 0.458 ms
Execution Time: 715.945 ms
The Parallel Seq Scan
is taking a lot of time but I dont know how to speed it up.
My first thought it to split this worker into different ranges of IDs and query the database at different times to reduce the load on the database. So instead of querying message.contacts
I would query message.contacts.where('id > 1 && id < 10000')
and then message.contacts.where('id > 10001 && id < 20000')
etc until we reached the max id.
This feels naive. How do I either speed this query up or spread it out over time?
I also thought of adding a multi-column index on users.managed_subscription_id
and users.managed_subscription_id
but hadn't tried that yet.
Top comments (3)
Hi Ian, interesting conundrum. The query plan is much more complicated than what it seems from the worker's code, I guess that
.contacts
isn't that straigthforward.At first glance there's a lot going on in that worker.
Let's see if we can tackle it a piece at a time:
this sets an exclusive lock on the row for the duration of the transaction opened by
.with_lock
. So we know that this blocks all transactions from accessing that row, but it also waits if the row is being accessed by another transaction.I'm not sure you need to lock the row for so long as the state machine methods (let's call them like this :D) seems to be clustered at the top. Could it make sense to only lock the row around those methods and prepare the batch outside the lock?
Basically when you start preparing the batches, you're done with message, I think you can release the lock by then.
this issues a
SELECT COUNT(*)
which seems fine but it can probably be optimized by usingif messages.contacts.none?
which issues aSELECT ... LIMIT 1
. You may want to test both speeds as I'm uncertain about the schema.Here you're loading contacts in batches, using an in memory set to make sure you haven't seen the
contact_id
already. I have a question: why can they be duplicate in the first place if the contact ids come frommessage.contacts
?I had immediately thought of:
but you said that's too slow as well.
Another issue with that set is that you're storing, for the entire duration of the job all the IDs in memory. According to the plan you have tens of thousands of them.
I then re-read the code and thought about a different approach to the whole thing (aside from the long locking).
start
andfinish
parameters to give tofind_in_batches
). It then creates a bunch of jobs #2This as you say would reduce the running time by splitting multiple concurrent jobs in smaller
SELECT
s.You still have to solve the problem of the duplication. But there might be a solution if you use Redis and its atomic transactions: redis.io/topics/transactions
The other idea, untested, here is to have job #1 split the ids and queueing job #2 N times (1 for each pair of start and finish offsets). job #2 doesn't directly calls the "send email jobs" BUT prepares data in Redis. When the batch of jobs #2 is completed. Then you fire off the batches of job #3. When job #3 starts it will check with Redis if it actually has to send the email or not (the not will come by Redis basically telling them "someone else already sent an email to this contact id").
Let me know if any of this makes sense :D
Just found this Sidekiq wiki page called Really Complex Workflows with Batches.
@rhymes Already gave a good answer for the performance related questions.
Looking at the implementations of a worker, just a thought - you may also find github.com/fatkodima/sidekiq-itera... helpful for your use case. It will make implementation easier, and you will be able to iterate over a set of records and be able to stop/resume/throttle.