Skip to content

Commit

Permalink
Add pre-processing capabilities (#18)
Browse files Browse the repository at this point in the history
* Add pre-processing capabilities
* Use Completable for preprocessing, since it is easier to skip when no transformations are necessary.
  • Loading branch information
bjornandre authored Jun 30, 2023
1 parent 6839d6d commit d9e329b
Show file tree
Hide file tree
Showing 8 changed files with 78 additions and 21 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

<!-- Dependency versions -->
<avro-buddy-core.version>1.1.0</avro-buddy-core.version>
<dapla-dlp-pseudo-func.version>1.2.2</dapla-dlp-pseudo-func.version>
<dapla-dlp-pseudo-func.version>1.2.3</dapla-dlp-pseudo-func.version>
<dapla-storage-client.version>5.1.2</dapla-storage-client.version>
<guava.version>31.1-jre</guava.version>
<jsonassert.version>1.5.1</jsonassert.version>
Expand Down
7 changes: 7 additions & 0 deletions src/main/java/no/ssb/dlp/pseudo/core/StreamProcessor.java
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
package no.ssb.dlp.pseudo.core;

import io.reactivex.Completable;
import io.reactivex.Flowable;
import no.ssb.dlp.pseudo.core.map.RecordMapSerializer;

import java.io.InputStream;
import java.util.Map;

public interface StreamProcessor {
<T> Completable init(InputStream is, RecordMapSerializer<T> serializer);
<T> Flowable<T> process(InputStream is, RecordMapSerializer<T> serializer);
@FunctionalInterface
public interface ItemProcessor {
Map<String, Object> process(Map<String, Object> r);
}
}
23 changes: 16 additions & 7 deletions src/main/java/no/ssb/dlp/pseudo/core/csv/CsvStreamProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.univocity.parsers.common.record.Record;
import com.univocity.parsers.csv.CsvParser;
import com.univocity.parsers.csv.CsvParserSettings;
import io.reactivex.Completable;
import io.reactivex.Emitter;
import io.reactivex.Flowable;
import lombok.RequiredArgsConstructor;
Expand All @@ -12,7 +13,6 @@
import no.ssb.dlp.pseudo.core.map.RecordMapProcessor;
import no.ssb.dlp.pseudo.core.map.RecordMapSerializer;

import java.io.IOException;
import java.io.InputStream;
import java.util.LinkedHashMap;
import java.util.Map;
Expand All @@ -24,12 +24,21 @@ public class CsvStreamProcessor implements StreamProcessor {

private final RecordMapProcessor recordMapProcessor;

@Override
public <T> Completable init(InputStream is, RecordMapSerializer<T> serializer) {
if (recordMapProcessor.hasPreprocessors()) {
return Completable.fromPublisher(processStream(is, serializer, (map) -> recordMapProcessor.init(map)));
} else {
return Completable.complete();
}
}

@Override
public <T> Flowable<T> process(InputStream is, RecordMapSerializer<T> serializer) {
return processStream(is, serializer);
return processStream(is, serializer, (map) -> recordMapProcessor.process(map));
}

<T> CsvProcessorContext<T> initCsvProcessorContext(InputStream is, RecordMapSerializer<T> serializer) throws IOException {
<T> CsvProcessorContext<T> initCsvProcessorContext(InputStream is, RecordMapSerializer<T> serializer) {
CsvParserSettings settings = new CsvParserSettings();
settings.detectFormatAutomatically();
settings.setHeaderExtractionEnabled(true);
Expand All @@ -38,19 +47,19 @@ <T> CsvProcessorContext<T> initCsvProcessorContext(InputStream is, RecordMapSeri
return new CsvProcessorContext<>(csvParser, serializer);
}

private <T> Flowable<T> processStream(InputStream is, RecordMapSerializer<T> serializer) {
private <T> Flowable<T> processStream(InputStream is, RecordMapSerializer<T> serializer, ItemProcessor processor) {
return Flowable.generate(
() -> initCsvProcessorContext(is, serializer),
(ctx, emitter) -> {this.processItem(ctx, emitter);}
(ctx, emitter) -> {this.processItem(ctx, emitter, processor);}
);
}

private <T> void processItem(CsvProcessorContext<T> ctx, Emitter<T> emitter) {
private <T> void processItem(CsvProcessorContext<T> ctx, Emitter<T> emitter, ItemProcessor processor) {
Record r = ctx.csvParser.parseNextRecord();
if (r != null) {
int position = ctx.currentPosition.getAndIncrement();
Map<String, Object> recordMap = r.fillFieldObjectMap(new LinkedHashMap<>());
Map<String, Object> processedRecord = recordMapProcessor.process(recordMap);
Map<String, Object> processedRecord = processor.process(recordMap);
emitter.onNext(ctx.getSerializer().serialize(processedRecord, position));
}
else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,14 @@ public Optional<PseudoFuncRuleMatch> match(FieldDescriptor field) {
return pseudoFuncs.findPseudoFunc(field);
}

public String init(FieldDescriptor field, String varValue) {
Optional<PseudoFuncRuleMatch> match = pseudoFuncs.findPseudoFunc(field);
if (match.isPresent()) {
match.get().getFunc().init(PseudoFuncInput.of(varValue));
}
return varValue;
}

private String process(PseudoOperation operation, FieldDescriptor field, String varValue) {

// TODO: This check is function type specific (e.g. only applies for FPE?)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,15 @@
* Example: 1) Perform pseudonymization and 2) log schema metrics
*/
public class ValueInterceptorChain implements ValueInterceptor {
private final List<ValueInterceptor> initChain = new ArrayList<>();
private final List<ValueInterceptor> chain = new ArrayList<>();
private final Map<String, Serializable> context = new HashMap<>();

public ValueInterceptorChain preprocessor(ValueInterceptor valueInterceptor) {
initChain.add(valueInterceptor);
return this;
}

public ValueInterceptorChain register(ValueInterceptor valueInterceptor) {
chain.add(valueInterceptor);
return this;
Expand All @@ -26,6 +32,17 @@ public ValueInterceptorChain register(ValueInterceptor valueInterceptor, ValueIn
return this;
}

public String init(FieldDescriptor field, String value) {
for (ValueInterceptor vi : initChain) {
value = vi.apply(field, value);
}
return value;
}

public boolean hasPreprocessors() {
return !initChain.isEmpty();
}

@Override
public String apply(FieldDescriptor field, String value) {
for (ValueInterceptor vi : chain) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
public class PseudoFileSource {
private final MediaType providedMediaType;
private final MediaType mediaType;
private final InputStream inputStream;
private final Set<File> allFiles;
private final Collection<File> sourceFiles;

Expand All @@ -45,7 +44,6 @@ public PseudoFileSource(File file, MediaType sourceMediaType) {
if (sourceFiles.isEmpty()) {
throw new PseudoException("No files of type " + mediaType + " found");
}
inputStream = inputStreamOf(sourceFiles);
}
catch (IOException e) {
throw new PseudoException("Error initializing PseudoFileStream from file " + file, e);
Expand Down Expand Up @@ -75,7 +73,8 @@ public MediaType getMediaType() {
* @return a (possibly concatenated) input stream for the provided the files
*/
public InputStream getInputStream() {
return inputStream;
// Do this every time since the input stream cannot be re-used
return inputStreamOf(sourceFiles);
}

/**
Expand Down
30 changes: 20 additions & 10 deletions src/main/java/no/ssb/dlp/pseudo/core/json/JsonStreamProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import io.reactivex.Completable;
import io.reactivex.Emitter;
import io.reactivex.Flowable;
import lombok.RequiredArgsConstructor;
import lombok.Value;
import lombok.extern.slf4j.Slf4j;
import no.ssb.dlp.pseudo.core.PseudoOperation;
import no.ssb.dlp.pseudo.core.StreamProcessor;
import no.ssb.dlp.pseudo.core.map.RecordMap;
import no.ssb.dlp.pseudo.core.map.RecordMapProcessor;
Expand All @@ -35,25 +35,36 @@ public class JsonStreamProcessor implements StreamProcessor {

private final RecordMapProcessor recordMapProcessor;

@Override
public <T> Completable init(InputStream is, RecordMapSerializer<T> serializer) {
if (recordMapProcessor.hasPreprocessors()) {
return Completable.fromPublisher(processStream(is, serializer, (map) -> recordMapProcessor.init(map)));
} else {
return Completable.complete();
}
}

@Override
public <T> Flowable<T> process(InputStream is, RecordMapSerializer<T> serializer) {
return processStream(PseudoOperation.DEPSEUDONYMIZE, is, serializer);
return processStream(is, serializer, (map) -> recordMapProcessor.process(map));
}

<T> JsonProcessorContext<T> initJsonProcessorContext(PseudoOperation operation, InputStream is, RecordMapSerializer<T> serializer) throws IOException {
<T> JsonProcessorContext<T> initJsonProcessorContext(InputStream is, RecordMapSerializer<T> serializer) throws IOException {
final JsonParser jsonParser = OBJECT_MAPPER.getFactory().createParser(is);
return new JsonProcessorContext<>(operation, jsonParser, serializer);
return new JsonProcessorContext<>(jsonParser, serializer);
}

private <T> Flowable<T> processStream(PseudoOperation operation, InputStream is, RecordMapSerializer<T> serializer) {
private <T> Flowable<T> processStream(InputStream is, RecordMapSerializer<T> serializer,
ItemProcessor processor) {
return Flowable.generate(
() -> initJsonProcessorContext(operation, is, serializer),
(ctx, emitter) -> {this.processItem(ctx, emitter);},
() -> initJsonProcessorContext(is, serializer),
(ctx, emitter) -> {this.processItem(ctx, emitter, processor);},
JsonProcessorContext::close
);
}

private <T> void processItem(JsonProcessorContext<T> ctx, Emitter<T> emitter) throws IOException {
private <T> void processItem(JsonProcessorContext<T> ctx, Emitter<T> emitter,
ItemProcessor processor) throws IOException {
JsonParser jsonParser = ctx.getJsonParser();
JsonToken jsonToken = jsonParser.nextToken();
while (jsonToken == JsonToken.START_ARRAY || jsonToken == JsonToken.END_ARRAY) {
Expand All @@ -63,7 +74,7 @@ private <T> void processItem(JsonProcessorContext<T> ctx, Emitter<T> emitter) th
if (jsonToken != null) {
int position = ctx.currentPosition.getAndIncrement();
Map<String, Object> r = OBJECT_MAPPER.readValue(jsonParser, RecordMap.class);
Map<String, Object> processedRecord = recordMapProcessor.process(r);
Map<String, Object> processedRecord = processor.process(r);
emitter.onNext(ctx.getSerializer().serialize(processedRecord, position));
}
else {
Expand All @@ -73,7 +84,6 @@ private <T> void processItem(JsonProcessorContext<T> ctx, Emitter<T> emitter) th

@Value
static class JsonProcessorContext<T> {
private final PseudoOperation operation;
private final JsonParser jsonParser;
private final RecordMapSerializer<T> serializer;
private final AtomicInteger currentPosition = new AtomicInteger();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,15 @@
public class RecordMapProcessor {
private final ValueInterceptorChain valueInterceptorChain;

public Map<String, Object> init(Map<String, Object> r) {
return MapTraverser.traverse(r, valueInterceptorChain::init);
}

public Map<String, Object> process(Map<String, Object> r) {
return MapTraverser.traverse(r, valueInterceptorChain::apply);
}

public boolean hasPreprocessors() {
return valueInterceptorChain.hasPreprocessors();
}
}

0 comments on commit d9e329b

Please sign in to comment.