Skip to content

Commit

Permalink
RANGER-5102: Add config param for writing audits to HDFS in append mode
Browse files Browse the repository at this point in the history
  • Loading branch information
kumaab committed Feb 24, 2025
1 parent aaaef8f commit dfde509
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,14 @@
public abstract class AbstractRangerAuditWriter implements RangerAuditWriter {
private static final Logger logger = LoggerFactory.getLogger(AbstractRangerAuditWriter.class);

public static final String PROP_FILESYSTEM_DIR = "dir";
public static final String PROP_FILESYSTEM_SUBDIR = "subdir";
public static final String PROP_FILESYSTEM_FILE_NAME_FORMAT = "filename.format";
public static final String PROP_FILESYSTEM_FILE_ROLLOVER = "file.rollover.sec";
public static final String PROP_FILESYSTEM_ROLLOVER_PERIOD = "file.rollover.period";
public static final String PROP_FILESYSTEM_FILE_EXTENSION = ".log";
public static final String PROP_FILESYSTEM_DIR = "dir";
public static final String PROP_FILESYSTEM_SUBDIR = "subdir";
public static final String PROP_FILESYSTEM_FILE_NAME_FORMAT = "filename.format";
public static final String PROP_FILESYSTEM_FILE_ROLLOVER = "file.rollover.sec";
public static final String PROP_FILESYSTEM_ROLLOVER_PERIOD = "file.rollover.period";
public static final String PROP_FILESYSTEM_FILE_EXTENSION = ".log";
public static final String PROP_IS_APPEND_ENABLED = "file.append.enabled";

public Configuration conf = null;
public FileSystem fileSystem = null;
public Map<String, String> auditConfigs = null;
Expand All @@ -66,7 +68,7 @@ public abstract class AbstractRangerAuditWriter implements RangerAuditWriter {
public boolean rollOverByDuration = false;
public volatile FSDataOutputStream ostream = null; // output stream wrapped in logWriter
private boolean isHFlushCapableStream = false;
protected boolean reUseLastLogFile = false;
protected boolean reUseLastLogFile = false;

@Override
public void init(Properties props, String propPrefix, String auditProviderName, Map<String,String> auditConfigs) {
Expand Down Expand Up @@ -136,8 +138,7 @@ public Configuration createConfiguration() {
return conf;
}

public void createParents(Path pathLogfile, FileSystem fileSystem)
throws Exception {
public void createParents(Path pathLogfile, FileSystem fileSystem) throws Exception {
logger.info("Creating parent folder for " + pathLogfile);
Path parentPath = pathLogfile != null ? pathLogfile.getParent() : null;

Expand Down Expand Up @@ -178,11 +179,13 @@ public void init(Properties props, String propPrefix) {
logFileNameFormat = "%app-type%_ranger_audit_%hostname%" + fileExtension;
}

reUseLastLogFile = MiscUtil.getBooleanProperty(props, propPrefix + "." + PROP_IS_APPEND_ENABLED, false);
logFolder = logFolderProp + "/" + logSubFolder;

logger.info("logFolder=" + logFolder + ", destName=" + auditProviderName);
logger.info("logFileNameFormat=" + logFileNameFormat + ", destName="+ auditProviderName);
logger.info("config=" + auditConfigs.toString());
logger.info("logFolder={}, destName={}", logFolder, auditProviderName);
logger.info("logFileNameFormat={}, destName={}", logFileNameFormat, auditProviderName);
logger.info("config={}", auditConfigs.toString());
logger.info("isAppendEnabled = {}", reUseLastLogFile);

rolloverPeriod = MiscUtil.getStringProperty(props, propPrefix + "." + PROP_FILESYSTEM_ROLLOVER_PERIOD);
rollingTimeUtil = RollingTimeUtil.getInstance();
Expand Down Expand Up @@ -228,7 +231,8 @@ public void closeFileIfNeeded() {
setNextRollOverTime();

currentFileName = null;
reUseLastLogFile = false;
auditPath = null;
fullPath = null;
}

if (logger.isDebugEnabled()) {
Expand All @@ -248,12 +252,14 @@ public PrintWriter createWriter() throws Exception {

if (logWriter == null) {
boolean appendMode = false;
// if append is supported, reuse last log file
if (reUseLastLogFile && isAppendEnabled()) {
logger.info("Appending to last log file. auditPath = {}", fullPath);

// if append is supported and enabled via config param, reuse last log file
if (auditPath != null && reUseLastLogFile && isAppendEnabled()) {
try {
ostream = fileSystem.append(auditPath);
appendMode = true;

logger.info("Appending to last log file. auditPath = {}", fullPath);
} catch (Exception e){
logger.error("Failed to append to file {} due to {}", fullPath, e.getMessage());
logger.info("Falling back to create a new log file!");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,14 +113,12 @@ public PrintWriter run() throws Exception {
logger.error("Stream encountered errors while writing audits to HDFS!");
closeWriter();
resetWriter();
reUseLastLogFile = true;
return false;
}
} catch (Exception e) {
logger.error("Exception encountered while writing audits to HDFS!", e);
closeWriter();
resetWriter();
reUseLastLogFile = true;
return false;
} finally {
if (logger.isDebugEnabled()) {
Expand All @@ -129,7 +127,6 @@ public PrintWriter run() throws Exception {
if (out != null) {
out.flush();
}
//closeWriter();
}

return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,23 @@

package org.apache.ranger.audit.utils;

import org.apache.hadoop.fs.CommonPathCapabilities;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.Test;

import java.io.IOException;
import java.util.Map;
import java.util.HashMap;
import java.util.Properties;
import java.util.Collections;
import java.io.PrintWriter;

import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.when;

public class RangerJSONAuditWriterTest {
Expand All @@ -46,58 +47,118 @@ public void setup(){
props.setProperty("test.dir", "/tmp");
auditConfigs = new HashMap<>();
auditConfigs.put(FileSystem.FS_DEFAULT_NAME_KEY, FileSystem.DEFAULT_FS);
auditConfigs.put("fs.file.impl", "org.apache.hadoop.fs.RawLocalFileSystem");
}

@Test
public void checkReUseFlagInStreamErrors() throws Exception {
public void verifyAppendToFileWhenEnabledWithConfig() throws Exception {
RangerJSONAuditWriter jsonAuditWriter = spy(new RangerJSONAuditWriter());

setup();
props.setProperty("test.file.append.enabled", "true");
jsonAuditWriter.init(props, "test", "localfs", auditConfigs);

assertTrue(jsonAuditWriter.logJSON(Collections.singleton("This event will be logged in write(create) mode!")));
assertTrue(jsonAuditWriter.reUseLastLogFile);

when(jsonAuditWriter.getLogFileStream()).thenThrow(new IOException("Unable to fetch log file stream!"));
assertFalse(jsonAuditWriter.logJSON(Collections.singleton("This event will not be logged due to exception!")));

assertNull(jsonAuditWriter.ostream);
assertNull(jsonAuditWriter.logWriter);
assertNotNull(jsonAuditWriter.auditPath);
assertNotNull(jsonAuditWriter.fullPath);

reset(jsonAuditWriter);
assertTrue(jsonAuditWriter.logJSON(Collections.singleton("Last log file will be opened in append mode and this event will be written")));
assertTrue(jsonAuditWriter.logJSON(Collections.singleton("This event will also be written in append mode")));

jsonAuditWriter.fileSystem.deleteOnExit(jsonAuditWriter.auditPath);
}

@Test
public void verifyFileRolloverWithAppend() throws Exception {
RangerJSONAuditWriter jsonAuditWriter = spy(new RangerJSONAuditWriter());
PrintWriter out = mock(PrintWriter.class);

setup();
props.setProperty("test.file.rollover.enable.periodic.rollover", "true");
props.setProperty("test.file.rollover.periodic.rollover.check.sec", "2");
props.setProperty("test.file.append.enabled", "true");
// rollover log file after this interval
jsonAuditWriter.fileRolloverSec = 5; // in seconds
jsonAuditWriter.init(props, "test", "localfs", auditConfigs);

assertFalse(jsonAuditWriter.reUseLastLogFile);
when(jsonAuditWriter.getLogFileStream()).thenReturn(out);
when(out.checkError()).thenReturn(true);
assertFalse(jsonAuditWriter.logJSON(Collections.singleton("This event will not be logged!")));
assertTrue(jsonAuditWriter.reUseLastLogFile);
assertTrue(jsonAuditWriter.logJSON(Collections.singleton("This event will be logged in write(create) mode!")));

when(jsonAuditWriter.getLogFileStream()).thenThrow(new IOException("Unable to fetch log file stream!"));
assertFalse(jsonAuditWriter.logJSON(Collections.singleton("This event will not be logged due to exception!")));

assertNull(jsonAuditWriter.ostream);
assertNull(jsonAuditWriter.logWriter);
assertNotNull(jsonAuditWriter.auditPath);
assertNotNull(jsonAuditWriter.fullPath);

reset(jsonAuditWriter);

assertTrue(jsonAuditWriter.logJSON(Collections.singleton("Last log file will be opened in append mode and this event will be written")));
assertTrue(jsonAuditWriter.logJSON(Collections.singleton("This event will also be written in append mode")));
Path auditPath1 = jsonAuditWriter.auditPath;

Thread.sleep(6000);

// rollover should have happened
assertTrue(jsonAuditWriter.reUseLastLogFile);
assertNull(jsonAuditWriter.ostream);
assertNull(jsonAuditWriter.logWriter);
assertNull(jsonAuditWriter.auditPath);
assertNull(jsonAuditWriter.fullPath);

assertTrue(jsonAuditWriter.logJSON(Collections.singleton("Second file created since rollover happened!")));

// ensure the same rolled over file is not used for append
assertNotEquals(auditPath1, jsonAuditWriter.auditPath);

// cleanup
jsonAuditWriter.fileSystem.deleteOnExit(auditPath1);
jsonAuditWriter.fileSystem.deleteOnExit(jsonAuditWriter.auditPath);
jsonAuditWriter.logJSON(Collections.singleton("cleaning up!"));
jsonAuditWriter.closeWriter();
}

@Test
public void checkAppendtoFileWhenExceptionsOccur() throws Exception {
public void verifyNoAppendToFileWhenDisabledWithConfig() throws Exception {
RangerJSONAuditWriter jsonAuditWriter = spy(new RangerJSONAuditWriter());

setup();

props.setProperty("test.file.append.enabled", "false");
jsonAuditWriter.init(props, "test", "localfs", auditConfigs);
jsonAuditWriter.createFileSystemFolders();
// File creation should fail with an exception which will trigger append next time.
when(jsonAuditWriter.fileSystem.create(jsonAuditWriter.auditPath))
.thenThrow(new IOException("Creation not allowed!"));
jsonAuditWriter.logJSON(Collections.singleton("This event will not be logged!"));
jsonAuditWriter.fileSystem.deleteOnExit(jsonAuditWriter.auditPath);

assertTrue(jsonAuditWriter.reUseLastLogFile);
when(jsonAuditWriter.fileSystem.create(jsonAuditWriter.auditPath)).thenThrow(new IOException("Creation not allowed at this time!"));
assertFalse(jsonAuditWriter.logJSON(Collections.singleton("This event will not be logged!")));
assertFalse(jsonAuditWriter.reUseLastLogFile);
assertNull(jsonAuditWriter.ostream);
assertNull(jsonAuditWriter.logWriter);
assertNotNull(jsonAuditWriter.auditPath);
assertNotNull(jsonAuditWriter.fullPath);

jsonAuditWriter.fileSystem = mock(FileSystem.class);
when(jsonAuditWriter.fileSystem
.hasPathCapability(jsonAuditWriter.auditPath, CommonPathCapabilities.FS_APPEND)).thenReturn(true);
Path auditPath1 = jsonAuditWriter.auditPath;

reset(jsonAuditWriter);
assertTrue(jsonAuditWriter.logJSON(Collections.singleton("This event should be written to a newly created file in write mode!")));
assertTrue(jsonAuditWriter.logJSON(Collections.singleton("This event should also be written to the previous file")));
assertFalse(jsonAuditWriter.reUseLastLogFile);

// cleanup
jsonAuditWriter.fileSystem.deleteOnExit(auditPath1);
jsonAuditWriter.fileSystem.deleteOnExit(jsonAuditWriter.auditPath);
// this will lead to an exception since append is called on mocks
jsonAuditWriter.logJSON(Collections.singleton(
"This event should be appended but won't be as appended we use mocks."));
}


@Test
public void checkFileRolloverAfterThreshold() throws Exception {
public void verifyFileRolloverAfterThreshold() throws Exception {
RangerJSONAuditWriter jsonAuditWriter = spy(new RangerJSONAuditWriter());

setup();
Expand All @@ -107,17 +168,18 @@ public void checkFileRolloverAfterThreshold() throws Exception {
jsonAuditWriter.fileRolloverSec = 5; // in seconds
jsonAuditWriter.init(props, "test", "localfs", auditConfigs);


assertTrue(jsonAuditWriter.logJSON(Collections.singleton("First file created and added this line!")));
jsonAuditWriter.fileSystem.deleteOnExit(jsonAuditWriter.auditPath); // cleanup
Path auditPath1 = jsonAuditWriter.auditPath;

Thread.sleep(6000);

assertFalse(jsonAuditWriter.reUseLastLogFile);
assertNull(jsonAuditWriter.ostream);
assertNull(jsonAuditWriter.logWriter);

assertTrue(jsonAuditWriter.logJSON(Collections.singleton("Second file created since rollover happened!")));
jsonAuditWriter.fileSystem.deleteOnExit(jsonAuditWriter.auditPath); // cleanup

// cleanup
jsonAuditWriter.fileSystem.deleteOnExit(auditPath1);
jsonAuditWriter.fileSystem.deleteOnExit(jsonAuditWriter.auditPath);
jsonAuditWriter.closeWriter();
}
}

0 comments on commit dfde509

Please sign in to comment.