Observe operations

This commit is contained in:
M66B
2018-12-02 14:19:54 +01:00
parent 810fe66a78
commit 251ff98ea5
18 changed files with 1595 additions and 458 deletions

View File

@@ -40,6 +40,8 @@ import android.net.NetworkRequest;
import android.net.Uri;
import android.os.Build;
import android.os.Bundle;
import android.os.Handler;
import android.os.Looper;
import android.os.PowerManager;
import android.os.SystemClock;
import android.preference.PreferenceManager;
@@ -120,12 +122,10 @@ import androidx.annotation.Nullable;
import androidx.core.content.ContextCompat;
import androidx.lifecycle.LifecycleService;
import androidx.lifecycle.Observer;
import androidx.localbroadcastmanager.content.LocalBroadcastManager;
import static android.os.Process.THREAD_PRIORITY_BACKGROUND;
public class ServiceSynchronize extends LifecycleService {
private final Object lock = new Object();
private TupleAccountStats lastStats = null;
private ServiceManager serviceManager = new ServiceManager();
private static ExecutorService executor = Executors.newSingleThreadExecutor(Helper.backgroundThreadFactory);
@@ -149,9 +149,6 @@ public class ServiceSynchronize extends LifecycleService {
static final int PI_TRASH = 5;
static final int PI_IGNORED = 6;
static final String ACTION_SYNCHRONIZE_FOLDER = BuildConfig.APPLICATION_ID + ".SYNCHRONIZE_FOLDER";
static final String ACTION_PROCESS_OPERATIONS = BuildConfig.APPLICATION_ID + ".PROCESS_OPERATIONS";
@Override
public void onCreate() {
Log.i(Helper.TAG, "Service create version=" + BuildConfig.VERSION_NAME);
@@ -356,8 +353,6 @@ public class ServiceSynchronize extends LifecycleService {
db.endTransaction();
}
EntityOperation.process(context);
return null;
}
}.load(this, args);
@@ -734,14 +729,12 @@ public class ServiceSynchronize extends LifecycleService {
private void monitorAccount(final EntityAccount account, final ServiceState state) throws NoSuchProviderException {
final PowerManager pm = getSystemService(PowerManager.class);
final PowerManager.WakeLock wl0 = pm.newWakeLock(
PowerManager.PARTIAL_WAKE_LOCK,
BuildConfig.APPLICATION_ID + ":account." + account.id + ".monitor");
final PowerManager.WakeLock wlAccount = pm.newWakeLock(
PowerManager.PARTIAL_WAKE_LOCK, BuildConfig.APPLICATION_ID + ":account." + account.id);
try {
wl0.acquire();
wlAccount.acquire();
final DB db = DB.getInstance(this);
final ExecutorService executor = Executors.newSingleThreadExecutor(Helper.backgroundThreadFactory);
int backoff = CONNECT_BACKOFF_START;
while (state.running()) {
@@ -760,19 +753,15 @@ public class ServiceSynchronize extends LifecycleService {
final IMAPStore istore = (IMAPStore) isession.getStore(account.starttls ? "imap" : "imaps");
final Map<EntityFolder, IMAPFolder> folders = new HashMap<>();
List<Thread> syncs = new ArrayList<>();
List<Thread> idlers = new ArrayList<>();
List<Handler> handlers = new ArrayList<>();
try {
// Listen for store events
istore.addStoreListener(new StoreListener() {
PowerManager.WakeLock wl = pm.newWakeLock(
PowerManager.PARTIAL_WAKE_LOCK,
BuildConfig.APPLICATION_ID + ":account." + account.id + ".store");
@Override
public void notification(StoreEvent e) {
try {
wl.acquire();
wlAccount.acquire();
String type = (e.getMessageType() == StoreEvent.ALERT ? "alert" : "notice");
EntityLog.log(ServiceSynchronize.this, account.name + " " + type + ": " + e.getMessage());
if (e.getMessageType() == StoreEvent.ALERT) {
@@ -780,32 +769,28 @@ public class ServiceSynchronize extends LifecycleService {
state.error();
}
} finally {
wl.release();
wlAccount.release();
}
}
});
// Listen for folder events
istore.addFolderListener(new FolderAdapter() {
PowerManager.WakeLock wl = pm.newWakeLock(
PowerManager.PARTIAL_WAKE_LOCK,
BuildConfig.APPLICATION_ID + ":account." + account.id + ".folder");
@Override
public void folderCreated(FolderEvent e) {
try {
wl.acquire();
wlAccount.acquire();
Log.i(Helper.TAG, "Folder created=" + e.getFolder().getFullName());
reload(ServiceSynchronize.this, "folder created");
} finally {
wl.release();
wlAccount.release();
}
}
@Override
public void folderRenamed(FolderEvent e) {
try {
wl.acquire();
wlAccount.acquire();
Log.i(Helper.TAG, "Folder renamed=" + e.getFolder());
String old = e.getFolder().getFullName();
@@ -815,18 +800,18 @@ public class ServiceSynchronize extends LifecycleService {
reload(ServiceSynchronize.this, "folder renamed");
} finally {
wl.release();
wlAccount.release();
}
}
@Override
public void folderDeleted(FolderEvent e) {
try {
wl.acquire();
wlAccount.acquire();
Log.i(Helper.TAG, "Folder deleted=" + e.getFolder().getFullName());
reload(ServiceSynchronize.this, "folder deleted");
} finally {
wl.release();
wlAccount.release();
}
}
});
@@ -909,166 +894,150 @@ public class ServiceSynchronize extends LifecycleService {
Log.i(Helper.TAG, account.name + " folder " + folder.name + " flags=" + ifolder.getPermanentFlags());
// Synchronize folder
Thread sync = new Thread(new Runnable() {
PowerManager.WakeLock wl = pm.newWakeLock(
PowerManager.PARTIAL_WAKE_LOCK,
BuildConfig.APPLICATION_ID + ":account." + account.id + ".sync");
// Listen for new and deleted messages
ifolder.addMessageCountListener(new MessageCountAdapter() {
@Override
public void run() {
try {
wl.acquire();
public void messagesAdded(MessageCountEvent e) {
synchronized (folder) {
try {
wlAccount.acquire();
Log.i(Helper.TAG, folder.name + " messages added");
// Process pending operations
processOperations(folder, isession, istore, ifolder, state);
FetchProfile fp = new FetchProfile();
fp.add(FetchProfile.Item.ENVELOPE);
fp.add(FetchProfile.Item.FLAGS);
fp.add(FetchProfile.Item.CONTENT_INFO); // body structure
fp.add(UIDFolder.FetchProfileItem.UID);
fp.add(IMAPFolder.FetchProfileItem.HEADERS);
fp.add(IMAPFolder.FetchProfileItem.MESSAGE);
fp.add(FetchProfile.Item.SIZE);
fp.add(IMAPFolder.FetchProfileItem.INTERNALDATE);
ifolder.fetch(e.getMessages(), fp);
// Listen for new and deleted messages
ifolder.addMessageCountListener(new MessageCountAdapter() {
@Override
public void messagesAdded(MessageCountEvent e) {
synchronized (lock) {
for (Message imessage : e.getMessages())
try {
long id;
try {
wl.acquire();
Log.i(Helper.TAG, folder.name + " messages added");
FetchProfile fp = new FetchProfile();
fp.add(FetchProfile.Item.ENVELOPE);
fp.add(FetchProfile.Item.FLAGS);
fp.add(FetchProfile.Item.CONTENT_INFO); // body structure
fp.add(UIDFolder.FetchProfileItem.UID);
fp.add(IMAPFolder.FetchProfileItem.HEADERS);
fp.add(IMAPFolder.FetchProfileItem.MESSAGE);
fp.add(FetchProfile.Item.SIZE);
fp.add(IMAPFolder.FetchProfileItem.INTERNALDATE);
ifolder.fetch(e.getMessages(), fp);
for (Message imessage : e.getMessages())
try {
long id;
try {
db.beginTransaction();
id = synchronizeMessage(
ServiceSynchronize.this,
folder, ifolder, (IMAPMessage) imessage,
false, false, false);
db.setTransactionSuccessful();
} finally {
db.endTransaction();
}
downloadMessage(ServiceSynchronize.this, folder, ifolder, (IMAPMessage) imessage, id);
} catch (MessageRemovedException ex) {
Log.w(Helper.TAG, folder.name + " " + ex + "\n" + Log.getStackTraceString(ex));
} catch (IOException ex) {
if (ex.getCause() instanceof MessageRemovedException)
Log.w(Helper.TAG, folder.name + " " + ex + "\n" + Log.getStackTraceString(ex));
else
throw ex;
}
EntityOperation.process(ServiceSynchronize.this); // download small attachments
} catch (Throwable ex) {
Log.e(Helper.TAG, folder.name + " " + ex + "\n" + Log.getStackTraceString(ex));
reportError(account.name, folder.name, ex);
db.folder().setFolderError(folder.id, Helper.formatThrowable(ex));
state.error();
db.beginTransaction();
id = synchronizeMessage(
ServiceSynchronize.this,
folder, ifolder, (IMAPMessage) imessage,
false, false, false);
db.setTransactionSuccessful();
} finally {
wl.release();
db.endTransaction();
}
}
}
@Override
public void messagesRemoved(MessageCountEvent e) {
synchronized (lock) {
try {
wl.acquire();
Log.i(Helper.TAG, folder.name + " messages removed");
for (Message imessage : e.getMessages())
try {
long uid = ifolder.getUID(imessage);
DB db = DB.getInstance(ServiceSynchronize.this);
int count = db.message().deleteMessage(folder.id, uid);
Log.i(Helper.TAG, "Deleted uid=" + uid + " count=" + count);
} catch (MessageRemovedException ex) {
Log.w(Helper.TAG, folder.name + " " + ex + "\n" + Log.getStackTraceString(ex));
}
} catch (Throwable ex) {
Log.e(Helper.TAG, folder.name + " " + ex + "\n" + Log.getStackTraceString(ex));
reportError(account.name, folder.name, ex);
db.folder().setFolderError(folder.id, Helper.formatThrowable(ex));
state.error();
db.beginTransaction();
downloadMessage(ServiceSynchronize.this, folder, ifolder, (IMAPMessage) imessage, id);
db.setTransactionSuccessful();
} finally {
wl.release();
db.endTransaction();
}
} catch (MessageRemovedException ex) {
Log.w(Helper.TAG, folder.name + " " + ex + "\n" + Log.getStackTraceString(ex));
} catch (IOException ex) {
if (ex.getCause() instanceof MessageRemovedException)
Log.w(Helper.TAG, folder.name + " " + ex + "\n" + Log.getStackTraceString(ex));
else
throw ex;
}
}
});
// Fetch e-mail
synchronizeMessages(account, folder, ifolder, state);
// Flags (like "seen") at the remote could be changed while synchronizing
// Listen for changed messages
ifolder.addMessageChangedListener(new MessageChangedListener() {
@Override
public void messageChanged(MessageChangedEvent e) {
synchronized (lock) {
try {
wl.acquire();
try {
Log.i(Helper.TAG, folder.name + " message changed");
FetchProfile fp = new FetchProfile();
fp.add(UIDFolder.FetchProfileItem.UID);
fp.add(IMAPFolder.FetchProfileItem.FLAGS);
ifolder.fetch(new Message[]{e.getMessage()}, fp);
long id;
try {
db.beginTransaction();
id = synchronizeMessage(
ServiceSynchronize.this,
folder, ifolder, (IMAPMessage) e.getMessage(),
false, false, false);
db.setTransactionSuccessful();
} finally {
db.endTransaction();
}
downloadMessage(ServiceSynchronize.this, folder, ifolder, (IMAPMessage) e.getMessage(), id);
} catch (MessageRemovedException ex) {
Log.w(Helper.TAG, folder.name + " " + ex + "\n" + Log.getStackTraceString(ex));
} catch (IOException ex) {
if (ex.getCause() instanceof MessageRemovedException)
Log.w(Helper.TAG, folder.name + " " + ex + "\n" + Log.getStackTraceString(ex));
else
throw ex;
}
} catch (Throwable ex) {
Log.e(Helper.TAG, folder.name + " " + ex + "\n" + Log.getStackTraceString(ex));
reportError(account.name, folder.name, ex);
db.folder().setFolderError(folder.id, Helper.formatThrowable(ex));
state.error();
} finally {
wl.release();
}
}
}
});
} catch (Throwable ex) {
Log.e(Helper.TAG, folder.name + " " + ex + "\n" + Log.getStackTraceString(ex));
reportError(account.name, folder.name, ex);
db.folder().setFolderError(folder.id, Helper.formatThrowable(ex));
state.error();
} finally {
wl.release();
} catch (Throwable ex) {
Log.e(Helper.TAG, folder.name + " " + ex + "\n" + Log.getStackTraceString(ex));
reportError(account.name, folder.name, ex);
db.folder().setFolderError(folder.id, Helper.formatThrowable(ex));
state.error();
} finally {
wlAccount.release();
}
}
}
}, "sync." + folder.id);
sync.start();
syncs.add(sync);
@Override
public void messagesRemoved(MessageCountEvent e) {
synchronized (folder) {
try {
wlAccount.acquire();
Log.i(Helper.TAG, folder.name + " messages removed");
for (Message imessage : e.getMessages())
try {
long uid = ifolder.getUID(imessage);
DB db = DB.getInstance(ServiceSynchronize.this);
int count = db.message().deleteMessage(folder.id, uid);
Log.i(Helper.TAG, "Deleted uid=" + uid + " count=" + count);
} catch (MessageRemovedException ex) {
Log.w(Helper.TAG, folder.name + " " + ex + "\n" + Log.getStackTraceString(ex));
}
} catch (Throwable ex) {
Log.e(Helper.TAG, folder.name + " " + ex + "\n" + Log.getStackTraceString(ex));
reportError(account.name, folder.name, ex);
db.folder().setFolderError(folder.id, Helper.formatThrowable(ex));
state.error();
} finally {
wlAccount.release();
}
}
}
});
// Flags (like "seen") at the remote could be changed while synchronizing
// Listen for changed messages
ifolder.addMessageChangedListener(new MessageChangedListener() {
@Override
public void messageChanged(MessageChangedEvent e) {
synchronized (folder) {
try {
wlAccount.acquire();
try {
Log.i(Helper.TAG, folder.name + " message changed");
FetchProfile fp = new FetchProfile();
fp.add(UIDFolder.FetchProfileItem.UID);
fp.add(IMAPFolder.FetchProfileItem.FLAGS);
ifolder.fetch(new Message[]{e.getMessage()}, fp);
long id;
try {
db.beginTransaction();
id = synchronizeMessage(
ServiceSynchronize.this,
folder, ifolder, (IMAPMessage) e.getMessage(),
false, false, false);
db.setTransactionSuccessful();
} finally {
db.endTransaction();
}
try {
db.beginTransaction();
downloadMessage(ServiceSynchronize.this, folder, ifolder, (IMAPMessage) e.getMessage(), id);
db.setTransactionSuccessful();
} finally {
db.endTransaction();
}
} catch (MessageRemovedException ex) {
Log.w(Helper.TAG, folder.name + " " + ex + "\n" + Log.getStackTraceString(ex));
} catch (IOException ex) {
if (ex.getCause() instanceof MessageRemovedException)
Log.w(Helper.TAG, folder.name + " " + ex + "\n" + Log.getStackTraceString(ex));
else
throw ex;
}
} catch (Throwable ex) {
Log.e(Helper.TAG, folder.name + " " + ex + "\n" + Log.getStackTraceString(ex));
reportError(account.name, folder.name, ex);
db.folder().setFolderError(folder.id, Helper.formatThrowable(ex));
state.error();
} finally {
wlAccount.release();
}
}
}
});
// Idle folder
if (capIdle) {
@@ -1078,9 +1047,8 @@ public class ServiceSynchronize extends LifecycleService {
try {
Log.i(Helper.TAG, folder.name + " start idle");
while (state.running()) {
Log.i(Helper.TAG, folder.name + " do idle");
Log.v(Helper.TAG, folder.name + " do idle");
ifolder.idle(false);
//Log.i(Helper.TAG, folder.name + " done idle");
}
} catch (Throwable ex) {
Log.e(Helper.TAG, folder.name + " " + ex + "\n" + Log.getStackTraceString(ex));
@@ -1095,108 +1063,110 @@ public class ServiceSynchronize extends LifecycleService {
idler.start();
idlers.add(idler);
}
EntityOperation.sync(db, folder.id);
}
// Process folder actions
BroadcastReceiver processFolder = new BroadcastReceiver() {
@Override
public void onReceive(Context context, final Intent intent) {
executor.submit(new Runnable() {
PowerManager.WakeLock wl = pm.newWakeLock(
PowerManager.PARTIAL_WAKE_LOCK,
BuildConfig.APPLICATION_ID + ":account." + account.id + ".process");
// Observe folder operations
for (final EntityFolder folder : db.folder().getFolders(account.id)) {
Handler handler = new Handler(getMainLooper()) {
private List<Long> handling = new ArrayList<>();
private final PowerManager.WakeLock wlFolder = pm.newWakeLock(
PowerManager.PARTIAL_WAKE_LOCK, BuildConfig.APPLICATION_ID + ":folder." + folder.id);
private final ExecutorService executor = Executors.newSingleThreadExecutor(Helper.backgroundThreadFactory);
@Override
public void run() {
long fid = intent.getLongExtra("folder", -1);
try {
wl.acquire();
Log.i(Helper.TAG, "Process folder=" + fid + " intent=" + intent);
// Get folder
EntityFolder folder = null;
IMAPFolder ifolder = null;
for (EntityFolder f : folders.keySet())
if (f.id == fid) {
folder = f;
ifolder = folders.get(f);
break;
@Override
public void handleMessage(android.os.Message msg) {
Log.i(Helper.TAG, folder.name + " observe=" + msg.what);
if (msg.what == 0)
db.operation().liveOperations(folder.id).removeObservers(ServiceSynchronize.this);
else
db.operation().liveOperations(folder.id).observe(ServiceSynchronize.this, new Observer<List<EntityOperation>>() {
@Override
public void onChanged(List<EntityOperation> operations) {
boolean process = false;
List<Long> current = new ArrayList<>();
for (EntityOperation op : operations) {
if (!handling.contains(op.id) || op.error != null)
process = true;
current.add(op.id);
}
handling = current;
final boolean shouldClose = (folder == null);
if (handling.size() > 0 && process) {
Log.i(Helper.TAG, folder.name + " operations=" + operations.size());
executor.submit(new Runnable() {
@Override
public void run() {
try {
wlFolder.acquire();
Log.i(Helper.TAG, folder.name + " process");
try {
if (folder == null)
folder = db.folder().getFolder(fid);
// Get folder
EntityFolder ofolder = null;
IMAPFolder ifolder = null;
for (EntityFolder f : folders.keySet())
if (f.id == folder.id) {
ofolder = f;
ifolder = folders.get(f);
break;
}
Log.i(Helper.TAG, folder.name + " run " + (shouldClose ? "offline" : "online"));
final boolean shouldClose = (ofolder == null);
if (ifolder == null) {
// Prevent unnecessary folder connections
if (ACTION_PROCESS_OPERATIONS.equals(intent.getAction()))
if (db.operation().getOperationCount(fid) == 0)
return;
try {
if (ofolder == null)
ofolder = db.folder().getFolder(folder.id);
db.folder().setFolderState(folder.id, "connecting");
Log.i(Helper.TAG, ofolder.name + " run " + (shouldClose ? "offline" : "online"));
ifolder = (IMAPFolder) istore.getFolder(folder.name);
ifolder.open(Folder.READ_WRITE);
if (ifolder == null) {
// Prevent unnecessary folder connections
if (db.operation().getOperationCount(ofolder.id, null) == 0)
return;
db.folder().setFolderState(folder.id, "connected");
db.folder().setFolderError(folder.id, null);
}
db.folder().setFolderState(ofolder.id, "connecting");
if (ACTION_PROCESS_OPERATIONS.equals(intent.getAction()))
processOperations(folder, isession, istore, ifolder, state);
ifolder = (IMAPFolder) istore.getFolder(ofolder.name);
ifolder.open(Folder.READ_WRITE);
else if (ACTION_SYNCHRONIZE_FOLDER.equals(intent.getAction())) {
processOperations(folder, isession, istore, ifolder, state);
synchronizeMessages(account, folder, ifolder, state);
}
db.folder().setFolderState(ofolder.id, "connected");
db.folder().setFolderError(ofolder.id, null);
}
} catch (Throwable ex) {
Log.e(Helper.TAG, folder.name + " " + ex + "\n" + Log.getStackTraceString(ex));
reportError(account.name, folder.name, ex);
db.folder().setFolderError(folder.id, Helper.formatThrowable(ex));
state.error();
} finally {
if (shouldClose) {
if (ifolder != null && ifolder.isOpen()) {
db.folder().setFolderState(folder.id, "closing");
try {
ifolder.close(false);
} catch (MessagingException ex) {
Log.w(Helper.TAG, folder.name + " " + ex + "\n" + Log.getStackTraceString(ex));
processOperations(account, ofolder, isession, istore, ifolder, state);
} catch (Throwable ex) {
Log.e(Helper.TAG, ofolder.name + " " + ex + "\n" + Log.getStackTraceString(ex));
reportError(account.name, ofolder.name, ex);
db.folder().setFolderError(ofolder.id, Helper.formatThrowable(ex));
state.error();
} finally {
if (shouldClose) {
if (ifolder != null && ifolder.isOpen()) {
db.folder().setFolderState(ofolder.id, "closing");
try {
ifolder.close(false);
} catch (MessagingException ex) {
Log.w(Helper.TAG, ofolder.name + " " + ex + "\n" + Log.getStackTraceString(ex));
}
}
db.folder().setFolderState(ofolder.id, null);
}
}
} finally {
wlFolder.release();
}
}
}
db.folder().setFolderState(folder.id, null);
});
}
}
} finally {
wl.release();
}
}
});
}
};
// Listen for folder operations
IntentFilter f = new IntentFilter();
f.addAction(ACTION_SYNCHRONIZE_FOLDER);
f.addAction(ACTION_PROCESS_OPERATIONS);
f.addDataType("account/" + account.id);
LocalBroadcastManager lbm = LocalBroadcastManager.getInstance(ServiceSynchronize.this);
lbm.registerReceiver(processFolder, f);
for (EntityFolder folder : folders.keySet())
if (db.operation().getOperationCount(folder.id) > 0) {
Intent intent = new Intent();
intent.setType("account/" + account.id);
intent.setAction(ServiceSynchronize.ACTION_PROCESS_OPERATIONS);
intent.putExtra("folder", folder.id);
lbm.sendBroadcast(intent);
}
});
}
};
handler.sendEmptyMessage(1);
handlers.add(handler);
}
// Keep alive alarm receiver
BroadcastReceiver alarm = new BroadcastReceiver() {
@@ -1204,7 +1174,7 @@ public class ServiceSynchronize extends LifecycleService {
public void onReceive(Context context, Intent intent) {
// Receiver runs on main thread
// Receiver has a wake lock for ~10 seconds
EntityLog.log(context, account.name + " keep alive wake lock=" + wl0.isHeld());
EntityLog.log(context, account.name + " keep alive wake lock=" + wlAccount.isHeld());
state.release();
}
};
@@ -1242,19 +1212,22 @@ public class ServiceSynchronize extends LifecycleService {
pi);
try {
wl0.release();
wlAccount.release();
state.acquire();
} catch (InterruptedException ex) {
EntityLog.log(this, account.name + " waited state=" + state);
} finally {
wl0.acquire();
wlAccount.acquire();
}
}
} finally {
// Cleanup
am.cancel(pi);
unregisterReceiver(alarm);
lbm.unregisterReceiver(processFolder);
for (Handler handler : handlers)
handler.sendEmptyMessage(0);
handlers.clear();
}
Log.i(Helper.TAG, account.name + " done state=" + state);
@@ -1282,13 +1255,10 @@ public class ServiceSynchronize extends LifecycleService {
db.account().setAccountState(account.id, null);
}
// Stop syncs
for (Thread sync : syncs)
state.join(sync);
// Stop idlers
for (Thread idler : idlers)
state.join(idler);
idlers.clear();
for (EntityFolder folder : folders.keySet())
db.folder().setFolderState(folder.id, null);
@@ -1323,10 +1293,10 @@ public class ServiceSynchronize extends LifecycleService {
pi);
try {
wl0.release();
wlAccount.release();
state.acquire(2 * CONNECT_BACKOFF_AlARM * 60 * 1000L);
} finally {
wl0.acquire();
wlAccount.acquire();
}
} finally {
// Cleanup
@@ -1343,12 +1313,12 @@ public class ServiceSynchronize extends LifecycleService {
}
} finally {
EntityLog.log(this, account.name + " stopped");
wl0.release();
wlAccount.release();
}
}
private void processOperations(EntityFolder folder, Session isession, IMAPStore istore, IMAPFolder ifolder, ServiceState state) throws MessagingException, JSONException, IOException {
synchronized (lock) {
private void processOperations(EntityAccount account, EntityFolder folder, Session isession, IMAPStore istore, IMAPFolder ifolder, ServiceState state) throws MessagingException, JSONException, IOException {
synchronized (folder) {
try {
Log.i(Helper.TAG, folder.name + " start process");
@@ -1363,15 +1333,20 @@ public class ServiceSynchronize extends LifecycleService {
" msg=" + op.message +
" args=" + op.args);
EntityMessage message = db.message().getMessage(op.message);
// Fetch most recent copy of message
EntityMessage message = null;
if (op.message != null)
message = db.message().getMessage(op.message);
try {
if (message == null)
if (message == null && !EntityOperation.SYNC.equals(op.name))
throw new MessageRemovedException();
db.operation().setOperationError(op.id, null);
db.message().setMessageError(message.id, null);
if (message != null)
db.message().setMessageError(message.id, null);
if (message.uid == null &&
if (message != null && message.uid == null &&
(EntityOperation.SEEN.equals(op.name) ||
EntityOperation.DELETE.equals(op.name) ||
EntityOperation.MOVE.equals(op.name) ||
@@ -1380,6 +1355,8 @@ public class ServiceSynchronize extends LifecycleService {
JSONArray jargs = new JSONArray(op.args);
// Operations should use database transaction when needed
if (EntityOperation.SEEN.equals(op.name))
doSeen(folder, ifolder, message, jargs, db);
@@ -1413,6 +1390,9 @@ public class ServiceSynchronize extends LifecycleService {
else if (EntityOperation.ATTACHMENT.equals(op.name))
doAttachment(folder, op, ifolder, message, jargs, db);
else if (EntityOperation.SYNC.equals(op.name))
synchronizeMessages(account, folder, ifolder, state);
else
throw new MessagingException("Unknown operation name=" + op.name);
@@ -1701,8 +1681,6 @@ public class ServiceSynchronize extends LifecycleService {
} finally {
db.endTransaction();
}
EntityOperation.process(this);
} catch (MessagingException ex) {
db.identity().setIdentityError(ident.id, Helper.formatThrowable(ex));
@@ -1890,20 +1868,16 @@ public class ServiceSynchronize extends LifecycleService {
long fetch = SystemClock.elapsedRealtime();
Log.i(Helper.TAG, folder.name + " remote fetched=" + (SystemClock.elapsedRealtime() - fetch) + " ms");
for (int i = 0; i < imessages.length && state.running(); i++) {
Message imessage = imessages[i];
for (int i = 0; i < imessages.length && state.running(); i++)
try {
uids.remove(ifolder.getUID(imessage));
uids.remove(ifolder.getUID(imessages[i]));
} catch (MessageRemovedException ex) {
Log.w(Helper.TAG, folder.name + " " + ex + "\n" + Log.getStackTraceString(ex));
} catch (Throwable ex) {
Log.e(Helper.TAG, folder.name + " " + ex + "\n" + Log.getStackTraceString(ex));
reportError(account.name, folder.name, ex);
db.folder().setFolderError(folder.id, Helper.formatThrowable(ex));
}
}
// Delete local messages not at remote
Log.i(Helper.TAG, folder.name + " delete=" + uids.size());
@@ -1926,8 +1900,6 @@ public class ServiceSynchronize extends LifecycleService {
Log.i(Helper.TAG, folder.name + " add=" + imessages.length);
for (int i = imessages.length - 1; i >= 0 && state.running(); i -= SYNC_BATCH_SIZE) {
int from = Math.max(0, i - SYNC_BATCH_SIZE + 1);
//Log.i(Helper.TAG, folder.name + " update " + from + " .. " + i);
Message[] isub = Arrays.copyOfRange(imessages, from, i + 1);
// Full fetch new/changed messages only
@@ -1983,18 +1955,18 @@ public class ServiceSynchronize extends LifecycleService {
Log.i(Helper.TAG, folder.name + " download=" + imessages.length);
for (int i = imessages.length - 1; i >= 0 && state.running(); i -= DOWNLOAD_BATCH_SIZE) {
int from = Math.max(0, i - DOWNLOAD_BATCH_SIZE + 1);
//Log.i(Helper.TAG, folder.name + " download " + from + " .. " + i);
Message[] isub = Arrays.copyOfRange(imessages, from, i + 1);
// Fetch on demand
for (int j = isub.length - 1; j >= 0 && state.running(); j--)
try {
//Log.i(Helper.TAG, folder.name + " download index=" + (from + j) + " id=" + ids[from + j]);
db.beginTransaction();
if (ids[from + j] != null) {
downloadMessage(this, folder, ifolder, (IMAPMessage) isub[j], ids[from + j]);
Thread.sleep(20);
}
db.setTransactionSuccessful();
} catch (FolderClosedException ex) {
throw ex;
} catch (FolderClosedIOException ex) {
@@ -2002,6 +1974,7 @@ public class ServiceSynchronize extends LifecycleService {
} catch (Throwable ex) {
Log.e(Helper.TAG, folder.name + " " + ex + "\n" + Log.getStackTraceString(ex));
} finally {
db.endTransaction();
// Free memory
((IMAPMessage) isub[j]).invalidateHeaders();
}
@@ -2051,7 +2024,6 @@ public class ServiceSynchronize extends LifecycleService {
if (message == null) {
// Will fetch headers within database transaction
String msgid = helper.getMessageID();
String[] refs = helper.getReferences();
Log.i(Helper.TAG, "Searching for " + msgid);
for (EntityMessage dup : db.message().getMessageByMsgId(folder.account, msgid, found)) {
EntityFolder dfolder = db.folder().getFolder(dup.folder);
@@ -2302,9 +2274,7 @@ public class ServiceSynchronize extends LifecycleService {
private boolean started = false;
private int queued = 0;
private long lastLost = 0;
private EntityFolder outbox = null;
private ExecutorService queue = Executors.newSingleThreadExecutor(Helper.backgroundThreadFactory);
private ExecutorService executor = Executors.newSingleThreadExecutor(Helper.backgroundThreadFactory);
@Override
public void onCapabilitiesChanged(Network network, NetworkCapabilities capabilities) {
@@ -2380,8 +2350,7 @@ public class ServiceSynchronize extends LifecycleService {
state.runnable(new Runnable() {
PowerManager pm = getSystemService(PowerManager.class);
PowerManager.WakeLock wl = pm.newWakeLock(
PowerManager.PARTIAL_WAKE_LOCK,
BuildConfig.APPLICATION_ID + ":start");
PowerManager.PARTIAL_WAKE_LOCK, BuildConfig.APPLICATION_ID + ":main");
private List<ServiceState> threadState = new ArrayList<>();
@Override
@@ -2391,12 +2360,6 @@ public class ServiceSynchronize extends LifecycleService {
final DB db = DB.getInstance(ServiceSynchronize.this);
outbox = db.folder().getOutbox();
if (outbox == null) {
EntityLog.log(ServiceSynchronize.this, "No outbox");
return;
}
long ago = new Date().getTime() - lastLost;
if (ago < RECONNECT_BACKOFF)
try {
@@ -2409,19 +2372,69 @@ public class ServiceSynchronize extends LifecycleService {
}
// Start monitoring outbox
IntentFilter f = new IntentFilter();
f.addAction(ACTION_SYNCHRONIZE_FOLDER);
f.addAction(ACTION_PROCESS_OPERATIONS);
f.addDataType("account/outbox");
LocalBroadcastManager lbm = LocalBroadcastManager.getInstance(ServiceSynchronize.this);
lbm.registerReceiver(outboxReceiver, f);
Handler handler = null;
final EntityFolder outbox = db.folder().getOutbox();
if (outbox != null) {
db.folder().setFolderError(outbox.id, null);
db.folder().setFolderState(outbox.id, "connected");
db.folder().setFolderError(outbox.id, null);
handler = new Handler(Looper.getMainLooper()) {
@Override
public void handleMessage(android.os.Message msg) {
Log.i(Helper.TAG, outbox.name + " observe=" + msg.what);
lbm.sendBroadcast(new Intent(ACTION_PROCESS_OPERATIONS)
.setType("account/outbox")
.putExtra("folder", outbox.id));
if (msg.what == 0)
db.operation().liveOperations(outbox.id).removeObservers(ServiceSynchronize.this);
else {
db.operation().liveOperations(outbox.id).observe(ServiceSynchronize.this, new Observer<List<EntityOperation>>() {
private List<Long> handling = new ArrayList<>();
private ExecutorService executor = Executors.newSingleThreadExecutor(Helper.backgroundThreadFactory);
@Override
public void onChanged(List<EntityOperation> operations) {
boolean process = false;
List<Long> current = new ArrayList<>();
for (EntityOperation op : operations) {
if (!handling.contains(op.id) || op.error != null)
process = true;
current.add(op.id);
}
handling = current;
if (handling.size() > 0 && process) {
Log.i(Helper.TAG, outbox.name + " operations=" + operations.size());
executor.submit(new Runnable() {
PowerManager pm = getSystemService(PowerManager.class);
PowerManager.WakeLock wl = pm.newWakeLock(
PowerManager.PARTIAL_WAKE_LOCK, BuildConfig.APPLICATION_ID + ":outbox");
@Override
public void run() {
try {
wl.acquire();
Log.i(Helper.TAG, outbox.name + " process");
db.folder().setFolderState(outbox.id, "syncing");
processOperations(null, outbox, null, null, null, state);
db.folder().setFolderError(outbox.id, null);
} catch (Throwable ex) {
Log.e(Helper.TAG, outbox.name + " " + ex + "\n" + Log.getStackTraceString(ex));
reportError(null, outbox.name, ex);
db.folder().setFolderError(outbox.id, Helper.formatThrowable(ex));
} finally {
db.folder().setFolderState(outbox.id, null);
wl.release();
EntityLog.log(ServiceSynchronize.this, "Outbox wake lock=" + wl.isHeld());
}
}
});
}
}
});
}
}
};
handler.sendEmptyMessage(1);
}
// Start monitoring accounts
List<EntityAccount> accounts = db.account().getAccounts(true);
@@ -2463,9 +2476,11 @@ public class ServiceSynchronize extends LifecycleService {
threadState.clear();
// Stop monitoring outbox
lbm.unregisterReceiver(outboxReceiver);
Log.i(Helper.TAG, outbox.name + " unlisten operations");
db.folder().setFolderState(outbox.id, null);
if (outbox != null) {
Log.i(Helper.TAG, outbox.name + " unlisten operations");
handler.sendEmptyMessage(0);
db.folder().setFolderState(outbox.id, null);
}
EntityLog.log(ServiceSynchronize.this, "Main exited");
} catch (Throwable ex) {
@@ -2505,8 +2520,7 @@ public class ServiceSynchronize extends LifecycleService {
queue.submit(new Runnable() {
PowerManager pm = getSystemService(PowerManager.class);
PowerManager.WakeLock wl = pm.newWakeLock(
PowerManager.PARTIAL_WAKE_LOCK,
BuildConfig.APPLICATION_ID + ":reload");
PowerManager.PARTIAL_WAKE_LOCK, BuildConfig.APPLICATION_ID + ":manage");
@Override
public void run() {
@@ -2546,45 +2560,6 @@ public class ServiceSynchronize extends LifecycleService {
started = doStart;
}
private BroadcastReceiver outboxReceiver = new BroadcastReceiver() {
@Override
public void onReceive(final Context context, Intent intent) {
Log.v(Helper.TAG, outbox.name + " run operations");
executor.submit(new Runnable() {
PowerManager pm = getSystemService(PowerManager.class);
PowerManager.WakeLock wl = pm.newWakeLock(
PowerManager.PARTIAL_WAKE_LOCK,
BuildConfig.APPLICATION_ID + ":outbox");
@Override
public void run() {
try {
wl.acquire();
DB db = DB.getInstance(context);
try {
Log.i(Helper.TAG, outbox.name + " start operations");
db.folder().setFolderState(outbox.id, "syncing");
processOperations(outbox, null, null, null, state);
db.folder().setFolderError(outbox.id, null);
} catch (Throwable ex) {
Log.e(Helper.TAG, outbox.name + " " + ex + "\n" + Log.getStackTraceString(ex));
reportError(null, outbox.name, ex);
db.folder().setFolderError(outbox.id, Helper.formatThrowable(ex));
} finally {
Log.i(Helper.TAG, outbox.name + " end operations");
db.folder().setFolderState(outbox.id, null);
}
} finally {
wl.release();
EntityLog.log(ServiceSynchronize.this, "Outbox wake lock=" + wl.isHeld());
}
}
});
}
};
}
public static void init(Context context) {