summaryrefslogtreecommitdiff
path: root/src/main/src/sync/BasicSyncTask.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/src/sync/BasicSyncTask.java')
-rw-r--r--src/main/src/sync/BasicSyncTask.java292
1 files changed, 292 insertions, 0 deletions
diff --git a/src/main/src/sync/BasicSyncTask.java b/src/main/src/sync/BasicSyncTask.java
new file mode 100644
index 0000000..24f34a8
--- /dev/null
+++ b/src/main/src/sync/BasicSyncTask.java
@@ -0,0 +1,292 @@
+/*
+ * SSSync, a Simple and Stupid Synchronizer for data with multi-valued attributes
+ * Copyright (C) 2014 Ludovic Pouzenc <ludovic@pouzenc.fr>
+ *
+ * This file is part of SSSync.
+ *
+ * SSSync is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * SSSync is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with SSSync. If not, see <http://www.gnu.org/licenses/>
+ */
+
+package sync;
+
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Set;
+
+import org.apache.log4j.Logger;
+
+import data.MVDataEntry;
+import data.io.MVDataReader;
+import data.io.MVDataWriter;
+
+/**
+ * Basic one-way synchronization code. Uses MVDataEntry semantics.
+ * Each entry has a key and a set of multi-valued attributes, like LDAP entries.
+ * Data source is a MVDataReader. Multiple source could be used via MVDataCombiner.
+ * <br/><br/>
+ * <b>Warnings :</b> needs MVDataReaders that give key-sorted results. This sync will try
+ * to delete entries that exists on destination side and don't exist at source side.
+ * Extra attributes in existing entries on destination side are preserved.
+ * Look like useful for account's failure password count for instance.
+ * <br/><br/>
+ * <b>Notes :</b> Null value and empty strings are not allowed in MVDataEntry, so they are not sync'ed.
+ *
+ * @author lpouzenc
+ */
+public class BasicSyncTask extends AbstractSyncTask {
+ private static final Logger logger = Logger.getLogger(BasicSyncTask.class.getName());
+
+ /**
+ * Source data stream (read-only)
+ */
+ private final MVDataReader srcReader;
+ /**
+ * Destination data stream (read)
+ */
+ private final MVDataReader dstReader;
+ /**
+ * Destination data stream (write)
+ */
+ private final MVDataWriter dstWriter;
+
+ /**
+ * If true, disable removal of data on destination side even if detected as obsolete
+ */
+ private boolean skipEntryDelete;
+
+
+ private int maxInserts;
+ private int maxUpdates;
+ private int maxDeletes;
+
+ private transient int curInserts;
+ private transient int curUpdates;
+ private transient int curDeletes;
+
+
+ /**
+ * BasicSyncTask constructor
+ * Assumes that the *Readers have iterators that returns entries sorted by lexicographical ascending key
+ * @param taskName Friendly name of this task (for tracing in log files)
+ * @param srcReader Source data stream (read-only)
+ * @param dstReader Destination data stream (read)
+ * @param dstWriter Destination data stream (write)
+ */
+ public BasicSyncTask(String taskName, boolean skipDelete, MVDataReader srcReader, MVDataReader dstReader, MVDataWriter dstWriter) {
+ this.taskName = taskName;
+ this.srcReader = srcReader;
+ this.dstReader = dstReader;
+ this.dstWriter = dstWriter;
+
+ this.maxInserts = 0;
+ this.maxUpdates = 0;
+ this.maxDeletes = 0;
+ }
+
+ public Boolean call() {
+ logger.info("task " + taskName + " : starting " + (dryRun?"dry-run":"real") + " pass");
+ // Better stack traces "call()" don't say "what"
+ boolean success = syncTaskRun();
+ logger.info("task " + taskName + " : " + (success?"terminated successfully":"aborted"));
+
+ return success;
+ }
+
+ private boolean syncTaskRun() {
+ curInserts=0;
+ curUpdates=0;
+ curDeletes=0;
+ dstWriter.setDryRun(dryRun);
+
+ Iterator<MVDataEntry> itSrc = srcReader.iterator();
+ Iterator<MVDataEntry> itDst = dstReader.iterator();
+ MVDataEntry src = null, dst = null;
+ boolean srcExhausted = false;
+ boolean dstExhausted = false;
+ boolean abort = false;
+ boolean done = false;
+ while ( !abort && !done ) {
+
+ // Look-ahead srcReader if previous has been "poped" (or if never read yet)
+ if ( src == null ) {
+ if ( !srcExhausted ) {
+ srcExhausted = !itSrc.hasNext();
+ }
+ if ( !srcExhausted ) {
+ try {
+ src=itSrc.next();
+ logger.trace("src read : " + src);
+ } catch (Exception e) {
+ logger.error("Read failure detected on " + srcReader.getDataSourceName() + ". Aborting.", e);
+ // Escape from the while loop
+ abort=true; continue;
+ }
+ }
+ }
+
+ // Look-ahead dstReader if previous has been "poped" (or if never read yet)
+ if ( dst == null ) {
+ if ( !dstExhausted ) {
+ dstExhausted = !itDst.hasNext();
+ }
+ if ( !dstExhausted ) {
+ try {
+ dst = itDst.next();
+ logger.trace("dst read : " + dst);
+ } catch (NoSuchElementException e) {
+ logger.error("Read failure detected on " + dstReader.getDataSourceName() + ". Aborting.", e);
+ // Escape from the while loop
+ abort=true; continue;
+ }
+ }
+ }
+
+ // Error-free cases (no problems while reading data)
+ int compare;
+ if ( !srcExhausted && !dstExhausted ) {
+ // General case : check order precedence to take an action
+ compare = src.compareTo(dst);
+ } else if ( !srcExhausted && dstExhausted ) {
+ // Particular case : dst is exhausted, it's like ( src < dst )
+ compare=-1;
+ } else if ( srcExhausted && !dstExhausted ) {
+ // Particular case : src is exhausted, it's like ( src > dst )
+ compare=1;
+ } else /* ( srcExhausted && dstExhausted ) */ {
+ // Particular case : everything is synchronized
+ // Exit gracefully the while loop
+ done=true; continue;
+ }
+
+ logger.trace("compare : " + compare);
+
+ boolean actionRealized = false;
+ // Take an action (insert/update/delete)
+ if ( compare < 0 ) {
+ actionRealized = _insert(src);
+ src = null;
+ // preserve dst until src key is not greater
+ } else if ( compare > 0 ) {
+ // dst current entry doesn't exists anymore (src key is greater than dst key)
+ actionRealized = _delete(dst);
+ // preserve src until dst key is not greater
+ dst = null;
+ } else /* ( compare == 0 ) */ {
+ // src current entry already exists in dst, update it if necessary
+ Set<String> changedAttr = src.getChangedAttributes(dst);
+ if ( ! changedAttr.isEmpty() ) {
+ actionRealized = _update(src,dst,changedAttr);
+ } else {
+ // Already up-to-date, nothing to do
+ actionRealized = true;
+ }
+ // Both src and dst have been used
+ src = null;
+ dst = null;
+ }
+ abort = !actionRealized;
+ } /* while */
+
+ return !abort;
+ } /* _taskRunSync() */
+
+ private boolean _insert(MVDataEntry entry) {
+
+ if ( maxInserts > 0 && curInserts >= maxInserts ) {
+ logger.error("Max insert limit reached (" + maxInserts + ")" );
+ return false;
+ }
+
+ logger.debug("dstWriter : Action\n-> Insert " + entry);
+ try {
+ dstWriter.insert(entry);
+ } catch (Exception e) {
+ logger.error("Exception occured while inserting", e);
+ return false;
+ }
+
+ curInserts++;
+ return true;
+ }
+
+ private boolean _update(MVDataEntry updatedEntry, MVDataEntry originalEntry, Set<String> attrToUpdate) {
+ if ( maxUpdates > 0 && curUpdates >= maxUpdates ) {
+ logger.error("Max update limit reached (" + maxUpdates + ")");
+ return false;
+ }
+
+ logger.debug("dstWriter : Action\n-> Update " + updatedEntry + "\n-> changed attributes : " + attrToUpdate);
+ try {
+ dstWriter.update(updatedEntry, originalEntry, attrToUpdate);
+ } catch (Exception e) {
+ logger.error("Exception occured while updating", e);
+ return false;
+ }
+
+ curUpdates++;
+ return true;
+ }
+
+ private boolean _delete(MVDataEntry entry) {
+ if ( skipEntryDelete ) {
+ logger.info("dstWriter : skipping deletion for key " + entry.getKey());
+ return true;
+ }
+
+ if ( maxDeletes > 0 && curDeletes >= maxDeletes ) {
+ logger.error("Max delete limit reached (" + maxDeletes + ")");
+ return false;
+ }
+ logger.debug("dstWriter : Action\n-> Delete " + entry);
+ try {
+ dstWriter.delete(entry);
+ } catch (Exception e) {
+ logger.error("Exception occured while deleting", e);
+ return false;
+ }
+
+ curDeletes++;
+ return true;
+ }
+
+ // Boring accessors
+
+ /**
+ * Setter to fix limits about operations counts (safeguard)
+ * @param maxInserts
+ * @param maxUpdates
+ * @param maxDeletes
+ */
+ public void setOperationLimits(int maxInserts, int maxUpdates, int maxDeletes) {
+ this.maxInserts = maxInserts;
+ this.maxUpdates = maxUpdates;
+ this.maxDeletes = maxDeletes;
+ }
+
+ /**
+ * @return the skipEntryDelete
+ */
+ public boolean isSkipEntryDelete() {
+ return skipEntryDelete;
+ }
+
+ /**
+ * @param skipEntryDelete the skipEntryDelete to set
+ */
+ public void setSkipEntryDelete(boolean skipEntryDelete) {
+ this.skipEntryDelete = skipEntryDelete;
+ }
+
+}