fast download pausing

* fast download pausing
* fix UI thread blocking when calling pause()
* check running threads before start the download
* fix null pointer exception in onDestroy in the download service, without calling onCreate method (android 8)
This commit is contained in:
kapodamy 2018-12-05 01:03:56 -03:00
parent 9f4a7e664f
commit e2aa36d083
5 changed files with 241 additions and 118 deletions

View file

@ -5,6 +5,7 @@ import android.util.Log;
import java.io.File;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.RandomAccessFile;
import java.net.HttpURLConnection;
import java.nio.channels.ClosedByInterruptException;
@ -13,14 +14,16 @@ import us.shandian.giga.util.Utility;
import static org.schabi.newpipe.BuildConfig.DEBUG;
public class DownloadInitializer implements Runnable {
public class DownloadInitializer extends Thread {
private final static String TAG = "DownloadInitializer";
final static int mId = 0;
private DownloadMission mMission;
private HttpURLConnection mConn;
DownloadInitializer(@NonNull DownloadMission mission) {
mMission = mission;
mConn = null;
}
@Override
@ -32,10 +35,12 @@ public class DownloadInitializer implements Runnable {
try {
mMission.currentThreadCount = mMission.threadCount;
HttpURLConnection conn = mMission.openConnection(mId, -1, -1);
mConn = mMission.openConnection(mId, -1, -1);
mMission.establishConnection(mId, mConn);
if (!mMission.running || Thread.interrupted()) return;
mMission.length = Utility.getContentLength(conn);
mMission.length = Utility.getContentLength(mConn);
if (mMission.length == 0) {
@ -44,7 +49,7 @@ public class DownloadInitializer implements Runnable {
}
// check for dynamic generated content
if (mMission.length == -1 && conn.getResponseCode() == 200) {
if (mMission.length == -1 && mConn.getResponseCode() == 200) {
mMission.blocks = 0;
mMission.length = 0;
mMission.fallback = true;
@ -56,50 +61,54 @@ public class DownloadInitializer implements Runnable {
}
} else {
// Open again
conn = mMission.openConnection(mId, mMission.length - 10, mMission.length);
mConn = mMission.openConnection(mId, mMission.length - 10, mMission.length);
mMission.establishConnection(mId, mConn);
int code = conn.getResponseCode();
if (!mMission.running || Thread.interrupted()) return;
if (code == 206) {
if (mMission.currentThreadCount > 1) {
mMission.blocks = mMission.length / DownloadMission.BLOCK_SIZE;
synchronized (mMission.blockState) {
if (mConn.getResponseCode() == 206) {
if (mMission.currentThreadCount > 1) {
mMission.blocks = mMission.length / DownloadMission.BLOCK_SIZE;
if (mMission.currentThreadCount > mMission.blocks) {
mMission.currentThreadCount = (int) mMission.blocks;
if (mMission.currentThreadCount > mMission.blocks) {
mMission.currentThreadCount = (int) mMission.blocks;
}
if (mMission.currentThreadCount <= 0) {
mMission.currentThreadCount = 1;
}
if (mMission.blocks * DownloadMission.BLOCK_SIZE < mMission.length) {
mMission.blocks++;
}
} else {
// if one thread is solicited don't calculate blocks, is useless
mMission.blocks = 1;
mMission.fallback = true;
mMission.unknownLength = false;
}
if (mMission.currentThreadCount <= 0) {
mMission.currentThreadCount = 1;
}
if (mMission.blocks * DownloadMission.BLOCK_SIZE < mMission.length) {
mMission.blocks++;
if (DEBUG) {
Log.d(TAG, "http response code = " + mConn.getResponseCode());
}
} else {
// if one thread is solicited don't calculate blocks, is useless
mMission.blocks = 1;
// Fallback to single thread
mMission.blocks = 0;
mMission.fallback = true;
mMission.unknownLength = false;
mMission.currentThreadCount = 1;
if (DEBUG) {
Log.d(TAG, "falling back due http response code = " + mConn.getResponseCode());
}
}
if (DEBUG) {
Log.d(TAG, "http response code = " + code);
}
} else {
// Fallback to single thread
mMission.blocks = 0;
mMission.fallback = true;
mMission.unknownLength = false;
mMission.currentThreadCount = 1;
if (DEBUG) {
Log.d(TAG, "falling back due http response code = " + code);
for (long i = 0; i < mMission.currentThreadCount; i++) {
mMission.threadBlockPositions.add(i);
mMission.threadBytePositions.add(0L);
}
}
}
for (long i = 0; i < mMission.currentThreadCount; i++) {
mMission.threadBlockPositions.add(i);
mMission.threadBytePositions.add(0L);
if (!mMission.running || Thread.interrupted()) return;
}
File file;
@ -112,7 +121,7 @@ public class DownloadInitializer implements Runnable {
file = new File(file, mMission.name);
// if the name is used by "something", delete it
// if the name is used by another process, delete it
if (file.exists() && !file.isFile() && !file.delete()) {
mMission.notifyError(DownloadMission.ERROR_FILE_CREATION, null);
return;
@ -131,14 +140,16 @@ public class DownloadInitializer implements Runnable {
af.seek(mMission.offsets[mMission.current]);
af.close();
if (Thread.interrupted()) return;
if (!mMission.running || Thread.interrupted()) return;
mMission.running = false;
break;
} catch (InterruptedIOException | ClosedByInterruptException e) {
return;
} catch (Exception e) {
if (e instanceof ClosedByInterruptException) {
return;
} else if (e instanceof IOException && e.getMessage().contains("Permission denied")) {
if (!mMission.running) return;
if (e instanceof IOException && e.getMessage().contains("Permission denied")) {
mMission.notifyError(DownloadMission.ERROR_PERMISSION_DENIED, e);
return;
}
@ -150,11 +161,26 @@ public class DownloadInitializer implements Runnable {
return;
}
//try again
Log.e(TAG, "initializer failed, retrying", e);
}
}
// hide marquee in the progress bar
mMission.done++;
mMission.start();
}
@Override
public void interrupt() {
super.interrupt();
if (mConn != null) {
try {
mConn.disconnect();
} catch (Exception e) {
// nothing to do
}
}
}
}

View file

@ -122,13 +122,13 @@ public class DownloadMission extends Mission {
private transient boolean mWritingToFile;
@SuppressWarnings("UseSparseArrays")// LongSparseArray is not serializable
private final HashMap<Long, Boolean> blockState = new HashMap<>();
final HashMap<Long, Boolean> blockState = new HashMap<>();
final List<Long> threadBlockPositions = new ArrayList<>();
final List<Long> threadBytePositions = new ArrayList<>();
private transient boolean deleted;
int currentThreadCount;
private transient Thread[] threads = null;
private transient Thread[] threads = new Thread[0];
private transient Thread init = null;
@ -238,9 +238,8 @@ public class DownloadMission extends Mission {
* @param rangeEnd range end
* @return a {@link java.net.URLConnection URLConnection} linking to the URL.
* @throws IOException if an I/O exception occurs.
* @throws HttpError if the the http response is not satisfiable
*/
HttpURLConnection openConnection(int threadId, long rangeStart, long rangeEnd) throws IOException, HttpError {
HttpURLConnection openConnection(int threadId, long rangeStart, long rangeEnd) throws IOException {
URL url = new URL(urls[current]);
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setInstanceFollowRedirects(true);
@ -250,29 +249,45 @@ public class DownloadMission extends Mission {
if (rangeEnd > 0) req += rangeEnd;
conn.setRequestProperty("Range", req);
if (DEBUG) {
Log.d(TAG, threadId + ":" + conn.getRequestProperty("Range"));
Log.d(TAG, threadId + ":Content-Length=" + conn.getContentLength() + " Code:" + conn.getResponseCode());
}
}
conn.connect();
return conn;
}
/**
* @param threadId id of the calling thread
* @param conn Opens and establish the communication
* @throws IOException if an error occurred connecting to the server.
* @throws HttpError if the HTTP Status-Code is not satisfiable
*/
void establishConnection(int threadId, HttpURLConnection conn) throws IOException, HttpError {
conn.connect();
int statusCode = conn.getResponseCode();
if (DEBUG) {
Log.d(TAG, threadId + ":Content-Length=" + conn.getContentLength() + " Code:" + statusCode);
}
switch (statusCode) {
case 204:
case 205:
case 207:
throw new HttpError(conn.getResponseCode());
case 416:
return;// let the download thread handle this error
default:
if (statusCode < 200 || statusCode > 299) {
throw new HttpError(statusCode);
}
}
return conn;
}
private void notify(int what) {
Message m = new Message();
m.what = what;
@ -389,6 +404,11 @@ public class DownloadMission extends Mission {
*/
public void start() {
if (running || current >= urls.length) return;
// ensure that the previous state is completely paused.
joinForThread(init);
for (Thread thread : threads) joinForThread(thread);
enqueued = false;
running = true;
errCode = ERROR_NOTHING;
@ -400,7 +420,7 @@ public class DownloadMission extends Mission {
init = null;
if (threads == null) {
if (threads.length < 1) {
threads = new Thread[currentThreadCount];
}
@ -428,39 +448,37 @@ public class DownloadMission extends Mission {
recovered = true;
enqueued = false;
if (init != null && init != Thread.currentThread() && init.isAlive()) {
init.interrupt();
try {
init.join();
} catch (InterruptedException e) {
// nothing to do
if (postprocessingRunning) {
if (DEBUG) {
Log.w(TAG, "pause during post-processing is not applicable.");
}
return;
}
resetState();
if (init != null && init.isAlive()) {
init.interrupt();
synchronized (blockState) {
resetState();
}
return;
}
if (DEBUG && blocks == 0) {
Log.w(TAG, "pausing a download that can not be resumed.");
Log.w(TAG, "pausing a download that can not be resumed (range requests not allowed by the server).");
}
if (threads == null || Thread.interrupted()) {
if (threads == null || Thread.currentThread().isInterrupted()) {
writeThisToFile();
return;
}
if (postprocessingRunning) return;
// wait for all threads are suspended before save the state
runAsync(-1, () -> {
try {
for (Thread thread : threads) {
if (thread == Thread.currentThread()) continue;
if (thread.isAlive()) {
thread.interrupt();
thread.join();
thread.join(5000);
}
}
} catch (Exception e) {
@ -492,7 +510,7 @@ public class DownloadMission extends Mission {
threadBlockPositions.clear();
threadBytePositions.clear();
blockState.clear();
threads = null;
threads = new Thread[0];
Utility.writeToFile(metadata, DownloadMission.this);
}
@ -571,28 +589,61 @@ public class DownloadMission extends Mission {
}
/**
* run a method in a new thread
* run a new thread
*
* @param id id of new thread (used for debugging only)
* @param who the object whose {@code run} method is invoked when this thread is started
* @return the created thread
* @param who the Runnable whose {@code run} method is invoked.
*/
private Thread runAsync(int id, Runnable who) {
private void runAsync(int id, Runnable who) {
runAsync(id, new Thread(who));
}
/**
* run a new thread
*
* @param id id of new thread (used for debugging only)
* @param who the Thread whose {@code run} method is invoked when this thread is started
* @return the passed thread
*/
private Thread runAsync(int id, Thread who) {
// known thread ids:
// -2: state saving by notifyProgress() method
// -1: wait for saving the state by pause() method
// 0: initializer
// >=1: any download thread
Thread thread = new Thread(who);
if (DEBUG) {
thread.setName(String.format("[%s] id = %s filename = %s", TAG, id, name));
who.setName(String.format("%s[%s] %s", TAG, id, name));
}
thread.start();
return thread;
who.start();
return who;
}
private void joinForThread(Thread thread) {
if (thread == null || !thread.isAlive()) return;
if (thread == Thread.currentThread()) return;
if (DEBUG) {
Log.w(TAG, "a thread is !still alive!: " + thread.getName());
}
// still alive, this should not happen.
// Possible reasons:
// slow device
// the user is spamming start/pause buttons
// start() method called quickly after pause()
try {
thread.join(10000);
} catch (InterruptedException e) {
Log.d(TAG, "timeout on join : " + thread.getName());
throw new RuntimeException("A thread is still running:\n" + thread.getName());
}
}
static class HttpError extends Exception {
int statusCode;
@ -602,7 +653,7 @@ public class DownloadMission extends Mission {
@Override
public String getMessage() {
return "Http status code: " + String.valueOf(statusCode);
return "HTTP " + String.valueOf(statusCode);
}
}
}

View file

@ -14,16 +14,19 @@ import static org.schabi.newpipe.BuildConfig.DEBUG;
* Runnable to download blocks of a file until the file is completely downloaded,
* an error occurs or the process is stopped.
*/
public class DownloadRunnable implements Runnable {
public class DownloadRunnable extends Thread {
private static final String TAG = DownloadRunnable.class.getSimpleName();
private final DownloadMission mMission;
private final int mId;
private HttpURLConnection mConn;
DownloadRunnable(DownloadMission mission, int id) {
if (mission == null) throw new NullPointerException("mission is null");
mMission = mission;
mId = id;
mConn = null;
}
@Override
@ -47,12 +50,7 @@ public class DownloadRunnable implements Runnable {
return;
}
while (mMission.errCode == DownloadMission.ERROR_NOTHING && mMission.running && blockPosition < mMission.blocks) {
if (Thread.currentThread().isInterrupted()) {
mMission.pause();
return;
}
while (mMission.running && mMission.errCode == DownloadMission.ERROR_NOTHING && blockPosition < mMission.blocks) {
if (DEBUG && retry) {
Log.d(TAG, mId + ":retry is true. Resuming at " + blockPosition);
@ -83,8 +81,9 @@ public class DownloadRunnable implements Runnable {
long start = blockPosition * DownloadMission.BLOCK_SIZE;
long end = start + DownloadMission.BLOCK_SIZE - 1;
long offset = mMission.getThreadBytePosition(mId);
start += mMission.getThreadBytePosition(mId);
start += offset;
if (end >= mMission.length) {
end = mMission.length - 1;
@ -93,14 +92,21 @@ public class DownloadRunnable implements Runnable {
long total = 0;
try {
HttpURLConnection conn = mMission.openConnection(mId, start, end);
mConn = mMission.openConnection(mId, start, end);
mMission.establishConnection(mId, mConn);
// check if the download can be resumed
if (mConn.getResponseCode() == 416 && offset > 0) {
retryCount--;
throw new DownloadMission.HttpError(416);
}
// The server may be ignoring the range request
if (conn.getResponseCode() != 206) {
mMission.notifyError(new DownloadMission.HttpError(conn.getResponseCode()));
if (mConn.getResponseCode() != 206) {
mMission.notifyError(new DownloadMission.HttpError(mConn.getResponseCode()));
if (DEBUG) {
Log.e(TAG, mId + ":Unsupported " + conn.getResponseCode());
Log.e(TAG, mId + ":Unsupported " + mConn.getResponseCode());
}
break;
@ -108,7 +114,8 @@ public class DownloadRunnable implements Runnable {
f.seek(mMission.offsets[mMission.current] + start);
is = conn.getInputStream();
is = mConn.getInputStream();
byte[] buf = new byte[DownloadMission.BUFFER_SIZE];
int len;
@ -121,18 +128,17 @@ public class DownloadRunnable implements Runnable {
if (DEBUG && mMission.running) {
Log.d(TAG, mId + ":position " + blockPosition + " finished, " + total + " bytes downloaded");
mMission.setThreadBytePosition(mId, 0L);
}
// if the download is paused, save progress for this thread
if (!mMission.running) {
mMission.setThreadBytePosition(mId, total);
break;
}
if (mMission.running)
mMission.setThreadBytePosition(mId, 0L);// clear byte position for next block
else
mMission.setThreadBytePosition(mId, total);// download paused, save progress for this block
} catch (Exception e) {
mMission.setThreadBytePosition(mId, total);
if (e instanceof ClosedByInterruptException) break;
if (!mMission.running || e instanceof ClosedByInterruptException) break;
if (retryCount++ >= mMission.maxRetry) {
mMission.notifyError(e);
@ -147,29 +153,43 @@ public class DownloadRunnable implements Runnable {
}
}
try {
f.close();
} catch (Exception err) {
// ¿ejected media storage? ¿file deleted? ¿storage ran out of space?
}
try {
if (is != null) is.close();
} catch (Exception err) {
// nothing to do
}
try {
f.close();
} catch (Exception err) {
// ¿ejected media storage? ¿file deleted? ¿storage ran out of space?
}
if (DEBUG) {
Log.d(TAG, "thread " + mId + " exited from main download loop");
}
if (mMission.errCode == DownloadMission.ERROR_NOTHING && mMission.running) {
if (DEBUG) {
Log.d(TAG, "no error has happened, notifying");
}
mMission.notifyFinished();
}
if (DEBUG && !mMission.running) {
Log.d(TAG, "The mission has been paused. Passing.");
}
}
@Override
public void interrupt() {
super.interrupt();
try {
if (mConn != null) mConn.disconnect();
} catch (Exception e) {
// nothing to do
}
}
}

View file

@ -18,30 +18,33 @@ import static org.schabi.newpipe.BuildConfig.DEBUG;
/**
* Single-threaded fallback mode
*/
public class DownloadRunnableFallback implements Runnable {
public class DownloadRunnableFallback extends Thread {
private static final String TAG = "DownloadRunnableFallback";
private final DownloadMission mMission;
private int retryCount = 0;
private final int mId = 1;
private InputStream is;
private RandomAccessFile f;
private int mRetryCount = 0;
private InputStream mIs;
private RandomAccessFile mF;
private HttpURLConnection mConn;
DownloadRunnableFallback(@NonNull DownloadMission mission) {
mMission = mission;
is = null;
f = null;
mIs = null;
mF = null;
mConn = null;
}
private void dispose() {
try {
if (is != null) is.close();
if (mIs != null) mIs.close();
} catch (IOException e) {
// nothing to do
}
try {
if (f != null) f.close();
if (mF != null) mF.close();
} catch (IOException e) {
// ¿ejected media storage? ¿file deleted? ¿storage ran out of space?
}
@ -63,27 +66,36 @@ public class DownloadRunnableFallback implements Runnable {
try {
long rangeStart = (mMission.unknownLength || start < 1) ? -1 : start;
HttpURLConnection conn = mMission.openConnection(1, rangeStart, -1);
mConn = mMission.openConnection(mId, rangeStart, -1);
mMission.establishConnection(mId, mConn);
// check if the download can be resumed
if (mConn.getResponseCode() == 416 && start > 0) {
start = 0;
mRetryCount--;
throw new DownloadMission.HttpError(416);
}
// secondary check for the file length
if (!mMission.unknownLength)
mMission.unknownLength = Utility.getContentLength(conn) == -1;
mMission.unknownLength = Utility.getContentLength(mConn) == -1;
f = new RandomAccessFile(mMission.getDownloadedFile(), "rw");
f.seek(mMission.offsets[mMission.current] + start);
mF = new RandomAccessFile(mMission.getDownloadedFile(), "rw");
mF.seek(mMission.offsets[mMission.current] + start);
is = conn.getInputStream();
mIs = mConn.getInputStream();
byte[] buf = new byte[64 * 1024];
int len = 0;
while (mMission.running && (len = is.read(buf, 0, buf.length)) != -1) {
f.write(buf, 0, len);
while (mMission.running && (len = mIs.read(buf, 0, buf.length)) != -1) {
mF.write(buf, 0, len);
start += len;
mMission.notifyProgress(len);
}
// if thread goes interrupted check if the last part is written. This avoid re-download the whole file
// if thread goes interrupted check if the last part mIs written. This avoid re-download the whole file
done = len == -1;
} catch (Exception e) {
dispose();
@ -91,9 +103,9 @@ public class DownloadRunnableFallback implements Runnable {
// save position
mMission.setThreadBytePosition(0, start);
if (e instanceof ClosedByInterruptException) return;
if (!mMission.running || e instanceof ClosedByInterruptException) return;
if (retryCount++ >= mMission.maxRetry) {
if (mRetryCount++ >= mMission.maxRetry) {
mMission.notifyError(e);
return;
}
@ -110,4 +122,18 @@ public class DownloadRunnableFallback implements Runnable {
mMission.setThreadBytePosition(0, start);
}
}
@Override
public void interrupt() {
super.interrupt();
if (mConn != null) {
try {
mConn.disconnect();
} catch (Exception e) {
// nothing to do
}
}
}
}

View file

@ -235,7 +235,7 @@ public class DownloadManagerService extends Service {
if (icDownloadDone != null) icDownloadDone.recycle();
if (icDownloadFailed != null) icDownloadFailed.recycle();
icLauncher.recycle();
if (icLauncher != null) icLauncher.recycle();
}
@Override