Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share
menu search
person
Welcome To Ask or Share your Answers For Others

Categories

I have a dataframe that contains 2 columns. For each row, I simply want to to create a Redis set where first value of dataframe is key and 2nd value is the value of the Redis set. I've done research and I think I found the fastest way of doing this via iterables:

def send_to_redis(df, r):
    df['bin_subscriber'] = df.apply(lambda row: uuid.UUID(row.subscriber).bytes, axis=1)
    df['bin_total_score'] = df.apply(lambda row: struct.pack('B', round(row.total_score)), axis=1)
    df = df[['bin_subscriber', 'bin_total_score']]
    with r.pipeline() as pipe:
        index = 0
        for subscriber, total_score in zip(df['bin_subscriber'], df['bin_total_score']):
            r.set(subscriber, total_score)
            if (index + 1) % 2000 == 0:
                pipe.execute()
            index += 1

With this, I can send about 400-500k sets to Redis per minute. We may end up processing up to 300 million which at this rate would take half a day or so. Doable but not ideal. Note that in the outer wrapper I am downloading .parquet files from s3 one at a time and pulling into Pandas via IO bytes.

def process_file(s3_resource, r, bucket, key):
    buffer = io.BytesIO()
    s3_object = s3_resource.Object(bucket, key)
    s3_object.download_fileobj(buffer)
    send_to_redis(
        pandas.read_parquet(buffer, columns=['subscriber', 'total_score']), r)

def main():
    args = get_args()
    s3_resource = boto3.resource('s3')
    r = redis.Redis()
    file_prefix = get_prefix(args)
    s3_keys = [
        item.key for item in
        s3_resource.Bucket(args.bucket).objects.filter(Prefix=file_prefix)
        if item.key.endswith('.parquet')
    ]
    for key in s3_keys:
        process_file(s3_resource, r, args.bucket, key)



Is there a way to send this data to Redis without the use of iteration? Is it possible to send an entire blob of data to Redis and have Redis set the key and value for every 1st and 2nd value of the data blob? I imagine that would be slightly faster.

The original parquet that I am pulling into Pandas is created via Pyspark. I've tried using the Spark-Redis plugin which is extremely fast, but I'm not sure how to convert my data to the above binary within a Spark dataframe itself and I don't like how the column name is added as a string to every single value and it doesn't seem to be configurable. Every redis object having that label seems very space inefficient.

Any suggestions would be greatly appreciated!

question from:https://stackoverflow.com/questions/66055935/fastest-way-to-send-dataframe-to-redis

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
491 views
Welcome To Ask or Share your Answers For Others

1 Answer

Try Redis Mass Insertion and redis bulk import using --pipe:

  1. Create a new text file input.txt containing the Redis command
Set Key0 Value0
set Key1 Value1
...
SET Keyn Valuen
  1. use redis-mass.py (see below) to insert to redis
python redis-mass.py input.txt | redis-cli --pipe

redis-mass.py from github.

#!/usr/bin/env python
"""
    redis-mass.py
    ~~~~~~~~~~~~~
    Prepares a newline-separated file of Redis commands for mass insertion.
    :copyright: (c) 2015 by Tim Simmons.
    :license: BSD, see LICENSE for more details.
"""
import sys

def proto(line):
    result = "*%s
$%s
%s
" % (str(len(line)), str(len(line[0])), line[0])
    for arg in line[1:]:
        result += "$%s
%s
" % (str(len(arg)), arg)
    return result

if __name__ == "__main__":
    try:
        filename = sys.argv[1]
        f = open(filename, 'r')
    except IndexError:
        f = sys.stdin.readlines()

    for line in f:
        print(proto(line.rstrip().split(' ')),)

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share
...