/** Local variable to read fsync.warningthresholdms into */ Long fsyncWarningThreshold; if ((fsyncWarningThreshold = Long.getLong(ZOOKEEPER_FSYNC_WARNING_THRESHOLD_MS_PROPERTY)) == null) { fsyncWarningThreshold = Long.getLong(FSYNC_WARNING_THRESHOLD_MS_PROPERTY, 1000); } fsyncWarningThresholdMS = fsyncWarningThreshold;
/** * append an entry to the transaction log * @param hdr the header of the transaction * @param txn the transaction part of the entry * returns true iff something appended, otw false * 向事务日志追加一个条目 */ publicsynchronizedbooleanappend(TxnHeader hdr, Record txn)throws IOException { return append(hdr, txn, null); }
@Override publicsynchronizedbooleanappend(TxnHeader hdr, Record txn, TxnDigest digest)throws IOException { if (hdr == null) { returnfalse; } if (hdr.getZxid() <= lastZxidSeen) { LOG.warn( "Current zxid {} is <= {} for {}", hdr.getZxid(), lastZxidSeen, Request.op2String(hdr.getType())); } else { lastZxidSeen = hdr.getZxid(); } if (logStream == null) { LOG.info("Creating new log file: {}", Util.makeLogName(hdr.getZxid()));
logFileWrite = newFile(logDir, Util.makeLogName(hdr.getZxid())); fos = newFileOutputStream(logFileWrite); logStream = newBufferedOutputStream(fos); oa = BinaryOutputArchive.getArchive(logStream); FileHeaderfhdr=newFileHeader(TXNLOG_MAGIC, VERSION, dbId); fhdr.serialize(oa, "fileheader"); // Make sure that the magic number is written before padding. logStream.flush(); filePadding.setCurrentSize(fos.getChannel().position()); streamsToFlush.add(fos); } filePadding.padFile(fos.getChannel()); byte[] buf = Util.marshallTxnEntry(hdr, txn, digest); if (buf == null || buf.length == 0) { thrownewIOException("Faulty serialization for header " + "and txn"); } Checksumcrc= makeChecksumAlgorithm(); crc.update(buf, 0, buf.length); oa.writeLong(crc.getValue(), "txnEntryCRC"); Util.writeTxnBytes(oa, buf);