summaryrefslogtreecommitdiff
path: root/src/main/src/data/io/SafeDataReader.java
blob: 2c5dda988fc09f277064799d6c81a93c05085605 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
/*
 * SSSync, a Simple and Stupid Synchronizer for data with multi-valued attributes
 * Copyright (C) 2014  Ludovic Pouzenc <ludovic@pouzenc.fr>
 *  
 * This file is part of SSSync.
 *
 *  SSSync is free software: you can redistribute it and/or modify
 *  it under the terms of the GNU General Public License as published by
 *  the Free Software Foundation, either version 3 of the License, or
 *  (at your option) any later version.
 *
 *  SSSync is distributed in the hope that it will be useful,
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 *  GNU General Public License for more details.
 *
 *  You should have received a copy of the GNU General Public License
 *  along with SSSync.  If not, see <http://www.gnu.org/licenses/>
 */

package data.io;

import java.util.Iterator;
import java.util.NoSuchElementException;

import org.apache.log4j.Logger;

import data.MVDataEntry;

/**
 * Multi-valued "safe" stream reader proxy.
 * Adds logging and skipReadError mode feature. Check if items are well ordered.
 * Ensures consistency of hasNext() / next() even if source stream is faulty.
 * Never returns null items but throw NoSuchElementException if no other choices.
 * 
 * @author lpouzenc
 */
public class SafeDataReader extends AbstractMVDataReader {

	private static final Logger logger = Logger.getLogger(SafeDataReader.class.getName());

	private final MVDataReader src;
	/**
	 * If true, continue even in case of read errors
	 */
	private final boolean skipReadErrors;
	
	private transient Iterator<MVDataEntry> srcIt;
	private transient boolean abort;
	private transient MVDataEntry previousData;


	public SafeDataReader(MVDataReader src, boolean skipReadErrors) {
		this.src = src;
		this.dataSourceName = src.getDataSourceName();
		this.skipReadErrors = skipReadErrors;
	}

	/**
	 * {@inheritDoc}
	 */
	@Override
	public Iterator<MVDataEntry> iterator() {
		// Reset everything
		srcIt = src.iterator();
		abort = false;
		previousData = null;
		
		return this;
	}

	/**
	 * {@inheritDoc}
	 */
	@Override
	public boolean hasNext() {
		return (!abort && srcIt.hasNext());
	}
	
	/**
	 * {@inheritDoc}
	 */
	@Override
	public MVDataEntry next() {
		boolean alreadyWarned=false;
		boolean done=false;
		MVDataEntry entry = null;
		
		// Prepare an hint for read exception (knowledge of last successfully read entry could help)
		String hint = ( previousData != null )?previousData.getKey():"(nothing)";
		
		// Seek for the next valid entry
		while (!this.abort && !done && srcIt.hasNext()) {
					
			// Try to read next entry
			try {
				entry=src.next();
				if ( entry == null ) throw new NoSuchElementException("Null item returned");
			} catch (Exception e) {
				logger.warn(src.getDataSourceName() + " : exception when seeking next valid entry after " + hint, e);
				entry = null; // Make sure don't re-use a previous entry
			}
			
			// Sanity checks
			boolean valid = ( entry != null && entry.isValid() );
			//XXX Regex should be a parameter
			if ( valid && !entry.getKey().matches("^\\p{Print}+$") ) {
				logger.warn(src.getDataSourceName() + " : Invalid key found : '" + entry.getKey().replaceAll("[^\\p{Print}]", "?") + "' after " + hint);
				valid = false;
			}

			
			// Two branches : If valid, check ordering then skip or done. If invalid : skip or abort.
			if ( valid ) {
				// Ensure that data.key is greater than previousData.key or abort
				if (  previousData != null && entry.getKey().compareTo(previousData.getKey()) <= 0 ) {
					//TODO : this is almost useless in case of reverse-sortered query because everything will be deleted by the Syncer before asking the second item
					logger.error(src.getDataSourceName() + " : Input data is not well ordered but the sync task require it : '"
							+ entry.getKey() + "' is not lexicographically greater than '" + previousData.getKey() + "'");
					// Escape the while loop
					abort=true; continue;
				} 

				// We have found a valid entry, so escape gracefully the loop
				done=true;
			} else {
				// Log read problems and choose between skip or abort
				if ( ! this.skipReadErrors ) {
					logger.error(src.getDataSourceName() + " has returned an invalid entry after " + hint);
					// Escape the while loop
					abort=true; continue;
				} 
				if ( !alreadyWarned ) {
					alreadyWarned=true;
					logger.info("Invalid entry read but skipReadErrors is enabled, will try to read next entry (warned only once)");
				}
				
				// We don't have a valid entry, give a chance to the next iteration
				done=false;
			} /* if ( valid )*/
			
		} /* while */

		// If we don't have found anything valid, throw exception (better semantics than returning null)
		if (!done) {
			throw new NoSuchElementException();
		}
		
		// Keep track of previous read record 
		// -> for hinting in log messages when bad things happens
		// -> to check if entries are well ordered		
		previousData=entry;
		return entry;
	}
}