Only throttle YouTube feed loading

This commit is contained in:
Stypox 2024-11-24 16:20:46 +01:00
parent 33b96d238a
commit 726c12e934
No known key found for this signature in database
GPG key ID: 4BDF1B40A49FDD23

View file

@ -17,6 +17,7 @@ import org.schabi.newpipe.database.subscription.NotificationMode
import org.schabi.newpipe.database.subscription.SubscriptionEntity import org.schabi.newpipe.database.subscription.SubscriptionEntity
import org.schabi.newpipe.extractor.Info import org.schabi.newpipe.extractor.Info
import org.schabi.newpipe.extractor.NewPipe import org.schabi.newpipe.extractor.NewPipe
import org.schabi.newpipe.extractor.ServiceList
import org.schabi.newpipe.extractor.feed.FeedInfo import org.schabi.newpipe.extractor.feed.FeedInfo
import org.schabi.newpipe.extractor.stream.StreamInfoItem import org.schabi.newpipe.extractor.stream.StreamInfoItem
import org.schabi.newpipe.ktx.getStringSafe import org.schabi.newpipe.ktx.getStringSafe
@ -90,9 +91,9 @@ class FeedLoadManager(private val context: Context) {
else -> feedDatabaseManager.outdatedSubscriptionsForGroup(groupId, outdatedThreshold) else -> feedDatabaseManager.outdatedSubscriptionsForGroup(groupId, outdatedThreshold)
} }
// like `currentProgress`, but counts the number of extractions that have begun, so they // like `currentProgress`, but counts the number of YouTube extractions that have begun, so
// can be properly throttled every once in a while (see doOnNext below) // they can be properly throttled every once in a while (see doOnNext below)
val extractionCount = AtomicInteger() val youtubeExtractionCount = AtomicInteger()
return outdatedSubscriptions return outdatedSubscriptions
.take(1) .take(1)
@ -109,13 +110,15 @@ class FeedLoadManager(private val context: Context) {
.observeOn(Schedulers.io()) .observeOn(Schedulers.io())
.flatMap { Flowable.fromIterable(it) } .flatMap { Flowable.fromIterable(it) }
.takeWhile { !cancelSignal.get() } .takeWhile { !cancelSignal.get() }
.doOnNext { .doOnNext { subscriptionEntity ->
// throttle extractions once every BATCH_SIZE to avoid being throttled // throttle YouTube extractions once every BATCH_SIZE to avoid being rate limited
val previousCount = extractionCount.getAndIncrement() if (subscriptionEntity.serviceId == ServiceList.YouTube.serviceId) {
val previousCount = youtubeExtractionCount.getAndIncrement()
if (previousCount != 0 && previousCount % BATCH_SIZE == 0) { if (previousCount != 0 && previousCount % BATCH_SIZE == 0) {
Thread.sleep(DELAY_BETWEEN_BATCHES_MILLIS.random()) Thread.sleep(DELAY_BETWEEN_BATCHES_MILLIS.random())
} }
} }
}
.parallel(PARALLEL_EXTRACTIONS, PARALLEL_EXTRACTIONS * 2) .parallel(PARALLEL_EXTRACTIONS, PARALLEL_EXTRACTIONS * 2)
.runOn(Schedulers.io(), PARALLEL_EXTRACTIONS * 2) .runOn(Schedulers.io(), PARALLEL_EXTRACTIONS * 2)
.filter { !cancelSignal.get() } .filter { !cancelSignal.get() }
@ -342,14 +345,14 @@ class FeedLoadManager(private val context: Context) {
private const val PARALLEL_EXTRACTIONS = 3 private const val PARALLEL_EXTRACTIONS = 3
/** /**
* How many extractions to perform before waiting [DELAY_BETWEEN_BATCHES_MILLIS] to avoid * How many YouTube extractions to perform before waiting [DELAY_BETWEEN_BATCHES_MILLIS]
* being rate limited * to avoid being rate limited
*/ */
private const val BATCH_SIZE = 50 private const val BATCH_SIZE = 50
/** /**
* Wait a random delay in this range once every [BATCH_SIZE] extractions to avoid being * Wait a random delay in this range once every [BATCH_SIZE] YouTube extractions to avoid
* rate limited * being rate limited
*/ */
private val DELAY_BETWEEN_BATCHES_MILLIS = (6000L..12000L) private val DELAY_BETWEEN_BATCHES_MILLIS = (6000L..12000L)