Note by using lit(1) for both the partitioning and the ordering - this makes everything to be in the same partition, and seems to preserve the original ordering of the DataFrame. Below is small code snippet : import .expressions._ĭf.withColumn("row_num", row_number.over(Window.partitionBy(lit(1)).orderBy(lit(1)))) This can cause performance and memory issues - we can easily go OOM, depending on how much data and how much memory we have. The Window in both cases (sortable and not sortable data) consists basically of all the rows we currently have so that the row_number() function can go over them and increment the row number. In order to use row_number(), we need to move our data into one partition. It returns a sequential number starting at 1 within a window partition. Note that I found performance for the the above dfZipWithIndex to be significantly faster than the below algorithm. Instead of having to convert the DataFrame to an RDD, you can now use .expressions.row_number. Starting in Spark 1.5, Window expressions were added to Spark. Keep in mind falling back to RDDs and then to dataframe can be quite expensive. Row.fromSeq(Seq(element._2 + offset) ++ element._1.toSeq)Īrray(StructField(colName,LongType,false)) ++ df.schema.fields This expression would return the following IDs: 0, 1, 2, 8589934592 (1L The assumption is that the Spark DataFrame has less than 1 billion partitions, and each partition has less than 8 billion records.Īs an example, consider a Spark DataFrame with two partitions, each with 3 records. The current implementation puts the partition ID in the upper 31 bits, and the record number within each partition in the lower 33 bits. It generates a new column with unique 64-bit monotonic index for each row. The generated ID is guaranteed to be monotonically increasing and unique, but not consecutive. Since Spark 1.6 there is a function called monotonically_increasing_id(). Let’s discuss them and the catch behind using them in detail. Depending on the need, we might be in a position where we can benefit from having a unique auto-increment-ids like behavior in a spark dataframe. What happens though when you have distributed data, split into partitions that might resides in different machines like in apache spark?Īnd, coming from traditional relational databases, one may be used to working with ids (auto incremented usually) for identification, ordering and use them as reference in the constraints in data. When the data is in one table or dataframe (in one machine), adding sequential/unique ids is pretty straight forward.
0 Comments
Leave a Reply. |