diff options
Diffstat (limited to 'src/main/src/sync/BasicSyncTask.java')
-rw-r--r-- | src/main/src/sync/BasicSyncTask.java | 292 |
1 files changed, 292 insertions, 0 deletions
diff --git a/src/main/src/sync/BasicSyncTask.java b/src/main/src/sync/BasicSyncTask.java new file mode 100644 index 0000000..24f34a8 --- /dev/null +++ b/src/main/src/sync/BasicSyncTask.java @@ -0,0 +1,292 @@ +/* + * SSSync, a Simple and Stupid Synchronizer for data with multi-valued attributes + * Copyright (C) 2014 Ludovic Pouzenc <ludovic@pouzenc.fr> + * + * This file is part of SSSync. + * + * SSSync is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * SSSync is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with SSSync. If not, see <http://www.gnu.org/licenses/> + */ + +package sync; + + +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.Set; + +import org.apache.log4j.Logger; + +import data.MVDataEntry; +import data.io.MVDataReader; +import data.io.MVDataWriter; + +/** + * Basic one-way synchronization code. Uses MVDataEntry semantics. + * Each entry has a key and a set of multi-valued attributes, like LDAP entries. + * Data source is a MVDataReader. Multiple source could be used via MVDataCombiner. + * <br/><br/> + * <b>Warnings :</b> needs MVDataReaders that give key-sorted results. This sync will try + * to delete entries that exists on destination side and don't exist at source side. + * Extra attributes in existing entries on destination side are preserved. + * Look like useful for account's failure password count for instance. + * <br/><br/> + * <b>Notes :</b> Null value and empty strings are not allowed in MVDataEntry, so they are not sync'ed. + * + * @author lpouzenc + */ +public class BasicSyncTask extends AbstractSyncTask { + private static final Logger logger = Logger.getLogger(BasicSyncTask.class.getName()); + + /** + * Source data stream (read-only) + */ + private final MVDataReader srcReader; + /** + * Destination data stream (read) + */ + private final MVDataReader dstReader; + /** + * Destination data stream (write) + */ + private final MVDataWriter dstWriter; + + /** + * If true, disable removal of data on destination side even if detected as obsolete + */ + private boolean skipEntryDelete; + + + private int maxInserts; + private int maxUpdates; + private int maxDeletes; + + private transient int curInserts; + private transient int curUpdates; + private transient int curDeletes; + + + /** + * BasicSyncTask constructor + * Assumes that the *Readers have iterators that returns entries sorted by lexicographical ascending key + * @param taskName Friendly name of this task (for tracing in log files) + * @param srcReader Source data stream (read-only) + * @param dstReader Destination data stream (read) + * @param dstWriter Destination data stream (write) + */ + public BasicSyncTask(String taskName, boolean skipDelete, MVDataReader srcReader, MVDataReader dstReader, MVDataWriter dstWriter) { + this.taskName = taskName; + this.srcReader = srcReader; + this.dstReader = dstReader; + this.dstWriter = dstWriter; + + this.maxInserts = 0; + this.maxUpdates = 0; + this.maxDeletes = 0; + } + + public Boolean call() { + logger.info("task " + taskName + " : starting " + (dryRun?"dry-run":"real") + " pass"); + // Better stack traces "call()" don't say "what" + boolean success = syncTaskRun(); + logger.info("task " + taskName + " : " + (success?"terminated successfully":"aborted")); + + return success; + } + + private boolean syncTaskRun() { + curInserts=0; + curUpdates=0; + curDeletes=0; + dstWriter.setDryRun(dryRun); + + Iterator<MVDataEntry> itSrc = srcReader.iterator(); + Iterator<MVDataEntry> itDst = dstReader.iterator(); + MVDataEntry src = null, dst = null; + boolean srcExhausted = false; + boolean dstExhausted = false; + boolean abort = false; + boolean done = false; + while ( !abort && !done ) { + + // Look-ahead srcReader if previous has been "poped" (or if never read yet) + if ( src == null ) { + if ( !srcExhausted ) { + srcExhausted = !itSrc.hasNext(); + } + if ( !srcExhausted ) { + try { + src=itSrc.next(); + logger.trace("src read : " + src); + } catch (Exception e) { + logger.error("Read failure detected on " + srcReader.getDataSourceName() + ". Aborting.", e); + // Escape from the while loop + abort=true; continue; + } + } + } + + // Look-ahead dstReader if previous has been "poped" (or if never read yet) + if ( dst == null ) { + if ( !dstExhausted ) { + dstExhausted = !itDst.hasNext(); + } + if ( !dstExhausted ) { + try { + dst = itDst.next(); + logger.trace("dst read : " + dst); + } catch (NoSuchElementException e) { + logger.error("Read failure detected on " + dstReader.getDataSourceName() + ". Aborting.", e); + // Escape from the while loop + abort=true; continue; + } + } + } + + // Error-free cases (no problems while reading data) + int compare; + if ( !srcExhausted && !dstExhausted ) { + // General case : check order precedence to take an action + compare = src.compareTo(dst); + } else if ( !srcExhausted && dstExhausted ) { + // Particular case : dst is exhausted, it's like ( src < dst ) + compare=-1; + } else if ( srcExhausted && !dstExhausted ) { + // Particular case : src is exhausted, it's like ( src > dst ) + compare=1; + } else /* ( srcExhausted && dstExhausted ) */ { + // Particular case : everything is synchronized + // Exit gracefully the while loop + done=true; continue; + } + + logger.trace("compare : " + compare); + + boolean actionRealized = false; + // Take an action (insert/update/delete) + if ( compare < 0 ) { + actionRealized = _insert(src); + src = null; + // preserve dst until src key is not greater + } else if ( compare > 0 ) { + // dst current entry doesn't exists anymore (src key is greater than dst key) + actionRealized = _delete(dst); + // preserve src until dst key is not greater + dst = null; + } else /* ( compare == 0 ) */ { + // src current entry already exists in dst, update it if necessary + Set<String> changedAttr = src.getChangedAttributes(dst); + if ( ! changedAttr.isEmpty() ) { + actionRealized = _update(src,dst,changedAttr); + } else { + // Already up-to-date, nothing to do + actionRealized = true; + } + // Both src and dst have been used + src = null; + dst = null; + } + abort = !actionRealized; + } /* while */ + + return !abort; + } /* _taskRunSync() */ + + private boolean _insert(MVDataEntry entry) { + + if ( maxInserts > 0 && curInserts >= maxInserts ) { + logger.error("Max insert limit reached (" + maxInserts + ")" ); + return false; + } + + logger.debug("dstWriter : Action\n-> Insert " + entry); + try { + dstWriter.insert(entry); + } catch (Exception e) { + logger.error("Exception occured while inserting", e); + return false; + } + + curInserts++; + return true; + } + + private boolean _update(MVDataEntry updatedEntry, MVDataEntry originalEntry, Set<String> attrToUpdate) { + if ( maxUpdates > 0 && curUpdates >= maxUpdates ) { + logger.error("Max update limit reached (" + maxUpdates + ")"); + return false; + } + + logger.debug("dstWriter : Action\n-> Update " + updatedEntry + "\n-> changed attributes : " + attrToUpdate); + try { + dstWriter.update(updatedEntry, originalEntry, attrToUpdate); + } catch (Exception e) { + logger.error("Exception occured while updating", e); + return false; + } + + curUpdates++; + return true; + } + + private boolean _delete(MVDataEntry entry) { + if ( skipEntryDelete ) { + logger.info("dstWriter : skipping deletion for key " + entry.getKey()); + return true; + } + + if ( maxDeletes > 0 && curDeletes >= maxDeletes ) { + logger.error("Max delete limit reached (" + maxDeletes + ")"); + return false; + } + logger.debug("dstWriter : Action\n-> Delete " + entry); + try { + dstWriter.delete(entry); + } catch (Exception e) { + logger.error("Exception occured while deleting", e); + return false; + } + + curDeletes++; + return true; + } + + // Boring accessors + + /** + * Setter to fix limits about operations counts (safeguard) + * @param maxInserts + * @param maxUpdates + * @param maxDeletes + */ + public void setOperationLimits(int maxInserts, int maxUpdates, int maxDeletes) { + this.maxInserts = maxInserts; + this.maxUpdates = maxUpdates; + this.maxDeletes = maxDeletes; + } + + /** + * @return the skipEntryDelete + */ + public boolean isSkipEntryDelete() { + return skipEntryDelete; + } + + /** + * @param skipEntryDelete the skipEntryDelete to set + */ + public void setSkipEntryDelete(boolean skipEntryDelete) { + this.skipEntryDelete = skipEntryDelete; + } + +} |