From 33b96d238aa9410453abe2bb7544dd731030a584 Mon Sep 17 00:00:00 2001 From: Stypox Date: Sun, 24 Nov 2024 13:54:51 +0100 Subject: [PATCH 1/2] Throttle loading subscriptions feed to avoid YouTube rate limits --- .../local/feed/service/FeedLoadManager.kt | 25 ++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) diff --git a/app/src/main/java/org/schabi/newpipe/local/feed/service/FeedLoadManager.kt b/app/src/main/java/org/schabi/newpipe/local/feed/service/FeedLoadManager.kt index 901ceadf7..970419d44 100644 --- a/app/src/main/java/org/schabi/newpipe/local/feed/service/FeedLoadManager.kt +++ b/app/src/main/java/org/schabi/newpipe/local/feed/service/FeedLoadManager.kt @@ -90,6 +90,10 @@ class FeedLoadManager(private val context: Context) { else -> feedDatabaseManager.outdatedSubscriptionsForGroup(groupId, outdatedThreshold) } + // like `currentProgress`, but counts the number of extractions that have begun, so they + // can be properly throttled every once in a while (see doOnNext below) + val extractionCount = AtomicInteger() + return outdatedSubscriptions .take(1) .doOnNext { @@ -105,6 +109,13 @@ class FeedLoadManager(private val context: Context) { .observeOn(Schedulers.io()) .flatMap { Flowable.fromIterable(it) } .takeWhile { !cancelSignal.get() } + .doOnNext { + // throttle extractions once every BATCH_SIZE to avoid being throttled + val previousCount = extractionCount.getAndIncrement() + if (previousCount != 0 && previousCount % BATCH_SIZE == 0) { + Thread.sleep(DELAY_BETWEEN_BATCHES_MILLIS.random()) + } + } .parallel(PARALLEL_EXTRACTIONS, PARALLEL_EXTRACTIONS * 2) .runOn(Schedulers.io(), PARALLEL_EXTRACTIONS * 2) .filter { !cancelSignal.get() } @@ -328,7 +339,19 @@ class FeedLoadManager(private val context: Context) { /** * How many extractions will be running in parallel. */ - private const val PARALLEL_EXTRACTIONS = 6 + private const val PARALLEL_EXTRACTIONS = 3 + + /** + * How many extractions to perform before waiting [DELAY_BETWEEN_BATCHES_MILLIS] to avoid + * being rate limited + */ + private const val BATCH_SIZE = 50 + + /** + * Wait a random delay in this range once every [BATCH_SIZE] extractions to avoid being + * rate limited + */ + private val DELAY_BETWEEN_BATCHES_MILLIS = (6000L..12000L) /** * Number of items to buffer to mass-insert in the database. From 726c12e9348da1e0d43c5aaae6ab31713f65114f Mon Sep 17 00:00:00 2001 From: Stypox Date: Sun, 24 Nov 2024 16:20:46 +0100 Subject: [PATCH 2/2] Only throttle YouTube feed loading --- .../local/feed/service/FeedLoadManager.kt | 27 ++++++++++--------- 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/app/src/main/java/org/schabi/newpipe/local/feed/service/FeedLoadManager.kt b/app/src/main/java/org/schabi/newpipe/local/feed/service/FeedLoadManager.kt index 970419d44..9b0f177d5 100644 --- a/app/src/main/java/org/schabi/newpipe/local/feed/service/FeedLoadManager.kt +++ b/app/src/main/java/org/schabi/newpipe/local/feed/service/FeedLoadManager.kt @@ -17,6 +17,7 @@ import org.schabi.newpipe.database.subscription.NotificationMode import org.schabi.newpipe.database.subscription.SubscriptionEntity import org.schabi.newpipe.extractor.Info import org.schabi.newpipe.extractor.NewPipe +import org.schabi.newpipe.extractor.ServiceList import org.schabi.newpipe.extractor.feed.FeedInfo import org.schabi.newpipe.extractor.stream.StreamInfoItem import org.schabi.newpipe.ktx.getStringSafe @@ -90,9 +91,9 @@ class FeedLoadManager(private val context: Context) { else -> feedDatabaseManager.outdatedSubscriptionsForGroup(groupId, outdatedThreshold) } - // like `currentProgress`, but counts the number of extractions that have begun, so they - // can be properly throttled every once in a while (see doOnNext below) - val extractionCount = AtomicInteger() + // like `currentProgress`, but counts the number of YouTube extractions that have begun, so + // they can be properly throttled every once in a while (see doOnNext below) + val youtubeExtractionCount = AtomicInteger() return outdatedSubscriptions .take(1) @@ -109,11 +110,13 @@ class FeedLoadManager(private val context: Context) { .observeOn(Schedulers.io()) .flatMap { Flowable.fromIterable(it) } .takeWhile { !cancelSignal.get() } - .doOnNext { - // throttle extractions once every BATCH_SIZE to avoid being throttled - val previousCount = extractionCount.getAndIncrement() - if (previousCount != 0 && previousCount % BATCH_SIZE == 0) { - Thread.sleep(DELAY_BETWEEN_BATCHES_MILLIS.random()) + .doOnNext { subscriptionEntity -> + // throttle YouTube extractions once every BATCH_SIZE to avoid being rate limited + if (subscriptionEntity.serviceId == ServiceList.YouTube.serviceId) { + val previousCount = youtubeExtractionCount.getAndIncrement() + if (previousCount != 0 && previousCount % BATCH_SIZE == 0) { + Thread.sleep(DELAY_BETWEEN_BATCHES_MILLIS.random()) + } } } .parallel(PARALLEL_EXTRACTIONS, PARALLEL_EXTRACTIONS * 2) @@ -342,14 +345,14 @@ class FeedLoadManager(private val context: Context) { private const val PARALLEL_EXTRACTIONS = 3 /** - * How many extractions to perform before waiting [DELAY_BETWEEN_BATCHES_MILLIS] to avoid - * being rate limited + * How many YouTube extractions to perform before waiting [DELAY_BETWEEN_BATCHES_MILLIS] + * to avoid being rate limited */ private const val BATCH_SIZE = 50 /** - * Wait a random delay in this range once every [BATCH_SIZE] extractions to avoid being - * rate limited + * Wait a random delay in this range once every [BATCH_SIZE] YouTube extractions to avoid + * being rate limited */ private val DELAY_BETWEEN_BATCHES_MILLIS = (6000L..12000L)