summaryrefslogtreecommitdiff
path: root/src/core/src/data/filters/MVDataCombiner.java
blob: 1b2eb3f9847ed4026135b41b1c7b1054ed2a449e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
/*
 * SSSync, a Simple and Stupid Synchronizer for data with multi-valued attributes
 * Copyright (C) 2014  Ludovic Pouzenc <ludovic@pouzenc.fr>
 *  
 * This file is part of SSSync.
 *
 *  SSSync is free software: you can redistribute it and/or modify
 *  it under the terms of the GNU General Public License as published by
 *  the Free Software Foundation, either version 3 of the License, or
 *  (at your option) any later version.
 *
 *  SSSync is distributed in the hope that it will be useful,
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 *  GNU General Public License for more details.
 *
 *  You should have received a copy of the GNU General Public License
 *  along with SSSync.  If not, see <http://www.gnu.org/licenses/>
 */

package data.filters;

import java.util.Iterator;
import java.util.NoSuchElementException;

import data.MVDataEntry;
import data.io.AbstractMVDataReader;
import data.io.MVDataReader;

/**
 * Combines arbitrary number of MVData* sources while behaving same as AbstractMVDataReader.
 * This could enable a sync implementation to merge multiple sources
 *  before sync'ing in a transparent manner.
 * To prevent memory consumption, this assumes that all sources will be read
 *  with lexicographical ascending order on the "key" field.
 *  
 * @author lpouzenc
 */
public class MVDataCombiner extends AbstractMVDataReader {
	
	public enum MVDataCombineMode { PRIMARY_SOURCE, MERGE_APPEND, MERGE_REPLACE, OVERRIDE };
	
	private final MVDataReader[] readers;
	private final MVDataCombineMode[] mergeModes;
	
	private transient Iterator<MVDataEntry>[] readerIterators;
	private transient MVDataEntry[] lookAheadData;
	private transient String lastKey;
	
	
	public MVDataCombiner(String dataSourceName, MVDataReader[] readers, MVDataCombineMode mergeModes[]) {
		if ( readers == null || mergeModes == null || (mergeModes.length != readers.length) ) {
			throw new IllegalArgumentException("readers and mergeModes arrays should have same size");
		}
		if ( ! (mergeModes.length > 0) || mergeModes[0] != MVDataCombineMode.PRIMARY_SOURCE ) {
			throw new IllegalArgumentException("MVDataCombiner first mergeModes should always be PRIMARY_SOURCE");
		}
		
		this.dataSourceName = dataSourceName;
		this.readers = readers.clone();
		this.mergeModes = mergeModes.clone();
	}

	/**
	 * {@inheritDoc}
	 */
	@Override
	@SuppressWarnings("unchecked") /* for new Iterator[...] */
	public Iterator<MVDataEntry> iterator() {
		// Be cautious to reset everything
		readerIterators = new Iterator[readers.length];
		for (int i=0; i<readers.length;i++) {
			readerIterators[i] = readers[i].iterator();
		}
		lookAheadData = new MVDataEntry[readers.length];
		lastKey = null;
		
		return this;
	}

	/**
	 * {@inheritDoc}
	 */
	@Override
	public boolean hasNext() {
		for ( MVDataEntry line : lookAheadData ) {
			if ( line != null ) { return true; }
		}
		for ( MVDataReader reader : readers ) {
			if ( reader.hasNext() ) { return true; }
		}
		return false;
	}

	/**
	 * {@inheritDoc}
	 */
	@Override
	public MVDataEntry next() {

		final String currentKey = lookAheadAll();
	
		// Check if there was unsorted lines in source data
		if ( lastKey != null && (currentKey.compareTo(lastKey) < 0) ) {
				//XXX : this is checked here and in SafeDataReader (redundant), but both are optionnal...
				throw new UnsupportedOperationException("At least one data source is out of order. " +
						"Data sources are excepted to be read sorted by MVDataEntry key (ascending lexicogrpahical order)");
		}
		
		// Merge all data sources for key currentKey
		MVDataEntry result = null;
		for ( int i=0; i<lookAheadData.length; i++) {
			if ( lookAheadData[i] != null && lookAheadData[i].getKey().equals(currentKey) ) {
				if ( result == null ) {
					result = lookAheadData[i];
				} else {
					//XXX : some items in LDAP could have constrains like : "not multi-valued". Force MERGE_REPLACE mode ?
					//FIXME : honor all Combine modes
					result.mergeValues( (mergeModes[i] == MVDataCombineMode.MERGE_APPEND ),lookAheadData[i]);
				}
				lookAheadData[i]=null; // "Pop" the used entry
			}
		}
		
		lastKey = currentKey;
		
		return result;
	}

	private String lookAheadAll() {
		String minKey=null;
		
		// Feed the look-ahead buffer (look forward by 1 value for each reader)
		for ( int i=0; i<lookAheadData.length; i++) {
			if ( lookAheadData[i] == null && readerIterators[i].hasNext() ) {
				lookAheadData[i] = readerIterators[i].next();
			}
		}
		
		// Find the least RelData key from look-ahead buffers
		for (MVDataEntry entry: lookAheadData) {
			if ( entry != null ) {
				final String minKeyCandidate = entry.getKey();
				if ( minKey == null || minKey.compareTo(minKeyCandidate) > 0 ) {
					minKey = minKeyCandidate;
				}
			}
		}
		
		// Sanity checks
		if ( minKey == null ) {
			// Every reader is empty and look-ahead buffer is empty (hasNext() should have said false)
			throw new NoSuchElementException();
		}
		
		return minKey;
	}

	// Boring accessors
	
	public String getLastKey() {
		return lastKey;
	}
}