Filename of the directory where the Lucene index is stored is the timestamp of when the index is created. This allows us to avoid touching that index if we don't need to. When several threads are updating a particular index, access to that index is quite slow.We are able to stripe the indexes themselves across the multiple disk partitions.As a result, it can contain a max of ~2 billion records. Lucene stores the document ID is a 32-bit integer, rather than a 64-bit integer.As we index the data in Lucene, we "shard" the Lucene indices so that they do not grow beyond some configurable amount of space (default of 500 MB).When all data has been written to the merged Provenance Event Log File, compressed, and indexed, we delete the original journal files.We do this so that we can allow multiple threads to index the data at once, as the indexing is very computationally expensive, and is actually the bottleneck of NiFi when processing extremely large volumes of data records. A separate thread will then pull this information from a queue and index the data in Lucene. After each record is written, it is then placed on a queue along with a pointer to the data.This allows us to access these records extremely quickly. We will have to read at most 1 MB of (decompressed) data. As a result, if we have a Provenance Event Log File that is, say 1 GB when compressed, and we want a specific record from it, we can simply seek to the block offset (say 980,028,872) and then wrap the FileInputStream with a GZIPInputStream and start reading from there. The pointer to the data is the Provenance Event Log File that the data is stored in, the ID of the event, and the compression block offset. This way, when we index the events, we are able to index the relevant fields as well as a pointer to the data. This offset is the offset into the file where this block of events starts. At the same time, we keep a mapping in a ".toc" (Table-of-Contents) file of compression block index to "compression block offset". As we compress the data, we keep track of the "compression block index." We write 1 MB of data to a GZIP Stream, and then we increment the compression block index.We do not index data as it is written, as doing so would slow down throughput.If we shutdown or lose power while writing, if we are writing to a compressed file, data may not be recoverable.We do not compress data as it is written, as doing so would slow down throughput.When we roll over the journals, we compress and index the data.As this is occurring, we 'roll over' the journals so that other threads can update the repository at the same time. After some configurable period of time (30 seconds, by default), we take all journals and merge them into a single Provenance Event Log File.This allows us to change the schema as we need but also allows us to avoid the expense of converting the Provenance Event into an intermediate data structure, such as an Avro Record, just so that it can then be serialized to disk, and then doing the same thing when we deserialize. If we write to only a single journal per disk partition, then we do not utilize the disk well, as the serialization from object to bytes is fairly expensive. We have multiple journals per container because we are serializing the data inline. When we update the repository, we round-robin between partitions.
0 Comments
Leave a Reply. |