summaryrefslogtreecommitdiff
path: root/src/connectors/src/data/io/csv/CSVDataReader.java
blob: 6dbc8ff4f8cec28d0eeb7cbae90f5c0d614ef808 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
/*
 * SSSync, a Simple and Stupid Synchronizer for data with multi-valued attributes
 * Copyright (C) 2014  Ludovic Pouzenc <ludovic@pouzenc.fr>
 *  
 * This file is part of SSSync.
 *
 *  SSSync is free software: you can redistribute it and/or modify
 *  it under the terms of the GNU General Public License as published by
 *  the Free Software Foundation, either version 3 of the License, or
 *  (at your option) any later version.
 *
 *  SSSync is distributed in the hope that it will be useful,
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 *  GNU General Public License for more details.
 *
 *  You should have received a copy of the GNU General Public License
 *  along with SSSync.  If not, see <http://www.gnu.org/licenses/>
 */
package data.io.csv;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.Reader;
import java.io.StringReader;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.SortedSet;
import java.util.TreeSet;

import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVParser;
import org.apache.commons.csv.CSVRecord;

import data.MVDataEntry;
import data.io.AbstractMVDataReader;

/**
 * Stream-oriented reader from a particular CSV file.
 * Always returns lines/items sorted by lexicographical ascending key.
 * 
 * @author lpouzenc
 */
public class CSVDataReader extends AbstractMVDataReader {

	public static final String CSV_DEMO = 
			//"key,attr,values\n" +
			"line3,hello,all;the;others\n" +
			"line1,from,csv1;csv1bis\n" +
			"line2,hello,all;the;world\n" +
			"line1,attr2,csv1\n" +
			",,\n";
	
	public static final CSVFormat DEFAULT_CSV_FORMAT = CSVFormat.EXCEL
			.withHeader("key","attr","values")
			.withIgnoreSurroundingSpaces(true);
	
	private final CSVFormat format;
	private final Reader dataSourceStream;
	
	private transient MVDataEntry nextEntry;
	private transient CSVRecord nextCSVRecord;
	private transient Iterator<CSVRecord> csvIt;


	/**
	 * Constructs a CSVDataReader object for parsing a CSV input given via dataSourceStream.
	 * @param dataSourceName A short string representing this reader (for logging)
	 * @param dataSourceStream A java.io.Reader from which read the actual CSV data, typically a FileReader 
	 * @param alreadySorted If false, memory cost is around 3 times the CSV file size !
	 * @param format Specify the exact format used to encode the CSV file (separators, escaping...)
	 * @throws IOException
	 */
	public CSVDataReader(String dataSourceName, Reader dataSourceStream, boolean alreadySorted, CSVFormat format) throws IOException {
		this.dataSourceName = dataSourceName;
		this.format = format;
		
		if ( alreadySorted ) {
			this.dataSourceStream = dataSourceStream;
		} else {
			BufferedReader bufReader;
			if ( dataSourceStream instanceof BufferedReader ) {
				bufReader = (BufferedReader) dataSourceStream;
			} else {
				bufReader = new BufferedReader(dataSourceStream);
			}
			this.dataSourceStream = readAndSortLines(bufReader);
		}
	}

	/**
	 * Constructs a CSVDataReader object with default CSV format (for CSVParser).
	 * @param dataSourceName A short string representing this reader (for logging)
	 * @param dataSourceStream A java.io.Reader from which read the actual CSV data, typically a FileReader
	 * @param alreadySorted If false, memory cost is around 3 times the CSV file size !
	 * @throws IOException 
	 */
	public CSVDataReader(String dataSourceName, Reader dataSourceStream, boolean alreadySorted) throws IOException {
		this(dataSourceName, dataSourceStream, alreadySorted, DEFAULT_CSV_FORMAT);
	}
	
	/**
	 * {@inheritDoc}
	 * Note : multiple iterators on the same instance are not supported
	 */
	@Override
	public Iterator<MVDataEntry> iterator() {
		// When a new iterator is requested, everything should be reset
		CSVParser parser;
		try {
			dataSourceStream.reset();
			parser = new CSVParser(dataSourceStream, format);
		} catch (IOException e) {
			throw new RuntimeException(e);
		}
		csvIt = parser.iterator();
		nextCSVRecord = null;
		nextEntry = null;
		return this;
	}
	
	/**
	 * {@inheritDoc}
	 */
	@Override
	public boolean hasNext() {
		if ( nextEntry == null ) {
			lookAhead();
		}
		return ( nextEntry != null );  
	}

	/**
	 * {@inheritDoc}
	 */
	@Override
	public MVDataEntry next() {
		if ( !hasNext() ) {
			throw new NoSuchElementException();
		}
		// Pop the lookahead record
		MVDataEntry res = nextEntry;
		nextEntry=null;
		// And return it
		return res;
	}
	
	/**
	 * In-memory File sorting, return as a single String
	 * @param reader
	 * @return
	 * @throws IOException
	 */
	private Reader readAndSortLines(BufferedReader bufReader) throws IOException {
		// Put all the CSV in memory, in a SortedSet
		SortedSet<String> lineSet = new TreeSet<String>();
		String inputLine;
		int totalCSVSize=0;
		while ((inputLine = bufReader.readLine()) != null) {
			lineSet.add(inputLine);
			totalCSVSize += inputLine.length() + 1;
		}
		bufReader.close(); // Closes also dataSourceStream

		// Put all sorted lines in a String
		StringBuilder allLines = new StringBuilder(totalCSVSize);
		for ( String line: lineSet) {
			allLines.append(line + "\n");
		}
		lineSet = null; // Could help the GC if the input file is huge
		
		// Build a Java Reader from that String
		return new StringReader(allLines.toString());
	}
	
	/**
	 * A MVDataEntry could be represented on many CSV lines.
	 * The key is repeated, the attr could change, the values should change (for given key/attr pair)
	 */
	private void lookAhead() {
		MVDataEntry currEntry = null;
		
		boolean abort=(nextCSVRecord==null && !csvIt.hasNext()); // Nothing to crunch
		boolean done=(nextEntry!=null); // Already looked ahead
		while (!abort && !done) {
			// Try to get a valid CSVRecord
			if ( nextCSVRecord == null ) {
				nextCSVRecord = nextValidCSVRecord();
			}
			// If no more CSV data
			if ( nextCSVRecord == null ) {
				// Maybe we have a remaining entry to return
				if ( currEntry != null ) {
					done=true; continue;
				} else {
					abort=true; continue;
				}
			}
			
			// Now we have a valid CSV line to put in a MVDataEntry
			String newKey = nextCSVRecord.get("key");
			

			// If no MVDataEntry yet, it's time to create it (we have data to put into)
			if ( currEntry == null ) {
				currEntry = new MVDataEntry(newKey);
			}
			// If CSV line key matches MVDataEntry key, appends attr/values on it
			// XXX Tricky code : following condition is always true if the previous one is true
			if ( currEntry.getKey().equals(newKey) ) {
				currEntry.splitAndPut(nextCSVRecord.get("attr"), nextCSVRecord.get("values"), ";");
				nextCSVRecord = null; // Record consumed
			} else {
				// Keys are different, we are done (and we have remaining CSV data in nextCSVRecord)
				done=true; continue;
			}
		}
		
		nextEntry = done?currEntry:null;
	}
	
	/**
	 * Seek for the next valid record in the CSV file
	 * @return the next valid CSVRecord
	 */
	private CSVRecord nextValidCSVRecord() {
		CSVRecord res = null;
		boolean abort = !csvIt.hasNext();
		boolean done = false;
		while (!abort && !done) {
			// Try to read a CSV line
			res = (csvIt.hasNext())?csvIt.next():null;

			// Break if nothing readable
			if ( res == null ) {
				abort=true; continue;
			}
			
			// Skip invalid and empty lines
			String key = res.get("key");
			if ( key != null && ! key.isEmpty() ) {
				done=true; continue;
			}
		}
		
		return done?res:null;
	}
}