Skip to content

Commit

Permalink
RANGER-5102: Add config param for writing audits to HDFS in append mo…
Browse files Browse the repository at this point in the history
…de (#513)

- added new config param to enable/disable append mode
- added unit tests
  • Loading branch information
kumaab authored Feb 24, 2025
1 parent 324d5db commit 87666d3
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,13 @@
public abstract class AbstractRangerAuditWriter implements RangerAuditWriter {
private static final Logger logger = LoggerFactory.getLogger(AbstractRangerAuditWriter.class);

public static final String PROP_FILESYSTEM_DIR = "dir";
public static final String PROP_FILESYSTEM_SUBDIR = "subdir";
public static final String PROP_FILESYSTEM_FILE_NAME_FORMAT = "filename.format";
public static final String PROP_FILESYSTEM_FILE_ROLLOVER = "file.rollover.sec";
public static final String PROP_FILESYSTEM_ROLLOVER_PERIOD = "file.rollover.period";
public static final String PROP_FILESYSTEM_FILE_EXTENSION = ".log";
public static final String PROP_FILESYSTEM_DIR = "dir";
public static final String PROP_FILESYSTEM_SUBDIR = "subdir";
public static final String PROP_FILESYSTEM_FILE_NAME_FORMAT = "filename.format";
public static final String PROP_FILESYSTEM_FILE_ROLLOVER = "file.rollover.sec";
public static final String PROP_FILESYSTEM_ROLLOVER_PERIOD = "file.rollover.period";
public static final String PROP_FILESYSTEM_FILE_EXTENSION = ".log";
public static final String PROP_IS_APPEND_ENABLED = "file.append.enabled";

public Configuration conf;
public FileSystem fileSystem;
Expand Down Expand Up @@ -225,18 +226,19 @@ public void init(Properties props, String propPrefix) {
logFileNameFormat = "%app-type%_ranger_audit_%hostname%" + fileExtension;
}

logFolder = logFolderProp + "/" + logSubFolder;
reUseLastLogFile = MiscUtil.getBooleanProperty(props, propPrefix + "." + PROP_IS_APPEND_ENABLED, false);
logFolder = logFolderProp + "/" + logSubFolder;

logger.info("logFolder={}, destName={}", logFolder, auditProviderName);
logger.info("logFileNameFormat={}, destName={}", logFileNameFormat, auditProviderName);
logger.info("config={}", auditConfigs);
logger.info("logFolder = {}, destName = {}", logFolder, auditProviderName);
logger.info("logFileNameFormat = {}, destName = {}", logFileNameFormat, auditProviderName);
logger.info("config = {}", auditConfigs);
logger.info("isAppendEnabled = {}", reUseLastLogFile);

rolloverPeriod = MiscUtil.getStringProperty(props, propPrefix + "." + PROP_FILESYSTEM_ROLLOVER_PERIOD);
rollingTimeUtil = RollingTimeUtil.getInstance();

//file.rollover.period is used for rolling over. If it could compute the next roll over time using file.rollover.period
//it fall back to use file.rollover.sec for find next rollover time. If still couldn't find default will be 1day window
//for rollover.
//file.rollover.period is used for rolling over. If it could compute the next rollover time using file.rollover.period
//it fallbacks to use file.rollover.sec for find next rollover time. If still couldn't find default will be 1day window for rollover.
if (StringUtils.isEmpty(rolloverPeriod)) {
rolloverPeriod = rollingTimeUtil.convertRolloverSecondsToRolloverPeriod(fileRolloverSec);
}
Expand Down Expand Up @@ -272,7 +274,8 @@ public void closeFileIfNeeded() {
setNextRollOverTime();

currentFileName = null;
reUseLastLogFile = false;
auditPath = null;
fullPath = null;
}

logger.debug("<== AbstractRangerAuditWriter.closeFileIfNeeded()");
Expand All @@ -290,13 +293,13 @@ public PrintWriter createWriter() throws Exception {
if (logWriter == null) {
boolean appendMode = false;

// if append is supported, reuse last log file
if (reUseLastLogFile && isAppendEnabled()) {
logger.info("Appending to last log file. auditPath = {}", fullPath);

// if append is supported and enabled via config param, reuse last log file
if (auditPath != null && reUseLastLogFile && isAppendEnabled()) {
try {
ostream = fileSystem.append(auditPath);
appendMode = true;

logger.info("Appending to last log file. auditPath = {}", fullPath);
} catch (Exception e) {
logger.error("Failed to append to file {} due to {}", fullPath, e.getMessage());
logger.info("Falling back to create a new log file!");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,25 +119,20 @@ public synchronized boolean logJSON(final Collection<String> events) throws Exce
closeWriter();
resetWriter();

reUseLastLogFile = true;

return false;
}
} catch (Exception e) {
logger.error("Exception encountered while writing audits to HDFS!", e);
closeWriter();
resetWriter();

reUseLastLogFile = true;

return false;
} finally {
logger.debug("Flushing HDFS audit. Event Size:{}", events.size());

if (out != null) {
out.flush();
}
//closeWriter();
}

return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,22 @@

package org.apache.ranger.audit.utils;

import org.apache.hadoop.fs.CommonPathCapabilities;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.Test;

import java.io.IOException;
import java.io.PrintWriter;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;

Expand All @@ -47,77 +48,135 @@ public void setup() {
props.setProperty("test.dir", "/tmp");

auditConfigs.put(FileSystem.FS_DEFAULT_NAME_KEY, FileSystem.DEFAULT_FS);
auditConfigs.put("fs.file.impl", "org.apache.hadoop.fs.RawLocalFileSystem");
}

@Test
public void checkReUseFlagInStreamErrors() throws Exception {
public void verifyAppendToFileWhenEnabledWithConfig() throws Exception {
RangerJSONAuditWriter jsonAuditWriter = spy(new RangerJSONAuditWriter());
PrintWriter out = mock(PrintWriter.class);

setup();
props.setProperty("test.file.append.enabled", "true");
jsonAuditWriter.init(props, "test", "localfs", auditConfigs);

assertTrue(jsonAuditWriter.logJSON(Collections.singleton("This event will be logged in write(create) mode!")));
assertTrue(jsonAuditWriter.reUseLastLogFile);

when(jsonAuditWriter.getLogFileStream()).thenThrow(new IOException("Unable to fetch log file stream!"));
assertFalse(jsonAuditWriter.logJSON(Collections.singleton("This event will not be logged due to exception!")));

assertNull(jsonAuditWriter.ostream);
assertNull(jsonAuditWriter.logWriter);
assertNotNull(jsonAuditWriter.auditPath);
assertNotNull(jsonAuditWriter.fullPath);

reset(jsonAuditWriter);
assertTrue(jsonAuditWriter.logJSON(Collections.singleton("Last log file will be opened in append mode and this event will be written")));
assertTrue(jsonAuditWriter.logJSON(Collections.singleton("This event will also be written in append mode")));

jsonAuditWriter.fileSystem.deleteOnExit(jsonAuditWriter.auditPath);
}

@Test
public void verifyFileRolloverWithAppend() throws Exception {
RangerJSONAuditWriter jsonAuditWriter = spy(new RangerJSONAuditWriter());

setup();
props.setProperty("test.file.rollover.enable.periodic.rollover", "true");
props.setProperty("test.file.rollover.periodic.rollover.check.sec", "2");
props.setProperty("test.file.append.enabled", "true");
// rollover log file after this interval
jsonAuditWriter.fileRolloverSec = 5; // in seconds
jsonAuditWriter.init(props, "test", "localfs", auditConfigs);
assertFalse(jsonAuditWriter.reUseLastLogFile);

when(jsonAuditWriter.getLogFileStream()).thenReturn(out);
when(out.checkError()).thenReturn(true);
assertFalse(jsonAuditWriter.logJSON(Collections.singleton("This event will not be logged!")));
assertTrue(jsonAuditWriter.reUseLastLogFile);
assertTrue(jsonAuditWriter.logJSON(Collections.singleton("This event will be logged in write(create) mode!")));

when(jsonAuditWriter.getLogFileStream()).thenThrow(new IOException("Unable to fetch log file stream!"));
assertFalse(jsonAuditWriter.logJSON(Collections.singleton("This event will not be logged due to exception!")));

assertNull(jsonAuditWriter.ostream);
assertNull(jsonAuditWriter.logWriter);
assertNotNull(jsonAuditWriter.auditPath);
assertNotNull(jsonAuditWriter.fullPath);

reset(jsonAuditWriter);

assertTrue(jsonAuditWriter.logJSON(Collections.singleton("Last log file will be opened in append mode and this event will be written")));
assertTrue(jsonAuditWriter.logJSON(Collections.singleton("This event will also be written in append mode")));
Path auditPath1 = jsonAuditWriter.auditPath;

Thread.sleep(6000);

// rollover should have happened
assertTrue(jsonAuditWriter.reUseLastLogFile);
assertNull(jsonAuditWriter.ostream);
assertNull(jsonAuditWriter.logWriter);
assertNull(jsonAuditWriter.auditPath);
assertNull(jsonAuditWriter.fullPath);

assertTrue(jsonAuditWriter.logJSON(Collections.singleton("Second file created since rollover happened!")));

// ensure the same rolled over file is not used for append
assertNotEquals(auditPath1, jsonAuditWriter.auditPath);

// cleanup
jsonAuditWriter.fileSystem.deleteOnExit(auditPath1);
jsonAuditWriter.fileSystem.deleteOnExit(jsonAuditWriter.auditPath);
jsonAuditWriter.logJSON(Collections.singleton("cleaning up!"));
jsonAuditWriter.closeWriter();
}

@Test
public void checkAppendtoFileWhenExceptionsOccur() throws Exception {
public void verifyNoAppendToFileWhenDisabledWithConfig() throws Exception {
RangerJSONAuditWriter jsonAuditWriter = spy(new RangerJSONAuditWriter());

setup();

props.setProperty("test.file.append.enabled", "false");
jsonAuditWriter.init(props, "test", "localfs", auditConfigs);
jsonAuditWriter.createFileSystemFolders();

// File creation should fail with an exception which will trigger append next time.
when(jsonAuditWriter.fileSystem.create(jsonAuditWriter.auditPath)).thenThrow(new IOException("Creation not allowed!"));
jsonAuditWriter.logJSON(Collections.singleton("This event will not be logged!"));
jsonAuditWriter.fileSystem.deleteOnExit(jsonAuditWriter.auditPath);
assertTrue(jsonAuditWriter.reUseLastLogFile);
when(jsonAuditWriter.fileSystem.create(jsonAuditWriter.auditPath)).thenThrow(new IOException("Creation not allowed at this time!"));
assertFalse(jsonAuditWriter.logJSON(Collections.singleton("This event will not be logged!")));
assertFalse(jsonAuditWriter.reUseLastLogFile);
assertNull(jsonAuditWriter.ostream);
assertNull(jsonAuditWriter.logWriter);
assertNotNull(jsonAuditWriter.auditPath);
assertNotNull(jsonAuditWriter.fullPath);

jsonAuditWriter.fileSystem = mock(FileSystem.class);
when(jsonAuditWriter.fileSystem.hasPathCapability(jsonAuditWriter.auditPath, CommonPathCapabilities.FS_APPEND)).thenReturn(true);
Path auditPath1 = jsonAuditWriter.auditPath;

reset(jsonAuditWriter);
assertTrue(jsonAuditWriter.logJSON(Collections.singleton("This event should be written to a newly created file in write mode!")));
assertTrue(jsonAuditWriter.logJSON(Collections.singleton("This event should also be written to the previous file")));
assertFalse(jsonAuditWriter.reUseLastLogFile);

// cleanup
jsonAuditWriter.fileSystem.deleteOnExit(auditPath1);
jsonAuditWriter.fileSystem.deleteOnExit(jsonAuditWriter.auditPath);
// this will lead to an exception since append is called on mocks
jsonAuditWriter.logJSON(Collections.singleton("This event should be appended but won't be as appended we use mocks."));
}

@Test
public void checkFileRolloverAfterThreshold() throws Exception {
public void verifyFileRolloverAfterThreshold() throws Exception {
RangerJSONAuditWriter jsonAuditWriter = spy(new RangerJSONAuditWriter());

setup();

props.setProperty("test.file.rollover.enable.periodic.rollover", "true");
props.setProperty("test.file.rollover.periodic.rollover.check.sec", "2");
// rollover log file after this interval

jsonAuditWriter.fileRolloverSec = 5; // in seconds
jsonAuditWriter.init(props, "test", "localfs", auditConfigs);

assertTrue(jsonAuditWriter.logJSON(Collections.singleton("First file created and added this line!")));
Path auditPath1 = jsonAuditWriter.auditPath;

jsonAuditWriter.fileSystem.deleteOnExit(jsonAuditWriter.auditPath); // cleanup
Thread.sleep(6000);
assertFalse(jsonAuditWriter.reUseLastLogFile);

assertNull(jsonAuditWriter.ostream);
assertNull(jsonAuditWriter.logWriter);

assertTrue(jsonAuditWriter.logJSON(Collections.singleton("Second file created since rollover happened!")));

jsonAuditWriter.fileSystem.deleteOnExit(jsonAuditWriter.auditPath); // cleanup
// cleanup
jsonAuditWriter.fileSystem.deleteOnExit(auditPath1);
jsonAuditWriter.fileSystem.deleteOnExit(jsonAuditWriter.auditPath);
jsonAuditWriter.closeWriter();
}
}

0 comments on commit 87666d3

Please sign in to comment.