summaryrefslogtreecommitdiff
path: root/src/main/src
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/src')
-rw-r--r--src/main/src/SSSync.java208
-rw-r--r--src/main/src/conf/ConfigConnectionBean.java111
-rw-r--r--src/main/src/conf/ConfigConnectionsBean.java45
-rw-r--r--src/main/src/conf/ConfigGlobalsBean.java41
-rw-r--r--src/main/src/conf/ConfigOpLimitsBean.java55
-rw-r--r--src/main/src/conf/ConfigRootBean.java73
-rw-r--r--src/main/src/conf/ConfigSrcOrDestBean.java96
-rw-r--r--src/main/src/conf/ConfigTaskBean.java80
-rw-r--r--src/main/src/conf/SSSyncConfParser.java65
-rw-r--r--src/main/src/conf/SSSyncConnectionsFactory.java61
-rw-r--r--src/main/src/conf/SSSyncTasksFactory.java147
-rw-r--r--src/main/src/data/io/ConnectionsHolder.java81
-rw-r--r--src/main/src/data/io/SafeDataReader.java155
-rw-r--r--src/main/src/sync/BasicSyncTask.java292
-rw-r--r--src/main/src/utils/JVMStatsDumper.java111
15 files changed, 1621 insertions, 0 deletions
diff --git a/src/main/src/SSSync.java b/src/main/src/SSSync.java
new file mode 100644
index 0000000..422c31e
--- /dev/null
+++ b/src/main/src/SSSync.java
@@ -0,0 +1,208 @@
+/*
+ * SSSync, a Simple and Stupid Synchronizer for data with multi-valued attributes
+ * Copyright (C) 2014 Ludovic Pouzenc <ludovic@pouzenc.fr>
+ *
+ * This file is part of SSSync.
+ *
+ * SSSync is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * SSSync is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with SSSync. If not, see <http://www.gnu.org/licenses/>
+ */
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.log4j.PropertyConfigurator;
+
+import conf.ConfigConnectionsBean;
+import conf.ConfigGlobalsBean;
+import conf.ConfigRootBean;
+import conf.SSSyncConfParser;
+import conf.SSSyncConnectionsFactory;
+import conf.SSSyncTasksFactory;
+import data.io.ConnectionsHolder;
+
+import sync.BasicSyncTask;
+import utils.JVMStatsDumper;
+
+/**
+ * Main class for Simple and Stupid Sync'er
+ *
+ * @author lpouzenc
+ */
+public class SSSync {
+ private static final Logger logger = Logger.getLogger(SSSync.class.getName());
+
+ private static final String LOG_PROPERTIES_FILE = "conf/log4j.properties";
+ private static final String CONFIG_MAIN_FILE = "conf/sssync.yaml";
+ private static final String CONFIG_CONN_FILE = "conf/connections.yaml";
+
+ private static final int ERR_SUCCESS = 0;
+ private static final int ERR_CONFIG_PARSE_ERROR = 1;
+ private static final int ERR_CONN_INIT_ERROR = 2;
+ private static final int ERR_TASK_INIT_ERROR = 3;
+ private static final int ERR_DRYRUN_FAILURE = 4;
+ private static final int ERR_REALRUN_FAILURE = 5;
+ //TODO private static final int ERR_MAXTIME_REACHED = 6;
+
+ /**
+ * Main entry point. Takes care of cmdline parsing, config files interpretation,
+ * tasks setup and start.
+ *
+ * @param args
+ */
+ public static void main(String[] args) {
+ // log4j setup (first thing to do)
+ PropertyConfigurator.configure(LOG_PROPERTIES_FILE);
+ logger.info("Program start (user: '" + System.getProperty("user.name") +
+ "', cwd: '" + System.getProperty("user.dir") + "')");
+
+ //TODO use cmdline args for config file path
+ String mainConfigFile = CONFIG_MAIN_FILE;
+ String connConfigFile = CONFIG_CONN_FILE;
+
+ // Config parsing
+ ConfigRootBean confMain = null;
+ ConfigConnectionsBean confConn = null;
+ try {
+ confMain = SSSyncConfParser.loadMainConfig(mainConfigFile);
+ confConn = SSSyncConfParser.loadConnConfig(connConfigFile);
+ } catch (Exception e) {
+ logger.fatal("Exception while loading configuration", e);
+ end(ERR_CONFIG_PARSE_ERROR);
+ }
+ ConfigGlobalsBean confGlobals = confMain.getGlobals();
+
+ // Config dump if DEBUG level (or finer)
+ if ( !logger.getLevel().isGreaterOrEqual(Level.INFO) ) {
+ logger.debug("Current connection configuration :\n" + confConn);
+ logger.debug("Current main configuration :\n" + confMain);
+ }
+
+ // Connections init
+ logger.info("Connections initialization");
+ ConnectionsHolder connections = null;
+ try {
+ connections = SSSyncConnectionsFactory.setupConnections(confConn);
+ } catch (Exception e) {
+ logger.fatal("Exception while establishing connections", e);
+ end(ERR_CONN_INIT_ERROR);
+ }
+
+ // Suggest garbage collector to forget our passwords since we are connected
+ confConn=null;
+ System.gc();
+ JVMStatsDumper.logMemoryUsage();
+
+
+ // Tasks init
+ logger.info("Tasks initialization");
+ List<BasicSyncTask> tasks = null;
+ try {
+ tasks = SSSyncTasksFactory.setupTasks(connections, confMain);
+ } catch (Exception e) {
+ logger.fatal("Exception during tasks initialization", e);
+ end(ERR_TASK_INIT_ERROR);
+ }
+
+ logger.info("Tasks are ready to start");
+ JVMStatsDumper.logMemoryUsage();
+
+
+ // Tasks first (dry) run
+ if ( ! SSSync.safeTaskRun(tasks, confGlobals.getMaxExecTime(), true) ) {
+ logger.error("Dry-run pass has shown problems, skipping real synchronization");
+ end(ERR_DRYRUN_FAILURE);
+ }
+
+ // Tasks second (real) run
+ if ( SSSync.safeTaskRun(tasks, confGlobals.getMaxExecTime(), false) ) {
+ logger.error("Real-run pass has shown problems, data could be messed up !");
+ end(ERR_REALRUN_FAILURE);
+ }
+
+ // Clean-up
+ try {
+ connections.close();
+ } catch (IOException e) {
+ logger.info("Problem during connections closing");
+ }
+
+ // Normal exit
+ end(ERR_SUCCESS);
+ }
+
+ /**
+ * Method to run safely a sequence of tasks within a given time period.
+ * In a separate thread, it runs all the tasks sequentially.
+ *
+ * @param list
+ * @param timeOutInMinute
+ * @return
+ * @throws ExecutionException
+ * @throws InterruptedException
+ */
+ private static boolean safeTaskRun(List<BasicSyncTask> list, long timeOutInMinute, boolean dryRun) {
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+ List<Future<Boolean>> results;
+ boolean aborted = false;
+
+ logger.info("Starting " + (dryRun?"dry-run":"real-run") + " synchronization pass");
+
+ for ( BasicSyncTask t : list ) {
+ t.setDryRun(dryRun);
+ }
+
+ try {
+ results = executor.invokeAll(list, timeOutInMinute, TimeUnit.MINUTES);
+ // Join all tasks, seeking for an unsuccessful execution
+ for (Future<Boolean> r: results) {
+ if ( ! r.get() ) {
+ aborted = true;
+ }
+ }
+ } catch (CancellationException e) {
+ logger.fatal("Global maximum execution time exhausted, aborting tasks !");
+ aborted = true;
+ } catch (InterruptedException e) {
+ logger.fatal("Worker thread for task execution was interrupted", e);
+ aborted = true;
+ } catch (ExecutionException e) {
+ logger.error("Exception during tasks execution", e.getCause());
+ aborted = true;
+ }
+
+ JVMStatsDumper.logMemoryUsage();
+ executor.shutdown();
+
+ return !aborted;
+ }
+
+ /**
+ * Helper function to always log the end of program
+ * @param result
+ */
+ private static void end(int result) {
+ JVMStatsDumper.logGCStats();
+ logger.info("Program end (result code: " + result + ")");
+ System.exit(result);
+ }
+
+}
diff --git a/src/main/src/conf/ConfigConnectionBean.java b/src/main/src/conf/ConfigConnectionBean.java
new file mode 100644
index 0000000..b43b56f
--- /dev/null
+++ b/src/main/src/conf/ConfigConnectionBean.java
@@ -0,0 +1,111 @@
+/*
+ * SSSync, a Simple and Stupid Synchronizer for data with multi-valued attributes
+ * Copyright (C) 2014 Ludovic Pouzenc <ludovic@pouzenc.fr>
+ *
+ * This file is part of SSSync.
+ *
+ * SSSync is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * SSSync is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with SSSync. If not, see <http://www.gnu.org/licenses/>
+ */
+
+package conf;
+
+import data.io.sql.SQLConnectionWrapper.DBMSType;
+
+/**
+ * Generated Configuration Bean
+ */
+public class ConfigConnectionBean {
+
+ public enum ConnectionType { jdbc, ldap }
+
+ private String id;
+ private ConnectionType type;
+ private DBMSType dbms;
+ private String ress;
+ private String host;
+ private int port;
+ private String user;
+ private String bind;
+ private String pass;
+ private String db;
+
+ public String getId() {
+ return id;
+ }
+ public void setId(String id) {
+ this.id = id;
+ }
+ public ConnectionType getType() {
+ return type;
+ }
+ public void setType(ConnectionType type) {
+ this.type = type;
+ }
+ public DBMSType getDbms() {
+ return dbms;
+ }
+ public void setDbms(DBMSType dbms) {
+ this.dbms = dbms;
+ }
+ public String getRess() {
+ return ress;
+ }
+ public void setRess(String ress) {
+ this.ress = ress;
+ }
+ public String getHost() {
+ return host;
+ }
+ public void setHost(String host) {
+ this.host = host;
+ }
+ public int getPort() {
+ return port;
+ }
+ public void setPort(int port) {
+ this.port = port;
+ }
+ public String getUser() {
+ return user;
+ }
+ public void setUser(String user) {
+ this.user = user;
+ }
+ public String getBind() {
+ return bind;
+ }
+ public void setBind(String bind) {
+ this.bind = bind;
+ }
+ public String getPass() {
+ return pass;
+ }
+ public void setPass(String pass) {
+ this.pass = pass;
+ }
+ public String getDb() {
+ return db;
+ }
+ public void setDb(String db) {
+ this.db = db;
+ }
+
+ @Override
+ public String toString() {
+ return "ConfigConnectionBean [id=" + id + ", type=" + type + ", dbms=" + dbms
+ + ", ress=" + ress + ", host=" + host + ", port=" + port
+ + ", user=" + user + ", bind=" + bind + ", pass=(obfuscated)]";
+ }
+
+}
diff --git a/src/main/src/conf/ConfigConnectionsBean.java b/src/main/src/conf/ConfigConnectionsBean.java
new file mode 100644
index 0000000..9fb034b
--- /dev/null
+++ b/src/main/src/conf/ConfigConnectionsBean.java
@@ -0,0 +1,45 @@
+/*
+ * SSSync, a Simple and Stupid Synchronizer for data with multi-valued attributes
+ * Copyright (C) 2014 Ludovic Pouzenc <ludovic@pouzenc.fr>
+ *
+ * This file is part of SSSync.
+ *
+ * SSSync is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * SSSync is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with SSSync. If not, see <http://www.gnu.org/licenses/>
+ */
+
+package conf;
+
+import java.util.List;
+
+/**
+ * Generated Configuration Bean
+ */
+public class ConfigConnectionsBean {
+
+ private List<ConfigConnectionBean> connections;
+
+ public List<ConfigConnectionBean> getConnections() {
+ return connections;
+ }
+
+ public void setConnections(List<ConfigConnectionBean> connections) {
+ this.connections = connections;
+ }
+
+ @Override
+ public String toString() {
+ return "ConfigConnectionsBean [connections=" + ConfigRootBean.listDump(connections,1) + "]";
+ }
+
+}
diff --git a/src/main/src/conf/ConfigGlobalsBean.java b/src/main/src/conf/ConfigGlobalsBean.java
new file mode 100644
index 0000000..256acee
--- /dev/null
+++ b/src/main/src/conf/ConfigGlobalsBean.java
@@ -0,0 +1,41 @@
+/*
+ * SSSync, a Simple and Stupid Synchronizer for data with multi-valued attributes
+ * Copyright (C) 2014 Ludovic Pouzenc <ludovic@pouzenc.fr>
+ *
+ * This file is part of SSSync.
+ *
+ * SSSync is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * SSSync is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with SSSync. If not, see <http://www.gnu.org/licenses/>
+ */
+
+package conf;
+
+/**
+ * Generated Configuration Bean
+ */
+public class ConfigGlobalsBean {
+ private int maxExecTime;
+
+ public int getMaxExecTime() {
+ return maxExecTime;
+ }
+
+ public void setMaxExecTime(int maxExecTime) {
+ this.maxExecTime = maxExecTime;
+ }
+
+ @Override
+ public String toString() {
+ return "ConfigGlobalsBean [maxExecTime=" + maxExecTime + "]";
+ }
+}
diff --git a/src/main/src/conf/ConfigOpLimitsBean.java b/src/main/src/conf/ConfigOpLimitsBean.java
new file mode 100644
index 0000000..8f68e8c
--- /dev/null
+++ b/src/main/src/conf/ConfigOpLimitsBean.java
@@ -0,0 +1,55 @@
+/*
+ * SSSync, a Simple and Stupid Synchronizer for data with multi-valued attributes
+ * Copyright (C) 2014 Ludovic Pouzenc <ludovic@pouzenc.fr>
+ *
+ * This file is part of SSSync.
+ *
+ * SSSync is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * SSSync is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with SSSync. If not, see <http://www.gnu.org/licenses/>
+ */
+
+package conf;
+
+/**
+ * Generated Configuration Bean
+ */
+public class ConfigOpLimitsBean {
+ private int insert;
+ private int update;
+ private int delete;
+
+ public int getInsert() {
+ return insert;
+ }
+ public void setInsert(int insert) {
+ this.insert = insert;
+ }
+ public int getUpdate() {
+ return update;
+ }
+ public void setUpdate(int update) {
+ this.update = update;
+ }
+ public int getDelete() {
+ return delete;
+ }
+ public void setDelete(int delete) {
+ this.delete = delete;
+ }
+
+ @Override
+ public String toString() {
+ return "ConfigOpLimitsBean [insert=" + insert + ", update=" + update
+ + ", delete=" + delete + "]";
+ }
+}
diff --git a/src/main/src/conf/ConfigRootBean.java b/src/main/src/conf/ConfigRootBean.java
new file mode 100644
index 0000000..acbbd49
--- /dev/null
+++ b/src/main/src/conf/ConfigRootBean.java
@@ -0,0 +1,73 @@
+/*
+ * SSSync, a Simple and Stupid Synchronizer for data with multi-valued attributes
+ * Copyright (C) 2014 Ludovic Pouzenc <ludovic@pouzenc.fr>
+ *
+ * This file is part of SSSync.
+ *
+ * SSSync is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * SSSync is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with SSSync. If not, see <http://www.gnu.org/licenses/>
+ */
+
+package conf;
+
+import java.util.List;
+
+/**
+ * Generated Configuration Bean
+ */
+public class ConfigRootBean {
+
+ private ConfigGlobalsBean globals;
+ private List<ConfigTaskBean> tasks;
+
+ public ConfigGlobalsBean getGlobals() {
+ return globals;
+ }
+ public void setGlobals(ConfigGlobalsBean globals) {
+ this.globals = globals;
+ }
+
+ public List<ConfigTaskBean> getTasks() {
+ return tasks;
+ }
+ public void setTasks(List<ConfigTaskBean> tasks) {
+ this.tasks = tasks;
+ }
+
+ @Override
+ public String toString() {
+ return "ConfigRootBean [globals=" + globals + ", tasks=" + listDump(tasks, 1) + "]";
+ }
+
+
+ public static <T> String listDump(List<T> list, int ident) {
+ StringBuffer buf = new StringBuffer();
+ buf.append('{');
+ for (T item : list) {
+ buf.append('\n');
+ for (int i = 0; i < ident; i++) {
+ buf.append('\t');
+ }
+ buf.append(item.toString());
+ buf.append(',');
+ }
+ buf.append('\n');
+ for (int i = 0; i < ident-1; i++) {
+ buf.append('\t');
+ }
+ buf.append('}');
+ return buf.toString();
+ }
+
+
+}
diff --git a/src/main/src/conf/ConfigSrcOrDestBean.java b/src/main/src/conf/ConfigSrcOrDestBean.java
new file mode 100644
index 0000000..5be1674
--- /dev/null
+++ b/src/main/src/conf/ConfigSrcOrDestBean.java
@@ -0,0 +1,96 @@
+/*
+ * SSSync, a Simple and Stupid Synchronizer for data with multi-valued attributes
+ * Copyright (C) 2014 Ludovic Pouzenc <ludovic@pouzenc.fr>
+ *
+ * This file is part of SSSync.
+ *
+ * SSSync is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * SSSync is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with SSSync. If not, see <http://www.gnu.org/licenses/>
+ */
+
+package conf;
+
+import data.filters.MVDataCombiner;
+
+/**
+ * Generated Configuration Bean
+ */
+public class ConfigSrcOrDestBean {
+
+ public enum SourceKind { csv, ldap, sorted_csv, sql };
+
+ private String name;
+ private SourceKind kind;
+ private String conn;
+ private MVDataCombiner.MVDataCombineMode mode;
+ private String query;
+ private String path;
+ private String attr;
+ private String base;
+
+ public String getName() {
+ return name;
+ }
+ public void setName(String name) {
+ this.name = name;
+ }
+ public SourceKind getKind() {
+ return kind;
+ }
+ public void setKind(SourceKind kind) {
+ this.kind = kind;
+ }
+ public String getConn() {
+ return conn;
+ }
+ public void setConn(String conn) {
+ this.conn = conn;
+ }
+ public MVDataCombiner.MVDataCombineMode getMode() {
+ return mode;
+ }
+ public void setMode(MVDataCombiner.MVDataCombineMode mode) {
+ this.mode = mode;
+ }
+ public String getQuery() {
+ return query;
+ }
+ public void setQuery(String query) {
+ this.query = query;
+ }
+ public String getPath() {
+ return path;
+ }
+ public void setPath(String path) {
+ this.path = path;
+ }
+ public String getAttr() {
+ return attr;
+ }
+ public void setAttr(String attr) {
+ this.attr = attr;
+ }
+ public String getBase() {
+ return base;
+ }
+ public void setBase(String base) {
+ this.base = base;
+ }
+
+ @Override
+ public String toString() {
+ return "ConfigSrcOrDestBean [name=" + name + ", kind=" + kind
+ + ", conn=" + conn + ", mode=" + mode + ", query=" + query
+ + ", path=" + path + ", attr=" + attr + ", base=" + base + "]";
+ }
+}
diff --git a/src/main/src/conf/ConfigTaskBean.java b/src/main/src/conf/ConfigTaskBean.java
new file mode 100644
index 0000000..ed34eee
--- /dev/null
+++ b/src/main/src/conf/ConfigTaskBean.java
@@ -0,0 +1,80 @@
+/*
+ * SSSync, a Simple and Stupid Synchronizer for data with multi-valued attributes
+ * Copyright (C) 2014 Ludovic Pouzenc <ludovic@pouzenc.fr>
+ *
+ * This file is part of SSSync.
+ *
+ * SSSync is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * SSSync is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with SSSync. If not, see <http://www.gnu.org/licenses/>
+ */
+
+package conf;
+
+import java.util.List;
+
+/**
+ * Generated Configuration Bean
+ */
+public class ConfigTaskBean {
+
+ private String name;
+ private ConfigOpLimitsBean opLimits;
+ private List<ConfigSrcOrDestBean> sources;
+ private ConfigSrcOrDestBean destination;
+ private boolean skipReadErrors;
+ private boolean skipEntryDelete;
+
+ public String getName() {
+ return name;
+ }
+ public void setName(String name) {
+ this.name = name;
+ }
+ public ConfigOpLimitsBean getOpLimits() {
+ return opLimits;
+ }
+ public void setOpLimits(ConfigOpLimitsBean opLimits) {
+ this.opLimits = opLimits;
+ }
+ public List<ConfigSrcOrDestBean> getSources() {
+ return sources;
+ }
+ public void setSources(List<ConfigSrcOrDestBean> sources) {
+ this.sources = sources;
+ }
+ public ConfigSrcOrDestBean getDestination() {
+ return destination;
+ }
+ public void setDestination(ConfigSrcOrDestBean destination) {
+ this.destination = destination;
+ }
+ public boolean isSkipReadErrors() {
+ return skipReadErrors;
+ }
+ public void setSkipReadErrors(boolean skipReadErrors) {
+ this.skipReadErrors = skipReadErrors;
+ }
+ public boolean isSkipEntryDelete() {
+ return skipEntryDelete;
+ }
+ public void setSkipEntryDelete(boolean skipDelete) {
+ this.skipEntryDelete = skipDelete;
+ }
+ @Override
+ public String toString() {
+ return "ConfigTaskBean [name=" + name + ", opLimits=" + opLimits
+ + ", sources=" + sources + ", destination=" + destination
+ + ", skipReadErrors=" + skipReadErrors + ", skipEntryDelete="
+ + skipEntryDelete + "]";
+ }
+}
diff --git a/src/main/src/conf/SSSyncConfParser.java b/src/main/src/conf/SSSyncConfParser.java
new file mode 100644
index 0000000..42dc760
--- /dev/null
+++ b/src/main/src/conf/SSSyncConfParser.java
@@ -0,0 +1,65 @@
+/*
+ * SSSync, a Simple and Stupid Synchronizer for data with multi-valued attributes
+ * Copyright (C) 2014 Ludovic Pouzenc <ludovic@pouzenc.fr>
+ *
+ * This file is part of SSSync.
+ *
+ * SSSync is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * SSSync is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with SSSync. If not, see <http://www.gnu.org/licenses/>
+ */
+
+package conf;
+
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.text.ParseException;
+
+import org.yaml.snakeyaml.Yaml;
+import org.yaml.snakeyaml.constructor.Constructor;
+
+/**
+ * TODO javadoc
+ *
+ * @author lpouzenc
+ */
+public class SSSyncConfParser {
+
+ public static ConfigRootBean loadMainConfig(String mainConfigFile) throws FileNotFoundException, ParseException {
+ Yaml yamlMain = new Yaml(new Constructor(ConfigRootBean.class));
+
+ //TODO : try to prevent weird exceptions when config is not respecting the implicit grammar of the bean tree
+
+ ConfigRootBean confMain = (ConfigRootBean) yamlMain.load(new FileInputStream(mainConfigFile));
+
+ if ( confMain == null || confMain.getGlobals() == null ) {
+ throw new ParseException("Config parser has returned a null item", 0);
+ }
+
+ // TODO : check config sanity and completeness
+
+ return confMain;
+ }
+
+ public static ConfigConnectionsBean loadConnConfig(String connConfigFile) throws FileNotFoundException, ParseException {
+ Yaml yamlConn = new Yaml(new Constructor(ConfigConnectionsBean.class));
+
+ ConfigConnectionsBean confConn = (ConfigConnectionsBean) yamlConn.load(new FileInputStream(connConfigFile));
+
+ if ( confConn == null ) {
+ throw new ParseException("Config parser has return a null item", 0);
+ }
+
+ return confConn;
+ }
+
+}
diff --git a/src/main/src/conf/SSSyncConnectionsFactory.java b/src/main/src/conf/SSSyncConnectionsFactory.java
new file mode 100644
index 0000000..e747258
--- /dev/null
+++ b/src/main/src/conf/SSSyncConnectionsFactory.java
@@ -0,0 +1,61 @@
+/*
+ * SSSync, a Simple and Stupid Synchronizer for data with multi-valued attributes
+ * Copyright (C) 2014 Ludovic Pouzenc <ludovic@pouzenc.fr>
+ *
+ * This file is part of SSSync.
+ *
+ * SSSync is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * SSSync is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with SSSync. If not, see <http://www.gnu.org/licenses/>
+ */
+
+package conf;
+
+import data.io.ConnectionsHolder;
+import data.io.ldap.LDAPConnectionWrapper;
+import data.io.sql.SQLConnectionWrapper;
+
+/**
+ * TODO javadoc
+ *
+ * @author lpouzenc
+ */
+public class SSSyncConnectionsFactory {
+
+ /**
+ * Setup all connections described in config
+ * @return
+ * @throws Exception
+ */
+ public static ConnectionsHolder setupConnections(ConfigConnectionsBean confConn) throws Exception {
+ ConnectionsHolder connections = new ConnectionsHolder();
+
+ for ( ConfigConnectionBean conn : confConn.getConnections() ) {
+ switch (conn.getType()) {
+ case jdbc:
+ SQLConnectionWrapper connSQL = new SQLConnectionWrapper(conn.getDbms(), conn.getHost(), conn.getPort(), conn.getRess(), conn.getUser(), conn.getPass(), conn.getDb());
+ connections.putConnSQL(conn.getId(), connSQL);
+ break;
+ case ldap:
+ LDAPConnectionWrapper connLDAP = new LDAPConnectionWrapper(conn.getHost(), conn.getPort(), conn.getBind(), conn.getPass());
+ connections.putConnLDAP(conn.getId(), connLDAP);
+ break;
+ default:
+ //XXX : find better Exception type
+ throw new Exception("Bad config : conn '" + conn.getId() + "' unsupported type");
+ }
+ }
+
+ return connections;
+ }
+
+}
diff --git a/src/main/src/conf/SSSyncTasksFactory.java b/src/main/src/conf/SSSyncTasksFactory.java
new file mode 100644
index 0000000..de3e8f6
--- /dev/null
+++ b/src/main/src/conf/SSSyncTasksFactory.java
@@ -0,0 +1,147 @@
+/*
+ * SSSync, a Simple and Stupid Synchronizer for data with multi-valued attributes
+ * Copyright (C) 2014 Ludovic Pouzenc <ludovic@pouzenc.fr>
+ *
+ * This file is part of SSSync.
+ *
+ * SSSync is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * SSSync is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with SSSync. If not, see <http://www.gnu.org/licenses/>
+ */
+
+package conf;
+
+import java.io.File;
+import java.io.FileReader;
+import java.util.ArrayList;
+import java.util.List;
+
+import sync.BasicSyncTask;
+import data.filters.MVDataCombiner;
+import data.filters.MVDataCombiner.MVDataCombineMode;
+import data.io.ConnectionsHolder;
+import data.io.MVDataReader;
+import data.io.MVDataWriter;
+import data.io.SafeDataReader;
+import data.io.csv.CSVDataReader;
+import data.io.ldap.LDAPConnectionWrapper;
+import data.io.sql.SQLConnectionWrapper;
+
+/**
+ * TODO javadoc
+ *
+ * @author lpouzenc
+ */
+public class SSSyncTasksFactory {
+
+ /**
+ * Build tasks objects with all needed resources from a config beans tree
+ * @param conf
+ * @return
+ * @throws Exception
+ */
+ public static List<BasicSyncTask> setupTasks(ConnectionsHolder connections, ConfigRootBean confMain) throws Exception {
+ List<BasicSyncTask> tasks = new ArrayList<BasicSyncTask>();
+
+ // For each task...
+ for ( ConfigTaskBean confTask: confMain.getTasks() ) {
+ MVDataReader srcReader=null;
+
+ // Building all sources
+
+ List<ConfigSrcOrDestBean> confSources = confTask.getSources();
+ // See if we are in multiple source situation (then MVDataCombiner) or not (then simple MVDataReader)
+ if ( confSources.size() == 0 ) {
+ throw new Exception("Bad config : task '" + confTask.getName() + "' has no defined sources");
+ } else if ( confSources.size() == 1 ) {
+ srcReader = new SafeDataReader(_makeReader(connections, confSources.get(0), confTask.getName()), confTask.isSkipReadErrors());
+ } else {
+ List<MVDataReader> readers = new ArrayList<MVDataReader>();
+ List<MVDataCombineMode> mergeModes = new ArrayList<MVDataCombineMode>();
+
+ // For each source of the future MVDataCombiner...
+ for ( ConfigSrcOrDestBean confSource: confSources ) {
+ // Create and add the reader and his parameters
+ readers.add(new SafeDataReader(_makeReader(connections, confSource, confTask.getName()), confTask.isSkipReadErrors()));
+ mergeModes.add(confSource.getMode());
+ }
+
+ srcReader = new MVDataCombiner("srcCombiner", readers.toArray(new MVDataReader[0]), mergeModes.toArray(new MVDataCombineMode[0]));
+ }
+
+ // Building destination
+
+ MVDataReader dstReader=null;
+ MVDataWriter dstWriter=null;
+
+ ConfigSrcOrDestBean confDestination = confTask.getDestination();
+ switch ( confDestination.getKind() ) {
+ case ldap:
+ LDAPConnectionWrapper builder = connections.getLDAPConnectionBuilder(confDestination.getConn());
+ // TODO : configurable lookAhead
+ MVDataReader tmpReader = builder.newFlatReader(confDestination.getName()+"_reader", confDestination.getBase(), confDestination.getAttr(), 128);
+ dstReader = new SafeDataReader(tmpReader, false);
+ dstWriter = builder.newFlatWriter(confDestination.getBase(), confDestination.getAttr());
+ break;
+ default:
+ throw new Exception("Bad config : task '" + confTask.getName() + "' unsupported destination kind");
+ }
+
+ // Then building the sync task and add it to the task list
+ int maxInserts = confTask.getOpLimits().getInsert();
+ int maxUpdates = confTask.getOpLimits().getUpdate();
+ int maxDeletes = confTask.getOpLimits().getDelete();
+
+ BasicSyncTask task = new BasicSyncTask(confTask.getName(), false, srcReader, dstReader, dstWriter);
+ task.setOperationLimits(maxInserts, maxUpdates, maxDeletes);
+ task.setSkipEntryDelete(confTask.isSkipEntryDelete());
+ tasks.add(task);
+ }
+
+ return tasks;
+ }
+
+ /**
+ * Helper function to make a new reader from an existing connection
+ * @param confSource
+ * @param taskName
+ * @return
+ * @throws Exception
+ */
+ private static MVDataReader _makeReader(ConnectionsHolder connections, ConfigSrcOrDestBean confSource, String taskName) throws Exception {
+ MVDataReader reader=null;
+ switch (confSource.getKind()) {
+ case csv:
+ reader = new CSVDataReader(confSource.getName(), new FileReader(confSource.getPath()), false);
+ break;
+ case ldap:
+ LDAPConnectionWrapper ldapConnBuilder = connections.getLDAPConnectionBuilder(confSource.getConn());
+ //FIXME : if conf error, get...ConnectionBuilder could return null
+ //TODO : configurable lookAhead
+ reader = ldapConnBuilder.newFlatReader(confSource.getName(), confSource.getBase(), confSource.getAttr(), 128);
+ break;
+ case sorted_csv:
+ reader = new CSVDataReader(confSource.getName(), new FileReader(confSource.getPath()), true);
+ break;
+ case sql:
+ SQLConnectionWrapper sqlConnBuilder = connections.getSQLConnectionBuilder(confSource.getConn());
+ //TODO We assume the query config item is a filepath. It isn't checked anywhere.
+ reader = sqlConnBuilder.newReader(confSource.getName(), new File(confSource.getQuery()));
+ break;
+ default:
+ throw new Exception("Bad config : task '" + taskName + "' unsupported source kind");
+ }
+
+ return reader;
+ }
+
+}
diff --git a/src/main/src/data/io/ConnectionsHolder.java b/src/main/src/data/io/ConnectionsHolder.java
new file mode 100644
index 0000000..3a6e527
--- /dev/null
+++ b/src/main/src/data/io/ConnectionsHolder.java
@@ -0,0 +1,81 @@
+/*
+ * SSSync, a Simple and Stupid Synchronizer for data with multi-valued attributes
+ * Copyright (C) 2014 Ludovic Pouzenc <ludovic@pouzenc.fr>
+ *
+ * This file is part of SSSync.
+ *
+ * SSSync is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * SSSync is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with SSSync. If not, see <http://www.gnu.org/licenses/>
+ */
+
+package data.io;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashMap;
+
+import data.io.ldap.LDAPConnectionWrapper;
+import data.io.sql.SQLConnectionWrapper;
+
+/**
+ * TODO javadoc
+ *
+ * @author lpouzenc
+ */
+public class ConnectionsHolder implements Closeable {
+
+ public final HashMap<String, LDAPConnectionWrapper> connMapLDAP;
+ public final HashMap<String, SQLConnectionWrapper> connMapSQL;
+
+ //TODO : with some refactoring, this class may disappear
+ /**
+ * Bean class to keep track of all opened connections in a single object
+ */
+ public ConnectionsHolder() {
+ this.connMapLDAP = new HashMap<String, LDAPConnectionWrapper>();
+ this.connMapSQL = new HashMap<String, SQLConnectionWrapper>();
+ }
+
+ public LDAPConnectionWrapper getLDAPConnectionBuilder(String conn) {
+ return connMapLDAP.get(conn);
+ }
+
+ public SQLConnectionWrapper getSQLConnectionBuilder(String conn) {
+ return connMapSQL.get(conn);
+ }
+
+ public void putConnLDAP(String connId, LDAPConnectionWrapper connLDAP) {
+ this.connMapLDAP.put(connId, connLDAP);
+ }
+
+ public void putConnSQL(String connId, SQLConnectionWrapper connSQL) {
+ this.connMapSQL.put(connId, connSQL);
+ }
+
+ /**
+ * Close all connections
+ */
+ @Override
+ public void close() throws IOException {
+ // XXX : this will stop at first uncloseable connection. It isn't a very interesting problem however.
+ for ( LDAPConnectionWrapper connLDAP: connMapLDAP.values() ) {
+ connLDAP.close();
+ }
+ for ( SQLConnectionWrapper connSQL: connMapSQL.values() ) {
+ connSQL.close();
+ }
+ }
+
+
+
+}
diff --git a/src/main/src/data/io/SafeDataReader.java b/src/main/src/data/io/SafeDataReader.java
new file mode 100644
index 0000000..2c5dda9
--- /dev/null
+++ b/src/main/src/data/io/SafeDataReader.java
@@ -0,0 +1,155 @@
+/*
+ * SSSync, a Simple and Stupid Synchronizer for data with multi-valued attributes
+ * Copyright (C) 2014 Ludovic Pouzenc <ludovic@pouzenc.fr>
+ *
+ * This file is part of SSSync.
+ *
+ * SSSync is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * SSSync is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with SSSync. If not, see <http://www.gnu.org/licenses/>
+ */
+
+package data.io;
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import org.apache.log4j.Logger;
+
+import data.MVDataEntry;
+
+/**
+ * Multi-valued "safe" stream reader proxy.
+ * Adds logging and skipReadError mode feature. Check if items are well ordered.
+ * Ensures consistency of hasNext() / next() even if source stream is faulty.
+ * Never returns null items but throw NoSuchElementException if no other choices.
+ *
+ * @author lpouzenc
+ */
+public class SafeDataReader extends AbstractMVDataReader {
+
+ private static final Logger logger = Logger.getLogger(SafeDataReader.class.getName());
+
+ private final MVDataReader src;
+ /**
+ * If true, continue even in case of read errors
+ */
+ private final boolean skipReadErrors;
+
+ private transient Iterator<MVDataEntry> srcIt;
+ private transient boolean abort;
+ private transient MVDataEntry previousData;
+
+
+ public SafeDataReader(MVDataReader src, boolean skipReadErrors) {
+ this.src = src;
+ this.dataSourceName = src.getDataSourceName();
+ this.skipReadErrors = skipReadErrors;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Iterator<MVDataEntry> iterator() {
+ // Reset everything
+ srcIt = src.iterator();
+ abort = false;
+ previousData = null;
+
+ return this;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean hasNext() {
+ return (!abort && srcIt.hasNext());
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public MVDataEntry next() {
+ boolean alreadyWarned=false;
+ boolean done=false;
+ MVDataEntry entry = null;
+
+ // Prepare an hint for read exception (knowledge of last successfully read entry could help)
+ String hint = ( previousData != null )?previousData.getKey():"(nothing)";
+
+ // Seek for the next valid entry
+ while (!this.abort && !done && srcIt.hasNext()) {
+
+ // Try to read next entry
+ try {
+ entry=src.next();
+ if ( entry == null ) throw new NoSuchElementException("Null item returned");
+ } catch (Exception e) {
+ logger.warn(src.getDataSourceName() + " : exception when seeking next valid entry after " + hint, e);
+ entry = null; // Make sure don't re-use a previous entry
+ }
+
+ // Sanity checks
+ boolean valid = ( entry != null && entry.isValid() );
+ //XXX Regex should be a parameter
+ if ( valid && !entry.getKey().matches("^\\p{Print}+$") ) {
+ logger.warn(src.getDataSourceName() + " : Invalid key found : '" + entry.getKey().replaceAll("[^\\p{Print}]", "?") + "' after " + hint);
+ valid = false;
+ }
+
+
+ // Two branches : If valid, check ordering then skip or done. If invalid : skip or abort.
+ if ( valid ) {
+ // Ensure that data.key is greater than previousData.key or abort
+ if ( previousData != null && entry.getKey().compareTo(previousData.getKey()) <= 0 ) {
+ //TODO : this is almost useless in case of reverse-sortered query because everything will be deleted by the Syncer before asking the second item
+ logger.error(src.getDataSourceName() + " : Input data is not well ordered but the sync task require it : '"
+ + entry.getKey() + "' is not lexicographically greater than '" + previousData.getKey() + "'");
+ // Escape the while loop
+ abort=true; continue;
+ }
+
+ // We have found a valid entry, so escape gracefully the loop
+ done=true;
+ } else {
+ // Log read problems and choose between skip or abort
+ if ( ! this.skipReadErrors ) {
+ logger.error(src.getDataSourceName() + " has returned an invalid entry after " + hint);
+ // Escape the while loop
+ abort=true; continue;
+ }
+ if ( !alreadyWarned ) {
+ alreadyWarned=true;
+ logger.info("Invalid entry read but skipReadErrors is enabled, will try to read next entry (warned only once)");
+ }
+
+ // We don't have a valid entry, give a chance to the next iteration
+ done=false;
+ } /* if ( valid )*/
+
+ } /* while */
+
+ // If we don't have found anything valid, throw exception (better semantics than returning null)
+ if (!done) {
+ throw new NoSuchElementException();
+ }
+
+ // Keep track of previous read record
+ // -> for hinting in log messages when bad things happens
+ // -> to check if entries are well ordered
+ previousData=entry;
+ return entry;
+ }
+}
diff --git a/src/main/src/sync/BasicSyncTask.java b/src/main/src/sync/BasicSyncTask.java
new file mode 100644
index 0000000..24f34a8
--- /dev/null
+++ b/src/main/src/sync/BasicSyncTask.java
@@ -0,0 +1,292 @@
+/*
+ * SSSync, a Simple and Stupid Synchronizer for data with multi-valued attributes
+ * Copyright (C) 2014 Ludovic Pouzenc <ludovic@pouzenc.fr>
+ *
+ * This file is part of SSSync.
+ *
+ * SSSync is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * SSSync is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with SSSync. If not, see <http://www.gnu.org/licenses/>
+ */
+
+package sync;
+
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Set;
+
+import org.apache.log4j.Logger;
+
+import data.MVDataEntry;
+import data.io.MVDataReader;
+import data.io.MVDataWriter;
+
+/**
+ * Basic one-way synchronization code. Uses MVDataEntry semantics.
+ * Each entry has a key and a set of multi-valued attributes, like LDAP entries.
+ * Data source is a MVDataReader. Multiple source could be used via MVDataCombiner.
+ * <br/><br/>
+ * <b>Warnings :</b> needs MVDataReaders that give key-sorted results. This sync will try
+ * to delete entries that exists on destination side and don't exist at source side.
+ * Extra attributes in existing entries on destination side are preserved.
+ * Look like useful for account's failure password count for instance.
+ * <br/><br/>
+ * <b>Notes :</b> Null value and empty strings are not allowed in MVDataEntry, so they are not sync'ed.
+ *
+ * @author lpouzenc
+ */
+public class BasicSyncTask extends AbstractSyncTask {
+ private static final Logger logger = Logger.getLogger(BasicSyncTask.class.getName());
+
+ /**
+ * Source data stream (read-only)
+ */
+ private final MVDataReader srcReader;
+ /**
+ * Destination data stream (read)
+ */
+ private final MVDataReader dstReader;
+ /**
+ * Destination data stream (write)
+ */
+ private final MVDataWriter dstWriter;
+
+ /**
+ * If true, disable removal of data on destination side even if detected as obsolete
+ */
+ private boolean skipEntryDelete;
+
+
+ private int maxInserts;
+ private int maxUpdates;
+ private int maxDeletes;
+
+ private transient int curInserts;
+ private transient int curUpdates;
+ private transient int curDeletes;
+
+
+ /**
+ * BasicSyncTask constructor
+ * Assumes that the *Readers have iterators that returns entries sorted by lexicographical ascending key
+ * @param taskName Friendly name of this task (for tracing in log files)
+ * @param srcReader Source data stream (read-only)
+ * @param dstReader Destination data stream (read)
+ * @param dstWriter Destination data stream (write)
+ */
+ public BasicSyncTask(String taskName, boolean skipDelete, MVDataReader srcReader, MVDataReader dstReader, MVDataWriter dstWriter) {
+ this.taskName = taskName;
+ this.srcReader = srcReader;
+ this.dstReader = dstReader;
+ this.dstWriter = dstWriter;
+
+ this.maxInserts = 0;
+ this.maxUpdates = 0;
+ this.maxDeletes = 0;
+ }
+
+ public Boolean call() {
+ logger.info("task " + taskName + " : starting " + (dryRun?"dry-run":"real") + " pass");
+ // Better stack traces "call()" don't say "what"
+ boolean success = syncTaskRun();
+ logger.info("task " + taskName + " : " + (success?"terminated successfully":"aborted"));
+
+ return success;
+ }
+
+ private boolean syncTaskRun() {
+ curInserts=0;
+ curUpdates=0;
+ curDeletes=0;
+ dstWriter.setDryRun(dryRun);
+
+ Iterator<MVDataEntry> itSrc = srcReader.iterator();
+ Iterator<MVDataEntry> itDst = dstReader.iterator();
+ MVDataEntry src = null, dst = null;
+ boolean srcExhausted = false;
+ boolean dstExhausted = false;
+ boolean abort = false;
+ boolean done = false;
+ while ( !abort && !done ) {
+
+ // Look-ahead srcReader if previous has been "poped" (or if never read yet)
+ if ( src == null ) {
+ if ( !srcExhausted ) {
+ srcExhausted = !itSrc.hasNext();
+ }
+ if ( !srcExhausted ) {
+ try {
+ src=itSrc.next();
+ logger.trace("src read : " + src);
+ } catch (Exception e) {
+ logger.error("Read failure detected on " + srcReader.getDataSourceName() + ". Aborting.", e);
+ // Escape from the while loop
+ abort=true; continue;
+ }
+ }
+ }
+
+ // Look-ahead dstReader if previous has been "poped" (or if never read yet)
+ if ( dst == null ) {
+ if ( !dstExhausted ) {
+ dstExhausted = !itDst.hasNext();
+ }
+ if ( !dstExhausted ) {
+ try {
+ dst = itDst.next();
+ logger.trace("dst read : " + dst);
+ } catch (NoSuchElementException e) {
+ logger.error("Read failure detected on " + dstReader.getDataSourceName() + ". Aborting.", e);
+ // Escape from the while loop
+ abort=true; continue;
+ }
+ }
+ }
+
+ // Error-free cases (no problems while reading data)
+ int compare;
+ if ( !srcExhausted && !dstExhausted ) {
+ // General case : check order precedence to take an action
+ compare = src.compareTo(dst);
+ } else if ( !srcExhausted && dstExhausted ) {
+ // Particular case : dst is exhausted, it's like ( src < dst )
+ compare=-1;
+ } else if ( srcExhausted && !dstExhausted ) {
+ // Particular case : src is exhausted, it's like ( src > dst )
+ compare=1;
+ } else /* ( srcExhausted && dstExhausted ) */ {
+ // Particular case : everything is synchronized
+ // Exit gracefully the while loop
+ done=true; continue;
+ }
+
+ logger.trace("compare : " + compare);
+
+ boolean actionRealized = false;
+ // Take an action (insert/update/delete)
+ if ( compare < 0 ) {
+ actionRealized = _insert(src);
+ src = null;
+ // preserve dst until src key is not greater
+ } else if ( compare > 0 ) {
+ // dst current entry doesn't exists anymore (src key is greater than dst key)
+ actionRealized = _delete(dst);
+ // preserve src until dst key is not greater
+ dst = null;
+ } else /* ( compare == 0 ) */ {
+ // src current entry already exists in dst, update it if necessary
+ Set<String> changedAttr = src.getChangedAttributes(dst);
+ if ( ! changedAttr.isEmpty() ) {
+ actionRealized = _update(src,dst,changedAttr);
+ } else {
+ // Already up-to-date, nothing to do
+ actionRealized = true;
+ }
+ // Both src and dst have been used
+ src = null;
+ dst = null;
+ }
+ abort = !actionRealized;
+ } /* while */
+
+ return !abort;
+ } /* _taskRunSync() */
+
+ private boolean _insert(MVDataEntry entry) {
+
+ if ( maxInserts > 0 && curInserts >= maxInserts ) {
+ logger.error("Max insert limit reached (" + maxInserts + ")" );
+ return false;
+ }
+
+ logger.debug("dstWriter : Action\n-> Insert " + entry);
+ try {
+ dstWriter.insert(entry);
+ } catch (Exception e) {
+ logger.error("Exception occured while inserting", e);
+ return false;
+ }
+
+ curInserts++;
+ return true;
+ }
+
+ private boolean _update(MVDataEntry updatedEntry, MVDataEntry originalEntry, Set<String> attrToUpdate) {
+ if ( maxUpdates > 0 && curUpdates >= maxUpdates ) {
+ logger.error("Max update limit reached (" + maxUpdates + ")");
+ return false;
+ }
+
+ logger.debug("dstWriter : Action\n-> Update " + updatedEntry + "\n-> changed attributes : " + attrToUpdate);
+ try {
+ dstWriter.update(updatedEntry, originalEntry, attrToUpdate);
+ } catch (Exception e) {
+ logger.error("Exception occured while updating", e);
+ return false;
+ }
+
+ curUpdates++;
+ return true;
+ }
+
+ private boolean _delete(MVDataEntry entry) {
+ if ( skipEntryDelete ) {
+ logger.info("dstWriter : skipping deletion for key " + entry.getKey());
+ return true;
+ }
+
+ if ( maxDeletes > 0 && curDeletes >= maxDeletes ) {
+ logger.error("Max delete limit reached (" + maxDeletes + ")");
+ return false;
+ }
+ logger.debug("dstWriter : Action\n-> Delete " + entry);
+ try {
+ dstWriter.delete(entry);
+ } catch (Exception e) {
+ logger.error("Exception occured while deleting", e);
+ return false;
+ }
+
+ curDeletes++;
+ return true;
+ }
+
+ // Boring accessors
+
+ /**
+ * Setter to fix limits about operations counts (safeguard)
+ * @param maxInserts
+ * @param maxUpdates
+ * @param maxDeletes
+ */
+ public void setOperationLimits(int maxInserts, int maxUpdates, int maxDeletes) {
+ this.maxInserts = maxInserts;
+ this.maxUpdates = maxUpdates;
+ this.maxDeletes = maxDeletes;
+ }
+
+ /**
+ * @return the skipEntryDelete
+ */
+ public boolean isSkipEntryDelete() {
+ return skipEntryDelete;
+ }
+
+ /**
+ * @param skipEntryDelete the skipEntryDelete to set
+ */
+ public void setSkipEntryDelete(boolean skipEntryDelete) {
+ this.skipEntryDelete = skipEntryDelete;
+ }
+
+}
diff --git a/src/main/src/utils/JVMStatsDumper.java b/src/main/src/utils/JVMStatsDumper.java
new file mode 100644
index 0000000..41f1d97
--- /dev/null
+++ b/src/main/src/utils/JVMStatsDumper.java
@@ -0,0 +1,111 @@
+/*
+ * SSSync, a Simple and Stupid Synchronizer for data with multi-valued attributes
+ * Copyright (C) 2014 Ludovic Pouzenc <ludovic@pouzenc.fr>
+ *
+ * This file is part of SSSync.
+ *
+ * SSSync is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * SSSync is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with SSSync. If not, see <http://www.gnu.org/licenses/>
+ */
+
+package utils;
+
+import java.lang.management.GarbageCollectorMXBean;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryPoolMXBean;
+import java.lang.management.MemoryUsage;
+import java.lang.management.RuntimeMXBean;
+
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+
+/**
+ * TODO javadoc
+ *
+ * @author lpouzenc
+ */
+public class JVMStatsDumper {
+ private static final Logger logger = Logger.getLogger(JVMStatsDumper.class.getName());
+
+ public static void logGCStats() {
+ // Skip all string construction if will not print this stuff
+ if ( logger.getLevel().isGreaterOrEqual(Level.INFO) ) { return; }
+
+ long totalGarbageCollections = 0;
+ long garbageCollectionTime = 0;
+
+ final String gcDumpHeader="Dumping Garbage Collector statistics\n" +
+ "+--------------------+-----------------------------+\n" +
+ "+ GC Name + Count + Time (ms) +\n" +
+ "+--------------------+--------------+--------------+\n";
+
+ StringBuilder sb = new StringBuilder(1024);
+ sb.append(gcDumpHeader);
+
+ for(GarbageCollectorMXBean gc : ManagementFactory.getGarbageCollectorMXBeans()) {
+
+ long count = gc.getCollectionCount();
+ long time = gc.getCollectionTime();
+
+ sb.append(String.format("+ %18s + %,12d + %,12d +%n", gc.getName(), count, time));
+
+ if(count >= 0) totalGarbageCollections += count;
+ if(time >= 0) garbageCollectionTime += time;
+ }
+
+ sb.append("+ + + +\n");
+ sb.append(String.format("+ %18s + %,12d + %,12d +%n",
+ "Total", totalGarbageCollections, garbageCollectionTime
+ ));
+ sb.append("+--------------------+--------------+--------------+\n");
+
+ sb.append("JVM arguments : ");
+ RuntimeMXBean runtimeMxBean = ManagementFactory.getRuntimeMXBean();
+ for ( String arg : runtimeMxBean.getInputArguments() ) {
+ sb.append(arg + " ");
+ }
+
+ logger.debug(sb);
+ }
+
+ /**
+ * Helper function to log the current memory usage
+ */
+ public static void logMemoryUsage() {
+ // Skip all string construction if will not print this stuff
+ if ( logger.getLevel().isGreaterOrEqual(Level.INFO) ) { return; }
+
+ final String memDumpHeader="Dumping memory statistics\n" +
+ "+--------------------------------------------------------------------------------+\n" +
+ "+ + Current (kio) + Peak (kio) +\n" +
+ "+ Pool +-----------------------------------------------------------+\n" +
+ "+ + Used + Committed + Used + Committed +\n" +
+ "+--------------------+--------------+--------------+--------------+--------------+\n";
+
+ StringBuilder sb = new StringBuilder(1024);
+ sb.append(memDumpHeader);
+
+ for (MemoryPoolMXBean pool : ManagementFactory.getMemoryPoolMXBeans()) {
+ MemoryUsage peak = pool.getPeakUsage();
+ MemoryUsage curr = pool.getUsage();
+ sb.append(String.format("+ %18s + %,12d + %,12d + %,12d + %,12d +%n",
+ pool.getName(),curr.getUsed()/1024, curr.getCommitted()/1024, peak.getUsed()/1024, peak.getCommitted()/1024
+ ));
+ pool.resetPeakUsage(); //XXX Maybe this is not a global action and is useless on a temporary object used once
+ }
+ sb.append("+--------------------+--------------+--------------+--------------+--------------+\n");
+
+ logger.debug(sb);
+ }
+
+}