bash - shell source




分叉/多線程過程| 巴什 (6)

haridsv的方法很棒,它提供了運行處理器插槽設置的靈活性,其中許多進程可以保持運行,新作業在作業完成時提交,從而保持整體負載。 以下是針對ngrid'作業''網格'的n-slot處理器的haridsv代碼的mods(我將其用於仿真模型的網格)然後一次測試輸出8個作業,運行總計為,提交,完成和剩餘

#!/bin/bash
########################################################################
# see haridsv on forking-multi-threaded-processes-bash
# loop over grid, submitting jobs in the background.
# As jobs complete new ones are set going to keep the number running
# up to n as much as possible, until it tapers off at the end.
#
# 8 jobs
ngrid=8
# 3 at a time
n=3
# running counts
running=0
completed=0
# previous values
prunning=0
pcompleted=0
#
########################################################################
# process monitoring functions
#
declare -a pids
#
function checkPids() {
echo  ${#pids[@]}
if [ ${#pids[@]} -ne 0 ]
then
    echo "Checking for pids: ${pids[@]}"
    local range=$(eval echo {0..$((${#pids[@]}-1))})
    local i
    for i in $range; do
        if ! kill -0 ${pids[$i]} 2> /dev/null; then
            echo "Done -- ${pids[$i]}"
            unset pids[$i]
            completed=$(expr $completed + 1)
        fi
    done
    pids=("${pids[@]}") # Expunge nulls created by unset.
    running=$((${#pids[@]}))
    echo "#PIDS :"$running
fi
}
#
function addPid() {
    desc=$1
    pid=$2
    echo " ${desc} - "$pid
    pids=(${pids[@]} $pid)
}
########################################################################
#
# Loop and report when job changes happen,
# keep going until all are completed.
#
idx=0
while [ $completed -lt ${ngrid} ]
do
#
    if [ $running -lt $n ] && [ $idx -lt ${ngrid} ]
    then
####################################################################
#
# submit a new process if less than n
# are running and we haven't finished...
#
# get desc for process
#
        name="job_"${idx}
# background execution
        sleep 3 &
        addPid $name $!
        idx=$(expr $idx + 1)
#
####################################################################
#
    fi
#
    checkPids
# if something changes...
    if [ ${running} -gt ${prunning} ] || \
       [ ${completed} -gt ${pcompleted} ]
    then
        remain=$(expr $ngrid - $completed)
        echo  " Running: "${running}" Submitted: "${idx}\
              " Completed: "$completed" Remaining: "$remain
    fi
# save counts to prev values
    prunning=${running}
    pcompleted=${completed}
#
    sleep 1
#
done
#
########################################################################

測試輸出:

 job_0 - 75257
1
Checking for pids: 75257
#PIDS :1
 Running: 1 Submitted: 1  Completed: 0 Remaining: 8
 job_1 - 75262
2
Checking for pids: 75257 75262
#PIDS :2
 Running: 2 Submitted: 2  Completed: 0 Remaining: 8
 job_2 - 75267
3
Checking for pids: 75257 75262 75267
#PIDS :3
 Running: 3 Submitted: 3  Completed: 0 Remaining: 8
3
Checking for pids: 75257 75262 75267
Done -- 75257
#PIDS :2
 Running: 2 Submitted: 3  Completed: 1 Remaining: 7
 job_3 - 75277
3
Checking for pids: 75262 75267 75277
Done -- 75262
#PIDS :2
 Running: 2 Submitted: 4  Completed: 2 Remaining: 6
 job_4 - 75283
3
Checking for pids: 75267 75277 75283
Done -- 75267
#PIDS :2
 Running: 2 Submitted: 5  Completed: 3 Remaining: 5
 job_5 - 75289
3
Checking for pids: 75277 75283 75289
#PIDS :3
 Running: 3 Submitted: 6  Completed: 3 Remaining: 5
3
Checking for pids: 75277 75283 75289
Done -- 75277
#PIDS :2
 Running: 2 Submitted: 6  Completed: 4 Remaining: 4
 job_6 - 75298
3
Checking for pids: 75283 75289 75298
Done -- 75283
#PIDS :2
 Running: 2 Submitted: 7  Completed: 5 Remaining: 3
 job_7 - 75304
3
Checking for pids: 75289 75298 75304
Done -- 75289
#PIDS :2
 Running: 2 Submitted: 8  Completed: 6 Remaining: 2
2
Checking for pids: 75298 75304
#PIDS :2
2
Checking for pids: 75298 75304
Done -- 75298
#PIDS :1
 Running: 1 Submitted: 8  Completed: 7 Remaining: 1
1
Checking for pids: 75304
Done -- 75304
#PIDS :0
 Running: 0 Submitted: 8  Completed: 8 Remaining: 0

我想讓我的代碼的一部分更有效率。 我正在考慮將它分成多個進程並讓它們一次執行50/100次而不是一次。

例如(偽):

for line in file;
do 
foo;
foo2;
foo3;
done

我想這個for循環運行多次。 我知道這可以通過分叉來完成。 它看起來像這樣嗎?

while(x <= 50)
parent(child pid)
{
   fork child()
}
child
{
   do 
   foo; foo2; foo3; 
   done
   return child_pid()
}

或者我是否以錯誤的方式思考這個問題?

謝謝!


使用GNU Parallel,您可以:

cat file | parallel 'foo {}; foo2 {}; foo3 {}'

這將在每個cpu核心上運行一個作業。 運行50做:

cat file | parallel -j 50 'foo {}; foo2 {}; foo3 {}'

觀看介紹視頻以了解更多信息:

http://www.youtube.com/playlist?list=PL284C9FF2488BC6D1


我不喜歡使用wait因為它會在進程退出之前被阻塞,這在有多個進程需要等待的時候並不理想,因為在當前進程完成之前我無法獲得狀態更新。 我更喜歡使用kill -0sleep的組合。

給定一系列要等待的pids ,我使用下面的waitPids()函數來獲得有關哪些pids仍未完成的持續反饋。

declare -a pids
waitPids() {
    while [ ${#pids[@]} -ne 0 ]; do
        echo "Waiting for pids: ${pids[@]}"
        local range=$(eval echo {0..$((${#pids[@]}-1))})
        local i
        for i in $range; do
            if ! kill -0 ${pids[$i]} 2> /dev/null; then
                echo "Done -- ${pids[$i]}"
                unset pids[$i]
            fi
        done
        pids=("${pids[@]}") # Expunge nulls created by unset.
        sleep 1
    done
    echo "Done!"
}

當我在後台啟動一個進程時,我使用下面的實用程序函數將其pid立即添加到pids數組中:

addPid() {
    desc=$1
    pid=$2
    echo "$desc -- $pid"
    pids=(${pids[@]} $pid)
}

這是一個展示如何使用的示例:

for i in {2..5}; do
    sleep $i &
    addPid "Sleep for $i" $!
done
waitPids

以下是反饋的看法:

Sleep for 2 -- 36271
Sleep for 3 -- 36272
Sleep for 4 -- 36273
Sleep for 5 -- 36274
Waiting for pids: 36271 36272 36273 36274
Waiting for pids: 36271 36272 36273 36274
Waiting for pids: 36271 36272 36273 36274
Done -- 36271
Waiting for pids: 36272 36273 36274
Done -- 36272
Waiting for pids: 36273 36274
Done -- 36273
Waiting for pids: 36274
Done -- 36274
Done!

我不知道在bash中有任何顯式的fork調用。 您可能想要做的是追加您想要在後台運行的命令。 您還可以在bash腳本中使用&定義的函數:

do_something_with_line()
{
  line=$1
  foo
  foo2
  foo3
}

for line in file
do
  do_something_with_line $line &
done

編輯 :為了限制同時後台進程的數量,你可以嘗試這樣的事情:

for line in file
do
  while [`jobs | wc -l` -ge 50 ]
  do
    sleep 5
  done
  do_something_with_line $line &
done

讓我試一試

for x in 1 2 3 ; do { echo a $x ; sleep 1 ; echo b $x ; } &  done ; sleep 10

並使用jobs來查看正在運行的內容。


這是我的線程控制功能:

#!/bin/bash
# This function just checks jobs in background, don't do more things.
# if jobs number is lower than MAX, then return to get more jobs;
# if jobs number is greater or equal to MAX, then wait, until someone finished.

# Usage:
#   thread_max 8
#   thread_max 0    # wait, until all jobs completed

thread_max() {
    local CHECK_INTERVAL="3s"
    local CUR_THREADS=
    local MAX=
    [[ $1 ]] && MAX=$1 || return 127

    # reset MAX value, 0 is easy to remember
    [ $MAX -eq 0 ] && {
        MAX=1
        DEBUG "waiting for all tasks finish"
    }

    while true; do
        CUR_THREADS=`jobs -p | wc -w`

        # workaround about jobs bug. If don't execute it explicitily,
        # CUR_THREADS will stick at 1, even no jobs running anymore.
        jobs &>/dev/null

        DEBUG "current thread amount: $CUR_THREADS"
        if [ $CUR_THREADS -ge $MAX ]; then
            sleep $CHECK_INTERVAL
        else
            return 0
        fi
    done
}




fork