python-将生成器拆分为多个块,而无需预先遍历

(这个问题与此相关,但是这些问题是在发电机前进行的,这正是我要避免的问题)

我想将生成器拆分为多个块。 要求是:

  • 不要填充数据块:如果剩余元素的数量小于数据块大小,则最后一个数据块必须较小。
  • 不要事先走生成器:计算元素是昂贵的,并且必须仅通过使用函数来完成,而不是通过分块器来完成
  • 这当然意味着:不要在内存中累积(无列表)

我尝试了以下代码:

def head(iterable, max=10):
    for cnt, el in enumerate(iterable):
        yield el
        if cnt >= max:
            break

def chunks(iterable, size=10):
    i = iter(iterable)
    while True:
        yield head(i, size)

# Sample generator: the real data is much more complex, and expensive to compute
els = xrange(7)

for n, chunk in enumerate(chunks(els, 3)):
    for el in chunk:
        print 'Chunk %3d, value %d' % (n, el)

这以某种方式起作用:

Chunk   0, value 0
Chunk   0, value 1
Chunk   0, value 2
Chunk   1, value 3
Chunk   1, value 4
Chunk   1, value 5
Chunk   2, value 6
^CTraceback (most recent call last):
  File "xxxx.py", line 15, in <module>
    for el in chunk:
  File "xxxx.py", line 2, in head
    for cnt, el in enumerate(iterable):
KeyboardInterrupt

Buuuut ...因为2964171943525745745665,它永远不会停止(我必须按chunks)。无论何时使用完发电机,我都想停止该循环,但是我不知道如何检测这种情况。 我试图提出一个异常:

class NoMoreData(Exception):
    pass

def head(iterable, max=10):
    for cnt, el in enumerate(iterable):
        yield el
        if cnt >= max:
            break
    if cnt == 0 : raise NoMoreData()

def chunks(iterable, size=10):
    i = iter(iterable)
    while True:
        try:
            yield head(i, size)
        except NoMoreData:
            break

# Sample generator: the real data is much more complex, and expensive to compute    
els = xrange(7)

for n, chunk in enumerate(chunks(els, 2)):
    for el in chunk:
        print 'Chunk %3d, value %d' % (n, el)

但是然后仅在使用方的上下文中引发异常,这不是我想要的(我想保持使用方代码干净)

Chunk   0, value 0
Chunk   0, value 1
Chunk   0, value 2
Chunk   1, value 3
Chunk   1, value 4
Chunk   1, value 5
Chunk   2, value 6
Traceback (most recent call last):
  File "xxxx.py", line 22, in <module>
    for el in chunk:
  File "xxxx.py", line 9, in head
    if cnt == 0 : raise NoMoreData
__main__.NoMoreData()

如何在chunks功能中检测到发电机已耗尽,而没有行走?

dangonfast asked 2020-07-13T04:56:01Z
10个解决方案
62 votes

一种方法是窥视第一个元素(如果有),然后创建并返回实际的生成器。

def head(iterable, max=10):
    first = next(iterable)      # raise exception when depleted
    def head_inner():
        yield first             # yield the extracted first element
        for cnt, el in enumerate(iterable):
            yield el
            if cnt + 1 >= max:  # cnt + 1 to include first
                break
    return head_inner()

只需在您的itertools.chain生成器中使用它,并像使用自定义异常一样捕获for异常即可。


更新:这是另一个版本,使用itertools.chain替换了大多数itertools.chain函数和for循环。 实际上,这个简单的for循环与原始代码中笨拙的while-try-next-except-break构造具有完全相同的作用,因此结果更具可读性。

def chunks(iterable, size=10):
    iterator = iter(iterable)
    for first in iterator:    # stops when iterator is depleted
        def chunk():          # construct generator for next chunk
            yield first       # yield element from for loop
            for more in islice(iterator, size - 1):
                yield more    # yield more elements from the iterator
        yield chunk()         # in outer generator, yield next chunk

使用itertools.chain替换内部发电机,我们可以得到比这更短的东西:

def chunks(iterable, size=10):
    iterator = iter(iterable)
    for first in iterator:
        yield chain([first], islice(iterator, size - 1))
tobias_k answered 2020-07-13T04:56:25Z
10 votes

创建组/块而不预走发生器的另一种方法是在使用g对象的关键功能上使用g。 由于g对象与可迭代对象无关,因此可以轻松生成块,而无需任何有关可迭代对象的知识。

g的每次迭代都会调用count对象的groupby方法,并通过将当前计数值除以块的大小来生成组/块键(后跟块中的项)。

from itertools import groupby, count

def chunks(iterable, size=10):
    c = count()
    for _, g in groupby(iterable, lambda _: next(c)//size):
        yield g

生成器函数产生的每个组/块g是一个迭代器。 但是,由于groupby对所有组都使用共享的迭代器,因此不能将组迭代器存储在列表或任何容器中,因此应在使用下一个之前使用每个组迭代器。

Moses Koledoye answered 2020-07-13T04:56:55Z
6 votes

由于(在CPython中)使用了纯C级内置函数,因此我可以提出最快的解决方案。 这样,就不需要Python字节码来生成每个块(除非在Python中实现了底层生成器),这具有巨大的性能优势。 它确实在返回每个块之前先遍历了每个块,但是它没有对要返回的块进行任何预遍历:

# Py2 only to get generator based map
from future_builtins import map

from itertools import islice, repeat, starmap, takewhile
# operator.truth is *significantly* faster than bool for the case of
# exactly one positional argument
from operator import truth

def chunker(n, iterable):  # n is size of each chunk; last chunk may be smaller
    return takewhile(truth, map(tuple, starmap(islice, repeat((iter(iterable), n)))))

由于有点密集,因此可以使用扩展版本进行说明:

def chunker(n, iterable):
    iterable = iter(iterable)
    while True:
        x = tuple(islice(iterable, n))
        if not x:
            return
        yield x

enumerate中包装对chunker的呼叫,可以让您在需要时为数据块编号。

ShadowRanger answered 2020-07-13T04:57:24Z
2 votes

如何使用itertools.islice

import itertools

els = iter(xrange(7))

print list(itertools.islice(els, 2))
print list(itertools.islice(els, 2))
print list(itertools.islice(els, 2))
print list(itertools.islice(els, 2))

这使:

[0, 1]
[2, 3]
[4, 5]
[6]
warvariuc answered 2020-07-13T04:57:49Z
2 votes
from itertools import islice
def chunk(it, n):
    '''
    # returns chunks of n elements each

    >>> list(chunk(range(10), 3))
    [
        [0, 1, 2, ],
        [3, 4, 5, ],
        [6, 7, 8, ],
        [9, ]
    ]

    >>> list(chunk(list(range(10)), 3))
    [
        [0, 1, 2, ],
        [3, 4, 5, ],
        [6, 7, 8, ],
        [9, ]
    ]
    '''
    def _w(g):
        return lambda: tuple(islice(g, n))
    return iter(_w(iter(it)), ())
igiroux answered 2020-07-13T04:58:04Z
2 votes

在以更高的速度为500k +行的数据库插入制定解决方案时,开始意识到此方案的有用性。

生成器处理来自源的数据,并逐行“屈服”它; 然后另一台生成器将输出分组,然后逐块“屈服”。 第二个生成器仅知道块大小,仅此而已。

下面是一个突出概念的示例:

#!/usr/bin/python

def firstn_gen(n):
    num = 0
    while num < n:
        yield num
        num += 1

def chunk_gen(some_gen, chunk_size=7):
    res_chunk = []
    for count, item in enumerate(some_gen, 1):
        res_chunk.append(item)
        if count % chunk_size == 0:
            yield res_chunk
            res_chunk[:] = []
    else:
        yield res_chunk


if __name__ == '__main__':
    for a_chunk in chunk_gen(firstn_gen(33)):
        print(a_chunk)

在Python 2.7.12中测试:

[0, 1, 2, 3, 4, 5, 6]
[7, 8, 9, 10, 11, 12, 13]
[14, 15, 16, 17, 18, 19, 20]
[21, 22, 23, 24, 25, 26, 27]
[28, 29, 30, 31, 32]
Down the Stream answered 2020-07-13T04:58:37Z
1 votes

我遇到了同样的问题,但是找到了比这里提到的解决方案更简单的解决方案:

def chunker(iterable, chunk_size):
    els = iter(iterable)
    while True:
        next_el = next(els)
        yield chain([next_el], islice(els, chunk_size - 1))

for i, chunk in enumerate(chunker(range(11), 2)):
    for el in chunk:
        print(i, el)

# Prints the following:
0 0
0 1
1 2
1 3
2 4
2 5
3 6
3 7
4 8
4 9
5 10
santon answered 2020-07-13T04:58:57Z
0 votes

您已经说过不希望将内容存储在内存中,这是否意味着您无法为当前块构建中间列表?

为什么不遍历生成器并在块之间插入标记值? 消费者(或合适的包装器)可以忽略该标记:

class Sentinel(object):
    pass

def chunk(els, size):
    for i, el in enumerate(els):
        yield el
        if i > 0 and i % size == 0:
            yield Sentinel
answered 2020-07-13T04:59:22Z
0 votes

用发电机编辑其他解决方案

您不应该在迭代器中执行list(chunk(range(7)),而只需对其进行迭代并在每次迭代时更新块号:

def chunk(it, maxv):
    n = 0
    for i in it:
        yield n // mavx, i
        n += 1

如果要使用发电机,则可以:

def chunk(a, maxv):
    def inner(it, maxv, l):
        l[0] = False
        for i in range(maxv):
            yield next(it)
        l[0] = True
        raise StopIteration
    it = iter(a)
    l = [True]
    while l[0] == True:
        yield inner(it, maxv, l)
    raise StopIteration

与一个迭代。

测试:在python 2.7和3.4上:

for i in chunk(range(7), 3):
    print 'CHUNK'
    for a in i:
        print a

给出:

CHUNK
0
1
2
CHUNK
3
4
5
CHUNK
6

并在2.7上:

for i in chunk(xrange(7), 3):
    print 'CHUNK'
    for a in i:
        print a

给出相同的结果。

但要注意:list(chunk(range(7))在2.7和3.4上被阻止

Serge Ballesta answered 2020-07-13T05:00:17Z
0 votes

受到Moses Koledoye的回答的启发,我试图提供一种使用itertools.groupby的解决方案,但不需要每个步骤都进行划分。

以下函数可用作groupby的键,它仅返回一个布尔值,该布尔值在预定义的调用次数之后翻转。

def chunks(chunksize=3):

    def flag_gen():
        flag = False
        while True:
            for num in range(chunksize):
                yield flag
            flag = not flag

    flag_iter = flag_gen()

    def flag_func(*args, **kwargs):
        return next(flag_iter)

    return flag_func

可以这样使用:

from itertools import groupby

my_long_generator = iter("abcdefghijklmnopqrstuvwxyz")

chunked_generator = groupby(my_long_generator, key=chunks(chunksize=5))

for flag, chunk in chunked_generator:
    print("Flag is {f}".format(f=flag), list(chunk))

输出:

Flag is False ['a', 'b', 'c', 'd', 'e']
Flag is True ['f', 'g', 'h', 'i', 'j']
Flag is False ['k', 'l', 'm', 'n', 'o']
Flag is True ['p', 'q', 'r', 's', 't']
Flag is False ['u', 'v', 'w', 'x', 'y']
Flag is True ['z']

我做了一个小提琴来演示此代码。

Andrew Martin answered 2020-07-13T05:00:54Z
translate from https://stackoverflow.com:/questions/24527006/split-a-generator-into-chunks-without-pre-walking-it