/* * SSSync, a Simple and Stupid Synchronizer for data with multi-valued attributes * Copyright (C) 2014 Ludovic Pouzenc * * This file is part of SSSync. * * SSSync is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * SSSync is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with SSSync. If not, see */ package sync; import java.util.Iterator; import java.util.NoSuchElementException; import java.util.Set; import org.apache.log4j.Logger; import data.MVDataEntry; import data.io.MVDataReader; import data.io.MVDataWriter; /** * Basic one-way synchronization code. Uses MVDataEntry semantics. * Each entry has a key and a set of multi-valued attributes, like LDAP entries. * Data source is a MVDataReader. Multiple source could be used via MVDataCombiner. *

* Warnings : needs MVDataReaders that give key-sorted results. This sync will try * to delete entries that exists on destination side and don't exist at source side. * Extra attributes in existing entries on destination side are preserved. * Look like useful for account's failure password count for instance. *

* Notes : Null value and empty strings are not allowed in MVDataEntry, so they are not sync'ed. * * @author lpouzenc */ public class BasicSyncTask extends AbstractSyncTask { private static final Logger logger = Logger.getLogger(BasicSyncTask.class.getName()); /** * Source data stream (read-only) */ private final MVDataReader srcReader; /** * Destination data stream (read) */ private final MVDataReader dstReader; /** * Destination data stream (write) */ private final MVDataWriter dstWriter; /** * If true, disable removal of data on destination side even if detected as obsolete */ private boolean skipEntryDelete; private int maxInserts; private int maxUpdates; private int maxDeletes; private transient int curInserts; private transient int curUpdates; private transient int curDeletes; /** * BasicSyncTask constructor * Assumes that the *Readers have iterators that returns entries sorted by lexicographical ascending key * @param taskName Friendly name of this task (for tracing in log files) * @param srcReader Source data stream (read-only) * @param dstReader Destination data stream (read) * @param dstWriter Destination data stream (write) */ public BasicSyncTask(String taskName, boolean skipDelete, MVDataReader srcReader, MVDataReader dstReader, MVDataWriter dstWriter) { this.taskName = taskName; this.srcReader = srcReader; this.dstReader = dstReader; this.dstWriter = dstWriter; this.maxInserts = 0; this.maxUpdates = 0; this.maxDeletes = 0; } public Boolean call() { logger.info("task " + taskName + " : starting " + (dryRun?"dry-run":"real") + " pass"); // Better stack traces "call()" don't say "what" boolean success = syncTaskRun(); logger.info("task " + taskName + " : " + (success?"terminated successfully":"aborted")); return success; } private boolean syncTaskRun() { curInserts=0; curUpdates=0; curDeletes=0; dstWriter.setDryRun(dryRun); Iterator itSrc = srcReader.iterator(); Iterator itDst = dstReader.iterator(); MVDataEntry src = null, dst = null; boolean srcExhausted = false; boolean dstExhausted = false; boolean abort = false; boolean done = false; while ( !abort && !done ) { // Look-ahead srcReader if previous has been "poped" (or if never read yet) if ( src == null ) { if ( !srcExhausted ) { srcExhausted = !itSrc.hasNext(); } if ( !srcExhausted ) { try { src=itSrc.next(); logger.trace("src read : " + src); } catch (Exception e) { logger.error("Read failure detected on " + srcReader.getDataSourceName() + ". Aborting.", e); // Escape from the while loop abort=true; continue; } } } // Look-ahead dstReader if previous has been "poped" (or if never read yet) if ( dst == null ) { if ( !dstExhausted ) { dstExhausted = !itDst.hasNext(); } if ( !dstExhausted ) { try { dst = itDst.next(); logger.trace("dst read : " + dst); } catch (NoSuchElementException e) { logger.error("Read failure detected on " + dstReader.getDataSourceName() + ". Aborting.", e); // Escape from the while loop abort=true; continue; } } } // Error-free cases (no problems while reading data) int compare; if ( !srcExhausted && !dstExhausted ) { // General case : check order precedence to take an action compare = src.compareTo(dst); } else if ( !srcExhausted && dstExhausted ) { // Particular case : dst is exhausted, it's like ( src < dst ) compare=-1; } else if ( srcExhausted && !dstExhausted ) { // Particular case : src is exhausted, it's like ( src > dst ) compare=1; } else /* ( srcExhausted && dstExhausted ) */ { // Particular case : everything is synchronized // Exit gracefully the while loop done=true; continue; } logger.trace("compare : " + compare); boolean actionRealized = false; // Take an action (insert/update/delete) if ( compare < 0 ) { actionRealized = _insert(src); src = null; // preserve dst until src key is not greater } else if ( compare > 0 ) { // dst current entry doesn't exists anymore (src key is greater than dst key) actionRealized = _delete(dst); // preserve src until dst key is not greater dst = null; } else /* ( compare == 0 ) */ { // src current entry already exists in dst, update it if necessary Set changedAttr = src.getChangedAttributes(dst); if ( ! changedAttr.isEmpty() ) { actionRealized = _update(src,dst,changedAttr); } else { // Already up-to-date, nothing to do actionRealized = true; } // Both src and dst have been used src = null; dst = null; } abort = !actionRealized; } /* while */ return !abort; } /* _taskRunSync() */ private boolean _insert(MVDataEntry entry) { if ( maxInserts > 0 && curInserts >= maxInserts ) { logger.error("Max insert limit reached (" + maxInserts + ")" ); return false; } logger.debug("dstWriter : Action\n-> Insert " + entry); try { dstWriter.insert(entry); } catch (Exception e) { logger.error("Exception occured while inserting", e); return false; } curInserts++; return true; } private boolean _update(MVDataEntry updatedEntry, MVDataEntry originalEntry, Set attrToUpdate) { if ( maxUpdates > 0 && curUpdates >= maxUpdates ) { logger.error("Max update limit reached (" + maxUpdates + ")"); return false; } logger.debug("dstWriter : Action\n-> Update " + updatedEntry + "\n-> changed attributes : " + attrToUpdate); try { dstWriter.update(updatedEntry, originalEntry, attrToUpdate); } catch (Exception e) { logger.error("Exception occured while updating", e); return false; } curUpdates++; return true; } private boolean _delete(MVDataEntry entry) { if ( skipEntryDelete ) { logger.info("dstWriter : skipping deletion for key " + entry.getKey()); return true; } if ( maxDeletes > 0 && curDeletes >= maxDeletes ) { logger.error("Max delete limit reached (" + maxDeletes + ")"); return false; } logger.debug("dstWriter : Action\n-> Delete " + entry); try { dstWriter.delete(entry); } catch (Exception e) { logger.error("Exception occured while deleting", e); return false; } curDeletes++; return true; } // Boring accessors /** * Setter to fix limits about operations counts (safeguard) * @param maxInserts * @param maxUpdates * @param maxDeletes */ public void setOperationLimits(int maxInserts, int maxUpdates, int maxDeletes) { this.maxInserts = maxInserts; this.maxUpdates = maxUpdates; this.maxDeletes = maxDeletes; } /** * @return the skipEntryDelete */ public boolean isSkipEntryDelete() { return skipEntryDelete; } /** * @param skipEntryDelete the skipEntryDelete to set */ public void setSkipEntryDelete(boolean skipEntryDelete) { this.skipEntryDelete = skipEntryDelete; } }